diff --git a/tests/subprocess2_test.py b/tests/subprocess2_test.py index d6c3fb700..6d31d1882 100755 --- a/tests/subprocess2_test.py +++ b/tests/subprocess2_test.py @@ -21,14 +21,24 @@ import subprocess2 # pylint: disable=R0201 -def convert(string): - """Converts string to CRLF on Windows.""" +def convert_to_crlf(string): + """Unconditionally convert LF to CRLF.""" + return string.replace('\n', '\r\n') + + +def convert_to_cr(string): + """Unconditionally convert LF to CR.""" + return string.replace('\n', '\r') + + +def convert_win(string): + """Converts string to CRLF on Windows only.""" if sys.platform == 'win32': return string.replace('\n', '\r\n') return string -class Subprocess2Test(unittest.TestCase): +class DefaultsTest(unittest.TestCase): # Can be mocked in a test. TO_SAVE = { subprocess2: [ @@ -37,8 +47,6 @@ class Subprocess2Test(unittest.TestCase): } def setUp(self): - self.exe_path = __file__ - self.exe = [sys.executable, self.exe_path, '--child'] self.saved = {} for module, names in self.TO_SAVE.iteritems(): self.saved[module] = dict( @@ -155,19 +163,12 @@ class Subprocess2Test(unittest.TestCase): } self.assertEquals(expected, results) - def test_timeout(self): - out, returncode = subprocess2.communicate( - self.exe + ['--sleep', '--stdout'], - timeout=0.01, - stdout=subprocess2.PIPE, - shell=False) - self.assertEquals(subprocess2.TIMED_OUT, returncode) - self.assertEquals(('', None), out) - def test_timeout_shell_throws(self): + # Never called. + _ = self._fake_Popen() try: subprocess2.communicate( - self.exe + ['--sleep', '--stdout'], + sys.executable, timeout=0.01, stdout=subprocess2.PIPE, shell=True) @@ -175,6 +176,45 @@ class Subprocess2Test(unittest.TestCase): except TypeError: pass + +class S2Test(unittest.TestCase): + def setUp(self): + super(S2Test, self).setUp() + self.exe_path = __file__ + self.exe = [sys.executable, self.exe_path, '--child'] + + def _run_test(self, function): + """Runs tests in 6 combinations: + - LF output with universal_newlines=False + - CR output with universal_newlines=False + - CRLF output with universal_newlines=False + - LF output with universal_newlines=True + - CR output with universal_newlines=True + - CRLF output with universal_newlines=True + + First |function| argument is the convertion for the origianl expected LF + string to the right EOL. + Second |function| argument is the executable and initial flag to run, to + control what EOL is used by the child process. + Third |function| argument is universal_newlines value. + """ + noop = lambda x: x + function(noop, self.exe, False) + function(convert_to_cr, self.exe + ['--cr'], False) + function(convert_to_crlf, self.exe + ['--crlf'], False) + function(noop, self.exe, True) + function(noop, self.exe + ['--cr'], True) + function(noop, self.exe + ['--crlf'], True) + + def test_timeout(self): + out, returncode = subprocess2.communicate( + self.exe + ['--sleep_first', '--stdout'], + timeout=0.01, + stdout=subprocess2.PIPE, + shell=False) + self.assertEquals(subprocess2.TIMED_OUT, returncode) + self.assertEquals(('', None), out) + def test_check_output_no_stdout(self): try: subprocess2.check_output(self.exe, stdout=subprocess2.PIPE) @@ -183,65 +223,78 @@ class Subprocess2Test(unittest.TestCase): pass def test_stdout_void(self): - (out, err), code = subprocess2.communicate( - self.exe + ['--stdout', '--stderr'], - stdout=subprocess2.VOID, - stderr=subprocess2.PIPE) - self.assertEquals(None, out) - self.assertEquals(convert('a\nbb\nccc\n'), err) - self.assertEquals(0, code) + def fn(c, e, un): + (out, err), code = subprocess2.communicate( + e + ['--stdout', '--stderr'], + stdout=subprocess2.VOID, + stderr=subprocess2.PIPE, + universal_newlines=un) + self.assertEquals(None, out) + self.assertEquals(c('a\nbb\nccc\n'), err) + self.assertEquals(0, code) + self._run_test(fn) def test_stderr_void(self): - (out, err), code = subprocess2.communicate( - self.exe + ['--stdout', '--stderr'], - universal_newlines=True, - stdout=subprocess2.PIPE, - stderr=subprocess2.VOID) - self.assertEquals('A\nBB\nCCC\n', out) - self.assertEquals(None, err) - self.assertEquals(0, code) + def fn(c, e, un): + (out, err), code = subprocess2.communicate( + e + ['--stdout', '--stderr'], + stdout=subprocess2.PIPE, + stderr=subprocess2.VOID, + universal_newlines=un) + self.assertEquals(c('A\nBB\nCCC\n'), out) + self.assertEquals(None, err) + self.assertEquals(0, code) + self._run_test(fn) def test_check_output_throw_stdout(self): - try: - subprocess2.check_output( - self.exe + ['--fail', '--stdout'], universal_newlines=True) - self.fail() - except subprocess2.CalledProcessError, e: - self.assertEquals('A\nBB\nCCC\n', e.stdout) - self.assertEquals(None, e.stderr) - self.assertEquals(64, e.returncode) + def fn(c, e, un): + try: + subprocess2.check_output( + e + ['--fail', '--stdout'], universal_newlines=un) + self.fail() + except subprocess2.CalledProcessError, e: + self.assertEquals(c('A\nBB\nCCC\n'), e.stdout) + self.assertEquals(None, e.stderr) + self.assertEquals(64, e.returncode) + self._run_test(fn) def test_check_output_throw_no_stderr(self): - try: - subprocess2.check_output( - self.exe + ['--fail', '--stderr'], universal_newlines=True) - self.fail() - except subprocess2.CalledProcessError, e: - self.assertEquals('', e.stdout) - self.assertEquals(None, e.stderr) - self.assertEquals(64, e.returncode) + def fn(c, e, un): + try: + subprocess2.check_output( + e + ['--fail', '--stderr'], universal_newlines=un) + self.fail() + except subprocess2.CalledProcessError, e: + self.assertEquals(c(''), e.stdout) + self.assertEquals(None, e.stderr) + self.assertEquals(64, e.returncode) + self._run_test(fn) def test_check_output_throw_stderr(self): - try: - subprocess2.check_output( - self.exe + ['--fail', '--stderr'], stderr=subprocess2.PIPE, - universal_newlines=True) - self.fail() - except subprocess2.CalledProcessError, e: - self.assertEquals('', e.stdout) - self.assertEquals('a\nbb\nccc\n', e.stderr) - self.assertEquals(64, e.returncode) + def fn(c, e, un): + try: + subprocess2.check_output( + e + ['--fail', '--stderr'], stderr=subprocess2.PIPE, + universal_newlines=un) + self.fail() + except subprocess2.CalledProcessError, e: + self.assertEquals('', e.stdout) + self.assertEquals(c('a\nbb\nccc\n'), e.stderr) + self.assertEquals(64, e.returncode) + self._run_test(fn) def test_check_output_throw_stderr_stdout(self): - try: - subprocess2.check_output( - self.exe + ['--fail', '--stderr'], stderr=subprocess2.STDOUT, - universal_newlines=True) - self.fail() - except subprocess2.CalledProcessError, e: - self.assertEquals('a\nbb\nccc\n', e.stdout) - self.assertEquals(None, e.stderr) - self.assertEquals(64, e.returncode) + def fn(c, e, un): + try: + subprocess2.check_output( + e + ['--fail', '--stderr'], stderr=subprocess2.STDOUT, + universal_newlines=un) + self.fail() + except subprocess2.CalledProcessError, e: + self.assertEquals(c('a\nbb\nccc\n'), e.stdout) + self.assertEquals(None, e.stderr) + self.assertEquals(64, e.returncode) + self._run_test(fn) def test_check_call_throw(self): try: @@ -253,45 +306,57 @@ class Subprocess2Test(unittest.TestCase): self.assertEquals(64, e.returncode) def test_check_output_tee_stderr(self): - stderr = [] - out, returncode = subprocess2.communicate( - self.exe + ['--stderr'], stderr=stderr.append) - self.assertEquals(convert('a\nbb\nccc\n'), ''.join(stderr)) - self.assertEquals((None, None), out) - self.assertEquals(0, returncode) + def fn(c, e, un): + stderr = [] + out, returncode = subprocess2.communicate( + e + ['--stderr'], stderr=stderr.append, + universal_newlines=un) + self.assertEquals(c('a\nbb\nccc\n'), ''.join(stderr)) + self.assertEquals((None, None), out) + self.assertEquals(0, returncode) + self._run_test(fn) def test_check_output_tee_stdout_stderr(self): - stdout = [] - stderr = [] - out, returncode = subprocess2.communicate( - self.exe + ['--stdout', '--stderr'], - stdout=stdout.append, - stderr=stderr.append) - self.assertEquals(convert('A\nBB\nCCC\n'), ''.join(stdout)) - self.assertEquals(convert('a\nbb\nccc\n'), ''.join(stderr)) - self.assertEquals((None, None), out) - self.assertEquals(0, returncode) + def fn(c, e, un): + stdout = [] + stderr = [] + out, returncode = subprocess2.communicate( + e + ['--stdout', '--stderr'], + stdout=stdout.append, + stderr=stderr.append, + universal_newlines=un) + self.assertEquals(c('A\nBB\nCCC\n'), ''.join(stdout)) + self.assertEquals(c('a\nbb\nccc\n'), ''.join(stderr)) + self.assertEquals((None, None), out) + self.assertEquals(0, returncode) + self._run_test(fn) def test_check_output_tee_stdin(self): - stdout = [] - stdin = '0123456789' - out, returncode = subprocess2.communicate( - self.exe + ['--stdout', '--read'], stdin=stdin, stdout=stdout.append) - self.assertEquals(convert('A\nBB\nCCC\n'), ''.join(stdout)) - self.assertEquals((None, None), out) - self.assertEquals(0, returncode) + def fn(c, e, un): + stdout = [] + stdin = '0123456789' + out, returncode = subprocess2.communicate( + e + ['--stdout', '--read'], stdin=stdin, stdout=stdout.append, + universal_newlines=un) + self.assertEquals(c('A\nBB\nCCC\n'), ''.join(stdout)) + self.assertEquals((None, None), out) + self.assertEquals(0, returncode) + self._run_test(fn) def test_check_output_tee_throw(self): - stderr = [] - try: - subprocess2.check_output( - self.exe + ['--stderr', '--fail'], stderr=stderr.append) - self.fail() - except subprocess2.CalledProcessError, e: - self.assertEquals(convert('a\nbb\nccc\n'), ''.join(stderr)) - self.assertEquals('', e.stdout) - self.assertEquals(None, e.stderr) - self.assertEquals(64, e.returncode) + def fn(c, e, un): + stderr = [] + try: + subprocess2.check_output( + e + ['--stderr', '--fail'], stderr=stderr.append, + universal_newlines=un) + self.fail() + except subprocess2.CalledProcessError, e: + self.assertEquals(c('a\nbb\nccc\n'), ''.join(stderr)) + self.assertEquals('', e.stdout) + self.assertEquals(None, e.stderr) + self.assertEquals(64, e.returncode) + self._run_test(fn) def test_check_output_tee_large(self): stdout = [] @@ -314,6 +379,13 @@ class Subprocess2Test(unittest.TestCase): def child_main(args): + if sys.platform == 'win32': + # Annoying, make sure the output is not translated on Windows. + # pylint: disable=E1101,F0401 + import msvcrt + msvcrt.setmode(sys.stdout.fileno(), os.O_BINARY) + msvcrt.setmode(sys.stderr.fileno(), os.O_BINARY) + parser = optparse.OptionParser() parser.add_option( '--fail', @@ -321,22 +393,29 @@ def child_main(args): action='store_const', default=0, const=64) + parser.add_option( + '--crlf', action='store_const', const='\r\n', dest='eol', default='\n') + parser.add_option( + '--cr', action='store_const', const='\r', dest='eol') parser.add_option('--stdout', action='store_true') parser.add_option('--stderr', action='store_true') - parser.add_option('--sleep', action='store_true') + parser.add_option('--sleep_first', action='store_true') + parser.add_option('--sleep_last', action='store_true') parser.add_option('--large', action='store_true') parser.add_option('--read', action='store_true') options, args = parser.parse_args(args) if args: parser.error('Internal error') - if options.sleep: + if options.sleep_first: time.sleep(10) def do(string): if options.stdout: - print >> sys.stdout, string.upper() + sys.stdout.write(string.upper()) + sys.stdout.write(options.eol) if options.stderr: - print >> sys.stderr, string.lower() + sys.stderr.write(string.lower()) + sys.stderr.write(options.eol) do('A') do('BB') @@ -351,6 +430,8 @@ def child_main(args): pass except OSError: pass + if options.sleep_last: + time.sleep(10) return options.return_value