You cannot select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
165 lines
5.7 KiB
Python
165 lines
5.7 KiB
Python
#!/usr/bin/env python
|
|
# Copyright (c) 2012 The Chromium Authors. All rights reserved.
|
|
# Use of this source code is governed by a BSD-style license that can be
|
|
# found in the LICENSE file.
|
|
|
|
"""Unit tests for upload_to_google_storage.py."""
|
|
|
|
import optparse
|
|
import os
|
|
import Queue
|
|
import shutil
|
|
import StringIO
|
|
import sys
|
|
import tempfile
|
|
import threading
|
|
import unittest
|
|
|
|
sys.path.insert(0, os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
|
|
|
|
import upload_to_google_storage
|
|
from download_from_google_storage_unittests import GsutilMock
|
|
|
|
# ../third_party/gsutil/gsutil
|
|
GSUTIL_DEFAULT_PATH = os.path.join(
|
|
os.path.dirname(os.path.dirname(os.path.abspath(__file__))),
|
|
'third_party', 'gsutil', 'gsutil')
|
|
TEST_DIR = os.path.dirname(os.path.abspath(__file__))
|
|
|
|
|
|
class UploadTests(unittest.TestCase):
|
|
def setUp(self):
|
|
self.gsutil = GsutilMock(GSUTIL_DEFAULT_PATH, None)
|
|
self.temp_dir = tempfile.mkdtemp(prefix='gstools_test')
|
|
self.base_path = os.path.join(self.temp_dir, 'gstools')
|
|
shutil.copytree(os.path.join(TEST_DIR, 'gstools'), self.base_path)
|
|
self.base_url = 'gs://sometesturl'
|
|
self.parser = optparse.OptionParser()
|
|
self.ret_codes = Queue.Queue()
|
|
self.stdout_queue = Queue.Queue()
|
|
self.lorem_ipsum = os.path.join(self.base_path, 'lorem_ipsum.txt')
|
|
self.lorem_ipsum_sha1 = '7871c8e24da15bad8b0be2c36edc9dc77e37727f'
|
|
|
|
def cleanUp(self):
|
|
shutil.rmtree(self.temp_dir)
|
|
sys.stdin = sys.__stdin__
|
|
|
|
def test_upload_single_file(self):
|
|
filenames = [self.lorem_ipsum]
|
|
output_filename = '%s.sha1' % self.lorem_ipsum
|
|
code = upload_to_google_storage.upload_to_google_storage(
|
|
filenames, self.base_url, self.gsutil, True, False, 1, False)
|
|
self.assertEqual(
|
|
self.gsutil.history,
|
|
[('check_call',
|
|
('ls', '%s/%s' % (self.base_url, self.lorem_ipsum_sha1))),
|
|
('check_call',
|
|
('cp', '-q', filenames[0], '%s/%s' % (self.base_url,
|
|
self.lorem_ipsum_sha1)))])
|
|
self.assertTrue(os.path.exists(output_filename))
|
|
self.assertEqual(
|
|
open(output_filename, 'rb').read(),
|
|
'7871c8e24da15bad8b0be2c36edc9dc77e37727f')
|
|
os.remove(output_filename)
|
|
self.assertEqual(code, 0)
|
|
|
|
def test_upload_single_file_remote_exists(self):
|
|
filenames = [self.lorem_ipsum]
|
|
output_filename = '%s.sha1' % self.lorem_ipsum
|
|
etag_string = 'ETag: 634d7c1ed3545383837428f031840a1e'
|
|
self.gsutil.add_expected(0, '', '')
|
|
self.gsutil.add_expected(0, etag_string, '')
|
|
code = upload_to_google_storage.upload_to_google_storage(
|
|
filenames, self.base_url, self.gsutil, False, False, 1, False)
|
|
self.assertEqual(
|
|
self.gsutil.history,
|
|
[('check_call',
|
|
('ls', '%s/%s' % (self.base_url, self.lorem_ipsum_sha1))),
|
|
('check_call',
|
|
('ls', '-L', '%s/%s' % (self.base_url, self.lorem_ipsum_sha1)))])
|
|
self.assertTrue(os.path.exists(output_filename))
|
|
self.assertEqual(
|
|
open(output_filename, 'rb').read(),
|
|
'7871c8e24da15bad8b0be2c36edc9dc77e37727f')
|
|
os.remove(output_filename)
|
|
self.assertEqual(code, 0)
|
|
|
|
def test_upload_worker_errors(self):
|
|
work_queue = Queue.Queue()
|
|
work_queue.put((self.lorem_ipsum, self.lorem_ipsum_sha1))
|
|
work_queue.put((None, None))
|
|
self.gsutil.add_expected(1, '', '') # For the first ls call.
|
|
self.gsutil.add_expected(20, '', 'Expected error message')
|
|
# pylint: disable=W0212
|
|
upload_to_google_storage._upload_worker(
|
|
0,
|
|
work_queue,
|
|
self.base_url,
|
|
self.gsutil,
|
|
threading.Lock(),
|
|
False,
|
|
False,
|
|
self.stdout_queue,
|
|
self.ret_codes)
|
|
expected_ret_codes = [
|
|
(20,
|
|
'Encountered error on uploading %s to %s/%s\nExpected error message' %
|
|
(self.lorem_ipsum, self.base_url, self.lorem_ipsum_sha1))]
|
|
self.assertEqual(list(self.ret_codes.queue), expected_ret_codes)
|
|
|
|
def test_skip_hashing(self):
|
|
filenames = [self.lorem_ipsum]
|
|
output_filename = '%s.sha1' % self.lorem_ipsum
|
|
fake_hash = '6871c8e24da15bad8b0be2c36edc9dc77e37727f'
|
|
with open(output_filename, 'wb') as f:
|
|
f.write(fake_hash) # Fake hash.
|
|
code = upload_to_google_storage.upload_to_google_storage(
|
|
filenames, self.base_url, self.gsutil, False, False, 1, True)
|
|
self.assertEqual(
|
|
self.gsutil.history,
|
|
[('check_call',
|
|
('ls', '%s/%s' % (self.base_url, fake_hash))),
|
|
('check_call',
|
|
('ls', '-L', '%s/%s' % (self.base_url, fake_hash))),
|
|
('check_call',
|
|
('cp', '-q', filenames[0], '%s/%s' % (self.base_url, fake_hash)))])
|
|
self.assertEqual(
|
|
open(output_filename, 'rb').read(), fake_hash)
|
|
os.remove(output_filename)
|
|
self.assertEqual(code, 0)
|
|
|
|
def test_get_targets_no_args(self):
|
|
try:
|
|
upload_to_google_storage.get_targets([], self.parser, False)
|
|
self.fail()
|
|
except SystemExit, e:
|
|
self.assertEqual(e.code, 2)
|
|
|
|
def test_get_targets_passthrough(self):
|
|
result = upload_to_google_storage.get_targets(
|
|
['a', 'b', 'c', 'd', 'e'],
|
|
self.parser,
|
|
False)
|
|
self.assertEqual(result, ['a', 'b', 'c', 'd', 'e'])
|
|
|
|
def test_get_targets_multiple_stdin(self):
|
|
inputs = ['a', 'b', 'c', 'd', 'e']
|
|
sys.stdin = StringIO.StringIO(os.linesep.join(inputs))
|
|
result = upload_to_google_storage.get_targets(
|
|
['-'],
|
|
self.parser,
|
|
False)
|
|
self.assertEqual(result, inputs)
|
|
|
|
def test_get_targets_multiple_stdin_null(self):
|
|
inputs = ['a', 'b', 'c', 'd', 'e']
|
|
sys.stdin = StringIO.StringIO('\0'.join(inputs))
|
|
result = upload_to_google_storage.get_targets(
|
|
['-'],
|
|
self.parser,
|
|
True)
|
|
self.assertEqual(result, inputs)
|
|
|
|
|
|
if __name__ == '__main__':
|
|
unittest.main() |