You cannot select more than 25 topics
			Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
		
		
		
		
		
			
		
			
				
	
	
		
			497 lines
		
	
	
		
			22 KiB
		
	
	
	
		
			Python
		
	
			
		
		
	
	
			497 lines
		
	
	
		
			22 KiB
		
	
	
	
		
			Python
		
	
#!/usr/bin/env vpython3
 | 
						|
# Copyright (c) 2012 The Chromium Authors. All rights reserved.
 | 
						|
# Use of this source code is governed by a BSD-style license that can be
 | 
						|
# found in the LICENSE file.
 | 
						|
# pylint: disable=protected-access
 | 
						|
"""Unit tests for download_from_google_storage.py."""
 | 
						|
 | 
						|
import optparse
 | 
						|
import os
 | 
						|
import queue
 | 
						|
 | 
						|
import shutil
 | 
						|
import sys
 | 
						|
import tarfile
 | 
						|
import tempfile
 | 
						|
import threading
 | 
						|
import unittest
 | 
						|
 | 
						|
sys.path.insert(0, os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
 | 
						|
 | 
						|
import upload_to_google_storage
 | 
						|
import download_from_google_storage
 | 
						|
 | 
						|
# ../third_party/gsutil/gsutil
 | 
						|
GSUTIL_DEFAULT_PATH = os.path.join(
 | 
						|
    os.path.dirname(os.path.dirname(os.path.abspath(__file__))), 'gsutil.py')
 | 
						|
TEST_DIR = os.path.dirname(os.path.abspath(__file__))
 | 
						|
 | 
						|
 | 
						|
class GsutilMock(object):
 | 
						|
    def __init__(self, path, boto_path, timeout=None):
 | 
						|
        self.path = path
 | 
						|
        self.timeout = timeout
 | 
						|
        self.boto_path = boto_path
 | 
						|
        self.expected = []
 | 
						|
        self.history = []
 | 
						|
        self.lock = threading.Lock()
 | 
						|
 | 
						|
    def add_expected(self, return_code, out, err, fn=None):
 | 
						|
        self.expected.append((return_code, out, err, fn))
 | 
						|
 | 
						|
    def append_history(self, method, args):
 | 
						|
        self.history.append((method, args))
 | 
						|
 | 
						|
    def call(self, *args):
 | 
						|
        with self.lock:
 | 
						|
            self.append_history('call', args)
 | 
						|
            if self.expected:
 | 
						|
                code, _out, _err, fn = self.expected.pop(0)
 | 
						|
                if fn:
 | 
						|
                    fn()
 | 
						|
                return code
 | 
						|
 | 
						|
            return 0
 | 
						|
 | 
						|
    def check_call(self, *args):
 | 
						|
        with self.lock:
 | 
						|
            self.append_history('check_call', args)
 | 
						|
            if self.expected:
 | 
						|
                code, out, err, fn = self.expected.pop(0)
 | 
						|
                if fn:
 | 
						|
                    fn()
 | 
						|
                return code, out, err
 | 
						|
 | 
						|
            return (0, '', '')
 | 
						|
 | 
						|
    def check_call_with_retries(self, *args):
 | 
						|
        return self.check_call(*args)
 | 
						|
 | 
						|
 | 
						|
class ChangedWorkingDirectory(object):
 | 
						|
    def __init__(self, working_directory):
 | 
						|
        self._old_cwd = ''
 | 
						|
        self._working_directory = working_directory
 | 
						|
 | 
						|
    def __enter__(self):
 | 
						|
        self._old_cwd = os.getcwd()
 | 
						|
        print("Enter directory = ", self._working_directory)
 | 
						|
        os.chdir(self._working_directory)
 | 
						|
 | 
						|
    def __exit__(self, *_):
 | 
						|
        print("Enter directory = ", self._old_cwd)
 | 
						|
        os.chdir(self._old_cwd)
 | 
						|
 | 
						|
 | 
						|
class GstoolsUnitTests(unittest.TestCase):
 | 
						|
    def setUp(self):
 | 
						|
        self.temp_dir = tempfile.mkdtemp(prefix='gstools_test')
 | 
						|
        self.base_path = os.path.join(self.temp_dir, 'test_files')
 | 
						|
        shutil.copytree(os.path.join(TEST_DIR, 'gstools'), self.base_path)
 | 
						|
 | 
						|
    def tearDown(self):
 | 
						|
        shutil.rmtree(self.temp_dir)
 | 
						|
 | 
						|
    def test_validate_tar_file(self):
 | 
						|
        lorem_ipsum = os.path.join(self.base_path, 'lorem_ipsum.txt')
 | 
						|
        with ChangedWorkingDirectory(self.base_path):
 | 
						|
            # Sanity ok check.
 | 
						|
            tar_dir = 'ok_dir'
 | 
						|
            os.makedirs(os.path.join(self.base_path, tar_dir))
 | 
						|
            tar = 'good.tar.gz'
 | 
						|
            lorem_ipsum_copy = os.path.join(tar_dir, 'lorem_ipsum.txt')
 | 
						|
            shutil.copyfile(lorem_ipsum, lorem_ipsum_copy)
 | 
						|
            with tarfile.open(tar, 'w:gz') as tar:
 | 
						|
                tar.add(lorem_ipsum_copy)
 | 
						|
                self.assertTrue(
 | 
						|
                    download_from_google_storage._validate_tar_file(
 | 
						|
                        tar, tar_dir))
 | 
						|
 | 
						|
            # os.symlink doesn't exist on Windows.
 | 
						|
            if sys.platform != 'win32':
 | 
						|
                # Test no links.
 | 
						|
                tar_dir_link = 'for_tar_link'
 | 
						|
                os.makedirs(tar_dir_link)
 | 
						|
                link = os.path.join(tar_dir_link, 'link')
 | 
						|
                os.symlink(lorem_ipsum, link)
 | 
						|
                tar_with_links = 'with_links.tar.gz'
 | 
						|
                with tarfile.open(tar_with_links, 'w:gz') as tar:
 | 
						|
                    tar.add(link)
 | 
						|
                    self.assertFalse(
 | 
						|
                        download_from_google_storage._validate_tar_file(
 | 
						|
                            tar, tar_dir_link))
 | 
						|
 | 
						|
            # Test not outside.
 | 
						|
            tar_dir_outside = 'outside_tar'
 | 
						|
            os.makedirs(tar_dir_outside)
 | 
						|
            tar_with_outside = 'with_outside.tar.gz'
 | 
						|
            with tarfile.open(tar_with_outside, 'w:gz') as tar:
 | 
						|
                tar.add(lorem_ipsum)
 | 
						|
                self.assertFalse(
 | 
						|
                    download_from_google_storage._validate_tar_file(
 | 
						|
                        tar, tar_dir_outside))
 | 
						|
            # Test no ../
 | 
						|
            tar_with_dotdot = 'with_dotdot.tar.gz'
 | 
						|
            dotdot_file = os.path.join(tar_dir, '..', tar_dir,
 | 
						|
                                       'lorem_ipsum.txt')
 | 
						|
            with tarfile.open(tar_with_dotdot, 'w:gz') as tar:
 | 
						|
                tar.add(dotdot_file)
 | 
						|
                self.assertFalse(
 | 
						|
                    download_from_google_storage._validate_tar_file(
 | 
						|
                        tar, tar_dir))
 | 
						|
            # Test normal file with .. in name okay
 | 
						|
            tar_with_hidden = 'with_normal_dotdot.tar.gz'
 | 
						|
            hidden_file = os.path.join(tar_dir, '..hidden_file.txt')
 | 
						|
            shutil.copyfile(lorem_ipsum, hidden_file)
 | 
						|
            with tarfile.open(tar_with_hidden, 'w:gz') as tar:
 | 
						|
                tar.add(hidden_file)
 | 
						|
                self.assertTrue(
 | 
						|
                    download_from_google_storage._validate_tar_file(
 | 
						|
                        tar, tar_dir))
 | 
						|
 | 
						|
    def test_gsutil(self):
 | 
						|
        # This will download a real gsutil package from Google Storage.
 | 
						|
        gsutil = download_from_google_storage.Gsutil(GSUTIL_DEFAULT_PATH, None)
 | 
						|
        self.assertEqual(gsutil.path, GSUTIL_DEFAULT_PATH)
 | 
						|
        code, _, err = gsutil.check_call()
 | 
						|
        self.assertEqual(code, 0, err)
 | 
						|
        self.assertEqual(err, '')
 | 
						|
 | 
						|
    def test_get_sha1(self):
 | 
						|
        lorem_ipsum = os.path.join(self.base_path, 'lorem_ipsum.txt')
 | 
						|
        self.assertEqual(download_from_google_storage.get_sha1(lorem_ipsum),
 | 
						|
                         '7871c8e24da15bad8b0be2c36edc9dc77e37727f')
 | 
						|
 | 
						|
    def test_get_md5(self):
 | 
						|
        lorem_ipsum = os.path.join(self.base_path, 'lorem_ipsum.txt')
 | 
						|
        self.assertEqual(upload_to_google_storage.get_md5(lorem_ipsum),
 | 
						|
                         '634d7c1ed3545383837428f031840a1e')
 | 
						|
 | 
						|
    def test_get_md5_cached_read(self):
 | 
						|
        lorem_ipsum = os.path.join(self.base_path, 'lorem_ipsum.txt')
 | 
						|
        # Use a fake 'stale' MD5 sum.  Expected behavior is to return stale sum.
 | 
						|
        self.assertEqual(upload_to_google_storage.get_md5_cached(lorem_ipsum),
 | 
						|
                         '734d7c1ed3545383837428f031840a1e')
 | 
						|
 | 
						|
    def test_get_md5_cached_write(self):
 | 
						|
        lorem_ipsum2 = os.path.join(self.base_path, 'lorem_ipsum2.txt')
 | 
						|
        lorem_ipsum2_md5 = os.path.join(self.base_path, 'lorem_ipsum2.txt.md5')
 | 
						|
        if os.path.exists(lorem_ipsum2_md5):
 | 
						|
            os.remove(lorem_ipsum2_md5)
 | 
						|
        # Use a fake 'stale' MD5 sum.  Expected behavior is to return stale sum.
 | 
						|
        self.assertEqual(upload_to_google_storage.get_md5_cached(lorem_ipsum2),
 | 
						|
                         '4c02d1eb455a0f22c575265d17b84b6d')
 | 
						|
        self.assertTrue(os.path.exists(lorem_ipsum2_md5))
 | 
						|
        self.assertEqual(
 | 
						|
            open(lorem_ipsum2_md5, 'rb').read().decode(),
 | 
						|
            '4c02d1eb455a0f22c575265d17b84b6d')
 | 
						|
        os.remove(lorem_ipsum2_md5)  # Clean up.
 | 
						|
        self.assertFalse(os.path.exists(lorem_ipsum2_md5))
 | 
						|
 | 
						|
 | 
						|
class DownloadTests(unittest.TestCase):
 | 
						|
    def setUp(self):
 | 
						|
        self.gsutil = GsutilMock(GSUTIL_DEFAULT_PATH, None)
 | 
						|
        self.temp_dir = tempfile.mkdtemp(prefix='gstools_test')
 | 
						|
        self.checkout_test_files = os.path.join(TEST_DIR, 'gstools',
 | 
						|
                                                'download_test_data')
 | 
						|
        self.base_path = os.path.join(self.temp_dir, 'download_test_data')
 | 
						|
        shutil.copytree(self.checkout_test_files, self.base_path)
 | 
						|
        self.base_url = 'gs://sometesturl'
 | 
						|
        self.parser = optparse.OptionParser()
 | 
						|
        self.queue = queue.Queue()
 | 
						|
        self.ret_codes = queue.Queue()
 | 
						|
        self.lorem_ipsum = os.path.join(TEST_DIR, 'gstools', 'lorem_ipsum.txt')
 | 
						|
        self.lorem_ipsum_sha1 = '7871c8e24da15bad8b0be2c36edc9dc77e37727f'
 | 
						|
        self.maxDiff = None
 | 
						|
 | 
						|
    def tearDown(self):
 | 
						|
        shutil.rmtree(self.temp_dir)
 | 
						|
 | 
						|
    def test_enumerate_files_non_recursive(self):
 | 
						|
        for item in download_from_google_storage.enumerate_input(
 | 
						|
                self.base_path, True, False, False, None, False, False):
 | 
						|
            self.queue.put(item)
 | 
						|
        expected_queue = [('e6c4fbd4fe7607f3e6ebf68b2ea4ef694da7b4fe',
 | 
						|
                           os.path.join(self.base_path, 'rootfolder_text.txt')),
 | 
						|
                          ('7871c8e24da15bad8b0be2c36edc9dc77e37727f',
 | 
						|
                           os.path.join(self.base_path,
 | 
						|
                                        'uploaded_lorem_ipsum.txt'))]
 | 
						|
        self.assertEqual(sorted(expected_queue), sorted(self.queue.queue))
 | 
						|
 | 
						|
    def test_enumerate_files_recursive(self):
 | 
						|
        for item in download_from_google_storage.enumerate_input(
 | 
						|
                self.base_path, True, True, False, None, False, False):
 | 
						|
            self.queue.put(item)
 | 
						|
        expected_queue = [
 | 
						|
            ('e6c4fbd4fe7607f3e6ebf68b2ea4ef694da7b4fe',
 | 
						|
             os.path.join(self.base_path, 'rootfolder_text.txt')),
 | 
						|
            ('7871c8e24da15bad8b0be2c36edc9dc77e37727f',
 | 
						|
             os.path.join(self.base_path, 'uploaded_lorem_ipsum.txt')),
 | 
						|
            ('b5415aa0b64006a95c0c409182e628881d6d6463',
 | 
						|
             os.path.join(self.base_path, 'subfolder', 'subfolder_text.txt')),
 | 
						|
            ('b5415aa0b64006a95c0c409182e628881d6d6463',
 | 
						|
             os.path.join(self.base_path, 'subfolder2', 'subfolder_text.txt')),
 | 
						|
        ]
 | 
						|
        self.assertEqual(sorted(expected_queue), sorted(self.queue.queue))
 | 
						|
 | 
						|
    def test_download_worker_single_file(self):
 | 
						|
        sha1_hash = self.lorem_ipsum_sha1
 | 
						|
        input_filename = '%s/%s' % (self.base_url, sha1_hash)
 | 
						|
        output_filename = os.path.join(self.base_path,
 | 
						|
                                       'uploaded_lorem_ipsum.txt')
 | 
						|
        self.gsutil.add_expected(
 | 
						|
            0, '', '',
 | 
						|
            lambda: shutil.copyfile(self.lorem_ipsum, output_filename))  # cp
 | 
						|
        self.queue.put((sha1_hash, output_filename))
 | 
						|
        self.queue.put((None, None))
 | 
						|
        stdout_queue = queue.Queue()
 | 
						|
        download_from_google_storage._downloader_worker_thread(
 | 
						|
            0, self.queue, False, self.base_url, self.gsutil, stdout_queue,
 | 
						|
            self.ret_codes, True, False)
 | 
						|
        expected_calls = [('check_call', ('cp', input_filename,
 | 
						|
                                          output_filename))]
 | 
						|
        sha1_hash = '7871c8e24da15bad8b0be2c36edc9dc77e37727f'
 | 
						|
        if sys.platform != 'win32':
 | 
						|
            expected_calls.append(
 | 
						|
                ('check_call', ('stat', 'gs://sometesturl/' + sha1_hash)))
 | 
						|
        expected_output = [
 | 
						|
            '0> Downloading %s@%s...' % (output_filename, sha1_hash)
 | 
						|
        ]
 | 
						|
        expected_ret_codes = []
 | 
						|
        self.assertEqual(list(stdout_queue.queue), expected_output)
 | 
						|
        self.assertEqual(self.gsutil.history, expected_calls)
 | 
						|
        self.assertEqual(list(self.ret_codes.queue), expected_ret_codes)
 | 
						|
 | 
						|
    def test_download_worker_skips_file(self):
 | 
						|
        sha1_hash = 'e6c4fbd4fe7607f3e6ebf68b2ea4ef694da7b4fe'
 | 
						|
        output_filename = os.path.join(self.base_path, 'rootfolder_text.txt')
 | 
						|
        self.queue.put((sha1_hash, output_filename))
 | 
						|
        self.queue.put((None, None))
 | 
						|
        stdout_queue = queue.Queue()
 | 
						|
        download_from_google_storage._downloader_worker_thread(
 | 
						|
            0, self.queue, False, self.base_url, self.gsutil, stdout_queue,
 | 
						|
            self.ret_codes, True, False)
 | 
						|
        # dfgs does not output anything in the no-op case.
 | 
						|
        self.assertEqual(list(stdout_queue.queue), [])
 | 
						|
        self.assertEqual(self.gsutil.history, [])
 | 
						|
 | 
						|
    def test_download_extract_archive(self):
 | 
						|
        # Generate a gzipped tarfile
 | 
						|
        output_filename = os.path.join(self.base_path, 'subfolder.tar.gz')
 | 
						|
        output_dirname = os.path.join(self.base_path, 'subfolder')
 | 
						|
        extracted_filename = os.path.join(output_dirname, 'subfolder_text.txt')
 | 
						|
        with tarfile.open(output_filename, 'w:gz') as tar:
 | 
						|
            tar.add(output_dirname, arcname='subfolder')
 | 
						|
        shutil.rmtree(output_dirname)
 | 
						|
        sha1_hash = download_from_google_storage.get_sha1(output_filename)
 | 
						|
        input_filename = '%s/%s' % (self.base_url, sha1_hash)
 | 
						|
 | 
						|
        # Initial download
 | 
						|
        self.queue.put((sha1_hash, output_filename))
 | 
						|
        self.queue.put((None, None))
 | 
						|
        stdout_queue = queue.Queue()
 | 
						|
        download_from_google_storage._downloader_worker_thread(0,
 | 
						|
                                                               self.queue,
 | 
						|
                                                               True,
 | 
						|
                                                               self.base_url,
 | 
						|
                                                               self.gsutil,
 | 
						|
                                                               stdout_queue,
 | 
						|
                                                               self.ret_codes,
 | 
						|
                                                               True,
 | 
						|
                                                               True,
 | 
						|
                                                               delete=False)
 | 
						|
        expected_calls = [('check_call', ('cp', input_filename,
 | 
						|
                                          output_filename))]
 | 
						|
        if sys.platform != 'win32':
 | 
						|
            expected_calls.append(
 | 
						|
                ('check_call', ('stat', 'gs://sometesturl/%s' % sha1_hash)))
 | 
						|
        expected_output = [
 | 
						|
            '0> Downloading %s@%s...' % (output_filename, sha1_hash)
 | 
						|
        ]
 | 
						|
        expected_output.extend([
 | 
						|
            '0> Extracting 3 entries from %s to %s' %
 | 
						|
            (output_filename, output_dirname)
 | 
						|
        ])
 | 
						|
        expected_ret_codes = []
 | 
						|
        self.assertEqual(list(stdout_queue.queue), expected_output)
 | 
						|
        self.assertEqual(self.gsutil.history, expected_calls)
 | 
						|
        self.assertEqual(list(self.ret_codes.queue), expected_ret_codes)
 | 
						|
        self.assertTrue(os.path.exists(output_dirname))
 | 
						|
        self.assertTrue(os.path.exists(extracted_filename))
 | 
						|
 | 
						|
        # Test noop download
 | 
						|
        self.queue.put((sha1_hash, output_filename))
 | 
						|
        self.queue.put((None, None))
 | 
						|
        stdout_queue = queue.Queue()
 | 
						|
        download_from_google_storage._downloader_worker_thread(0,
 | 
						|
                                                               self.queue,
 | 
						|
                                                               False,
 | 
						|
                                                               self.base_url,
 | 
						|
                                                               self.gsutil,
 | 
						|
                                                               stdout_queue,
 | 
						|
                                                               self.ret_codes,
 | 
						|
                                                               True,
 | 
						|
                                                               True,
 | 
						|
                                                               delete=False)
 | 
						|
 | 
						|
        self.assertEqual(list(stdout_queue.queue), [])
 | 
						|
        self.assertEqual(self.gsutil.history, expected_calls)
 | 
						|
        self.assertEqual(list(self.ret_codes.queue), [])
 | 
						|
        self.assertTrue(os.path.exists(output_dirname))
 | 
						|
        self.assertTrue(os.path.exists(extracted_filename))
 | 
						|
 | 
						|
        # With dirty flag file, previous extraction wasn't complete
 | 
						|
        with open(os.path.join(self.base_path, 'subfolder.tmp'), 'a'):
 | 
						|
            pass
 | 
						|
 | 
						|
        self.queue.put((sha1_hash, output_filename))
 | 
						|
        self.queue.put((None, None))
 | 
						|
        stdout_queue = queue.Queue()
 | 
						|
        download_from_google_storage._downloader_worker_thread(0,
 | 
						|
                                                               self.queue,
 | 
						|
                                                               False,
 | 
						|
                                                               self.base_url,
 | 
						|
                                                               self.gsutil,
 | 
						|
                                                               stdout_queue,
 | 
						|
                                                               self.ret_codes,
 | 
						|
                                                               True,
 | 
						|
                                                               True,
 | 
						|
                                                               delete=False)
 | 
						|
        expected_calls += [('check_call', ('cp', input_filename,
 | 
						|
                                           output_filename))]
 | 
						|
        if sys.platform != 'win32':
 | 
						|
            expected_calls.append(
 | 
						|
                ('check_call', ('stat', 'gs://sometesturl/%s' % sha1_hash)))
 | 
						|
        expected_output = [
 | 
						|
            '0> Detected tmp flag file for %s, re-downloading...' %
 | 
						|
            (output_filename),
 | 
						|
            '0> Downloading %s@%s...' % (output_filename, sha1_hash),
 | 
						|
            '0> Removed %s...' % (output_dirname),
 | 
						|
            '0> Extracting 3 entries from %s to %s' %
 | 
						|
            (output_filename, output_dirname),
 | 
						|
        ]
 | 
						|
        expected_ret_codes = []
 | 
						|
        self.assertEqual(list(stdout_queue.queue), expected_output)
 | 
						|
        self.assertEqual(self.gsutil.history, expected_calls)
 | 
						|
        self.assertEqual(list(self.ret_codes.queue), expected_ret_codes)
 | 
						|
        self.assertTrue(os.path.exists(output_dirname))
 | 
						|
        self.assertTrue(os.path.exists(extracted_filename))
 | 
						|
 | 
						|
    def test_download_worker_skips_not_found_file(self):
 | 
						|
        sha1_hash = '7871c8e24da15bad8b0be2c36edc9dc77e37727f'
 | 
						|
        input_filename = '%s/%s' % (self.base_url, sha1_hash)
 | 
						|
        output_filename = os.path.join(self.base_path,
 | 
						|
                                       'uploaded_lorem_ipsum.txt')
 | 
						|
        self.queue.put((sha1_hash, output_filename))
 | 
						|
        self.queue.put((None, None))
 | 
						|
        stdout_queue = queue.Queue()
 | 
						|
        self.gsutil.add_expected(1, '', '')  # Return error when 'cp' is called.
 | 
						|
        download_from_google_storage._downloader_worker_thread(
 | 
						|
            0, self.queue, False, self.base_url, self.gsutil, stdout_queue,
 | 
						|
            self.ret_codes, True, False)
 | 
						|
        expected_output = [
 | 
						|
            '0> Downloading %s@%s...' % (output_filename, sha1_hash),
 | 
						|
            '0> Failed to fetch file %s for %s, skipping. [Err: ]' %
 | 
						|
            (input_filename, output_filename),
 | 
						|
        ]
 | 
						|
        expected_calls = [('check_call', ('cp', input_filename,
 | 
						|
                                          output_filename))]
 | 
						|
        expected_ret_codes = [(1, 'Failed to fetch file %s for %s. [Err: ]' %
 | 
						|
                               (input_filename, output_filename))]
 | 
						|
        self.assertEqual(list(stdout_queue.queue), expected_output)
 | 
						|
        self.assertEqual(self.gsutil.history, expected_calls)
 | 
						|
        self.assertEqual(list(self.ret_codes.queue), expected_ret_codes)
 | 
						|
 | 
						|
    def test_download_cp_fails(self):
 | 
						|
        sha1_hash = '7871c8e24da15bad8b0be2c36edc9dc77e37727f'
 | 
						|
        input_filename = '%s/%s' % (self.base_url, sha1_hash)
 | 
						|
        output_filename = os.path.join(self.base_path,
 | 
						|
                                       'uploaded_lorem_ipsum.txt')
 | 
						|
        self.gsutil.add_expected(101, '', 'Test error message.')  # cp
 | 
						|
        code = download_from_google_storage.download_from_google_storage(
 | 
						|
            input_filename=sha1_hash,
 | 
						|
            base_url=self.base_url,
 | 
						|
            gsutil=self.gsutil,
 | 
						|
            num_threads=1,
 | 
						|
            directory=False,
 | 
						|
            recursive=False,
 | 
						|
            force=True,
 | 
						|
            output=output_filename,
 | 
						|
            ignore_errors=False,
 | 
						|
            sha1_file=False,
 | 
						|
            verbose=True,
 | 
						|
            auto_platform=False,
 | 
						|
            extract=False)
 | 
						|
        expected_calls = [('check_call', ('cp', input_filename,
 | 
						|
                                          output_filename))]
 | 
						|
        self.assertEqual(self.gsutil.history, expected_calls)
 | 
						|
        self.assertEqual(code, 101)
 | 
						|
 | 
						|
    def test_corrupt_download(self):
 | 
						|
        q = queue.Queue()
 | 
						|
        out_q = queue.Queue()
 | 
						|
        ret_codes = queue.Queue()
 | 
						|
        tmp_dir = tempfile.mkdtemp()
 | 
						|
        sha1_hash = '7871c8e24da15bad8b0be2c36edc9dc77e37727f'
 | 
						|
        output_filename = os.path.join(tmp_dir, 'lorem_ipsum.txt')
 | 
						|
        q.put(('7871c8e24da15bad8b0be2c36edc9dc77e37727f', output_filename))
 | 
						|
        q.put((None, None))
 | 
						|
 | 
						|
        def _write_bad_file():
 | 
						|
            with open(output_filename, 'w') as f:
 | 
						|
                f.write('foobar')
 | 
						|
 | 
						|
        self.gsutil.add_expected(0, '', '', _write_bad_file)  # cp
 | 
						|
        download_from_google_storage._downloader_worker_thread(
 | 
						|
            1, q, True, self.base_url, self.gsutil, out_q, ret_codes, True,
 | 
						|
            False)
 | 
						|
        self.assertTrue(q.empty())
 | 
						|
        msg = ('1> ERROR remote sha1 (%s) does not match expected sha1 (%s).' %
 | 
						|
               ('8843d7f92416211de9ebb963ff4ce28125932878', sha1_hash))
 | 
						|
        self.assertEqual(
 | 
						|
            out_q.get(),
 | 
						|
            '1> Downloading %s@%s...' % (output_filename, sha1_hash))
 | 
						|
        self.assertEqual(out_q.get(), msg)
 | 
						|
        self.assertEqual(ret_codes.get(), (20, msg))
 | 
						|
        self.assertTrue(out_q.empty())
 | 
						|
        self.assertTrue(ret_codes.empty())
 | 
						|
 | 
						|
    def test_download_directory_no_recursive_non_force(self):
 | 
						|
        sha1_hash = '7871c8e24da15bad8b0be2c36edc9dc77e37727f'
 | 
						|
        input_filename = '%s/%s' % (self.base_url, sha1_hash)
 | 
						|
        output_filename = os.path.join(self.base_path,
 | 
						|
                                       'uploaded_lorem_ipsum.txt')
 | 
						|
        self.gsutil.add_expected(0, '', '')  # version
 | 
						|
        self.gsutil.add_expected(
 | 
						|
            0, '', '',
 | 
						|
            lambda: shutil.copyfile(self.lorem_ipsum, output_filename))  # cp
 | 
						|
        code = download_from_google_storage.download_from_google_storage(
 | 
						|
            input_filename=self.base_path,
 | 
						|
            base_url=self.base_url,
 | 
						|
            gsutil=self.gsutil,
 | 
						|
            num_threads=1,
 | 
						|
            directory=True,
 | 
						|
            recursive=False,
 | 
						|
            force=False,
 | 
						|
            output=None,
 | 
						|
            ignore_errors=False,
 | 
						|
            sha1_file=False,
 | 
						|
            verbose=True,
 | 
						|
            auto_platform=False,
 | 
						|
            extract=False)
 | 
						|
        expected_calls = [('check_call', ('version', )),
 | 
						|
                          ('check_call', ('cp', input_filename,
 | 
						|
                                          output_filename))]
 | 
						|
        if sys.platform != 'win32':
 | 
						|
            expected_calls.append(
 | 
						|
                ('check_call',
 | 
						|
                 ('stat',
 | 
						|
                  'gs://sometesturl/7871c8e24da15bad8b0be2c36edc9dc77e37727f')))
 | 
						|
        self.assertEqual(self.gsutil.history, expected_calls)
 | 
						|
        self.assertEqual(code, 0)
 | 
						|
 | 
						|
 | 
						|
if __name__ == '__main__':
 | 
						|
    unittest.main()
 |