""" Unit tests for subprocess2.py. """
import logging
import optparse
import os
import sys
import time
import unittest
try :
import fcntl # pylint: disable=import-error
except ImportError :
fcntl = None
sys . path . insert ( 0 , os . path . dirname ( os . path . dirname ( os . path . abspath ( __file__ ) ) ) )
DEPOT_TOOLS = os . path . dirname ( os . path . dirname ( os . path . abspath ( __file__ ) ) )
sys . path . insert ( 0 , DEPOT_TOOLS )
import subprocess
import subprocess2
from testing_support import auto_stub
# Method could be a function
# pylint: disable=no-self-use
# Create aliases for subprocess2 specific tests. They shouldn't be used for
# regression tests.
VOID = subprocess2 . VOID
VOID_INPUT = subprocess2 . VOID_INPUT
PIPE = subprocess2 . PIPE
STDOUT = subprocess2 . STDOUT
def convert_to_crlf ( string ) :
""" Unconditionally convert LF to CRLF. """
return string . replace ( ' \n ' , ' \r \n ' )
def convert_to_cr ( string ) :
""" Unconditionally convert LF to CR. """
return string . replace ( ' \n ' , ' \r ' )
def convert_win ( string ) :
""" Converts string to CRLF on Windows only. """
if sys . platform == ' win32 ' :
return string . replace ( ' \n ' , ' \r \n ' )
return string
class DefaultsTest ( auto_stub . TestCase ) :
# TODO(maruel): Do a reopen() on sys.__stdout__ and sys.__stderr__ so they
# can be trapped in the child process for better coverage.
def _fake_communicate ( self ) :
""" Mocks subprocess2.communicate(). """
results = { }
def fake_communicate ( args , * * kwargs ) :
assert not results
results . update ( kwargs )
results [ ' args ' ] = args
return ( ' stdout ' , ' stderr ' ) , 0
self . mock ( subprocess2 , ' communicate ' , fake_communicate )
return results
def _fake_Popen ( self ) :
""" Mocks the whole subprocess2.Popen class. """
results = { }
class fake_Popen ( object ) :
returncode = - 8
def __init__ ( self , args , * * kwargs ) :
assert not results
results . update ( kwargs )
results [ ' args ' ] = args
# pylint: disable=redefined-builtin
def communicate ( input = None ) :
return None , None
self . mock ( subprocess2 , ' Popen ' , fake_Popen )
return results
def _fake_subprocess_Popen ( self ) :
""" Mocks the base class subprocess.Popen only. """
results = { }
def __init__ ( self , args , * * kwargs ) :
assert not results
results . update ( kwargs )
results [ ' args ' ] = args
def communicate ( ) :
return None , None
self . mock ( subprocess . Popen , ' __init__ ' , __init__ )
self . mock ( subprocess . Popen , ' communicate ' , communicate )
return results
def test_check_call_defaults ( self ) :
results = self . _fake_communicate ( )
self . assertEquals (
( ' stdout ' , ' stderr ' ) , subprocess2 . check_call_out ( [ ' foo ' ] , a = True ) )
expected = {
' args ' : [ ' foo ' ] ,
' a ' : True ,
self . assertEquals ( expected , results )
def test_capture_defaults ( self ) :
results = self . _fake_communicate ( )
self . assertEquals (
' stdout ' , subprocess2 . capture ( [ ' foo ' ] , a = True ) )
expected = {
' args ' : [ ' foo ' ] ,
' a ' : True ,
' stdin ' : subprocess2 . VOID_INPUT ,
' stdout ' : subprocess2 . PIPE ,
self . assertEquals ( expected , results )
def test_communicate_defaults ( self ) :
results = self . _fake_Popen ( )
self . assertEquals (
( ( None , None ) , - 8 ) , subprocess2 . communicate ( [ ' foo ' ] , a = True ) )
expected = {
' args ' : [ ' foo ' ] ,
' a ' : True ,
self . assertEquals ( expected , results )
def test_Popen_defaults ( self ) :
results = self . _fake_subprocess_Popen ( )
proc = subprocess2 . Popen ( [ ' foo ' ] , a = True )
# Cleanup code in subprocess.py needs this member to be set.
# pylint: disable=attribute-defined-outside-init
proc . _child_created = None
expected = {
' args ' : [ ' foo ' ] ,
' a ' : True ,
' shell ' : bool ( sys . platform == ' win32 ' ) ,
if sys . platform != ' win32 ' :
env = os . environ . copy ( )
is_english = lambda name : env . get ( name , ' en ' ) . startswith ( ' en ' )
if not is_english ( ' LANG ' ) :
env [ ' LANG ' ] = ' en_US.UTF-8 '
expected [ ' env ' ] = env
if not is_english ( ' LANGUAGE ' ) :
env [ ' LANGUAGE ' ] = ' en_US.UTF-8 '
expected [ ' env ' ] = env
self . assertEquals ( expected , results )
def test_check_output_defaults ( self ) :
results = self . _fake_communicate ( )
# It's discarding 'stderr' because it assumes stderr=subprocess2.STDOUT but
# fake_communicate() doesn't 'implement' that.
self . assertEquals ( ' stdout ' , subprocess2 . check_output ( [ ' foo ' ] , a = True ) )
expected = {
' args ' : [ ' foo ' ] ,
' a ' : True ,
' stdin ' : subprocess2 . VOID_INPUT ,
' stdout ' : subprocess2 . PIPE ,
self . assertEquals ( expected , results )
class BaseTestCase ( unittest . TestCase ) :
def setUp ( self ) :
super ( BaseTestCase , self ) . setUp ( )
self . exe_path = __file__
self . exe = [ sys . executable , self . exe_path , ' --child ' ]
self . states = { }
if fcntl :
for v in ( sys . stdin , sys . stdout , sys . stderr ) :
fileno = v . fileno ( )
self . states [ fileno ] = fcntl . fcntl ( fileno , fcntl . F_GETFL )
def tearDown ( self ) :
for fileno , fl in self . states . iteritems ( ) :
self . assertEquals ( fl , fcntl . fcntl ( fileno , fcntl . F_GETFL ) )
super ( BaseTestCase , self ) . tearDown ( )
from third_party import mock
def _check_res ( self , res , stdout , stderr , returncode ) :
( out , err ) , code = res
self . assertEquals ( stdout , out )
self . assertEquals ( stderr , err )
self . assertEquals ( returncode , code )
sys . executable ,
os . path . join ( DEPOT_TOOLS , ' testing_support ' , ' subprocess2_test_script.py ' ) ,
class DefaultsTest ( unittest . TestCase ) :
@mock.patch ( ' subprocess2.communicate ' )
def test_check_call_defaults ( self , mockCommunicate ) :
mockCommunicate . return_value = ( ( ' stdout ' , ' stderr ' ) , 0 )
self . assertEqual (
( ' stdout ' , ' stderr ' ) , subprocess2 . check_call_out ( [ ' foo ' ] , a = True ) )
mockCommunicate . assert_called_with ( [ ' foo ' ] , a = True )
class RegressionTest ( BaseTestCase ) :
@mock.patch ( ' subprocess2.communicate ' )
def test_capture_defaults ( self , mockCommunicate ) :
mockCommunicate . return_value = ( ( ' stdout ' , ' stderr ' ) , 0 )
self . assertEqual (
' stdout ' , subprocess2 . capture ( [ ' foo ' ] , a = True ) )
mockCommunicate . assert_called_with (
[ ' foo ' ] , a = True , stdin = subprocess2 . VOID_INPUT , stdout = subprocess2 . PIPE )
@mock.patch ( ' subprocess2.Popen ' )
def test_communicate_defaults ( self , mockPopen ) :
mockPopen ( ) . communicate . return_value = ( ' bar ' , ' baz ' )
mockPopen ( ) . returncode = - 8
self . assertEqual (
( ( ' bar ' , ' baz ' ) , - 8 ) , subprocess2 . communicate ( [ ' foo ' ] , a = True ) )
mockPopen . assert_called_with ( [ ' foo ' ] , a = True )
@mock.patch ( ' os.environ ' , { } )
@mock.patch ( ' subprocess.Popen.__init__ ' )
def test_Popen_defaults ( self , mockPopen ) :
with mock . patch ( ' sys.platform ' , ' win32 ' ) :
subprocess2 . Popen ( [ ' foo ' ] , a = True )
mockPopen . assert_called_with ( [ ' foo ' ] , a = True , shell = True )
with mock . patch ( ' sys.platform ' , ' non-win32 ' ) :
subprocess2 . Popen ( [ ' foo ' ] , a = True )
mockPopen . assert_called_with ( [ ' foo ' ] , a = True , shell = False )
def test_get_english_env ( self ) :
with mock . patch ( ' sys.platform ' , ' win32 ' ) :
self . assertIsNone ( subprocess2 . get_english_env ( { } ) )
with mock . patch ( ' sys.platform ' , ' non-win32 ' ) :
self . assertIsNone ( subprocess2 . get_english_env ( { } ) )
self . assertIsNone (
subprocess2 . get_english_env ( { ' LANG ' : ' en_XX ' , ' LANGUAGE ' : ' en_YY ' } ) )
self . assertEqual (
{ ' LANG ' : ' en_US.UTF-8 ' , ' LANGUAGE ' : ' en_US.UTF-8 ' } ,
subprocess2 . get_english_env ( { ' LANG ' : ' bar ' , ' LANGUAGE ' : ' baz ' } ) )
@mock.patch ( ' subprocess2.communicate ' )
def test_check_output_defaults ( self , mockCommunicate ) :
mockCommunicate . return_value = ( ( ' stdout ' , ' stderr ' ) , 0 )
self . assertEqual ( ' stdout ' , subprocess2 . check_output ( [ ' foo ' ] , a = True ) )
mockCommunicate . assert_called_with (
[ ' foo ' ] , a = True , stdin = subprocess2 . VOID_INPUT , stdout = subprocess2 . PIPE )
def _run_test ( with_subprocess = True ) :
""" Runs a tests in 12 combinations:
- With universal_newlines = True and False .
- With LF , CR , and CRLF output .
- With subprocess and subprocess2 .
subps = ( subprocess2 , subprocess ) if with_subprocess else ( subprocess2 , )
no_op = lambda s : s
to_bytes = lambda s : s . encode ( )
to_cr_bytes = lambda s : s . replace ( ' \n ' , ' \r ' ) . encode ( )
to_crlf_bytes = lambda s : s . replace ( ' \n ' , ' \r \n ' ) . encode ( )
def wrapper ( test ) :
def inner ( self ) :
for subp in subps :
# universal_newlines = False
test ( self , to_bytes , TEST_COMMAND , False , subp )
test ( self , to_cr_bytes , TEST_COMMAND + [ ' --cr ' ] , False , subp )
test ( self , to_crlf_bytes , TEST_COMMAND + [ ' --crlf ' ] , False , subp )
# universal_newlines = True
test ( self , no_op , TEST_COMMAND , True , subp )
test ( self , no_op , TEST_COMMAND + [ ' --cr ' ] , True , subp )
test ( self , no_op , TEST_COMMAND + [ ' --crlf ' ] , True , subp )
return inner
return wrapper
class SmokeTests ( unittest . TestCase ) :
# Regression tests to ensure that subprocess and subprocess2 have the same
# behavior.
def _run_test ( self , function ) :
""" Runs tests in 12 combinations:
- LF output with universal_newlines = False
- CR output with universal_newlines = False
- CRLF output with universal_newlines = False
- LF output with universal_newlines = True
- CR output with universal_newlines = True
- CRLF output with universal_newlines = True
Once with subprocess , once with subprocess2 .
First | function | argument is the conversion for the original expected LF
string to the right EOL .
Second | function | argument is the executable and initial flag to run , to
control what EOL is used by the child process .
Third | function | argument is universal_newlines value .
noop = lambda x : x
for subp in ( subprocess , subprocess2 ) :
function ( noop , self . exe , False , subp )
function ( convert_to_cr , self . exe + [ ' --cr ' ] , False , subp )
function ( convert_to_crlf , self . exe + [ ' --crlf ' ] , False , subp )
function ( noop , self . exe , True , subp )
function ( noop , self . exe + [ ' --cr ' ] , True , subp )
function ( noop , self . exe + [ ' --crlf ' ] , True , subp )
def _check_res ( self , res , stdout , stderr , returncode ) :
( out , err ) , code = res
self . assertEqual ( stdout , out )
self . assertEqual ( stderr , err )
self . assertEqual ( returncode , code )
def _check_exception ( self , subp , e , stdout , stderr , returncode ) :
""" On exception, look if the exception members are set correctly. """
self . assertEquals ( returncode , e . returncode )
if subp is subprocess :
self . assertEqual ( returncode , e . returncode )
if subp is subprocess2 or sys . version_info . major == 3 :
self . assertEqual ( stdout , e . stdout )
self . assertEqual ( stderr , e . stderr )
else :
# subprocess never save the output.
self . assertFalse ( hasattr ( e , ' stdout ' ) )
self . assertFalse ( hasattr ( e , ' stderr ' ) )
elif subp is subprocess2 :
self . assertEquals ( stdout , e . stdout )
self . assertEquals ( stderr , e . stderr )
else :
self . fail ( )
def test_check_output_no_stdout ( self ) :
try :
subprocess2 . check_output ( self . exe , stdout = subprocess2 . PIPE )
self . fail ( )
except ValueError :
if ( sys . version_info [ 0 ] * 10 + sys . version_info [ 1 ] ) > = 27 :
# python 2.7+
try :
# pylint: disable=no-member
subprocess . check_output ( self . exe , stdout = subprocess . PIPE )
self . fail ( )
except ValueError :
def test_check_output_throw_stdout ( self ) :
def fn ( c , e , un , subp ) :
if not hasattr ( subp , ' check_output ' ) :
try :
subp . check_output (
e + [ ' --fail ' , ' --stdout ' ] , universal_newlines = un )
self . fail ( )
except subp . CalledProcessError as exception :
self . _check_exception ( subp , exception , c ( ' A \n BB \n CCC \n ' ) , None , 64 )
self . _run_test ( fn )
def test_check_output_throw_no_stderr ( self ) :
def fn ( c , e , un , subp ) :
if not hasattr ( subp , ' check_output ' ) :
try :
subp . check_output (
e + [ ' --fail ' , ' --stderr ' ] , universal_newlines = un )
self . fail ( )
except subp . CalledProcessError as exception :
self . _check_exception ( subp , exception , c ( ' ' ) , None , 64 )
self . _run_test ( fn )
def test_check_output_throw_stderr ( self ) :
def fn ( c , e , un , subp ) :
if not hasattr ( subp , ' check_output ' ) :
try :
subp . check_output (
e + [ ' --fail ' , ' --stderr ' ] ,
stderr = subp . PIPE ,
universal_newlines = un )
self . fail ( )
except subp . CalledProcessError as exception :
self . _check_exception ( subp , exception , ' ' , c ( ' a \n bb \n ccc \n ' ) , 64 )
self . _run_test ( fn )
def test_check_output_throw_stderr_stdout ( self ) :
def fn ( c , e , un , subp ) :
if not hasattr ( subp , ' check_output ' ) :
try :
subp . check_output (
e + [ ' --fail ' , ' --stderr ' ] ,
stderr = subp . STDOUT ,
universal_newlines = un )
self . fail ( )
except subp . CalledProcessError as exception :
self . _check_exception ( subp , exception , c ( ' a \n bb \n ccc \n ' ) , None , 64 )
self . _run_test ( fn )
def test_check_call_throw ( self ) :
for subp in ( subprocess , subprocess2 ) :
try :
subp . check_call ( self . exe + [ ' --fail ' , ' --stderr ' ] )
self . fail ( )
except subp . CalledProcessError as exception :
self . _check_exception ( subp , exception , None , None , 64 )
def test_redirect_stderr_to_stdout_pipe ( self ) :
def fn ( c , e , un , subp ) :
# stderr output into stdout.
proc = subp . Popen (
e + [ ' --stderr ' ] ,
stdout = subp . PIPE ,
with self . assertRaises ( ValueError ) :
subp . check_output ( TEST_COMMAND , stdout = subp . PIPE )
@_run_test ( )
def test_check_output_throw_stdout ( self , c , cmd , un , subp ) :
with self . assertRaises ( subp . CalledProcessError ) as e :
subp . check_output (
cmd + [ ' --fail ' , ' --stdout ' ] , universal_newlines = un )
self . _check_exception ( subp , e . exception , c ( ' A \n BB \n CCC \n ' ) , None , 64 )
@_run_test ( )
def test_check_output_throw_no_stderr ( self , c , cmd , un , subp ) :
with self . assertRaises ( subp . CalledProcessError ) as e :
subp . check_output (
cmd + [ ' --fail ' , ' --stderr ' ] , universal_newlines = un )
self . _check_exception ( subp , e . exception , c ( ' ' ) , None , 64 )
@_run_test ( )
def test_check_output_throw_stderr ( self , c , cmd , un , subp ) :
with self . assertRaises ( subp . CalledProcessError ) as e :
subp . check_output (
cmd + [ ' --fail ' , ' --stderr ' ] ,
stderr = subp . PIPE ,
universal_newlines = un )
self . _check_exception ( subp , e . exception , c ( ' ' ) , c ( ' a \n bb \n ccc \n ' ) , 64 )
@_run_test ( )
def test_check_output_throw_stderr_stdout ( self , c , cmd , un , subp ) :
with self . assertRaises ( subp . CalledProcessError ) as e :
subp . check_output (
cmd + [ ' --fail ' , ' --stderr ' ] ,
stderr = subp . STDOUT ,
universal_newlines = un )
res = proc . communicate ( ) , proc . returncode
self . _check_res ( res , c ( ' a \n bb \n ccc \n ' ) , None , 0 )
self . _run_test ( fn )
def test_redirect_stderr_to_stdout ( self ) :
def fn ( c , e , un , subp ) :
# stderr output into stdout but stdout is not piped.
proc = subp . Popen (
e + [ ' --stderr ' ] , stderr = STDOUT , universal_newlines = un )
res = proc . communicate ( ) , proc . returncode
self . _check_res ( res , None , None , 0 )
self . _run_test ( fn )
def test_stderr ( self ) :
self . _check_exception ( subp , e . exception , c ( ' a \n bb \n ccc \n ' ) , None , 64 )
def test_check_call_throw ( self ) :
for subp in ( subprocess , subprocess2 ) :
with self . assertRaises ( subp . CalledProcessError ) as e :
subp . check_call ( TEST_COMMAND + [ ' --fail ' , ' --stderr ' ] )
self . _check_exception ( subp , e . exception , None , None , 64 )
@_run_test ( )
def test_redirect_stderr_to_stdout_pipe ( self , c , cmd , un , subp ) :
# stderr output into stdout.
proc = subp . Popen (
cmd + [ ' --stderr ' ] ,
stdout = subp . PIPE ,
stderr = subp . STDOUT ,
universal_newlines = un )
res = proc . communicate ( ) , proc . returncode
self . _check_res ( res , c ( ' a \n bb \n ccc \n ' ) , None , 0 )
@_run_test ( )
def test_redirect_stderr_to_stdout ( self , c , cmd , un , subp ) :
# stderr output into stdout but stdout is not piped.
proc = subp . Popen (
cmd + [ ' --stderr ' ] , stderr = subprocess2 . STDOUT , universal_newlines = un )
res = proc . communicate ( ) , proc . returncode
self . _check_res ( res , None , None , 0 )
@_run_test ( )
def test_stderr ( self , c , cmd , un , subp ) :
cmd = [ ' expr ' , ' 1 ' , ' / ' , ' 0 ' ]
if sys . platform == ' win32 ' :
cmd = [ ' cmd.exe ' , ' /c ' , ' exit ' , ' 1 ' ]
@ -334,180 +197,70 @@ class RegressionTest(BaseTestCase):
p2 = subprocess2 . Popen ( cmd , stderr = subprocess . PIPE , shell = False )
r1 = p1 . communicate ( )
r2 = p2 . communicate ( )
self . assertEquals ( r1 , r2 )
class S2Test ( BaseTestCase ) :
# Tests that can only run in subprocess2, e.g. new functionalities.
# In particular, subprocess2.communicate() doesn't exist in subprocess.
def _run_test ( self , function ) :
""" Runs tests in 6 combinations:
- LF output with universal_newlines = False
- CR output with universal_newlines = False
- CRLF output with universal_newlines = False
- LF output with universal_newlines = True
- CR output with universal_newlines = True
- CRLF output with universal_newlines = True
First | function | argument is the conversion for the origianl expected LF
string to the right EOL .
Second | function | argument is the executable and initial flag to run , to
control what EOL is used by the child process .
Third | function | argument is universal_newlines value .
noop = lambda x : x
function ( noop , self . exe , False )
function ( convert_to_cr , self . exe + [ ' --cr ' ] , False )
function ( convert_to_crlf , self . exe + [ ' --crlf ' ] , False )
function ( noop , self . exe , True )
function ( noop , self . exe + [ ' --cr ' ] , True )
function ( noop , self . exe + [ ' --crlf ' ] , True )
def _check_exception ( self , e , stdout , stderr , returncode ) :
""" On exception, look if the exception members are set correctly. """
self . assertEquals ( returncode , e . returncode )
self . assertEquals ( stdout , e . stdout )
self . assertEquals ( stderr , e . stderr )
def test_stdin ( self ) :
def fn ( c , e , un ) :
stdin = ' 0123456789 '
res = subprocess2 . communicate (
e + [ ' --read ' ] ,
stdin = stdin ,
universal_newlines = un )
self . _check_res ( res , None , None , 10 )
self . _run_test ( fn )
def test_stdin_unicode ( self ) :
def fn ( c , e , un ) :
stdin = u ' 0123456789 '
res = subprocess2 . communicate (
e + [ ' --read ' ] ,
stdin = stdin ,
universal_newlines = un )
self . _check_res ( res , None , None , 10 )
self . _run_test ( fn )
def test_stdin_empty ( self ) :
def fn ( c , e , un ) :
stdin = ' '
res = subprocess2 . communicate (
e + [ ' --read ' ] ,
stdin = stdin ,
universal_newlines = un )
self . _check_res ( res , None , None , 0 )
self . _run_test ( fn )
self . assertEqual ( r1 , r2 )
@_run_test ( with_subprocess = False )
def test_stdin ( self , c , cmd , un , subp ) :
stdin = c ( ' 0123456789 ' )
res = subprocess2 . communicate (
cmd + [ ' --read ' ] ,
stdin = stdin ,
universal_newlines = un )
self . _check_res ( res , None , None , 10 )
@_run_test ( with_subprocess = False )
def test_stdin_empty ( self , c , cmd , un , subp ) :
stdin = c ( ' ' )
res = subprocess2 . communicate (
cmd + [ ' --read ' ] ,
stdin = stdin ,
universal_newlines = un )
self . _check_res ( res , None , None , 0 )
def test_stdin_void ( self ) :
res = subprocess2 . communicate ( self . exe + [ ' --read ' ] , stdin = VOID_INPUT )
res = subprocess2 . communicate (
TEST_COMMAND + [ ' --read ' ] ,
stdin = subprocess2 . VOID_INPUT )
self . _check_res ( res , None , None , 0 )
def test_stdin_void_stdout ( self ) :
# Make sure a mix of VOID and PIPE works.
def fn ( c , e , un ) :
res = subprocess2 . communicate (
e + [ ' --stdout ' , ' --read ' ] ,
stdin = VOID_INPUT ,
stdout = PIPE ,
universal_newlines = un ,
shell = False )
self . _check_res ( res , c ( ' A \n BB \n CCC \n ' ) , None , 0 )
self . _run_test ( fn )
def test_stdout_void ( self ) :
def fn ( c , e , un ) :
res = subprocess2 . communicate (
e + [ ' --stdout ' , ' --stderr ' ] ,
stdout = VOID ,
stderr = PIPE ,
universal_newlines = un )
self . _check_res ( res , None , c ( ' a \n bb \n ccc \n ' ) , 0 )
self . _run_test ( fn )
def test_stderr_void ( self ) :
def fn ( c , e , un ) :
res = subprocess2 . communicate (
e + [ ' --stdout ' , ' --stderr ' ] ,
stdout = PIPE ,
stderr = VOID ,
universal_newlines = un )
self . _check_res ( res , c ( ' A \n BB \n CCC \n ' ) , None , 0 )
self . _run_test ( fn )
def test_stdout_void_stderr_redirect ( self ) :
def fn ( c , e , un ) :
res = subprocess2 . communicate (
e + [ ' --stdout ' , ' --stderr ' ] ,
stdout = VOID ,
stderr = STDOUT ,
universal_newlines = un )
self . _check_res ( res , None , None , 0 )
self . _run_test ( fn )
def child_main ( args ) :
if sys . platform == ' win32 ' :
# Annoying, make sure the output is not translated on Windows.
# pylint: disable=no-member,import-error
import msvcrt
msvcrt . setmode ( sys . stdout . fileno ( ) , os . O_BINARY )
msvcrt . setmode ( sys . stderr . fileno ( ) , os . O_BINARY )
parser = optparse . OptionParser ( )
parser . add_option (
' --fail ' ,
dest = ' return_value ' ,
action = ' store_const ' ,
default = 0 ,
const = 64 )
parser . add_option (
' --crlf ' , action = ' store_const ' , const = ' \r \n ' , dest = ' eol ' , default = ' \n ' )
parser . add_option (
' --cr ' , action = ' store_const ' , const = ' \r ' , dest = ' eol ' )
parser . add_option ( ' --stdout ' , action = ' store_true ' )
parser . add_option ( ' --stderr ' , action = ' store_true ' )
parser . add_option ( ' --sleep_first ' , action = ' store_true ' )
parser . add_option ( ' --sleep_last ' , action = ' store_true ' )
parser . add_option ( ' --large ' , action = ' store_true ' )
parser . add_option ( ' --read ' , action = ' store_true ' )
options , args = parser . parse_args ( args )
if args :
parser . error ( ' Internal error ' )
if options . sleep_first :
time . sleep ( 10 )
def do ( string ) :
if options . stdout :
sys . stdout . write ( string . upper ( ) )
sys . stdout . write ( options . eol )
if options . stderr :
sys . stderr . write ( string . lower ( ) )
sys . stderr . write ( options . eol )
do ( ' A ' )
do ( ' BB ' )
do ( ' CCC ' )
if options . large :
# Print 128kb.
string = ' 0123456789abcdef ' * ( 8 * 1024 )
sys . stdout . write ( string )
if options . read :
assert options . return_value is 0
try :
while sys . stdin . read ( 1 ) :
options . return_value + = 1
except OSError :
if options . sleep_last :
time . sleep ( 10 )
return options . return_value
@_run_test ( with_subprocess = False )
def test_stdin_void_stdout ( self , c , cmd , un , subp ) :
# Make sure a mix ofsubprocess2.VOID andsubprocess2.PIPE works.
res = subprocess2 . communicate (
cmd + [ ' --stdout ' , ' --read ' ] ,
stdin = subprocess2 . VOID_INPUT ,
stdout = subprocess2 . PIPE ,
universal_newlines = un ,
shell = False )
self . _check_res ( res , c ( ' A \n BB \n CCC \n ' ) , None , 0 )
@_run_test ( with_subprocess = False )
def test_stdout_void ( self , c , cmd , un , subp ) :
res = subprocess2 . communicate (
cmd + [ ' --stdout ' , ' --stderr ' ] ,
stdout = subprocess2 . VOID ,
stderr = subprocess2 . PIPE ,
universal_newlines = un )
self . _check_res ( res , None , c ( ' a \n bb \n ccc \n ' ) , 0 )
@_run_test ( with_subprocess = False )
def test_stderr_void ( self , c , cmd , un , subp ) :
res = subprocess2 . communicate (
cmd + [ ' --stdout ' , ' --stderr ' ] ,
stdout = subprocess2 . PIPE ,
stderr = subprocess2 . VOID ,
universal_newlines = un )
self . _check_res ( res , c ( ' A \n BB \n CCC \n ' ) , None , 0 )
@_run_test ( with_subprocess = False )
def test_stdout_void_stderr_redirect ( self , c , cmd , un , subp ) :
res = subprocess2 . communicate (
cmd + [ ' --stdout ' , ' --stderr ' ] ,
stdout = subprocess2 . VOID ,
stderr = subprocess2 . STDOUT ,
universal_newlines = un )
self . _check_res ( res , None , None , 0 )
if __name__ == ' __main__ ' :
logging . basicConfig ( level =
[ logging . WARNING , logging . INFO , logging . DEBUG ] [
min ( 2 , sys . argv . count ( ' -v ' ) ) ] )
if len ( sys . argv ) > 1 and sys . argv [ 1 ] == ' --child ' :
sys . exit ( child_main ( sys . argv [ 2 : ] ) )
unittest . main ( )