#!/usr/bin/env vpython3 # Copyright (c) 2015 The Chromium Authors. All rights reserved. # Use of this source code is governed by a BSD-style license that can be # found in the LICENSE file. import codecs import copy import os import sys import unittest sys.path.insert( 0, os.path.join(os.path.dirname(os.path.dirname(os.path.abspath(__file__))), 'recipes', 'recipe_modules', 'bot_update', 'resources')) import bot_update # TODO: Should fix these warnings. # pylint: disable=line-too-long class MockedPopen(object): """A fake instance of a called subprocess. This is meant to be used in conjunction with MockedCall. """ def __init__(self, args=None, kwargs=None): self.args = args or [] self.kwargs = kwargs or {} self.return_value = None self.fails = False def returns(self, rv): """Set the return value when this popen is called. rv can be a string, or a callable (eg function). """ self.return_value = rv return self def check(self, args, kwargs): """Check to see if the given args/kwargs call match this instance. This does a partial match, so that a call to "git clone foo" will match this instance if this instance was recorded as "git clone" """ if any(input_arg != expected_arg for (input_arg, expected_arg) in zip(args, self.args)): return False return self.return_value def __call__(self, args, kwargs): """Actually call this popen instance.""" if hasattr(self.return_value, '__call__'): return self.return_value(*args, **kwargs) return self.return_value class MockedCall(object): """A fake instance of bot_update.call(). This object is pre-seeded with "answers" in self.expectations. The type is a MockedPopen object, or any object with a __call__() and check() method. The check() method is used to check to see if the correct popen object is chosen (can be a partial match, eg a "git clone" popen module would match a "git clone foo" call). By default, if no answers have been pre-seeded, the call() returns successful with an empty string. """ def __init__(self, fake_filesystem): self.expectations = [] self.records = [] def expect(self, args=None, kwargs=None): args = args or [] kwargs = kwargs or {} popen = MockedPopen(args, kwargs) self.expectations.append(popen) return popen def __call__(self, *args, **kwargs): self.records.append((args, kwargs)) for popen in self.expectations: if popen.check(args, kwargs): self.expectations.remove(popen) return popen(args, kwargs) return '' class MockedGclientSync(): """A class producing a callable instance of gclient sync.""" def __init__(self, fake_filesystem): self.records = [] def __call__(self, *args, **_): self.records.append(args) class FakeFile(): def __init__(self): self.contents = '' def write(self, buf): self.contents += buf def read(self): return self.contents def __enter__(self): return self def __exit__(self, _, __, ___): pass class FakeFilesystem(): def __init__(self): self.files = {} def open(self, target, mode='r', encoding=None): if 'w' in mode: self.files[target] = FakeFile() return self.files[target] return self.files[target] def fake_git(*args, **kwargs): return bot_update.call('git', *args, **kwargs) class BotUpdateUnittests(unittest.TestCase): DEFAULT_PARAMS = { 'solutions': [{ 'name': 'somename', 'url': 'https://fake.com' }], 'revisions': {}, 'first_sln': 'somename', 'target_os': None, 'target_os_only': None, 'target_cpu': None, 'patch_root': None, 'patch_refs': [], 'gerrit_rebase_patch_ref': None, 'no_fetch_tags': False, 'refs': [], 'git_cache_dir': '', 'cleanup_dir': None, 'gerrit_reset': None, 'enforce_fetch': False, 'experiments': [], } def setUp(self): sys.platform = 'linux2' # For consistency, ya know? self.filesystem = FakeFilesystem() self.call = MockedCall(self.filesystem) self.gclient = MockedGclientSync(self.filesystem) self.call.expect((sys.executable, '-u', bot_update.GCLIENT_PATH, 'sync')).returns(self.gclient) self.old_call = getattr(bot_update, 'call') self.params = copy.deepcopy(self.DEFAULT_PARAMS) setattr(bot_update, 'call', self.call) setattr(bot_update, 'git', fake_git) self.old_os_cwd = os.getcwd setattr(os, 'getcwd', lambda: '/b/build/foo/build') setattr(bot_update, 'open', self.filesystem.open) self.old_codecs_open = codecs.open setattr(codecs, 'open', self.filesystem.open) def tearDown(self): setattr(bot_update, 'call', self.old_call) setattr(os, 'getcwd', self.old_os_cwd) delattr(bot_update, 'open') setattr(codecs, 'open', self.old_codecs_open) def overrideSetupForWindows(self): sys.platform = 'win' self.call.expect((sys.executable, '-u', bot_update.GCLIENT_PATH, 'sync')).returns(self.gclient) def testBasic(self): bot_update.ensure_checkout(**self.params) return self.call.records def testBasicCachepackOffloading(self): os.environ['PACKFILE_OFFLOADING'] = '1' bot_update.ensure_checkout(**self.params) os.environ.pop('PACKFILE_OFFLOADING') return self.call.records def testBasicRevision(self): self.params['revisions'] = { 'src': 'HEAD', 'src/v8': 'deadbeef', 'somename': 'DNE' } bot_update.ensure_checkout(**self.params) args = self.gclient.records[0] idx_first_revision = args.index('--revision') idx_second_revision = args.index('--revision', idx_first_revision + 1) idx_third_revision = args.index('--revision', idx_second_revision + 1) self.assertEqual(args[idx_first_revision + 1], 'somename@unmanaged') self.assertEqual(args[idx_second_revision + 1], 'src@refs/remotes/origin/main') self.assertEqual(args[idx_third_revision + 1], 'src/v8@deadbeef') return self.call.records def testTagsByDefault(self): bot_update.ensure_checkout(**self.params) found = False for record in self.call.records: args = record[0] if args[:3] == ('git', 'cache', 'populate'): self.assertFalse('--no-fetch-tags' in args) found = True self.assertTrue(found) return self.call.records def testNoTags(self): params = self.params params['no_fetch_tags'] = True bot_update.ensure_checkout(**params) found = False for record in self.call.records: args = record[0] if args[:3] == ('git', 'cache', 'populate'): self.assertTrue('--no-fetch-tags' in args) found = True self.assertTrue(found) return self.call.records def testGclientNoSyncExperiment(self): ref = 'refs/changes/12/345/6' repo = 'https://chromium.googlesource.com/v8/v8' self.params['patch_refs'] = ['%s@%s' % (repo, ref)] self.params['experiments'] = bot_update.EXP_NO_SYNC bot_update.ensure_checkout(**self.params) args = self.gclient.records[0] idx = args.index('--experiment') self.assertEqual(args[idx + 1], bot_update.EXP_NO_SYNC) def testApplyPatchOnGclient(self): ref = 'refs/changes/12/345/6' repo = 'https://chromium.googlesource.com/v8/v8' self.params['patch_refs'] = ['%s@%s' % (repo, ref)] bot_update.ensure_checkout(**self.params) args = self.gclient.records[0] idx = args.index('--patch-ref') self.assertEqual(args[idx + 1], self.params['patch_refs'][0]) self.assertNotIn('--patch-ref', args[idx + 1:]) # Assert we're not patching in bot_update.py for record in self.call.records: self.assertNotIn('git fetch ' + repo, ' '.join(record[0])) def testPatchRefs(self): self.params['patch_refs'] = [ 'https://chromium.googlesource.com/chromium/src@refs/changes/12/345/6', 'https://chromium.googlesource.com/v8/v8@refs/changes/1/234/56' ] bot_update.ensure_checkout(**self.params) args = self.gclient.records[0] patch_refs = set(args[i + 1] for i in range(len(args)) if args[i] == '--patch-ref' and i + 1 < len(args)) self.assertIn(self.params['patch_refs'][0], patch_refs) self.assertIn(self.params['patch_refs'][1], patch_refs) def testGitCheckoutBreaksLocks(self): self.overrideSetupForWindows() path = '/b/build/foo/build/.git' lockfile = 'index.lock' removed = [] old_os_walk = os.walk old_os_remove = os.remove setattr(os, 'walk', lambda _: [(path, None, [lockfile])]) setattr(os, 'remove', removed.append) bot_update.ensure_checkout(**self.params) setattr(os, 'walk', old_os_walk) setattr(os, 'remove', old_os_remove) self.assertTrue(os.path.join(path, lockfile) in removed) def testParsesRevisions(self): revisions = [ 'f671d3baeb64d9dba628ad582e867cf1aebc0207', 'src@deadbeef', 'https://foo.googlesource.com/bar@12345', 'bar@refs/experimental/test@example.com/test', ] expected_results = { 'root': 'f671d3baeb64d9dba628ad582e867cf1aebc0207', 'src': 'deadbeef', 'https://foo.googlesource.com/bar.git': '12345', 'bar': 'refs/experimental/test@example.com/test', } actual_results = bot_update.parse_revisions(revisions, 'root') self.assertEqual(expected_results, actual_results) class CallUnitTest(unittest.TestCase): def testCall(self): ret = bot_update.call(sys.executable, '-c', 'print(1)') self.assertEqual(u'1\n', ret) if __name__ == '__main__': unittest.main()