@ -5,17 +5,44 @@
""" Unit tests for subprocess2.py. """
import logging
import optparse
import os
import sys
import time
import unittest
try :
import fcntl
except ImportError :
fcntl = None
sys . path . insert ( 0 , os . path . dirname ( os . path . dirname ( os . path . abspath ( __file__ ) ) ) )
import subprocess2
class Subprocess2Test ( unittest . TestCase ) :
# Method could be a function
# pylint: disable=R0201
def convert_to_crlf ( string ) :
""" Unconditionally convert LF to CRLF. """
return string . replace ( ' \n ' , ' \r \n ' )
def convert_to_cr ( string ) :
""" Unconditionally convert LF to CR. """
return string . replace ( ' \n ' , ' \r ' )
def convert_win ( string ) :
""" Converts string to CRLF on Windows only. """
if sys . platform == ' win32 ' :
return string . replace ( ' \n ' , ' \r \n ' )
return string
class DefaultsTest ( unittest . TestCase ) :
# Can be mocked in a test.
TO_SAVE = {
subprocess2 : [
@ -24,8 +51,6 @@ class Subprocess2Test(unittest.TestCase):
}
def setUp ( self ) :
self . exe_path = __file__
self . exe = [ sys . executable , self . exe_path , ' --child ' ]
self . saved = { }
for module , names in self . TO_SAVE . iteritems ( ) :
self . saved [ module ] = dict (
@ -144,14 +169,54 @@ class Subprocess2Test(unittest.TestCase):
}
self . assertEquals ( expected , results )
class S2Test ( unittest . TestCase ) :
def setUp ( self ) :
super ( S2Test , self ) . setUp ( )
self . exe_path = __file__
self . exe = [ sys . executable , self . exe_path , ' --child ' ]
self . states = { }
if fcntl :
for v in ( sys . stdin , sys . stdout , sys . stderr ) :
fileno = v . fileno ( )
self . states [ fileno ] = fcntl . fcntl ( fileno , fcntl . F_GETFL )
def tearDown ( self ) :
for fileno , fl in self . states . iteritems ( ) :
self . assertEquals ( fl , fcntl . fcntl ( fileno , fcntl . F_GETFL ) )
super ( S2Test , self ) . tearDown ( )
def _run_test ( self , function ) :
""" Runs tests in 6 combinations:
- LF output with universal_newlines = False
- CR output with universal_newlines = False
- CRLF output with universal_newlines = False
- LF output with universal_newlines = True
- CR output with universal_newlines = True
- CRLF output with universal_newlines = True
First | function | argument is the convertion for the origianl expected LF
string to the right EOL .
Second | function | argument is the executable and initial flag to run , to
control what EOL is used by the child process .
Third | function | argument is universal_newlines value .
"""
noop = lambda x : x
function ( noop , self . exe , False )
function ( convert_to_cr , self . exe + [ ' --cr ' ] , False )
function ( convert_to_crlf , self . exe + [ ' --crlf ' ] , False )
function ( noop , self . exe , True )
function ( noop , self . exe + [ ' --cr ' ] , True )
function ( noop , self . exe + [ ' --crlf ' ] , True )
def test_timeout ( self ) :
# It'd be better to not discard stdout.
out , returncode = subprocess2 . communicate (
self . exe + [ ' --sleep ' , ' --stdout ' ] ,
self . exe + [ ' --sleep _first ' , ' --stdout ' ] ,
timeout = 0.01 ,
stdout = subprocess2 . PIPE )
stdout = subprocess2 . PIPE ,
shell = False )
self . assertEquals ( subprocess2 . TIMED_OUT , returncode )
self . assertEquals ( [ ' ' , None ] , out )
self . assertEquals ( (' ' , None ) , out )
def test_check_output_no_stdout ( self ) :
try :
@ -161,68 +226,78 @@ class Subprocess2Test(unittest.TestCase):
pass
def test_stdout_void ( self ) :
( out , err ) , code = subprocess2 . communicate (
self . exe + [ ' --stdout ' , ' --stderr ' ] ,
stdout = subprocess2 . VOID ,
stderr = subprocess2 . PIPE )
self . assertEquals ( None , out )
expected = ' a \n bb \n ccc \n '
if sys . platform == ' win32 ' :
expected = expected . replace ( ' \n ' , ' \r \n ' )
self . assertEquals ( expected , err )
self . assertEquals( 0 , code )
def fn ( c , e , un ) :
( out , err ) , code = subprocess2 . communicate (
e + [ ' --stdout ' , ' --stderr ' ] ,
stdout = subprocess2 . VOID ,
stderr = subprocess2 . PIPE ,
universal_newlines = un )
self . assertEquals ( None , out )
self . assertEquals ( c ( ' a \n bb \n ccc \n ' ) , err )
self . assertEquals ( 0 , code )
self . _run_test( fn )
def test_stderr_void ( self ) :
( out , err ) , code = subprocess2 . communicate (
self . exe + [ ' --stdout ' , ' --stderr ' ] ,
universal_newlines = True ,
stdout = subprocess2 . PIPE ,
stderr = subprocess2 . VOID )
self . assertEquals ( ' A \n BB \n CCC \n ' , out )
self . assertEquals ( None , err )
self . assertEquals ( 0 , code )
def fn ( c , e , un ) :
( out , err ) , code = subprocess2 . communicate (
e + [ ' --stdout ' , ' --stderr ' ] ,
stdout = subprocess2 . PIPE ,
stderr = subprocess2 . VOID ,
universal_newlines = un )
self . assertEquals ( c ( ' A \n BB \n CCC \n ' ) , out )
self . assertEquals ( None , err )
self . assertEquals ( 0 , code )
self . _run_test ( fn )
def test_check_output_throw_stdout ( self ) :
try :
subprocess2 . check_output (
self . exe + [ ' --fail ' , ' --stdout ' ] , universal_newlines = True )
self . fail ( )
except subprocess2 . CalledProcessError , e :
self . assertEquals ( ' A \n BB \n CCC \n ' , e . stdout )
self . assertEquals ( None , e . stderr )
self . assertEquals ( 64 , e . returncode )
def fn ( c , e , un ) :
try :
subprocess2 . check_output (
e + [ ' --fail ' , ' --stdout ' ] , universal_newlines = un )
self . fail ( )
except subprocess2 . CalledProcessError , e :
self . assertEquals ( c ( ' A \n BB \n CCC \n ' ) , e . stdout )
self . assertEquals ( None , e . stderr )
self . assertEquals ( 64 , e . returncode )
self . _run_test ( fn )
def test_check_output_throw_no_stderr ( self ) :
try :
subprocess2 . check_output (
self . exe + [ ' --fail ' , ' --stderr ' ] , universal_newlines = True )
self . fail ( )
except subprocess2 . CalledProcessError , e :
self . assertEquals ( ' ' , e . stdout )
self . assertEquals ( None , e . stderr )
self . assertEquals ( 64 , e . returncode )
def fn ( c , e , un ) :
try :
subprocess2 . check_output (
e + [ ' --fail ' , ' --stderr ' ] , universal_newlines = un )
self . fail ( )
except subprocess2 . CalledProcessError , e :
self . assertEquals ( c ( ' ' ) , e . stdout )
self . assertEquals ( None , e . stderr )
self . assertEquals ( 64 , e . returncode )
self . _run_test ( fn )
def test_check_output_throw_stderr ( self ) :
try :
subprocess2 . check_output (
self . exe + [ ' --fail ' , ' --stderr ' ] , stderr = subprocess2 . PIPE ,
universal_newlines = True )
self . fail ( )
except subprocess2 . CalledProcessError , e :
self . assertEquals ( ' ' , e . stdout )
self . assertEquals ( ' a \n bb \n ccc \n ' , e . stderr )
self . assertEquals ( 64 , e . returncode )
def fn ( c , e , un ) :
try :
subprocess2 . check_output (
e + [ ' --fail ' , ' --stderr ' ] , stderr = subprocess2 . PIPE ,
universal_newlines = un )
self . fail ( )
except subprocess2 . CalledProcessError , e :
self . assertEquals ( ' ' , e . stdout )
self . assertEquals ( c ( ' a \n bb \n ccc \n ' ) , e . stderr )
self . assertEquals ( 64 , e . returncode )
self . _run_test ( fn )
def test_check_output_throw_stderr_stdout ( self ) :
try :
subprocess2 . check_output (
self . exe + [ ' --fail ' , ' --stderr ' ] , stderr = subprocess2 . STDOUT ,
universal_newlines = True )
self . fail ( )
except subprocess2 . CalledProcessError , e :
self . assertEquals ( ' a \n bb \n ccc \n ' , e . stdout )
self . assertEquals ( None , e . stderr )
self . assertEquals ( 64 , e . returncode )
def fn ( c , e , un ) :
try :
subprocess2 . check_output (
e + [ ' --fail ' , ' --stderr ' ] , stderr = subprocess2 . STDOUT ,
universal_newlines = un )
self . fail ( )
except subprocess2 . CalledProcessError , e :
self . assertEquals ( c ( ' a \n bb \n ccc \n ' ) , e . stdout )
self . assertEquals ( None , e . stderr )
self . assertEquals ( 64 , e . returncode )
self . _run_test ( fn )
def test_check_call_throw ( self ) :
try :
@ -235,6 +310,13 @@ class Subprocess2Test(unittest.TestCase):
def child_main ( args ) :
if sys . platform == ' win32 ' :
# Annoying, make sure the output is not translated on Windows.
# pylint: disable=E1101,F0401
import msvcrt
msvcrt . setmode ( sys . stdout . fileno ( ) , os . O_BINARY )
msvcrt . setmode ( sys . stderr . fileno ( ) , os . O_BINARY )
parser = optparse . OptionParser ( )
parser . add_option (
' --fail ' ,
@ -242,28 +324,52 @@ def child_main(args):
action = ' store_const ' ,
default = 0 ,
const = 64 )
parser . add_option (
' --crlf ' , action = ' store_const ' , const = ' \r \n ' , dest = ' eol ' , default = ' \n ' )
parser . add_option (
' --cr ' , action = ' store_const ' , const = ' \r ' , dest = ' eol ' )
parser . add_option ( ' --stdout ' , action = ' store_true ' )
parser . add_option ( ' --stderr ' , action = ' store_true ' )
parser . add_option ( ' --sleep ' , action = ' store_true ' )
parser . add_option ( ' --sleep_first ' , action = ' store_true ' )
parser . add_option ( ' --sleep_last ' , action = ' store_true ' )
parser . add_option ( ' --large ' , action = ' store_true ' )
parser . add_option ( ' --read ' , action = ' store_true ' )
options , args = parser . parse_args ( args )
if args :
parser . error ( ' Internal error ' )
if options . sleep_first :
time . sleep ( 10 )
def do ( string ) :
if options . stdout :
print >> sys . stdout , string . upper ( )
sys . stdout . write ( string . upper ( ) )
sys . stdout . write ( options . eol )
if options . stderr :
print >> sys . stderr , string . lower ( )
sys . stderr . write ( string . lower ( ) )
sys . stderr . write ( options . eol )
do ( ' A ' )
do ( ' BB ' )
do ( ' CCC ' )
if options . sleep :
if options . large :
# Print 128kb.
string = ' 0123456789abcdef ' * ( 8 * 1024 )
sys . stdout . write ( string )
if options . read :
try :
while sys . stdin . read ( ) :
pass
except OSError :
pass
if options . sleep_last :
time . sleep ( 10 )
return options . return_value
if __name__ == ' __main__ ' :
logging . basicConfig ( level =
[ logging . WARNING , logging . INFO , logging . DEBUG ] [
min ( 2 , sys . argv . count ( ' -v ' ) ) ] )
if len ( sys . argv ) > 1 and sys . argv [ 1 ] == ' --child ' :
sys . exit ( child_main ( sys . argv [ 2 : ] ) )
unittest . main ( )