From 4942e4a5872536dc0cd43d426f4d11774e0a2103 Mon Sep 17 00:00:00 2001 From: "maruel@chromium.org" Date: Tue, 15 Nov 2011 15:50:50 +0000 Subject: [PATCH] Add most of testing improvements without the tee-specific tests Change return value from list to tuple for communicate(timeout != None). R=dpranke@chromium.org BUG= TEST= Review URL: http://codereview.chromium.org/8539015 git-svn-id: svn://svn.chromium.org/chrome/trunk/tools/depot_tools@110094 0039d316-1c4b-4281-b951-d872f2087c98 --- subprocess2.py | 2 +- tests/subprocess2_test.py | 232 +++++++++++++++++++++++++++----------- 2 files changed, 170 insertions(+), 64 deletions(-) diff --git a/subprocess2.py b/subprocess2.py index 4708aadf3..afe5c75c1 100644 --- a/subprocess2.py +++ b/subprocess2.py @@ -246,7 +246,7 @@ def communicate(args, timeout=None, **kwargs): time.sleep(0.001) # Now that the process died, reset the cursor and read the file. buff.seek(0) - out = [buff.read(), None] + out = (buff.read(), None) return out, proc.returncode diff --git a/tests/subprocess2_test.py b/tests/subprocess2_test.py index 54e4e1512..f6c23ed55 100755 --- a/tests/subprocess2_test.py +++ b/tests/subprocess2_test.py @@ -5,17 +5,44 @@ """Unit tests for subprocess2.py.""" +import logging import optparse import os import sys import time import unittest +try: + import fcntl +except ImportError: + fcntl = None + sys.path.insert(0, os.path.dirname(os.path.dirname(os.path.abspath(__file__)))) import subprocess2 -class Subprocess2Test(unittest.TestCase): +# Method could be a function +# pylint: disable=R0201 + + +def convert_to_crlf(string): + """Unconditionally convert LF to CRLF.""" + return string.replace('\n', '\r\n') + + +def convert_to_cr(string): + """Unconditionally convert LF to CR.""" + return string.replace('\n', '\r') + + +def convert_win(string): + """Converts string to CRLF on Windows only.""" + if sys.platform == 'win32': + return string.replace('\n', '\r\n') + return string + + +class DefaultsTest(unittest.TestCase): # Can be mocked in a test. TO_SAVE = { subprocess2: [ @@ -24,8 +51,6 @@ class Subprocess2Test(unittest.TestCase): } def setUp(self): - self.exe_path = __file__ - self.exe = [sys.executable, self.exe_path, '--child'] self.saved = {} for module, names in self.TO_SAVE.iteritems(): self.saved[module] = dict( @@ -144,14 +169,54 @@ class Subprocess2Test(unittest.TestCase): } self.assertEquals(expected, results) + +class S2Test(unittest.TestCase): + def setUp(self): + super(S2Test, self).setUp() + self.exe_path = __file__ + self.exe = [sys.executable, self.exe_path, '--child'] + self.states = {} + if fcntl: + for v in (sys.stdin, sys.stdout, sys.stderr): + fileno = v.fileno() + self.states[fileno] = fcntl.fcntl(fileno, fcntl.F_GETFL) + + def tearDown(self): + for fileno, fl in self.states.iteritems(): + self.assertEquals(fl, fcntl.fcntl(fileno, fcntl.F_GETFL)) + super(S2Test, self).tearDown() + + def _run_test(self, function): + """Runs tests in 6 combinations: + - LF output with universal_newlines=False + - CR output with universal_newlines=False + - CRLF output with universal_newlines=False + - LF output with universal_newlines=True + - CR output with universal_newlines=True + - CRLF output with universal_newlines=True + + First |function| argument is the convertion for the origianl expected LF + string to the right EOL. + Second |function| argument is the executable and initial flag to run, to + control what EOL is used by the child process. + Third |function| argument is universal_newlines value. + """ + noop = lambda x: x + function(noop, self.exe, False) + function(convert_to_cr, self.exe + ['--cr'], False) + function(convert_to_crlf, self.exe + ['--crlf'], False) + function(noop, self.exe, True) + function(noop, self.exe + ['--cr'], True) + function(noop, self.exe + ['--crlf'], True) + def test_timeout(self): - # It'd be better to not discard stdout. out, returncode = subprocess2.communicate( - self.exe + ['--sleep', '--stdout'], + self.exe + ['--sleep_first', '--stdout'], timeout=0.01, - stdout=subprocess2.PIPE) + stdout=subprocess2.PIPE, + shell=False) self.assertEquals(subprocess2.TIMED_OUT, returncode) - self.assertEquals(['', None], out) + self.assertEquals(('', None), out) def test_check_output_no_stdout(self): try: @@ -161,68 +226,78 @@ class Subprocess2Test(unittest.TestCase): pass def test_stdout_void(self): - (out, err), code = subprocess2.communicate( - self.exe + ['--stdout', '--stderr'], - stdout=subprocess2.VOID, - stderr=subprocess2.PIPE) - self.assertEquals(None, out) - expected = 'a\nbb\nccc\n' - if sys.platform == 'win32': - expected = expected.replace('\n', '\r\n') - self.assertEquals(expected, err) - self.assertEquals(0, code) + def fn(c, e, un): + (out, err), code = subprocess2.communicate( + e + ['--stdout', '--stderr'], + stdout=subprocess2.VOID, + stderr=subprocess2.PIPE, + universal_newlines=un) + self.assertEquals(None, out) + self.assertEquals(c('a\nbb\nccc\n'), err) + self.assertEquals(0, code) + self._run_test(fn) def test_stderr_void(self): - (out, err), code = subprocess2.communicate( - self.exe + ['--stdout', '--stderr'], - universal_newlines=True, - stdout=subprocess2.PIPE, - stderr=subprocess2.VOID) - self.assertEquals('A\nBB\nCCC\n', out) - self.assertEquals(None, err) - self.assertEquals(0, code) + def fn(c, e, un): + (out, err), code = subprocess2.communicate( + e + ['--stdout', '--stderr'], + stdout=subprocess2.PIPE, + stderr=subprocess2.VOID, + universal_newlines=un) + self.assertEquals(c('A\nBB\nCCC\n'), out) + self.assertEquals(None, err) + self.assertEquals(0, code) + self._run_test(fn) def test_check_output_throw_stdout(self): - try: - subprocess2.check_output( - self.exe + ['--fail', '--stdout'], universal_newlines=True) - self.fail() - except subprocess2.CalledProcessError, e: - self.assertEquals('A\nBB\nCCC\n', e.stdout) - self.assertEquals(None, e.stderr) - self.assertEquals(64, e.returncode) + def fn(c, e, un): + try: + subprocess2.check_output( + e + ['--fail', '--stdout'], universal_newlines=un) + self.fail() + except subprocess2.CalledProcessError, e: + self.assertEquals(c('A\nBB\nCCC\n'), e.stdout) + self.assertEquals(None, e.stderr) + self.assertEquals(64, e.returncode) + self._run_test(fn) def test_check_output_throw_no_stderr(self): - try: - subprocess2.check_output( - self.exe + ['--fail', '--stderr'], universal_newlines=True) - self.fail() - except subprocess2.CalledProcessError, e: - self.assertEquals('', e.stdout) - self.assertEquals(None, e.stderr) - self.assertEquals(64, e.returncode) + def fn(c, e, un): + try: + subprocess2.check_output( + e + ['--fail', '--stderr'], universal_newlines=un) + self.fail() + except subprocess2.CalledProcessError, e: + self.assertEquals(c(''), e.stdout) + self.assertEquals(None, e.stderr) + self.assertEquals(64, e.returncode) + self._run_test(fn) def test_check_output_throw_stderr(self): - try: - subprocess2.check_output( - self.exe + ['--fail', '--stderr'], stderr=subprocess2.PIPE, - universal_newlines=True) - self.fail() - except subprocess2.CalledProcessError, e: - self.assertEquals('', e.stdout) - self.assertEquals('a\nbb\nccc\n', e.stderr) - self.assertEquals(64, e.returncode) + def fn(c, e, un): + try: + subprocess2.check_output( + e + ['--fail', '--stderr'], stderr=subprocess2.PIPE, + universal_newlines=un) + self.fail() + except subprocess2.CalledProcessError, e: + self.assertEquals('', e.stdout) + self.assertEquals(c('a\nbb\nccc\n'), e.stderr) + self.assertEquals(64, e.returncode) + self._run_test(fn) def test_check_output_throw_stderr_stdout(self): - try: - subprocess2.check_output( - self.exe + ['--fail', '--stderr'], stderr=subprocess2.STDOUT, - universal_newlines=True) - self.fail() - except subprocess2.CalledProcessError, e: - self.assertEquals('a\nbb\nccc\n', e.stdout) - self.assertEquals(None, e.stderr) - self.assertEquals(64, e.returncode) + def fn(c, e, un): + try: + subprocess2.check_output( + e + ['--fail', '--stderr'], stderr=subprocess2.STDOUT, + universal_newlines=un) + self.fail() + except subprocess2.CalledProcessError, e: + self.assertEquals(c('a\nbb\nccc\n'), e.stdout) + self.assertEquals(None, e.stderr) + self.assertEquals(64, e.returncode) + self._run_test(fn) def test_check_call_throw(self): try: @@ -235,6 +310,13 @@ class Subprocess2Test(unittest.TestCase): def child_main(args): + if sys.platform == 'win32': + # Annoying, make sure the output is not translated on Windows. + # pylint: disable=E1101,F0401 + import msvcrt + msvcrt.setmode(sys.stdout.fileno(), os.O_BINARY) + msvcrt.setmode(sys.stderr.fileno(), os.O_BINARY) + parser = optparse.OptionParser() parser.add_option( '--fail', @@ -242,28 +324,52 @@ def child_main(args): action='store_const', default=0, const=64) + parser.add_option( + '--crlf', action='store_const', const='\r\n', dest='eol', default='\n') + parser.add_option( + '--cr', action='store_const', const='\r', dest='eol') parser.add_option('--stdout', action='store_true') parser.add_option('--stderr', action='store_true') - parser.add_option('--sleep', action='store_true') + parser.add_option('--sleep_first', action='store_true') + parser.add_option('--sleep_last', action='store_true') + parser.add_option('--large', action='store_true') + parser.add_option('--read', action='store_true') options, args = parser.parse_args(args) if args: parser.error('Internal error') + if options.sleep_first: + time.sleep(10) def do(string): if options.stdout: - print >> sys.stdout, string.upper() + sys.stdout.write(string.upper()) + sys.stdout.write(options.eol) if options.stderr: - print >> sys.stderr, string.lower() + sys.stderr.write(string.lower()) + sys.stderr.write(options.eol) do('A') do('BB') do('CCC') - if options.sleep: + if options.large: + # Print 128kb. + string = '0123456789abcdef' * (8*1024) + sys.stdout.write(string) + if options.read: + try: + while sys.stdin.read(): + pass + except OSError: + pass + if options.sleep_last: time.sleep(10) return options.return_value if __name__ == '__main__': + logging.basicConfig(level= + [logging.WARNING, logging.INFO, logging.DEBUG][ + min(2, sys.argv.count('-v'))]) if len(sys.argv) > 1 and sys.argv[1] == '--child': sys.exit(child_main(sys.argv[2:])) unittest.main()