From 192e8a67cfe4d1b7b0cb8e70b12b5a45f7a9221d Mon Sep 17 00:00:00 2001 From: Edward Lemur Date: Fri, 9 Aug 2019 23:11:49 +0000 Subject: [PATCH] subprocess2: Rewrite tests to be Python 3 compatible. Also replace mox with mock and move the test script to a different file. Bug: 984182 Change-Id: I9005a523c2abd82c38a7c61501c7cbfd4201a5b7 Reviewed-on: https://chromium-review.googlesource.com/c/chromium/tools/depot_tools/+/1745412 Reviewed-by: Robbie Iannucci Commit-Queue: Edward Lesmes --- testing_support/subprocess2_test_script.py | 58 ++ tests/subprocess2_test.py | 687 +++++++-------------- 2 files changed, 278 insertions(+), 467 deletions(-) create mode 100644 testing_support/subprocess2_test_script.py diff --git a/testing_support/subprocess2_test_script.py b/testing_support/subprocess2_test_script.py new file mode 100644 index 000000000..8e0139d50 --- /dev/null +++ b/testing_support/subprocess2_test_script.py @@ -0,0 +1,58 @@ +#!/usr/bin/env python +# Copyright (c) 2019 The Chromium Authors. All rights reserved. +# Use of this source code is governed by a BSD-style license that can be +# found in the LICENSE file. + +"""Script used to test subprocess2.""" + +import optparse +import os +import sys +import time + + +if sys.platform == 'win32': + # Annoying, make sure the output is not translated on Windows. + # pylint: disable=no-member,import-error + import msvcrt + msvcrt.setmode(sys.stdout.fileno(), os.O_BINARY) + msvcrt.setmode(sys.stderr.fileno(), os.O_BINARY) + +parser = optparse.OptionParser() +parser.add_option( + '--fail', + dest='return_value', + action='store_const', + default=0, + const=64) +parser.add_option( + '--crlf', action='store_const', const='\r\n', dest='eol', default='\n') +parser.add_option( + '--cr', action='store_const', const='\r', dest='eol') +parser.add_option('--stdout', action='store_true') +parser.add_option('--stderr', action='store_true') +parser.add_option('--read', action='store_true') +options, args = parser.parse_args() +if args: + parser.error('Internal error') + +def do(string): + if options.stdout: + sys.stdout.write(string.upper()) + sys.stdout.write(options.eol) + if options.stderr: + sys.stderr.write(string.lower()) + sys.stderr.write(options.eol) + +do('A') +do('BB') +do('CCC') +if options.read: + assert options.return_value is 0 + try: + while sys.stdin.read(1): + options.return_value += 1 + except OSError: + pass + +sys.exit(options.return_value) diff --git a/tests/subprocess2_test.py b/tests/subprocess2_test.py index 57280b8df..ace3e996b 100755 --- a/tests/subprocess2_test.py +++ b/tests/subprocess2_test.py @@ -5,328 +5,191 @@ """Unit tests for subprocess2.py.""" -import logging -import optparse import os import sys -import time import unittest -try: - import fcntl # pylint: disable=import-error -except ImportError: - fcntl = None - -sys.path.insert(0, os.path.dirname(os.path.dirname(os.path.abspath(__file__)))) +DEPOT_TOOLS = os.path.dirname(os.path.dirname(os.path.abspath(__file__))) +sys.path.insert(0, DEPOT_TOOLS) import subprocess import subprocess2 -from testing_support import auto_stub - -# Method could be a function -# pylint: disable=no-self-use - - -# Create aliases for subprocess2 specific tests. They shouldn't be used for -# regression tests. -VOID = subprocess2.VOID -VOID_INPUT = subprocess2.VOID_INPUT -PIPE = subprocess2.PIPE -STDOUT = subprocess2.STDOUT - - -def convert_to_crlf(string): - """Unconditionally convert LF to CRLF.""" - return string.replace('\n', '\r\n') - - -def convert_to_cr(string): - """Unconditionally convert LF to CR.""" - return string.replace('\n', '\r') - - -def convert_win(string): - """Converts string to CRLF on Windows only.""" - if sys.platform == 'win32': - return string.replace('\n', '\r\n') - return string - - -class DefaultsTest(auto_stub.TestCase): - # TODO(maruel): Do a reopen() on sys.__stdout__ and sys.__stderr__ so they - # can be trapped in the child process for better coverage. - def _fake_communicate(self): - """Mocks subprocess2.communicate().""" - results = {} - def fake_communicate(args, **kwargs): - assert not results - results.update(kwargs) - results['args'] = args - return ('stdout', 'stderr'), 0 - self.mock(subprocess2, 'communicate', fake_communicate) - return results - - def _fake_Popen(self): - """Mocks the whole subprocess2.Popen class.""" - results = {} - class fake_Popen(object): - returncode = -8 - def __init__(self, args, **kwargs): - assert not results - results.update(kwargs) - results['args'] = args - @staticmethod - # pylint: disable=redefined-builtin - def communicate(input=None): - return None, None - self.mock(subprocess2, 'Popen', fake_Popen) - return results - - def _fake_subprocess_Popen(self): - """Mocks the base class subprocess.Popen only.""" - results = {} - def __init__(self, args, **kwargs): - assert not results - results.update(kwargs) - results['args'] = args - def communicate(): - return None, None - self.mock(subprocess.Popen, '__init__', __init__) - self.mock(subprocess.Popen, 'communicate', communicate) - return results - - def test_check_call_defaults(self): - results = self._fake_communicate() - self.assertEquals( - ('stdout', 'stderr'), subprocess2.check_call_out(['foo'], a=True)) - expected = { - 'args': ['foo'], - 'a':True, - } - self.assertEquals(expected, results) - - def test_capture_defaults(self): - results = self._fake_communicate() - self.assertEquals( - 'stdout', subprocess2.capture(['foo'], a=True)) - expected = { - 'args': ['foo'], - 'a':True, - 'stdin': subprocess2.VOID_INPUT, - 'stdout': subprocess2.PIPE, - } - self.assertEquals(expected, results) - - def test_communicate_defaults(self): - results = self._fake_Popen() - self.assertEquals( - ((None, None), -8), subprocess2.communicate(['foo'], a=True)) - expected = { - 'args': ['foo'], - 'a': True, - } - self.assertEquals(expected, results) - - def test_Popen_defaults(self): - results = self._fake_subprocess_Popen() - proc = subprocess2.Popen(['foo'], a=True) - # Cleanup code in subprocess.py needs this member to be set. - # pylint: disable=attribute-defined-outside-init - proc._child_created = None - expected = { - 'args': ['foo'], - 'a': True, - 'shell': bool(sys.platform=='win32'), - } - if sys.platform != 'win32': - env = os.environ.copy() - is_english = lambda name: env.get(name, 'en').startswith('en') - if not is_english('LANG'): - env['LANG'] = 'en_US.UTF-8' - expected['env'] = env - if not is_english('LANGUAGE'): - env['LANGUAGE'] = 'en_US.UTF-8' - expected['env'] = env - self.assertEquals(expected, results) - - def test_check_output_defaults(self): - results = self._fake_communicate() - # It's discarding 'stderr' because it assumes stderr=subprocess2.STDOUT but - # fake_communicate() doesn't 'implement' that. - self.assertEquals('stdout', subprocess2.check_output(['foo'], a=True)) - expected = { - 'args': ['foo'], - 'a':True, - 'stdin': subprocess2.VOID_INPUT, - 'stdout': subprocess2.PIPE, - } - self.assertEquals(expected, results) - - -class BaseTestCase(unittest.TestCase): - def setUp(self): - super(BaseTestCase, self).setUp() - self.exe_path = __file__ - self.exe = [sys.executable, self.exe_path, '--child'] - self.states = {} - if fcntl: - for v in (sys.stdin, sys.stdout, sys.stderr): - fileno = v.fileno() - self.states[fileno] = fcntl.fcntl(fileno, fcntl.F_GETFL) - - def tearDown(self): - for fileno, fl in self.states.iteritems(): - self.assertEquals(fl, fcntl.fcntl(fileno, fcntl.F_GETFL)) - super(BaseTestCase, self).tearDown() +from third_party import mock - def _check_res(self, res, stdout, stderr, returncode): - (out, err), code = res - self.assertEquals(stdout, out) - self.assertEquals(stderr, err) - self.assertEquals(returncode, code) +TEST_COMMAND = [ + sys.executable, + os.path.join(DEPOT_TOOLS, 'testing_support', 'subprocess2_test_script.py'), +] + + +class DefaultsTest(unittest.TestCase): + @mock.patch('subprocess2.communicate') + def test_check_call_defaults(self, mockCommunicate): + mockCommunicate.return_value = (('stdout', 'stderr'), 0) + self.assertEqual( + ('stdout', 'stderr'), subprocess2.check_call_out(['foo'], a=True)) + mockCommunicate.assert_called_with(['foo'], a=True) -class RegressionTest(BaseTestCase): + @mock.patch('subprocess2.communicate') + def test_capture_defaults(self, mockCommunicate): + mockCommunicate.return_value = (('stdout', 'stderr'), 0) + self.assertEqual( + 'stdout', subprocess2.capture(['foo'], a=True)) + mockCommunicate.assert_called_with( + ['foo'], a=True, stdin=subprocess2.VOID_INPUT, stdout=subprocess2.PIPE) + + @mock.patch('subprocess2.Popen') + def test_communicate_defaults(self, mockPopen): + mockPopen().communicate.return_value = ('bar', 'baz') + mockPopen().returncode = -8 + self.assertEqual( + (('bar', 'baz'), -8), subprocess2.communicate(['foo'], a=True)) + mockPopen.assert_called_with(['foo'], a=True) + + @mock.patch('os.environ', {}) + @mock.patch('subprocess.Popen.__init__') + def test_Popen_defaults(self, mockPopen): + with mock.patch('sys.platform', 'win32'): + subprocess2.Popen(['foo'], a=True) + mockPopen.assert_called_with(['foo'], a=True, shell=True) + + with mock.patch('sys.platform', 'non-win32'): + subprocess2.Popen(['foo'], a=True) + mockPopen.assert_called_with(['foo'], a=True, shell=False) + + def test_get_english_env(self): + with mock.patch('sys.platform', 'win32'): + self.assertIsNone(subprocess2.get_english_env({})) + + with mock.patch('sys.platform', 'non-win32'): + self.assertIsNone(subprocess2.get_english_env({})) + self.assertIsNone( + subprocess2.get_english_env({'LANG': 'en_XX', 'LANGUAGE': 'en_YY'})) + self.assertEqual( + {'LANG': 'en_US.UTF-8', 'LANGUAGE': 'en_US.UTF-8'}, + subprocess2.get_english_env({'LANG': 'bar', 'LANGUAGE': 'baz'})) + + @mock.patch('subprocess2.communicate') + def test_check_output_defaults(self, mockCommunicate): + mockCommunicate.return_value = (('stdout', 'stderr'), 0) + self.assertEqual('stdout', subprocess2.check_output(['foo'], a=True)) + mockCommunicate.assert_called_with( + ['foo'], a=True, stdin=subprocess2.VOID_INPUT, stdout=subprocess2.PIPE) + + +def _run_test(with_subprocess=True): + """Runs a tests in 12 combinations: + - With universal_newlines=True and False. + - With LF, CR, and CRLF output. + - With subprocess and subprocess2. + """ + subps = (subprocess2, subprocess) if with_subprocess else (subprocess2,) + no_op = lambda s: s + to_bytes = lambda s: s.encode() + to_cr_bytes = lambda s: s.replace('\n', '\r').encode() + to_crlf_bytes = lambda s: s.replace('\n', '\r\n').encode() + def wrapper(test): + def inner(self): + for subp in subps: + # universal_newlines = False + test(self, to_bytes, TEST_COMMAND, False, subp) + test(self, to_cr_bytes, TEST_COMMAND + ['--cr'], False, subp) + test(self, to_crlf_bytes, TEST_COMMAND + ['--crlf'], False, subp) + # universal_newlines = True + test(self, no_op, TEST_COMMAND, True, subp) + test(self, no_op, TEST_COMMAND + ['--cr'], True, subp) + test(self, no_op, TEST_COMMAND + ['--crlf'], True, subp) + + return inner + return wrapper + + +class SmokeTests(unittest.TestCase): # Regression tests to ensure that subprocess and subprocess2 have the same # behavior. - def _run_test(self, function): - """Runs tests in 12 combinations: - - LF output with universal_newlines=False - - CR output with universal_newlines=False - - CRLF output with universal_newlines=False - - LF output with universal_newlines=True - - CR output with universal_newlines=True - - CRLF output with universal_newlines=True - - Once with subprocess, once with subprocess2. - - First |function| argument is the conversion for the original expected LF - string to the right EOL. - Second |function| argument is the executable and initial flag to run, to - control what EOL is used by the child process. - Third |function| argument is universal_newlines value. - """ - noop = lambda x: x - for subp in (subprocess, subprocess2): - function(noop, self.exe, False, subp) - function(convert_to_cr, self.exe + ['--cr'], False, subp) - function(convert_to_crlf, self.exe + ['--crlf'], False, subp) - function(noop, self.exe, True, subp) - function(noop, self.exe + ['--cr'], True, subp) - function(noop, self.exe + ['--crlf'], True, subp) + def _check_res(self, res, stdout, stderr, returncode): + (out, err), code = res + self.assertEqual(stdout, out) + self.assertEqual(stderr, err) + self.assertEqual(returncode, code) def _check_exception(self, subp, e, stdout, stderr, returncode): """On exception, look if the exception members are set correctly.""" - self.assertEquals(returncode, e.returncode) - if subp is subprocess: + self.assertEqual(returncode, e.returncode) + if subp is subprocess2 or sys.version_info.major == 3: + self.assertEqual(stdout, e.stdout) + self.assertEqual(stderr, e.stderr) + else: # subprocess never save the output. self.assertFalse(hasattr(e, 'stdout')) self.assertFalse(hasattr(e, 'stderr')) - elif subp is subprocess2: - self.assertEquals(stdout, e.stdout) - self.assertEquals(stderr, e.stderr) - else: - self.fail() def test_check_output_no_stdout(self): - try: - subprocess2.check_output(self.exe, stdout=subprocess2.PIPE) - self.fail() - except ValueError: - pass - - if (sys.version_info[0] * 10 + sys.version_info[1]) >= 27: - # python 2.7+ - try: - # pylint: disable=no-member - subprocess.check_output(self.exe, stdout=subprocess.PIPE) - self.fail() - except ValueError: - pass - - def test_check_output_throw_stdout(self): - def fn(c, e, un, subp): - if not hasattr(subp, 'check_output'): - return - try: - subp.check_output( - e + ['--fail', '--stdout'], universal_newlines=un) - self.fail() - except subp.CalledProcessError as exception: - self._check_exception(subp, exception, c('A\nBB\nCCC\n'), None, 64) - self._run_test(fn) - - def test_check_output_throw_no_stderr(self): - def fn(c, e, un, subp): - if not hasattr(subp, 'check_output'): - return - try: - subp.check_output( - e + ['--fail', '--stderr'], universal_newlines=un) - self.fail() - except subp.CalledProcessError as exception: - self._check_exception(subp, exception, c(''), None, 64) - self._run_test(fn) - - def test_check_output_throw_stderr(self): - def fn(c, e, un, subp): - if not hasattr(subp, 'check_output'): - return - try: - subp.check_output( - e + ['--fail', '--stderr'], - stderr=subp.PIPE, - universal_newlines=un) - self.fail() - except subp.CalledProcessError as exception: - self._check_exception(subp, exception, '', c('a\nbb\nccc\n'), 64) - self._run_test(fn) - - def test_check_output_throw_stderr_stdout(self): - def fn(c, e, un, subp): - if not hasattr(subp, 'check_output'): - return - try: - subp.check_output( - e + ['--fail', '--stderr'], - stderr=subp.STDOUT, - universal_newlines=un) - self.fail() - except subp.CalledProcessError as exception: - self._check_exception(subp, exception, c('a\nbb\nccc\n'), None, 64) - self._run_test(fn) - - def test_check_call_throw(self): for subp in (subprocess, subprocess2): - try: - subp.check_call(self.exe + ['--fail', '--stderr']) - self.fail() - except subp.CalledProcessError as exception: - self._check_exception(subp, exception, None, None, 64) - - def test_redirect_stderr_to_stdout_pipe(self): - def fn(c, e, un, subp): - # stderr output into stdout. - proc = subp.Popen( - e + ['--stderr'], - stdout=subp.PIPE, + with self.assertRaises(ValueError): + subp.check_output(TEST_COMMAND, stdout=subp.PIPE) + + @_run_test() + def test_check_output_throw_stdout(self, c, cmd, un, subp): + with self.assertRaises(subp.CalledProcessError) as e: + subp.check_output( + cmd + ['--fail', '--stdout'], universal_newlines=un) + self._check_exception(subp, e.exception, c('A\nBB\nCCC\n'), None, 64) + + @_run_test() + def test_check_output_throw_no_stderr(self, c, cmd, un, subp): + with self.assertRaises(subp.CalledProcessError) as e: + subp.check_output( + cmd + ['--fail', '--stderr'], universal_newlines=un) + self._check_exception(subp, e.exception, c(''), None, 64) + + @_run_test() + def test_check_output_throw_stderr(self, c, cmd, un, subp): + with self.assertRaises(subp.CalledProcessError) as e: + subp.check_output( + cmd + ['--fail', '--stderr'], + stderr=subp.PIPE, + universal_newlines=un) + self._check_exception(subp, e.exception, c(''), c('a\nbb\nccc\n'), 64) + + @_run_test() + def test_check_output_throw_stderr_stdout(self, c, cmd, un, subp): + with self.assertRaises(subp.CalledProcessError) as e: + subp.check_output( + cmd + ['--fail', '--stderr'], stderr=subp.STDOUT, universal_newlines=un) - res = proc.communicate(), proc.returncode - self._check_res(res, c('a\nbb\nccc\n'), None, 0) - self._run_test(fn) - - def test_redirect_stderr_to_stdout(self): - def fn(c, e, un, subp): - # stderr output into stdout but stdout is not piped. - proc = subp.Popen( - e + ['--stderr'], stderr=STDOUT, universal_newlines=un) - res = proc.communicate(), proc.returncode - self._check_res(res, None, None, 0) - self._run_test(fn) - - def test_stderr(self): + self._check_exception(subp, e.exception, c('a\nbb\nccc\n'), None, 64) + + def test_check_call_throw(self): + for subp in (subprocess, subprocess2): + with self.assertRaises(subp.CalledProcessError) as e: + subp.check_call(TEST_COMMAND + ['--fail', '--stderr']) + self._check_exception(subp, e.exception, None, None, 64) + + @_run_test() + def test_redirect_stderr_to_stdout_pipe(self, c, cmd, un, subp): + # stderr output into stdout. + proc = subp.Popen( + cmd + ['--stderr'], + stdout=subp.PIPE, + stderr=subp.STDOUT, + universal_newlines=un) + res = proc.communicate(), proc.returncode + self._check_res(res, c('a\nbb\nccc\n'), None, 0) + + @_run_test() + def test_redirect_stderr_to_stdout(self, c, cmd, un, subp): + # stderr output into stdout but stdout is not piped. + proc = subp.Popen( + cmd + ['--stderr'], stderr=subprocess2.STDOUT, universal_newlines=un) + res = proc.communicate(), proc.returncode + self._check_res(res, None, None, 0) + + @_run_test() + def test_stderr(self, c, cmd, un, subp): cmd = ['expr', '1', '/', '0'] if sys.platform == 'win32': cmd = ['cmd.exe', '/c', 'exit', '1'] @@ -334,180 +197,70 @@ class RegressionTest(BaseTestCase): p2 = subprocess2.Popen(cmd, stderr=subprocess.PIPE, shell=False) r1 = p1.communicate() r2 = p2.communicate() - self.assertEquals(r1, r2) - - -class S2Test(BaseTestCase): - # Tests that can only run in subprocess2, e.g. new functionalities. - # In particular, subprocess2.communicate() doesn't exist in subprocess. - def _run_test(self, function): - """Runs tests in 6 combinations: - - LF output with universal_newlines=False - - CR output with universal_newlines=False - - CRLF output with universal_newlines=False - - LF output with universal_newlines=True - - CR output with universal_newlines=True - - CRLF output with universal_newlines=True - - First |function| argument is the conversion for the origianl expected LF - string to the right EOL. - Second |function| argument is the executable and initial flag to run, to - control what EOL is used by the child process. - Third |function| argument is universal_newlines value. - """ - noop = lambda x: x - function(noop, self.exe, False) - function(convert_to_cr, self.exe + ['--cr'], False) - function(convert_to_crlf, self.exe + ['--crlf'], False) - function(noop, self.exe, True) - function(noop, self.exe + ['--cr'], True) - function(noop, self.exe + ['--crlf'], True) - - def _check_exception(self, e, stdout, stderr, returncode): - """On exception, look if the exception members are set correctly.""" - self.assertEquals(returncode, e.returncode) - self.assertEquals(stdout, e.stdout) - self.assertEquals(stderr, e.stderr) - - def test_stdin(self): - def fn(c, e, un): - stdin = '0123456789' - res = subprocess2.communicate( - e + ['--read'], - stdin=stdin, - universal_newlines=un) - self._check_res(res, None, None, 10) - self._run_test(fn) - - def test_stdin_unicode(self): - def fn(c, e, un): - stdin = u'0123456789' - res = subprocess2.communicate( - e + ['--read'], - stdin=stdin, - universal_newlines=un) - self._check_res(res, None, None, 10) - self._run_test(fn) - - def test_stdin_empty(self): - def fn(c, e, un): - stdin = '' - res = subprocess2.communicate( - e + ['--read'], - stdin=stdin, - universal_newlines=un) - self._check_res(res, None, None, 0) - self._run_test(fn) + self.assertEqual(r1, r2) + + @_run_test(with_subprocess=False) + def test_stdin(self, c, cmd, un, subp): + stdin = c('0123456789') + res = subprocess2.communicate( + cmd + ['--read'], + stdin=stdin, + universal_newlines=un) + self._check_res(res, None, None, 10) + + @_run_test(with_subprocess=False) + def test_stdin_empty(self, c, cmd, un, subp): + stdin = c('') + res = subprocess2.communicate( + cmd + ['--read'], + stdin=stdin, + universal_newlines=un) + self._check_res(res, None, None, 0) def test_stdin_void(self): - res = subprocess2.communicate(self.exe + ['--read'], stdin=VOID_INPUT) + res = subprocess2.communicate( + TEST_COMMAND + ['--read'], + stdin=subprocess2.VOID_INPUT) self._check_res(res, None, None, 0) - def test_stdin_void_stdout(self): - # Make sure a mix of VOID and PIPE works. - def fn(c, e, un): - res = subprocess2.communicate( - e + ['--stdout', '--read'], - stdin=VOID_INPUT, - stdout=PIPE, - universal_newlines=un, - shell=False) - self._check_res(res, c('A\nBB\nCCC\n'), None, 0) - self._run_test(fn) - - def test_stdout_void(self): - def fn(c, e, un): - res = subprocess2.communicate( - e + ['--stdout', '--stderr'], - stdout=VOID, - stderr=PIPE, - universal_newlines=un) - self._check_res(res, None, c('a\nbb\nccc\n'), 0) - self._run_test(fn) - - def test_stderr_void(self): - def fn(c, e, un): - res = subprocess2.communicate( - e + ['--stdout', '--stderr'], - stdout=PIPE, - stderr=VOID, - universal_newlines=un) - self._check_res(res, c('A\nBB\nCCC\n'), None, 0) - self._run_test(fn) - - def test_stdout_void_stderr_redirect(self): - def fn(c, e, un): - res = subprocess2.communicate( - e + ['--stdout', '--stderr'], - stdout=VOID, - stderr=STDOUT, - universal_newlines=un) - self._check_res(res, None, None, 0) - self._run_test(fn) - - -def child_main(args): - if sys.platform == 'win32': - # Annoying, make sure the output is not translated on Windows. - # pylint: disable=no-member,import-error - import msvcrt - msvcrt.setmode(sys.stdout.fileno(), os.O_BINARY) - msvcrt.setmode(sys.stderr.fileno(), os.O_BINARY) - - parser = optparse.OptionParser() - parser.add_option( - '--fail', - dest='return_value', - action='store_const', - default=0, - const=64) - parser.add_option( - '--crlf', action='store_const', const='\r\n', dest='eol', default='\n') - parser.add_option( - '--cr', action='store_const', const='\r', dest='eol') - parser.add_option('--stdout', action='store_true') - parser.add_option('--stderr', action='store_true') - parser.add_option('--sleep_first', action='store_true') - parser.add_option('--sleep_last', action='store_true') - parser.add_option('--large', action='store_true') - parser.add_option('--read', action='store_true') - options, args = parser.parse_args(args) - if args: - parser.error('Internal error') - if options.sleep_first: - time.sleep(10) - - def do(string): - if options.stdout: - sys.stdout.write(string.upper()) - sys.stdout.write(options.eol) - if options.stderr: - sys.stderr.write(string.lower()) - sys.stderr.write(options.eol) - - do('A') - do('BB') - do('CCC') - if options.large: - # Print 128kb. - string = '0123456789abcdef' * (8*1024) - sys.stdout.write(string) - if options.read: - assert options.return_value is 0 - try: - while sys.stdin.read(1): - options.return_value += 1 - except OSError: - pass - if options.sleep_last: - time.sleep(10) - return options.return_value + @_run_test(with_subprocess=False) + def test_stdin_void_stdout(self, c, cmd, un, subp): + # Make sure a mix ofsubprocess2.VOID andsubprocess2.PIPE works. + res = subprocess2.communicate( + cmd + ['--stdout', '--read'], + stdin=subprocess2.VOID_INPUT, + stdout=subprocess2.PIPE, + universal_newlines=un, + shell=False) + self._check_res(res, c('A\nBB\nCCC\n'), None, 0) + + @_run_test(with_subprocess=False) + def test_stdout_void(self, c, cmd, un, subp): + res = subprocess2.communicate( + cmd + ['--stdout', '--stderr'], + stdout=subprocess2.VOID, + stderr=subprocess2.PIPE, + universal_newlines=un) + self._check_res(res, None, c('a\nbb\nccc\n'), 0) + + @_run_test(with_subprocess=False) + def test_stderr_void(self, c, cmd, un, subp): + res = subprocess2.communicate( + cmd + ['--stdout', '--stderr'], + stdout=subprocess2.PIPE, + stderr=subprocess2.VOID, + universal_newlines=un) + self._check_res(res, c('A\nBB\nCCC\n'), None, 0) + + @_run_test(with_subprocess=False) + def test_stdout_void_stderr_redirect(self, c, cmd, un, subp): + res = subprocess2.communicate( + cmd + ['--stdout', '--stderr'], + stdout=subprocess2.VOID, + stderr=subprocess2.STDOUT, + universal_newlines=un) + self._check_res(res, None, None, 0) if __name__ == '__main__': - logging.basicConfig(level= - [logging.WARNING, logging.INFO, logging.DEBUG][ - min(2, sys.argv.count('-v'))]) - if len(sys.argv) > 1 and sys.argv[1] == '--child': - sys.exit(child_main(sys.argv[2:])) unittest.main()