@ -21,14 +21,24 @@ import subprocess2
# pylint: disable=R0201
def convert ( string ) :
""" Converts string to CRLF on Windows. """
def convert_to_crlf ( string ) :
""" Unconditionally convert LF to CRLF. """
return string . replace ( ' \n ' , ' \r \n ' )
def convert_to_cr ( string ) :
""" Unconditionally convert LF to CR. """
return string . replace ( ' \n ' , ' \r ' )
def convert_win ( string ) :
""" Converts string to CRLF on Windows only. """
if sys . platform == ' win32 ' :
return string . replace ( ' \n ' , ' \r \n ' )
return string
class Subprocess2Test ( unittest . TestCase ) :
class Defaults Test( unittest . TestCase ) :
# Can be mocked in a test.
TO_SAVE = {
subprocess2 : [
@ -37,8 +47,6 @@ class Subprocess2Test(unittest.TestCase):
}
def setUp ( self ) :
self . exe_path = __file__
self . exe = [ sys . executable , self . exe_path , ' --child ' ]
self . saved = { }
for module , names in self . TO_SAVE . iteritems ( ) :
self . saved [ module ] = dict (
@ -155,19 +163,12 @@ class Subprocess2Test(unittest.TestCase):
}
self . assertEquals ( expected , results )
def test_timeout ( self ) :
out , returncode = subprocess2 . communicate (
self . exe + [ ' --sleep ' , ' --stdout ' ] ,
timeout = 0.01 ,
stdout = subprocess2 . PIPE ,
shell = False )
self . assertEquals ( subprocess2 . TIMED_OUT , returncode )
self . assertEquals ( ( ' ' , None ) , out )
def test_timeout_shell_throws ( self ) :
# Never called.
_ = self . _fake_Popen ( )
try :
subprocess2 . communicate (
self . exe + [ ' --sleep ' , ' --stdout ' ] ,
sys . executable ,
timeout = 0.01 ,
stdout = subprocess2 . PIPE ,
shell = True )
@ -175,6 +176,45 @@ class Subprocess2Test(unittest.TestCase):
except TypeError :
pass
class S2Test ( unittest . TestCase ) :
def setUp ( self ) :
super ( S2Test , self ) . setUp ( )
self . exe_path = __file__
self . exe = [ sys . executable , self . exe_path , ' --child ' ]
def _run_test ( self , function ) :
""" Runs tests in 6 combinations:
- LF output with universal_newlines = False
- CR output with universal_newlines = False
- CRLF output with universal_newlines = False
- LF output with universal_newlines = True
- CR output with universal_newlines = True
- CRLF output with universal_newlines = True
First | function | argument is the convertion for the origianl expected LF
string to the right EOL .
Second | function | argument is the executable and initial flag to run , to
control what EOL is used by the child process .
Third | function | argument is universal_newlines value .
"""
noop = lambda x : x
function ( noop , self . exe , False )
function ( convert_to_cr , self . exe + [ ' --cr ' ] , False )
function ( convert_to_crlf , self . exe + [ ' --crlf ' ] , False )
function ( noop , self . exe , True )
function ( noop , self . exe + [ ' --cr ' ] , True )
function ( noop , self . exe + [ ' --crlf ' ] , True )
def test_timeout ( self ) :
out , returncode = subprocess2 . communicate (
self . exe + [ ' --sleep_first ' , ' --stdout ' ] ,
timeout = 0.01 ,
stdout = subprocess2 . PIPE ,
shell = False )
self . assertEquals ( subprocess2 . TIMED_OUT , returncode )
self . assertEquals ( ( ' ' , None ) , out )
def test_check_output_no_stdout ( self ) :
try :
subprocess2 . check_output ( self . exe , stdout = subprocess2 . PIPE )
@ -183,65 +223,78 @@ class Subprocess2Test(unittest.TestCase):
pass
def test_stdout_void ( self ) :
( out , err ) , code = subprocess2 . communicate (
self . exe + [ ' --stdout ' , ' --stderr ' ] ,
stdout = subprocess2 . VOID ,
stderr = subprocess2 . PIPE )
self . assertEquals ( None , out )
self . assertEquals ( convert ( ' a \n bb \n ccc \n ' ) , err )
self . assertEquals ( 0 , code )
def fn ( c , e , un ) :
( out , err ) , code = subprocess2 . communicate (
e + [ ' --stdout ' , ' --stderr ' ] ,
stdout = subprocess2 . VOID ,
stderr = subprocess2 . PIPE ,
universal_newlines = un )
self . assertEquals ( None , out )
self . assertEquals ( c ( ' a \n bb \n ccc \n ' ) , err )
self . assertEquals ( 0 , code )
self . _run_test ( fn )
def test_stderr_void ( self ) :
( out , err ) , code = subprocess2 . communicate (
self . exe + [ ' --stdout ' , ' --stderr ' ] ,
universal_newlines = True ,
stdout = subprocess2 . PIPE ,
stderr = subprocess2 . VOID )
self . assertEquals ( ' A \n BB \n CCC \n ' , out )
self . assertEquals ( None , err )
self . assertEquals ( 0 , code )
def fn ( c , e , un ) :
( out , err ) , code = subprocess2 . communicate (
e + [ ' --stdout ' , ' --stderr ' ] ,
stdout = subprocess2 . PIPE ,
stderr = subprocess2 . VOID ,
universal_newlines = un )
self . assertEquals ( c ( ' A \n BB \n CCC \n ' ) , out )
self . assertEquals ( None , err )
self . assertEquals ( 0 , code )
self . _run_test ( fn )
def test_check_output_throw_stdout ( self ) :
try :
subprocess2 . check_output (
self . exe + [ ' --fail ' , ' --stdout ' ] , universal_newlines = True )
self . fail ( )
except subprocess2 . CalledProcessError , e :
self . assertEquals ( ' A \n BB \n CCC \n ' , e . stdout )
self . assertEquals ( None , e . stderr )
self . assertEquals ( 64 , e . returncode )
def fn ( c , e , un ) :
try :
subprocess2 . check_output (
e + [ ' --fail ' , ' --stdout ' ] , universal_newlines = un )
self . fail ( )
except subprocess2 . CalledProcessError , e :
self . assertEquals ( c ( ' A \n BB \n CCC \n ' ) , e . stdout )
self . assertEquals ( None , e . stderr )
self . assertEquals ( 64 , e . returncode )
self . _run_test ( fn )
def test_check_output_throw_no_stderr ( self ) :
try :
subprocess2 . check_output (
self . exe + [ ' --fail ' , ' --stderr ' ] , universal_newlines = True )
self . fail ( )
except subprocess2 . CalledProcessError , e :
self . assertEquals ( ' ' , e . stdout )
self . assertEquals ( None , e . stderr )
self . assertEquals ( 64 , e . returncode )
def fn ( c , e , un ) :
try :
subprocess2 . check_output (
e + [ ' --fail ' , ' --stderr ' ] , universal_newlines = un )
self . fail ( )
except subprocess2 . CalledProcessError , e :
self . assertEquals ( c ( ' ' ) , e . stdout )
self . assertEquals ( None , e . stderr )
self . assertEquals ( 64 , e . returncode )
self . _run_test ( fn )
def test_check_output_throw_stderr ( self ) :
try :
subprocess2 . check_output (
self . exe + [ ' --fail ' , ' --stderr ' ] , stderr = subprocess2 . PIPE ,
universal_newlines = True )
self . fail ( )
except subprocess2 . CalledProcessError , e :
self . assertEquals ( ' ' , e . stdout )
self . assertEquals ( ' a \n bb \n ccc \n ' , e . stderr )
self . assertEquals ( 64 , e . returncode )
def fn ( c , e , un ) :
try :
subprocess2 . check_output (
e + [ ' --fail ' , ' --stderr ' ] , stderr = subprocess2 . PIPE ,
universal_newlines = un )
self . fail ( )
except subprocess2 . CalledProcessError , e :
self . assertEquals ( ' ' , e . stdout )
self . assertEquals ( c ( ' a \n bb \n ccc \n ' ) , e . stderr )
self . assertEquals ( 64 , e . returncode )
self . _run_test ( fn )
def test_check_output_throw_stderr_stdout ( self ) :
try :
subprocess2 . check_output (
self . exe + [ ' --fail ' , ' --stderr ' ] , stderr = subprocess2 . STDOUT ,
universal_newlines = True )
self . fail ( )
except subprocess2 . CalledProcessError , e :
self . assertEquals ( ' a \n bb \n ccc \n ' , e . stdout )
self . assertEquals ( None , e . stderr )
self . assertEquals ( 64 , e . returncode )
def fn ( c , e , un ) :
try :
subprocess2 . check_output (
e + [ ' --fail ' , ' --stderr ' ] , stderr = subprocess2 . STDOUT ,
universal_newlines = un )
self . fail ( )
except subprocess2 . CalledProcessError , e :
self . assertEquals ( c ( ' a \n bb \n ccc \n ' ) , e . stdout )
self . assertEquals ( None , e . stderr )
self . assertEquals ( 64 , e . returncode )
self . _run_test ( fn )
def test_check_call_throw ( self ) :
try :
@ -253,45 +306,57 @@ class Subprocess2Test(unittest.TestCase):
self . assertEquals ( 64 , e . returncode )
def test_check_output_tee_stderr ( self ) :
stderr = [ ]
out , returncode = subprocess2 . communicate (
self . exe + [ ' --stderr ' ] , stderr = stderr . append )
self . assertEquals ( convert ( ' a \n bb \n ccc \n ' ) , ' ' . join ( stderr ) )
self . assertEquals ( ( None , None ) , out )
self . assertEquals ( 0 , returncode )
def fn ( c , e , un ) :
stderr = [ ]
out , returncode = subprocess2 . communicate (
e + [ ' --stderr ' ] , stderr = stderr . append ,
universal_newlines = un )
self . assertEquals ( c ( ' a \n bb \n ccc \n ' ) , ' ' . join ( stderr ) )
self . assertEquals ( ( None , None ) , out )
self . assertEquals ( 0 , returncode )
self . _run_test ( fn )
def test_check_output_tee_stdout_stderr ( self ) :
stdout = [ ]
stderr = [ ]
out , returncode = subprocess2 . communicate (
self . exe + [ ' --stdout ' , ' --stderr ' ] ,
stdout = stdout . append ,
stderr = stderr . append )
self . assertEquals ( convert ( ' A \n BB \n CCC \n ' ) , ' ' . join ( stdout ) )
self . assertEquals ( convert ( ' a \n bb \n ccc \n ' ) , ' ' . join ( stderr ) )
self . assertEquals ( ( None , None ) , out )
self . assertEquals ( 0 , returncode )
def fn ( c , e , un ) :
stdout = [ ]
stderr = [ ]
out , returncode = subprocess2 . communicate (
e + [ ' --stdout ' , ' --stderr ' ] ,
stdout = stdout . append ,
stderr = stderr . append ,
universal_newlines = un )
self . assertEquals ( c ( ' A \n BB \n CCC \n ' ) , ' ' . join ( stdout ) )
self . assertEquals ( c ( ' a \n bb \n ccc \n ' ) , ' ' . join ( stderr ) )
self . assertEquals ( ( None , None ) , out )
self . assertEquals ( 0 , returncode )
self . _run_test ( fn )
def test_check_output_tee_stdin ( self ) :
stdout = [ ]
stdin = ' 0123456789 '
out , returncode = subprocess2 . communicate (
self . exe + [ ' --stdout ' , ' --read ' ] , stdin = stdin , stdout = stdout . append )
self . assertEquals ( convert ( ' A \n BB \n CCC \n ' ) , ' ' . join ( stdout ) )
self . assertEquals ( ( None , None ) , out )
self . assertEquals ( 0 , returncode )
def fn ( c , e , un ) :
stdout = [ ]
stdin = ' 0123456789 '
out , returncode = subprocess2 . communicate (
e + [ ' --stdout ' , ' --read ' ] , stdin = stdin , stdout = stdout . append ,
universal_newlines = un )
self . assertEquals ( c ( ' A \n BB \n CCC \n ' ) , ' ' . join ( stdout ) )
self . assertEquals ( ( None , None ) , out )
self . assertEquals ( 0 , returncode )
self . _run_test ( fn )
def test_check_output_tee_throw ( self ) :
stderr = [ ]
try :
subprocess2 . check_output (
self . exe + [ ' --stderr ' , ' --fail ' ] , stderr = stderr . append )
self . fail ( )
except subprocess2 . CalledProcessError , e :
self . assertEquals ( convert ( ' a \n bb \n ccc \n ' ) , ' ' . join ( stderr ) )
self . assertEquals ( ' ' , e . stdout )
self . assertEquals ( None , e . stderr )
self . assertEquals ( 64 , e . returncode )
def fn ( c , e , un ) :
stderr = [ ]
try :
subprocess2 . check_output (
e + [ ' --stderr ' , ' --fail ' ] , stderr = stderr . append ,
universal_newlines = un )
self . fail ( )
except subprocess2 . CalledProcessError , e :
self . assertEquals ( c ( ' a \n bb \n ccc \n ' ) , ' ' . join ( stderr ) )
self . assertEquals ( ' ' , e . stdout )
self . assertEquals ( None , e . stderr )
self . assertEquals ( 64 , e . returncode )
self . _run_test ( fn )
def test_check_output_tee_large ( self ) :
stdout = [ ]
@ -314,6 +379,13 @@ class Subprocess2Test(unittest.TestCase):
def child_main ( args ) :
if sys . platform == ' win32 ' :
# Annoying, make sure the output is not translated on Windows.
# pylint: disable=E1101,F0401
import msvcrt
msvcrt . setmode ( sys . stdout . fileno ( ) , os . O_BINARY )
msvcrt . setmode ( sys . stderr . fileno ( ) , os . O_BINARY )
parser = optparse . OptionParser ( )
parser . add_option (
' --fail ' ,
@ -321,22 +393,29 @@ def child_main(args):
action = ' store_const ' ,
default = 0 ,
const = 64 )
parser . add_option (
' --crlf ' , action = ' store_const ' , const = ' \r \n ' , dest = ' eol ' , default = ' \n ' )
parser . add_option (
' --cr ' , action = ' store_const ' , const = ' \r ' , dest = ' eol ' )
parser . add_option ( ' --stdout ' , action = ' store_true ' )
parser . add_option ( ' --stderr ' , action = ' store_true ' )
parser . add_option ( ' --sleep ' , action = ' store_true ' )
parser . add_option ( ' --sleep_first ' , action = ' store_true ' )
parser . add_option ( ' --sleep_last ' , action = ' store_true ' )
parser . add_option ( ' --large ' , action = ' store_true ' )
parser . add_option ( ' --read ' , action = ' store_true ' )
options , args = parser . parse_args ( args )
if args :
parser . error ( ' Internal error ' )
if options . sleep :
if options . sleep _first :
time . sleep ( 10 )
def do ( string ) :
if options . stdout :
print >> sys . stdout , string . upper ( )
sys . stdout . write ( string . upper ( ) )
sys . stdout . write ( options . eol )
if options . stderr :
print >> sys . stderr , string . lower ( )
sys . stderr . write ( string . lower ( ) )
sys . stderr . write ( options . eol )
do ( ' A ' )
do ( ' BB ' )
@ -351,6 +430,8 @@ def child_main(args):
pass
except OSError :
pass
if options . sleep_last :
time . sleep ( 10 )
return options . return_value