diff --git a/gclient.py b/gclient.py index 741d3ebfa..aa80b8660 100644 --- a/gclient.py +++ b/gclient.py @@ -59,7 +59,6 @@ import pprint import re import subprocess import sys -import threading import urlparse import urllib @@ -80,110 +79,6 @@ def attr(attr, data): ## GClient implementation. -class WorkItem(object): - """One work item.""" - requirements = [] - name = None - - def run(self): - pass - - -class ExecutionQueue(object): - """Dependencies sometime needs to be run out of order due to From() keyword. - - This class manages that all the required dependencies are run before running - each one. - - Methods of this class are multithread safe. - """ - def __init__(self, progress): - self.lock = threading.Lock() - # List of Dependency. - self.queued = [] - # List of strings representing each Dependency.name that was run. - self.ran = [] - # List of items currently running. - self.running = [] - self.progress = progress - if self.progress: - self.progress.update() - - def enqueue(self, d): - """Enqueue one Dependency to be executed later once its requirements are - satisfied. - """ - assert isinstance(d, WorkItem) - try: - self.lock.acquire() - self.queued.append(d) - total = len(self.queued) + len(self.ran) + len(self.running) - finally: - self.lock.release() - if self.progress: - self.progress._total = total + 1 - self.progress.update(0) - - def flush(self, *args, **kwargs): - """Runs all enqueued items until all are executed.""" - while self._run_one_item(*args, **kwargs): - pass - queued = [] - running = [] - try: - self.lock.acquire() - if self.queued: - queued = self.queued - self.queued = [] - if self.running: - running = self.running - self.running = [] - finally: - self.lock.release() - if self.progress: - self.progress.end() - if queued: - raise gclient_utils.Error('Entries still queued: %s' % str(queued)) - if running: - raise gclient_utils.Error('Entries still queued: %s' % str(running)) - - def _run_one_item(self, *args, **kwargs): - """Removes one item from the queue that has all its requirements completed - and execute it. - - Returns False if no item could be run. - """ - i = 0 - d = None - try: - self.lock.acquire() - while i != len(self.queued) and not d: - d = self.queued.pop(i) - for r in d.requirements: - if not r in self.ran: - self.queued.insert(i, d) - d = None - break - i += 1 - if not d: - return False - self.running.append(d) - finally: - self.lock.release() - d.run(*args, **kwargs) - try: - self.lock.acquire() - # TODO(maruel): http://crbug.com/51711 - #assert not d.name in self.ran - if not d.name in self.ran: - self.ran.append(d.name) - self.running.remove(d) - if self.progress: - self.progress.update(1) - finally: - self.lock.release() - return True - class GClientKeywords(object): class FromImpl(object): @@ -239,7 +134,7 @@ class GClientKeywords(object): raise gclient_utils.Error("Var is not defined: %s" % var_name) -class Dependency(GClientKeywords, WorkItem): +class Dependency(GClientKeywords, gclient_utils.WorkItem): """Object that represents a dependency checkout.""" DEPS_FILE = 'DEPS' @@ -815,7 +710,7 @@ solutions = [ pm = None if command == 'update' and not self._options.verbose: pm = Progress('Syncing projects', 1) - work_queue = ExecutionQueue(pm) + work_queue = gclient_utils.ExecutionQueue(pm) for s in self.dependencies: work_queue.enqueue(s) work_queue.flush(self._options, revision_overrides, command, args, @@ -860,7 +755,7 @@ solutions = [ if not self.dependencies: raise gclient_utils.Error('No solution specified') # Load all the settings. - work_queue = ExecutionQueue(None) + work_queue = gclient_utils.ExecutionQueue(None) for s in self.dependencies: work_queue.enqueue(s) work_queue.flush(self._options, {}, None, [], work_queue) diff --git a/gclient_utils.py b/gclient_utils.py index 5fd2ef27a..c530f4cb4 100644 --- a/gclient_utils.py +++ b/gclient_utils.py @@ -22,6 +22,7 @@ import stat import subprocess import sys import time +import threading import xml.dom.minidom import xml.parsers.expat @@ -363,3 +364,109 @@ def GetGClientRootAndEntries(path=None): execfile(config_path, env) config_dir = os.path.dirname(config_path) return config_dir, env['entries'] + + +class WorkItem(object): + """One work item.""" + # A list of string, each being a WorkItem name. + requirements = [] + # A unique string representing this work item. + name = None + + def run(self): + pass + + +class ExecutionQueue(object): + """Dependencies sometime needs to be run out of order due to From() keyword. + + This class manages that all the required dependencies are run before running + each one. + + Methods of this class are multithread safe. + """ + def __init__(self, progress): + self.lock = threading.Lock() + # List of WorkItem, Dependency inherits from WorkItem. + self.queued = [] + # List of strings representing each Dependency.name that was run. + self.ran = [] + # List of items currently running. + self.running = [] + self.progress = progress + if self.progress: + self.progress.update() + + def enqueue(self, d): + """Enqueue one Dependency to be executed later once its requirements are + satisfied. + """ + assert isinstance(d, WorkItem) + try: + self.lock.acquire() + self.queued.append(d) + total = len(self.queued) + len(self.ran) + len(self.running) + finally: + self.lock.release() + if self.progress: + self.progress._total = total + 1 + self.progress.update(0) + + def flush(self, *args, **kwargs): + """Runs all enqueued items until all are executed.""" + while self._run_one_item(*args, **kwargs): + pass + queued = [] + running = [] + try: + self.lock.acquire() + if self.queued: + queued = self.queued + self.queued = [] + if self.running: + running = self.running + self.running = [] + finally: + self.lock.release() + if self.progress: + self.progress.end() + if queued: + raise gclient_utils.Error('Entries still queued: %s' % str(queued)) + if running: + raise gclient_utils.Error('Entries still queued: %s' % str(running)) + + def _run_one_item(self, *args, **kwargs): + """Removes one item from the queue that has all its requirements completed + and execute it. + + Returns False if no item could be run. + """ + i = 0 + d = None + try: + self.lock.acquire() + while i != len(self.queued) and not d: + d = self.queued.pop(i) + for r in d.requirements: + if not r in self.ran: + self.queued.insert(i, d) + d = None + break + i += 1 + if not d: + return False + self.running.append(d) + finally: + self.lock.release() + d.run(*args, **kwargs) + try: + self.lock.acquire() + assert not d.name in self.ran + if not d.name in self.ran: + self.ran.append(d.name) + self.running.remove(d) + if self.progress: + self.progress.update(1) + finally: + self.lock.release() + return True diff --git a/tests/gclient_utils_test.py b/tests/gclient_utils_test.py index f69143356..b214172ba 100755 --- a/tests/gclient_utils_test.py +++ b/tests/gclient_utils_test.py @@ -15,14 +15,15 @@ class GclientUtilsUnittest(SuperMoxTestBase): """General gclient_utils.py tests.""" def testMembersChanged(self): members = [ - 'CheckCall', 'CheckCallError', 'Error', 'FileRead', 'FileWrite', - 'FindFileUpwards', 'FindGclientRoot', 'GetGClientRootAndEntries', - 'GetNamedNodeText', 'GetNodeNamedAttributeText', - 'PathDifference', 'ParseXML', 'PrintableObject', 'RemoveDirectory', - 'SplitUrlRevision', 'SubprocessCall', 'SubprocessCallAndFilter', - 'SyntaxErrorToError', - 'errno', 'logging', 'os', 're', 'stat', 'subprocess', 'sys', 'time', - 'xml', + 'CheckCall', 'CheckCallError', 'Error', 'ExecutionQueue', 'FileRead', + 'FileWrite', 'FindFileUpwards', 'FindGclientRoot', + 'GetGClientRootAndEntries', 'GetNamedNodeText', + 'GetNodeNamedAttributeText', 'PathDifference', 'ParseXML', + 'PrintableObject', 'RemoveDirectory', 'SplitUrlRevision', + 'SubprocessCall', 'SubprocessCallAndFilter', 'SyntaxErrorToError', + 'WorkItem', + 'errno', 'logging', 'os', 're', 'stat', 'subprocess', 'sys', + 'threading', 'time', 'xml', ] # If this test fails, you should add the relevant test. self.compareMembers(gclient_utils, members)