You cannot select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
depot_tools/tests/fake_repos.py

611 lines
18 KiB
Python

#!/usr/bin/python
# Copyright (c) 2010 The Chromium Authors. All rights reserved.
# Use of this source code is governed by a BSD-style license that can be
# found in the LICENSE file.
"""Generate fake repositories for testing."""
import atexit
import errno
import logging
import os
import pprint
import re
import stat
import subprocess
import sys
import time
import unittest
sys.path.append(os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
import scm
## Utility functions
def addKill():
"""Add kill() method to subprocess.Popen for python <2.6"""
if getattr(subprocess.Popen, 'kill', None):
return
if sys.platform == 'win32':
def kill_win(process):
import win32process
return win32process.TerminateProcess(process._handle, -1)
subprocess.Popen.kill = kill_win
else:
def kill_nix(process):
import signal
return os.kill(process.pid, signal.SIGKILL)
subprocess.Popen.kill = kill_nix
def rmtree(*path):
"""Recursively removes a directory, even if it's marked read-only.
Remove the directory located at *path, if it exists.
shutil.rmtree() doesn't work on Windows if any of the files or directories
are read-only, which svn repositories and some .svn files are. We need to
be able to force the files to be writable (i.e., deletable) as we traverse
the tree.
Even with all this, Windows still sometimes fails to delete a file, citing
a permission error (maybe something to do with antivirus scans or disk
indexing). The best suggestion any of the user forums had was to wait a
bit and try again, so we do that too. It's hand-waving, but sometimes it
works. :/
"""
file_path = os.path.join(*path)
if not os.path.exists(file_path):
return
def RemoveWithRetry_win(rmfunc, path):
os.chmod(path, stat.S_IWRITE)
if win32_api_avail:
win32api.SetFileAttributes(path, win32con.FILE_ATTRIBUTE_NORMAL)
try:
return rmfunc(path)
except EnvironmentError, e:
if e.errno != errno.EACCES:
raise
print 'Failed to delete %s: trying again' % repr(path)
time.sleep(0.1)
return rmfunc(path)
def RemoveWithRetry_non_win(rmfunc, path):
if os.path.islink(path):
return os.remove(path)
else:
return rmfunc(path)
win32_api_avail = False
remove_with_retry = None
if sys.platform.startswith('win'):
# Some people don't have the APIs installed. In that case we'll do without.
try:
win32api = __import__('win32api')
win32con = __import__('win32con')
win32_api_avail = True
except ImportError:
pass
remove_with_retry = RemoveWithRetry_win
else:
remove_with_retry = RemoveWithRetry_non_win
for root, dirs, files in os.walk(file_path, topdown=False):
# For POSIX: making the directory writable guarantees removability.
# Windows will ignore the non-read-only bits in the chmod value.
os.chmod(root, 0770)
for name in files:
remove_with_retry(os.remove, os.path.join(root, name))
for name in dirs:
remove_with_retry(os.rmdir, os.path.join(root, name))
remove_with_retry(os.rmdir, file_path)
def write(path, content):
f = open(path, 'wb')
f.write(content)
f.close()
join = os.path.join
def check_call(*args, **kwargs):
logging.debug(args[0])
subprocess.check_call(*args, **kwargs)
def Popen(*args, **kwargs):
kwargs.setdefault('stdout', subprocess.PIPE)
kwargs.setdefault('stderr', subprocess.STDOUT)
logging.debug(args[0])
return subprocess.Popen(*args, **kwargs)
def read_tree(tree_root):
"""Returns a dict of all the files in a tree. Defaults to self.root_dir."""
tree = {}
for root, dirs, files in os.walk(tree_root):
for d in filter(lambda x: x.startswith('.'), dirs):
dirs.remove(d)
for f in [join(root, f) for f in files if not f.startswith('.')]:
filepath = f[len(tree_root) + 1:].replace(os.sep, '/')
assert len(filepath), f
tree[filepath] = open(join(root, f), 'rU').read()
return tree
def dict_diff(dict1, dict2):
diff = {}
for k, v in dict1.iteritems():
if k not in dict2:
diff[k] = v
elif v != dict2[k]:
diff[k] = (v, dict2[k])
for k, v in dict2.iteritems():
if k not in dict1:
diff[k] = v
return diff
def commit_svn(repo):
"""Commits the changes and returns the new revision number."""
to_add = []
to_remove = []
for status, filepath in scm.SVN.CaptureStatus(repo):
if status[0] == '?':
to_add.append(filepath)
elif status[0] == '!':
to_remove.append(filepath)
if to_add:
check_call(['svn', 'add', '--no-auto-props', '-q'] + to_add, cwd=repo)
if to_remove:
check_call(['svn', 'remove', '-q'] + to_remove, cwd=repo)
proc = Popen(['svn', 'commit', repo, '-m', 'foo', '--non-interactive',
'--no-auth-cache', '--username', 'user1', '--password', 'foo'],
cwd=repo)
out, err = proc.communicate()
match = re.search(r'revision (\d+).', out)
if not match:
raise Exception('Commit failed', out, err, proc.returncode)
rev = match.group(1)
st = Popen(['svn', 'status'], cwd=repo).communicate()[0]
assert len(st) == 0, st
logging.debug('At revision %s' % rev)
return rev
def commit_git(repo):
"""Commits the changes and returns the new hash."""
check_call(['git', 'add', '-A', '-f'], cwd=repo)
check_call(['git', 'commit', '-q', '--message', 'foo'], cwd=repo)
rev = Popen(['git', 'show-ref', '--head', 'HEAD'],
cwd=repo).communicate()[0].split(' ', 1)[0]
logging.debug('At revision %s' % rev)
return rev
_FAKE_LOADED = False
class FakeRepos(object):
"""Generate both svn and git repositories to test gclient functionality.
Many DEPS functionalities need to be tested: Var, File, From, deps_os, hooks,
use_relative_paths.
And types of dependencies: Relative urls, Full urls, both svn and git."""
# Should leak the repositories.
SHOULD_LEAK = False
# Override if unhappy.
TRIAL_DIR = None
# Hostname
HOST = '127.0.0.1'
def __init__(self, trial_dir=None, leak=None, host=None):
global _FAKE_LOADED
if _FAKE_LOADED:
raise Exception('You can only start one FakeRepos at a time.')
_FAKE_LOADED = True
# Quick hack.
if '-v' in sys.argv:
logging.basicConfig(level=logging.DEBUG)
if '-l' in sys.argv:
self.SHOULD_LEAK = True
sys.argv.remove('-l')
elif leak is not None:
self.SHOULD_LEAK = leak
if host:
self.HOST = host
if trial_dir:
self.TRIAL_DIR = trial_dir
# Format is [ None, tree, tree, ...]
# i.e. revisions are 1-based.
self.svn_revs = [None]
# Format is { repo: [ None, (hash, tree), (hash, tree), ... ], ... }
# so reference looks like self.git_hashes[repo][rev][0] for hash and
# self.git_hashes[repo][rev][1] for it's tree snapshot.
# For consistency with self.svn_revs, it is 1-based too.
self.git_hashes = {}
self.svnserve = None
self.gitdaemon = None
self.common_init = False
def trial_dir(self):
if not self.TRIAL_DIR:
self.TRIAL_DIR = os.path.join(
os.path.dirname(os.path.abspath(__file__)), '_trial')
return self.TRIAL_DIR
def setUp(self):
"""All late initialization comes here.
Note that it deletes all trial_dir() and not only repos_dir."""
if not self.common_init:
self.common_init = True
self.repos_dir = os.path.join(self.trial_dir(), 'repos')
self.git_root = join(self.repos_dir, 'git')
self.svn_root = join(self.repos_dir, 'svn_checkout')
addKill()
rmtree(self.trial_dir())
os.makedirs(self.repos_dir)
atexit.register(self.tearDown)
def tearDown(self):
if self.svnserve:
logging.debug('Killing svnserve pid %s' % self.svnserve.pid)
self.svnserve.kill()
self.svnserve = None
if self.gitdaemon:
logging.debug('Killing git-daemon pid %s' % self.gitdaemon.pid)
self.gitdaemon.kill()
self.gitdaemon = None
if not self.SHOULD_LEAK:
logging.debug('Removing %s' % self.trial_dir())
rmtree(self.trial_dir())
def _genTree(self, root, tree_dict):
"""For a dictionary of file contents, generate a filesystem."""
if not os.path.isdir(root):
os.makedirs(root)
for (k, v) in tree_dict.iteritems():
k_os = k.replace('/', os.sep)
k_arr = k_os.split(os.sep)
if len(k_arr) > 1:
p = os.sep.join([root] + k_arr[:-1])
if not os.path.isdir(p):
os.makedirs(p)
if v is None:
os.remove(join(root, k))
else:
write(join(root, k), v)
def setUpSVN(self):
"""Creates subversion repositories and start the servers."""
if self.svnserve:
return
self.setUp()
root = join(self.repos_dir, 'svn')
check_call(['svnadmin', 'create', root])
write(join(root, 'conf', 'svnserve.conf'),
'[general]\n'
'anon-access = read\n'
'auth-access = write\n'
'password-db = passwd\n')
write(join(root, 'conf', 'passwd'),
'[users]\n'
'user1 = foo\n'
'user2 = bar\n')
# Start the daemon.
cmd = ['svnserve', '-d', '--foreground', '-r', self.repos_dir]
if self.HOST == '127.0.0.1':
cmd.append('--listen-host=127.0.0.1')
self.svnserve = Popen(cmd, cwd=root)
self.populateSvn()
def populateSvn(self):
"""Creates a few revisions of changes including DEPS files."""
# Repos
check_call(['svn', 'checkout', 'svn://127.0.0.1/svn', self.svn_root, '-q',
'--non-interactive', '--no-auth-cache',
'--username', 'user1', '--password', 'foo'])
assert os.path.isdir(join(self.svn_root, '.svn'))
def file_system(rev, DEPS):
fs = {
'origin': 'svn@%(rev)d\n',
'trunk/origin': 'svn/trunk@%(rev)d\n',
'trunk/src/origin': 'svn/trunk/src@%(rev)d\n',
'trunk/src/third_party/origin': 'svn/trunk/src/third_party@%(rev)d\n',
'trunk/other/origin': 'src/trunk/other@%(rev)d\n',
'trunk/third_party/origin': 'svn/trunk/third_party@%(rev)d\n',
'trunk/third_party/foo/origin': 'svn/trunk/third_party/foo@%(rev)d\n',
'trunk/third_party/prout/origin': 'svn/trunk/third_party/foo@%(rev)d\n',
}
for k in fs.iterkeys():
fs[k] = fs[k] % { 'rev': rev }
fs['trunk/src/DEPS'] = DEPS
return fs
# Testing:
# - dependency disapear
# - dependency renamed
# - versioned and unversioned reference
# - relative and full reference
# - deps_os
# - var
# - hooks
# TODO(maruel):
# - File
# - $matching_files
# - use_relative_paths
self._commit_svn(file_system(1, """
vars = {
'DummyVariable': 'third_party',
}
deps = {
'src/other': 'svn://%(host)s/svn/trunk/other',
'src/third_party/fpp': '/trunk/' + Var('DummyVariable') + '/foo',
}
deps_os = {
'mac': {
'src/third_party/prout': '/trunk/third_party/prout',
},
}""" % { 'host': self.HOST }))
self._commit_svn(file_system(2, """
deps = {
'src/other': 'svn://%(host)s/svn/trunk/other',
'src/third_party/foo': '/trunk/third_party/foo@1',
}
# I think this is wrong to have the hooks run from the base of the gclient
# checkout. It's maybe a bit too late to change that behavior.
hooks = [
{
'pattern': '.',
'action': ['python', '-c',
'open(\\'src/svn_hooked1\\', \\'w\\').write(\\'svn_hooked1\\')'],
},
{
# Should not be run.
'pattern': 'nonexistent',
'action': ['python', '-c',
'open(\\'src/svn_hooked2\\', \\'w\\').write(\\'svn_hooked2\\')'],
},
]
""" % { 'host': self.HOST }))
def setUpGIT(self):
"""Creates git repositories and start the servers."""
if self.gitdaemon:
return True
self.setUp()
if sys.platform == 'win32':
return False
for repo in ['repo_%d' % r for r in range(1, 5)]:
check_call(['git', 'init', '-q', join(self.git_root, repo)])
self.git_hashes[repo] = [None]
# Testing:
# - dependency disapear
# - dependency renamed
# - versioned and unversioned reference
# - relative and full reference
# - deps_os
# - var
# - hooks
# TODO(maruel):
# - File
# - $matching_files
# - use_relative_paths
self._commit_git('repo_1', {
'DEPS': """
vars = {
'DummyVariable': 'repo',
}
deps = {
'src/repo2': 'git://%(host)s/git/repo_2',
'src/repo2/repo3': '/' + Var('DummyVariable') + '_3',
}
deps_os = {
'mac': {
'src/repo4': '/repo_4',
},
}""" % { 'host': self.HOST },
'origin': 'git/repo_1@1\n',
})
self._commit_git('repo_2', {
'origin': "git/repo_2@1\n"
})
self._commit_git('repo_2', {
'origin': "git/repo_2@2\n"
})
self._commit_git('repo_3', {
'origin': "git/repo_3@1\n"
})
self._commit_git('repo_3', {
'origin': "git/repo_3@2\n"
})
self._commit_git('repo_4', {
'origin': "git/repo_4@1\n"
})
self._commit_git('repo_4', {
'origin': "git/repo_4@2\n"
})
self._commit_git('repo_1', {
'DEPS': """
deps = {
'src/repo2': 'git://%(host)s/git/repo_2@%(hash)s',
'src/repo2/repo_renamed': '/repo_3',
}
# I think this is wrong to have the hooks run from the base of the gclient
# checkout. It's maybe a bit too late to change that behavior.
hooks = [
{
'pattern': '.',
'action': ['python', '-c',
'open(\\'src/git_hooked1\\', \\'w\\').write(\\'git_hooked1\\')'],
},
{
# Should not be run.
'pattern': 'nonexistent',
'action': ['python', '-c',
'open(\\'src/git_hooked2\\', \\'w\\').write(\\'git_hooked2\\')'],
},
]
""" % {
'host': self.HOST,
# See self.__init__() for the format. Grab's the hash of the first
# commit in repo_2. Only keep the first 7 character because of:
# TODO(maruel): http://crosbug.com/3591 We need to strip the hash.. duh.
'hash': self.git_hashes['repo_2'][1][0][:7]
},
'origin': "git/repo_1@2\n"
})
# Start the daemon.
cmd = ['git', 'daemon', '--export-all', '--base-path=' + self.repos_dir]
if self.HOST == '127.0.0.1':
cmd.append('--listen=127.0.0.1')
logging.debug(cmd)
self.gitdaemon = Popen(cmd, cwd=self.repos_dir)
return True
def _commit_svn(self, tree):
self._genTree(self.svn_root, tree)
commit_svn(self.svn_root)
if self.svn_revs and self.svn_revs[-1]:
new_tree = self.svn_revs[-1].copy()
new_tree.update(tree)
else:
new_tree = tree.copy()
self.svn_revs.append(new_tree)
def _commit_git(self, repo, tree):
repo_root = join(self.git_root, repo)
self._genTree(repo_root, tree)
hash = commit_git(repo_root)
if self.git_hashes[repo][-1]:
new_tree = self.git_hashes[repo][-1][1].copy()
new_tree.update(tree)
else:
new_tree = tree.copy()
self.git_hashes[repo].append((hash, new_tree))
class FakeReposTestBase(unittest.TestCase):
"""This is vaguely inspired by twisted."""
# Replace this in your subclass.
CLASS_ROOT_DIR = None
# static FakeRepos instance.
FAKE_REPOS = FakeRepos()
def setUp(self):
unittest.TestCase.setUp(self)
self.FAKE_REPOS.setUp()
# Remove left overs and start fresh.
if not self.CLASS_ROOT_DIR:
self.CLASS_ROOT_DIR = join(self.FAKE_REPOS.trial_dir(), 'smoke')
self.root_dir = join(self.CLASS_ROOT_DIR, self.id())
rmtree(self.root_dir)
os.makedirs(self.root_dir)
self.svn_base = 'svn://%s/svn/' % self.FAKE_REPOS.HOST
self.git_base = 'git://%s/git/' % self.FAKE_REPOS.HOST
def tearDown(self):
if not self.FAKE_REPOS.SHOULD_LEAK:
rmtree(self.root_dir)
def checkString(self, expected, result, msg=None):
"""Prints the diffs to ease debugging."""
if expected != result:
# Strip the begining
while expected and result and expected[0] == result[0]:
expected = expected[1:]
result = result[1:]
# The exception trace makes it hard to read so dump it too.
if '\n' in result:
print result
self.assertEquals(expected, result, msg)
def check(self, expected, results):
"""Checks stdout, stderr, retcode."""
self.checkString(expected[0], results[0])
self.checkString(expected[1], results[1])
self.assertEquals(expected[2], results[2])
def assertTree(self, tree, tree_root=None):
"""Diff the checkout tree with a dict."""
if not tree_root:
tree_root = self.root_dir
actual = read_tree(tree_root)
diff = dict_diff(tree, actual)
if diff:
logging.debug('Actual %s\n%s' % (tree_root, pprint.pformat(actual)))
logging.debug('Expected\n%s' % pprint.pformat(tree))
logging.debug('Diff\n%s' % pprint.pformat(diff))
self.assertEquals(diff, [])
def mangle_svn_tree(self, *args):
"""Creates a 'virtual directory snapshot' to compare with the actual result
on disk."""
result = {}
for item, new_root in args:
old_root, rev = item.split('@', 1)
tree = self.FAKE_REPOS.svn_revs[int(rev)]
for k, v in tree.iteritems():
if not k.startswith(old_root):
continue
result[join(new_root, k[len(old_root) + 1:]).replace(os.sep, '/')] = v
return result
def mangle_git_tree(self, *args):
"""Creates a 'virtual directory snapshot' to compare with the actual result
on disk."""
result = {}
for item, new_root in args:
repo, rev = item.split('@', 1)
tree = self.gittree(repo, rev)
for k, v in tree.iteritems():
result[join(new_root, k)] = v
return result
def githash(self, repo, rev):
"""Sort-hand: Returns the hash for a git 'revision'."""
return self.FAKE_REPOS.git_hashes[repo][int(rev)][0]
def gittree(self, repo, rev):
"""Sort-hand: returns the directory tree for a git 'revision'."""
return self.FAKE_REPOS.git_hashes[repo][int(rev)][1]
def main(argv):
fake = FakeRepos()
print 'Using %s' % fake.trial_dir()
try:
fake.setUp()
print('Fake setup, press enter to quit or Ctrl-C to keep the checkouts.')
sys.stdin.readline()
except KeyboardInterrupt:
fake.SHOULD_LEAK = True
return 0
if __name__ == '__main__':
sys.exit(main(sys.argv))