From f8757b7e02226594655230ccbeae3543c7dc49c6 Mon Sep 17 00:00:00 2001 From: tandrii Date: Thu, 25 Aug 2016 04:02:19 -0700 Subject: [PATCH] gclient: kill git fetch operation that hangs. This provides env variable GCLIENT_KILL_GIT_FETCH_AFTER that kills git fetch if it produces no output for that many seconds. Note that this is not final patch, but an experiment. See http://crbug.com/635641#c24 for the deployment plan. BUG=635641 R=hinoka@chromium.org Review-Url: https://codereview.chromium.org/2241843002 --- gclient_scm.py | 4 ++- gclient_utils.py | 66 ++++++++++++++++++++++++++++++++++++- tests/gclient_scm_test.py | 62 ++++++++++++++++++++++++++++++++++ tests/gclient_utils_test.py | 63 +++++++++++++++++++++++++++++++++++ 4 files changed, 193 insertions(+), 2 deletions(-) diff --git a/gclient_scm.py b/gclient_scm.py index fc177567e..07ca6b8f5 100644 --- a/gclient_scm.py +++ b/gclient_scm.py @@ -1199,6 +1199,7 @@ class GitWrapper(SCMWrapper): return self._Capture(checkout_args) def _Fetch(self, options, remote=None, prune=False, quiet=False): + kill_timeout = float(os.getenv('GCLIENT_KILL_GIT_FETCH_AFTER', 0)) cfg = gclient_utils.DefaultIndexPackConfig(self.url) fetch_cmd = cfg + [ 'fetch', @@ -1211,7 +1212,8 @@ class GitWrapper(SCMWrapper): fetch_cmd.append('--verbose') elif quiet: fetch_cmd.append('--quiet') - self._Run(fetch_cmd, options, show_header=options.verbose, retry=True) + self._Run(fetch_cmd, options, show_header=options.verbose, retry=True, + kill_timeout=kill_timeout) # Return the revision that was fetched; this will be stored in 'FETCH_HEAD' return self._Capture(['rev-parse', '--verify', 'FETCH_HEAD']) diff --git a/gclient_utils.py b/gclient_utils.py index 602322b2c..794fc023b 100644 --- a/gclient_utils.py +++ b/gclient_utils.py @@ -459,9 +459,64 @@ class GClientChildren(object): print >> sys.stderr, ' ', zombie.pid +class _KillTimer(object): + """Timer that kills child process after certain interval since last poke or + creation. + """ + # TODO(tandrii): we really want to make use of subprocess42 here, and not + # re-invent the wheel, but it's too much work :( + + def __init__(self, timeout, child): + self._timeout = timeout + self._child = child + + self._cv = threading.Condition() + # All items below are protected by condition above. + self._kill_at = None + self._working = True + self._thread = None + + # Start the timer immediately. + if self._timeout: + self._kill_at = time.time() + self._timeout + self._thread = threading.Thread(name='_KillTimer', target=self._work) + self._thread.daemon = True + self._thread.start() + + def poke(self): + if not self._timeout: + return + with self._cv: + self._kill_at = time.time() + self._timeout + + def cancel(self): + with self._cv: + self._working = False + self._cv.notifyAll() + + def _work(self): + if not self._timeout: + return + while True: + with self._cv: + if not self._working: + return + left = self._kill_at - time.time() + if left > 0: + self._cv.wait(timeout=left) + continue + try: + logging.warn('killing child %s because of no output for %fs', + self._child, self._timeout) + self._child.kill() + except OSError: + logging.exception('failed to kill child %s', self._child) + return + + def CheckCallAndFilter(args, stdout=None, filter_fn=None, print_stdout=None, call_filter_on_first_line=False, - retry=False, **kwargs): + retry=False, kill_timeout=None, **kwargs): """Runs a command and calls back a filter function if needed. Accepts all subprocess2.Popen() parameters plus: @@ -472,10 +527,16 @@ def CheckCallAndFilter(args, stdout=None, filter_fn=None, stdout: Can be any bufferable output. retry: If the process exits non-zero, sleep for a brief interval and try again, up to RETRY_MAX times. + kill_timeout: (float) if given, number of seconds after which process would + be killed if there is no output. Must not be used with shell=True as + only shell process would be killed, but not processes spawned by + shell. stderr is always redirected to stdout. """ assert print_stdout or filter_fn + assert not kwargs.get('shell', False) or not kill_timeout, ( + 'kill_timeout should not be used with shell=True') stdout = stdout or sys.stdout output = cStringIO.StringIO() filter_fn = filter_fn or (lambda x: None) @@ -497,12 +558,14 @@ def CheckCallAndFilter(args, stdout=None, filter_fn=None, # normally buffering is done for each line, but if svn requests input, no # end-of-line character is output after the prompt and it would not show up. try: + timeout_killer = _KillTimer(kill_timeout, kid) in_byte = kid.stdout.read(1) if in_byte: if call_filter_on_first_line: filter_fn(None) in_line = '' while in_byte: + timeout_killer.poke() output.write(in_byte) if print_stdout: stdout.write(in_byte) @@ -517,6 +580,7 @@ def CheckCallAndFilter(args, stdout=None, filter_fn=None, if len(in_line): filter_fn(in_line) rv = kid.wait() + timeout_killer.cancel() # Don't put this in a 'finally,' since the child may still run if we get # an exception. diff --git a/tests/gclient_scm_test.py b/tests/gclient_scm_test.py index 00592244c..f3d1b366c 100755 --- a/tests/gclient_scm_test.py +++ b/tests/gclient_scm_test.py @@ -16,6 +16,7 @@ import os import re import sys import tempfile +import threading import unittest sys.path.insert(0, os.path.dirname(os.path.dirname(os.path.abspath(__file__)))) @@ -969,6 +970,67 @@ class UnmanagedGitWrapperTestCase(BaseGitWrapperTestCase): self.checkstdout('________ unmanaged solution; skipping .\n') +class GitHungTest(BaseGitWrapperTestCase): + def setUp(self): + super(GitHungTest, self).setUp() + self.old = gclient_scm.gclient_utils.CheckCallAndFilter + self.old2 = gclient_scm.gclient_utils.subprocess2.Popen + self.options = self.Options() + self.options.verbose = False + self.scm = gclient_scm.CreateSCM(url=self.url, root_dir=self.root_dir, + relpath=self.relpath) + os.environ['GCLIENT_KILL_GIT_FETCH_AFTER'] = '1.0' + + def tearDown(self): + os.environ.pop('GCLIENT_KILL_GIT_FETCH_AFTER') + gclient_scm.gclient_utils.CheckCallAndFilter = self.old + gclient_scm.gclient_utils.subprocess2.Popen = self.old2 + super(GitHungTest, self).tearDown() + + def testGitFetchOk(self): + def subprocess_git_fetch_run(_, filter_fn, kill_timeout, **__): + self.assertEqual(kill_timeout, 1.0) + filter_fn('remote: something') + gclient_scm.gclient_utils.CheckCallAndFilter = subprocess_git_fetch_run + self.scm._Fetch(self.options) + self.checkstdout('remote: something\n') + + def testGitFetchHungAndRetry(self): + class Process(object): + # First process will hang, second process will exit with 0 quickly. + cv = threading.Condition() + count = -1 + killed = [] + def __init__(self): + self.count += 1 + self.stdout = self + self.data = list('retry' if self.count > 0 else 'hung') + self.data.reverse() + self.this_killed = False + def read(self, _): + if self.data: + return self.data.pop() + if self.count == 0: + # Simulate hung process. + with self.cv: + self.cv.wait(timeout=0) + return '' + def kill(self): + self.this_killed = True + self.killed.append(self.count) + with self.cv: + self.cv.notify() + def wait(self): + if self.this_killed: + return 1 + return 0 + + gclient_scm.gclient_utils.subprocess2.Popen = lambda *_, **__: Process() + self.scm._Capture = lambda *_, **__: None + self.scm._Fetch(self.options) + self.checkstdout('hung\n') + + if __name__ == '__main__': level = logging.DEBUG if '-v' in sys.argv else logging.FATAL logging.basicConfig( diff --git a/tests/gclient_utils_test.py b/tests/gclient_utils_test.py index 39a3d772d..582fbba13 100755 --- a/tests/gclient_utils_test.py +++ b/tests/gclient_utils_test.py @@ -6,6 +6,7 @@ import os import StringIO import sys +import threading sys.path.insert(0, os.path.dirname(os.path.dirname(os.path.abspath(__file__)))) @@ -81,6 +82,62 @@ class CheckCallAndFilterTestCase(GclientUtilBase): 'ahah\naccb\nallo\naddb\n' '________ running \'boo foo bar\' in \'bleh\'\nahah\naccb\nallo\naddb') + def _checkKillTimeout(self, output_block_for=0, kill_raises=False): + cv = threading.Condition() + order = [] + + output = list(reversed('output')) + def kid_stdout_read(_): + if output: + return output.pop() + else: + with cv: + cv.wait(timeout=output_block_for) + order.append('unblock') + return '' + def kid_kill(): + with cv: + order.append('killed') + cv.notify() + if kill_raises: + raise OSError('somethign went wrong') + + kid = self.ProcessIdMock('') + kid.stdout.read = kid_stdout_read + kid.kill = kid_kill # pylint: disable=W0201 + cwd = 'bleh' + args = ['ar', 'gs'] + gclient_utils.sys.stdout.write( + '\n________ running \'ar gs\' in \'bleh\'\noutput') + os.getcwd() + subprocess2.Popen( + args, cwd=cwd, + stdout=subprocess2.PIPE, + stderr=subprocess2.STDOUT, + bufsize=0).AndReturn(kid) + self.mox.ReplayAll() + + # This test relies on the testing machine's ability to process 1 char + # of output in <0.01 seconds set in kill_timeout. + gclient_utils.CheckCallAndFilterAndHeader( + args, cwd=cwd, always=True, kill_timeout=0.01) + self.checkstdout( + '\n________ running \'ar gs\' in \'bleh\'\noutput\n' + '________ running \'ar gs\' in \'bleh\'\noutput') + return order + + def testKillTimeout(self): + order = self._checkKillTimeout(output_block_for=1.0) + self.assertEquals(order, ['killed', 'unblock']) + + def testKillRaise(self): + order = self._checkKillTimeout(output_block_for=1.0, kill_raises=True) + self.assertEquals(order, ['killed', 'unblock']) + + def testNoKill(self): + order = self._checkKillTimeout(output_block_for=0.0) + self.assertEquals(order, ['unblock']) + class SplitUrlRevisionTestCase(GclientUtilBase): def testSSHUrl(self): @@ -203,6 +260,12 @@ class GClientUtilsTest(trial_dir.TestCase): if __name__ == '__main__': import unittest + import logging + level = logging.DEBUG if '-v' in sys.argv else logging.FATAL + logging.basicConfig( + level=level, + format='%(asctime).19s %(levelname)s %(filename)s:' + '%(lineno)s %(message)s') unittest.main() # vim: ts=2:sw=2:tw=80:et: