|
|
|
# Copyright (c) 2012 The Chromium Authors. All rights reserved.
|
|
|
|
# Use of this source code is governed by a BSD-style license that can be
|
|
|
|
# found in the LICENSE file.
|
|
|
|
"""SCM-specific utility classes."""
|
|
|
|
|
|
|
|
from __future__ import annotations
|
|
|
|
|
|
|
|
import abc
|
|
|
|
import contextlib
|
|
|
|
import os
|
|
|
|
import pathlib
|
|
|
|
import platform
|
|
|
|
import re
|
|
|
|
import threading
|
|
|
|
|
|
|
|
from collections import defaultdict
|
|
|
|
from itertools import chain
|
|
|
|
from typing import Any
|
|
|
|
from typing import Collection, Iterable, Iterator, Literal, Dict
|
|
|
|
from typing import Optional, Sequence, Mapping
|
|
|
|
|
|
|
|
import gclient_utils
|
|
|
|
import git_common
|
|
|
|
import subprocess2
|
|
|
|
|
|
|
|
# TODO: Should fix these warnings.
|
|
|
|
# pylint: disable=line-too-long
|
|
|
|
|
|
|
|
# constants used to identify the tree state of a directory.
|
|
|
|
VERSIONED_NO = 0
|
|
|
|
VERSIONED_DIR = 1
|
|
|
|
VERSIONED_SUBMODULE = 2
|
|
|
|
|
|
|
|
|
|
|
|
def determine_scm(root):
|
|
|
|
"""Similar to upload.py's version but much simpler.
|
|
|
|
|
|
|
|
Returns 'git' or 'diff'.
|
|
|
|
"""
|
|
|
|
if os.path.isdir(os.path.join(root, '.git')):
|
|
|
|
return 'git'
|
|
|
|
|
|
|
|
try:
|
|
|
|
subprocess2.check_call(['git', 'rev-parse', '--show-cdup'],
|
|
|
|
stdout=subprocess2.DEVNULL,
|
|
|
|
stderr=subprocess2.DEVNULL,
|
|
|
|
cwd=root)
|
|
|
|
return 'git'
|
|
|
|
except (OSError, subprocess2.CalledProcessError):
|
|
|
|
return 'diff'
|
|
|
|
|
|
|
|
|
|
|
|
GitConfigScope = Literal['system', 'global', 'local', 'worktree']
|
|
|
|
GitScopeOrder: list[GitConfigScope] = ['system', 'global', 'local', 'worktree']
|
|
|
|
GitFlatConfigData = Mapping[str, Mapping[str, Sequence[str]]]
|
|
|
|
|
|
|
|
|
|
|
|
class GitConfigStateBase(metaclass=abc.ABCMeta):
|
|
|
|
"""GitConfigStateBase is the abstract base class for implementations of
|
|
|
|
CachedGitConfigState.
|
|
|
|
|
|
|
|
This is meant to model the manipulation of some underlying config data.
|
|
|
|
|
|
|
|
In GitConfigStateReal, this is modeled using `git config` commands in
|
|
|
|
a specific git repo.
|
|
|
|
|
|
|
|
In GitConfigStateTest, this is modeled using a set of GitConfigScope-indexed
|
|
|
|
dictionaries.
|
|
|
|
|
|
|
|
Implementations MUST ensure that all keys returned in load_config are
|
|
|
|
already canonicalized, and implementations MUST accept non-canonical keys to
|
|
|
|
set_* and unset_* methods.
|
|
|
|
"""
|
|
|
|
|
|
|
|
@abc.abstractmethod
|
|
|
|
def load_config(self) -> GitFlatConfigData:
|
|
|
|
"""When invoked, this should return the full state of the configuration
|
|
|
|
observable.
|
|
|
|
|
|
|
|
The caller must not mutate the returned value.
|
|
|
|
|
|
|
|
Implementations MUST ensure that all keys returned in load_config are
|
|
|
|
already canonicalized.
|
|
|
|
"""
|
|
|
|
|
|
|
|
@abc.abstractmethod
|
|
|
|
def set_config(self, key: str, value: str, *, append: bool,
|
|
|
|
scope: GitConfigScope):
|
|
|
|
"""When invoked, this should set `key` to a singluar `value` in the git
|
|
|
|
scope `scope` in this state's underlying data.
|
|
|
|
|
|
|
|
If `append` is True, this should add an additional value to the existing
|
|
|
|
`key`, if any.
|
|
|
|
|
|
|
|
Implementations MUST accept non-canonical `key` values.
|
|
|
|
"""
|
|
|
|
|
|
|
|
@abc.abstractmethod
|
|
|
|
def set_config_multi(self, key: str, value: str, *,
|
|
|
|
value_pattern: Optional[str], scope: GitConfigScope):
|
|
|
|
"""When invoked, this should replace all existing values of `key` with
|
|
|
|
`value` in the git scope `scope` in this state's underlying data.
|
|
|
|
|
|
|
|
If `value_pattern` is supplied, only existing values matching this
|
|
|
|
pattern will be replaced.
|
|
|
|
|
|
|
|
TODO: Make value_pattern an re.Pattern. This wasn't done at the time of
|
|
|
|
this refactor to keep the refactor small.
|
|
|
|
|
|
|
|
Implementations MUST accept non-canonical `key` values.
|
|
|
|
"""
|
|
|
|
|
|
|
|
@abc.abstractmethod
|
|
|
|
def unset_config(self, key: str, *, scope: GitConfigScope,
|
|
|
|
missing_ok: bool):
|
|
|
|
"""When invoked, remove a singlar value from `key` in this state's underlying data.
|
|
|
|
|
|
|
|
If missing_ok is False and `key` is not present in the given scope, this
|
|
|
|
must raise GitConfigUnsetMissingValue with `key` and `scope`.
|
|
|
|
|
|
|
|
If `key` is multi-valued in this scope, this must raise
|
|
|
|
GitConfigUnsetMultipleValues with `key` and `scope`.
|
|
|
|
|
|
|
|
Implementations MUST accept non-canonical `key` values.
|
|
|
|
"""
|
|
|
|
|
|
|
|
@abc.abstractmethod
|
|
|
|
def unset_config_multi(self, key: str, *, value_pattern: Optional[str],
|
|
|
|
scope: GitConfigScope, missing_ok: bool):
|
|
|
|
"""When invoked, remove all values from `key` in this state's underlying data.
|
|
|
|
|
|
|
|
If `value_pattern` is supplied, only values matching this pattern will
|
|
|
|
be removed.
|
|
|
|
|
|
|
|
If missing_ok is False and `key` is not present in the given scope, this
|
|
|
|
must raise GitConfigUnsetMissingValue with `key` and `scope`.
|
|
|
|
|
|
|
|
TODO: Make value_pattern an re.Pattern. This wasn't done at the time of
|
|
|
|
this refactor to keep the refactor small.
|
|
|
|
|
|
|
|
Implementations MUST accept non-canonical `key` values.
|
|
|
|
"""
|
|
|
|
|
|
|
|
|
|
|
|
class GitConfigUnsetMissingValue(ValueError):
|
|
|
|
|
|
|
|
def __init__(self, key: str, scope: str) -> None:
|
|
|
|
super().__init__(
|
|
|
|
f'Cannot unset missing key {key!r} in scope {scope!r} with missing_ok=False.'
|
|
|
|
)
|
|
|
|
|
|
|
|
|
|
|
|
class GitConfigUnsetMultipleValues(ValueError):
|
|
|
|
|
|
|
|
def __init__(self, key: str, scope: str) -> None:
|
|
|
|
super().__init__(
|
|
|
|
f'Cannot unset multi-value key {key!r} in scope {scope!r} with modify_all=False.'
|
|
|
|
)
|
|
|
|
|
|
|
|
|
|
|
|
class GitConfigUneditableScope(ValueError):
|
|
|
|
|
|
|
|
def __init__(self, scope: str) -> None:
|
|
|
|
super().__init__(f'Cannot edit git config in scope {scope!r}.')
|
|
|
|
|
|
|
|
|
|
|
|
class GitConfigUnknownScope(ValueError):
|
|
|
|
|
|
|
|
def __init__(self, scope: str) -> None:
|
|
|
|
super().__init__(f'Unknown git config scope {scope!r}.')
|
|
|
|
|
|
|
|
|
|
|
|
class GitConfigInvalidKey(ValueError):
|
|
|
|
|
|
|
|
def __init__(self, key: str) -> None:
|
|
|
|
super().__init__(
|
|
|
|
f'Invalid git config key {key!r}: does not contain a section.')
|
|
|
|
|
|
|
|
|
|
|
|
def canonicalize_git_config_key(key: str) -> str:
|
|
|
|
"""Returns the canonicalized form of `key` for git config.
|
|
|
|
|
|
|
|
Git config internally canonicalizes keys (i.e. for
|
|
|
|
'section.subsection.variable', both 'section' and 'variable' will be
|
|
|
|
lowercased, but 'subsection' will not).
|
|
|
|
|
|
|
|
This also normalizes keys in the form 'section.variable' (both 'section' and
|
|
|
|
'variable' will be lowercased).
|
|
|
|
"""
|
|
|
|
sections = key.split('.')
|
|
|
|
if len(sections) >= 3:
|
|
|
|
return '.'.join(
|
|
|
|
chain((sections[0].lower(), ), sections[1:-1],
|
|
|
|
(sections[-1].lower(), )))
|
|
|
|
if len(sections) == 2:
|
|
|
|
return '.'.join((sections[0].lower(), sections[1].lower()))
|
|
|
|
raise GitConfigInvalidKey(key)
|
|
|
|
|
|
|
|
|
|
|
|
class CachedGitConfigState(object):
|
|
|
|
"""This represents the observable git configuration state for a given
|
|
|
|
repository (whose top-level path is `root`).
|
|
|
|
|
|
|
|
This maintains an in-memory cache of the entire, flattened, observable
|
|
|
|
configuration state according to the GitConfigStateBase implementation.
|
|
|
|
|
|
|
|
All SetConfig operations which actually change the underlying data will
|
|
|
|
clear the internal cache. All read operations will either use the internal
|
|
|
|
cache, or repopulate it from the GitConfigStateBase implementation
|
|
|
|
on-demand.
|
|
|
|
|
|
|
|
This design assumes no other processes are mutating git config state, which
|
|
|
|
is typically true for git_cl and other short-lived programs in depot_tools
|
|
|
|
which use scm.py.
|
|
|
|
"""
|
|
|
|
|
|
|
|
def __init__(self, impl: GitConfigStateBase):
|
|
|
|
"""Initializes a git config cache against the given underlying
|
|
|
|
GitConfigStateBase (either GitConfigStateReal or GitConfigStateTest).
|
|
|
|
"""
|
|
|
|
self._impl: GitConfigStateBase = impl
|
|
|
|
|
|
|
|
# Actual cached configuration from the point of view of this root.
|
|
|
|
self._config: Optional[GitFlatConfigData] = None
|
|
|
|
|
|
|
|
def _maybe_load_config(self) -> GitFlatConfigData:
|
|
|
|
if self._config is None:
|
|
|
|
# NOTE: Implementations of self._impl must already ensure that all
|
|
|
|
# keys are canonicalized.
|
|
|
|
self._config = self._impl.load_config()
|
|
|
|
return self._config
|
|
|
|
|
|
|
|
def clear_cache(self):
|
|
|
|
self._config = None
|
|
|
|
|
|
|
|
def GetConfig(self,
|
|
|
|
key: str,
|
|
|
|
default: Optional[str] = None,
|
|
|
|
scope: Optional[str] = None) -> Optional[str]:
|
|
|
|
"""Lazily loads all configration observable for this CachedGitConfigState,
|
|
|
|
then returns the last value for `key` as a string.
|
|
|
|
|
|
|
|
If `key` is missing, returns default.
|
|
|
|
"""
|
|
|
|
|
|
|
|
key = canonicalize_git_config_key(key)
|
|
|
|
if not scope:
|
|
|
|
scope = "default"
|
|
|
|
|
|
|
|
scoped_config = self._maybe_load_config()
|
|
|
|
if not scoped_config:
|
|
|
|
return default
|
|
|
|
|
|
|
|
scoped_config = scoped_config.get(scope, None)
|
|
|
|
if not scoped_config:
|
|
|
|
return default
|
|
|
|
|
|
|
|
values = scoped_config.get(key, None)
|
|
|
|
|
|
|
|
if not values:
|
|
|
|
return default
|
|
|
|
return values[-1]
|
|
|
|
|
|
|
|
def GetConfigBool(self, key: str) -> bool:
|
|
|
|
"""Returns the booleanized value of `key`.
|
|
|
|
|
|
|
|
This follows `git config` semantics (i.e. it normalizes the string value
|
|
|
|
of the config value to "true" - all other string values return False).
|
|
|
|
"""
|
|
|
|
return self.GetConfig(key) == 'true'
|
|
|
|
|
|
|
|
def GetConfigList(self, key: str) -> list[str]:
|
|
|
|
"""Returns all values of `key` as a list of strings."""
|
|
|
|
key = canonicalize_git_config_key(key)
|
|
|
|
return list(self._maybe_load_config().get('default', {}).get(key, ()))
|
|
|
|
|
|
|
|
def YieldConfigRegexp(self,
|
|
|
|
pattern: Optional[str] = None
|
|
|
|
) -> Iterable[tuple[str, str]]:
|
|
|
|
"""Yields (key, value) pairs for any config keys matching `pattern`.
|
|
|
|
|
|
|
|
This use re.match, so `pattern` needs to be for the entire config key.
|
|
|
|
|
|
|
|
If `pattern` is None, this returns all config items.
|
|
|
|
|
|
|
|
Note that `pattern` is always matched against the canonicalized key
|
|
|
|
value (i.e. for 'section.[subsection.]variable', both 'section' and
|
|
|
|
'variable' will be lowercased, but 'subsection', if present, will not).
|
|
|
|
"""
|
|
|
|
if pattern is None:
|
|
|
|
pred = lambda _: True
|
|
|
|
else:
|
|
|
|
pred = re.compile(pattern).match
|
|
|
|
for key, values in sorted(self._maybe_load_config().get('default',
|
|
|
|
{}).items()):
|
|
|
|
if pred(key):
|
|
|
|
for value in values:
|
|
|
|
yield key, value
|
|
|
|
|
|
|
|
def SetConfig(self,
|
|
|
|
key,
|
|
|
|
value=None,
|
|
|
|
*,
|
|
|
|
append: bool = False,
|
|
|
|
missing_ok: bool = True,
|
|
|
|
modify_all: bool = False,
|
|
|
|
scope: GitConfigScope = 'local',
|
|
|
|
value_pattern: Optional[str] = None):
|
|
|
|
"""Sets or unsets one or more config values.
|
|
|
|
|
|
|
|
Args:
|
|
|
|
cwd: path to set `git config` for.
|
|
|
|
key: The specific config key to affect.
|
|
|
|
value: The value to set. If this is None, `key` will be unset.
|
|
|
|
append: If True and `value` is not None, this will append
|
|
|
|
the value instead of replacing an existing one. Must not be
|
|
|
|
specified with value_pattern.
|
|
|
|
missing_ok: If `value` is None (i.e. this is an unset operation),
|
|
|
|
ignore retcode=5 from `git config` (meaning that the value is
|
|
|
|
not present). If `value` is not None, then this option has no
|
|
|
|
effect. If this is false and the key is missing, this will raise
|
|
|
|
GitConfigUnsetMissingValue.
|
|
|
|
modify_all: If True, this will change a set operation to
|
|
|
|
`--replace-all`, and will change an unset operation to
|
|
|
|
`--unset-all`. Must not be specified with value_pattern.
|
|
|
|
scope: By default this is the `local` scope, but could be `global`
|
|
|
|
or `worktree`, depending on which config scope you want to affect.
|
|
|
|
Note that the `system` scope cannot be modified.
|
|
|
|
value_pattern: For use with `modify_all=True`, allows
|
|
|
|
further filtering of the set or unset operation based on
|
|
|
|
the currently configured value. Ignored for
|
|
|
|
`modify_all=False`.
|
|
|
|
"""
|
|
|
|
if scope not in GitScopeOrder:
|
|
|
|
raise GitConfigUnknownScope(scope)
|
|
|
|
if scope == 'system':
|
|
|
|
raise GitConfigUneditableScope(scope)
|
|
|
|
|
|
|
|
if value is None:
|
|
|
|
if modify_all:
|
|
|
|
self._impl.unset_config_multi(key,
|
|
|
|
value_pattern=value_pattern,
|
|
|
|
scope=scope,
|
|
|
|
missing_ok=missing_ok)
|
|
|
|
else:
|
|
|
|
self._impl.unset_config(key, scope=scope, missing_ok=missing_ok)
|
|
|
|
else:
|
|
|
|
if value_pattern:
|
|
|
|
if not modify_all:
|
|
|
|
raise ValueError(
|
|
|
|
'SetConfig with (value_pattern) and (not modify_all) is invalid.'
|
|
|
|
)
|
|
|
|
if append:
|
|
|
|
raise ValueError(
|
|
|
|
'SetConfig with (value_pattern) and (append) is invalid.'
|
|
|
|
)
|
|
|
|
|
|
|
|
self._impl.set_config_multi(key,
|
|
|
|
value,
|
|
|
|
value_pattern=value_pattern,
|
|
|
|
scope=scope)
|
|
|
|
else:
|
|
|
|
if modify_all:
|
|
|
|
self._impl.set_config_multi(key,
|
|
|
|
value,
|
|
|
|
value_pattern=None,
|
|
|
|
scope=scope)
|
|
|
|
|
|
|
|
self._impl.set_config(key, value, append=append, scope=scope)
|
|
|
|
|
|
|
|
# Once the underlying storage has set the value, we clear our cache so
|
|
|
|
# the next getter will reload it.
|
|
|
|
self.clear_cache()
|
|
|
|
|
|
|
|
|
|
|
|
class GitConfigStateReal(GitConfigStateBase):
|
|
|
|
"""GitConfigStateReal implements CachedGitConfigState by actually interacting with
|
|
|
|
the git configuration files on disk via GIT.Capture.
|
|
|
|
"""
|
|
|
|
|
|
|
|
_GLOBAL_LOCK = threading.Lock()
|
|
|
|
|
|
|
|
def __init__(self, root: pathlib.Path):
|
|
|
|
super().__init__()
|
|
|
|
self.root = root
|
|
|
|
|
|
|
|
def load_config(self) -> GitFlatConfigData:
|
|
|
|
# NOTE: `git config --list` already canonicalizes keys.
|
|
|
|
try:
|
|
|
|
gitConfigCmd = ['config', '--list', '-z', '--show-scope']
|
|
|
|
if git_common.get_git_version() <= (2, 25):
|
|
|
|
gitConfigCmd = ['config', '--list', '-z']
|
|
|
|
|
|
|
|
rawConfig = GIT.Capture(gitConfigCmd,
|
|
|
|
cwd=self.root,
|
|
|
|
strip_out=False)
|
|
|
|
except subprocess2.CalledProcessError:
|
|
|
|
return {}
|
|
|
|
|
|
|
|
assert isinstance(rawConfig, str)
|
|
|
|
cfg: Dict[str, Dict[str,
|
|
|
|
list[str]]] = defaultdict(lambda: defaultdict(list))
|
|
|
|
|
|
|
|
entries = rawConfig.split('\x00')[:-1]
|
|
|
|
|
|
|
|
def process_entry(entry: str, scope: str) -> None:
|
|
|
|
parts = entry.split('\n', 1)
|
|
|
|
key, value = parts if len(parts) == 2 else (parts[0], '')
|
|
|
|
key, value = key.strip(), value.strip()
|
|
|
|
cfg[scope][key].append(value)
|
|
|
|
if scope != "default":
|
|
|
|
cfg["default"][key].append(value)
|
|
|
|
|
|
|
|
i = 0
|
|
|
|
while i < len(entries):
|
|
|
|
if entries[i] in ['local', 'global', 'system']:
|
|
|
|
scope = entries[i]
|
|
|
|
i += 1
|
|
|
|
if i < len(entries):
|
|
|
|
process_entry(entries[i], scope)
|
|
|
|
else:
|
|
|
|
process_entry(entries[i], "default")
|
|
|
|
i += 1
|
|
|
|
|
|
|
|
return cfg
|
|
|
|
|
|
|
|
def set_config(self, key: str, value: str, *, append: bool,
|
|
|
|
scope: GitConfigScope):
|
|
|
|
# NOTE: `git config` already canonicalizes key.
|
|
|
|
args = ['config', f'--{scope}', key, value]
|
|
|
|
if append:
|
|
|
|
args.append('--add')
|
|
|
|
with self._scope_lock(scope):
|
|
|
|
GIT.Capture(args, cwd=self.root)
|
|
|
|
|
|
|
|
def set_config_multi(self, key: str, value: str, *,
|
|
|
|
value_pattern: Optional[str], scope: GitConfigScope):
|
|
|
|
# NOTE: `git config` already canonicalizes key.
|
|
|
|
args = ['config', f'--{scope}', '--replace-all', key, value]
|
|
|
|
if value_pattern is not None:
|
|
|
|
args.append(value_pattern)
|
|
|
|
with self._scope_lock(scope):
|
|
|
|
GIT.Capture(args, cwd=self.root)
|
|
|
|
|
|
|
|
def unset_config(self, key: str, *, scope: GitConfigScope,
|
|
|
|
missing_ok: bool):
|
|
|
|
# NOTE: `git config` already canonicalizes key.
|
|
|
|
accepted_retcodes = (0, 5) if missing_ok else (0, )
|
|
|
|
try:
|
|
|
|
with self._scope_lock(scope):
|
|
|
|
GIT.Capture(['config', f'--{scope}', '--unset', key],
|
|
|
|
cwd=self.root,
|
|
|
|
accepted_retcodes=accepted_retcodes)
|
|
|
|
except subprocess2.CalledProcessError as cpe:
|
|
|
|
if cpe.returncode == 5:
|
|
|
|
if b'multiple values' in cpe.stderr:
|
|
|
|
raise GitConfigUnsetMultipleValues(key, scope)
|
|
|
|
raise GitConfigUnsetMissingValue(key, scope)
|
|
|
|
raise
|
|
|
|
|
|
|
|
def unset_config_multi(self, key: str, *, value_pattern: Optional[str],
|
|
|
|
scope: GitConfigScope, missing_ok: bool):
|
|
|
|
# NOTE: `git config` already canonicalizes key.
|
|
|
|
accepted_retcodes = (0, 5) if missing_ok else (0, )
|
|
|
|
args = ['config', f'--{scope}', '--unset-all', key]
|
|
|
|
if value_pattern is not None:
|
|
|
|
args.append(value_pattern)
|
|
|
|
try:
|
|
|
|
with self._scope_lock(scope):
|
|
|
|
GIT.Capture(args,
|
|
|
|
cwd=self.root,
|
|
|
|
accepted_retcodes=accepted_retcodes)
|
|
|
|
except subprocess2.CalledProcessError as cpe:
|
|
|
|
if cpe.returncode == 5:
|
|
|
|
raise GitConfigUnsetMissingValue(key, scope)
|
|
|
|
raise
|
|
|
|
|
|
|
|
def _scope_lock(
|
|
|
|
self,
|
|
|
|
scope: GitConfigScope) -> contextlib.AbstractContextManager[Any]:
|
|
|
|
if scope == 'global':
|
|
|
|
return self._GLOBAL_LOCK
|
|
|
|
# TODO(ayatane): We should lock per local repo scope as well
|
|
|
|
# from a correctness perspective.
|
|
|
|
return contextlib.nullcontext()
|
|
|
|
|
|
|
|
|
|
|
|
class GitConfigStateTest(GitConfigStateBase):
|
|
|
|
"""A fake implementation of GitConfigStateBase for testing.
|
|
|
|
|
|
|
|
To properly initialize this, see tests/scm_mock.py.
|
|
|
|
"""
|
|
|
|
|
|
|
|
def __init__(self,
|
|
|
|
global_state_lock: threading.Lock,
|
|
|
|
global_state: dict[str, list[str]],
|
|
|
|
*,
|
|
|
|
system_state: Optional[GitFlatConfigData] = None):
|
|
|
|
"""Initializes a new (local, worktree) config state, with a reference to
|
|
|
|
a single global `global` state and an optional immutable `system` state.
|
|
|
|
|
|
|
|
All keys in global_state and system_state MUST already be canonicalized
|
|
|
|
with canonicalize_key().
|
|
|
|
|
|
|
|
The caller must supply a single shared Lock, plus a mutable reference to
|
|
|
|
the global-state dictionary.
|
|
|
|
|
|
|
|
This implementation will hold global_state_lock during all read/write
|
|
|
|
operations on the 'global' scope.
|
|
|
|
"""
|
|
|
|
self.system_state: GitFlatConfigData = system_state or {}
|
|
|
|
|
|
|
|
self.global_state_lock = global_state_lock
|
|
|
|
self.global_state = global_state
|
|
|
|
|
|
|
|
self.worktree_state: dict[str, list[str]] = {}
|
|
|
|
self.local_state: dict[str, list[str]] = {}
|
|
|
|
|
|
|
|
super().__init__()
|
|
|
|
|
|
|
|
@contextlib.contextmanager
|
|
|
|
def _editable_scope(
|
|
|
|
self, scope: GitConfigScope) -> Iterator[dict[str, list[str]]]:
|
|
|
|
if scope == 'system':
|
|
|
|
# This is also checked in CachedGitConfigState.SetConfig, but double
|
|
|
|
# check here.
|
|
|
|
raise GitConfigUneditableScope(scope)
|
|
|
|
|
|
|
|
if scope == 'global':
|
|
|
|
with self.global_state_lock:
|
|
|
|
yield self.global_state
|
|
|
|
elif scope == 'local':
|
|
|
|
yield self.local_state
|
|
|
|
elif scope == 'worktree':
|
|
|
|
yield self.worktree_state
|
|
|
|
else:
|
|
|
|
# This is also checked in CachedGitConfigState.SetConfig, but double
|
|
|
|
# check here.
|
|
|
|
raise GitConfigUnknownScope(scope)
|
|
|
|
|
|
|
|
def load_config(self) -> GitFlatConfigData:
|
|
|
|
cfg: Mapping[str, Mapping[str, list[str]]] = defaultdict(
|
|
|
|
lambda: defaultdict(list))
|
|
|
|
|
|
|
|
for key, values in self.system_state.items():
|
|
|
|
cfg['system'][key].extend(values)
|
|
|
|
cfg['default'][key].extend(values)
|
|
|
|
for ordered_scope in GitScopeOrder:
|
|
|
|
if ordered_scope == 'system':
|
|
|
|
continue
|
|
|
|
with self._editable_scope(ordered_scope) as scope_cfg:
|
|
|
|
for key, values in scope_cfg.items():
|
|
|
|
cfg[ordered_scope][key].extend(values)
|
|
|
|
cfg['default'][key].extend(values)
|
|
|
|
return cfg
|
|
|
|
|
|
|
|
def set_config(self, key: str, value: str, *, append: bool,
|
|
|
|
scope: GitConfigScope):
|
|
|
|
key = canonicalize_git_config_key(key)
|
|
|
|
with self._editable_scope(scope) as cfg:
|
|
|
|
cur = cfg.get(key)
|
|
|
|
if cur is None:
|
|
|
|
cfg[key] = [value]
|
|
|
|
return
|
|
|
|
if append:
|
|
|
|
cfg[key] = cur + [value]
|
|
|
|
return
|
|
|
|
if len(cur) == 1:
|
|
|
|
cfg[key] = [value]
|
|
|
|
return
|
|
|
|
raise ValueError(f'GitConfigStateTest: Cannot set key {key} '
|
|
|
|
f'- current value {cur!r} is multiple.')
|
|
|
|
|
|
|
|
def set_config_multi(self, key: str, value: str, *,
|
|
|
|
value_pattern: Optional[str], scope: GitConfigScope):
|
|
|
|
key = canonicalize_git_config_key(key)
|
|
|
|
with self._editable_scope(scope) as cfg:
|
|
|
|
cur = cfg.get(key)
|
|
|
|
if value_pattern is None or cur is None:
|
|
|
|
cfg[key] = [value]
|
|
|
|
return
|
|
|
|
|
|
|
|
# We want to insert `value` in place of the first pattern match - if
|
|
|
|
# multiple values match, they will all be removed.
|
|
|
|
pat = re.compile(value_pattern)
|
|
|
|
newval = []
|
|
|
|
added = False
|
|
|
|
for val in cur:
|
|
|
|
if pat.match(val):
|
|
|
|
if not added:
|
|
|
|
newval.append(value)
|
|
|
|
added = True
|
|
|
|
else:
|
|
|
|
newval.append(val)
|
|
|
|
if not added:
|
|
|
|
newval.append(value)
|
|
|
|
cfg[key] = newval
|
|
|
|
|
|
|
|
def unset_config(self, key: str, *, scope: GitConfigScope,
|
|
|
|
missing_ok: bool):
|
|
|
|
key = canonicalize_git_config_key(key)
|
|
|
|
with self._editable_scope(scope) as cfg:
|
|
|
|
cur = cfg.get(key)
|
|
|
|
if cur is None:
|
|
|
|
if missing_ok:
|
|
|
|
return
|
|
|
|
raise GitConfigUnsetMissingValue(key, scope)
|
|
|
|
if len(cur) == 1:
|
|
|
|
del cfg[key]
|
|
|
|
return
|
|
|
|
raise GitConfigUnsetMultipleValues(key, scope)
|
|
|
|
|
|
|
|
def unset_config_multi(self, key: str, *, value_pattern: Optional[str],
|
|
|
|
scope: GitConfigScope, missing_ok: bool):
|
|
|
|
key = canonicalize_git_config_key(key)
|
|
|
|
with self._editable_scope(scope) as cfg:
|
|
|
|
cur = cfg.get(key)
|
|
|
|
if cur is None:
|
|
|
|
if not missing_ok:
|
|
|
|
raise GitConfigUnsetMissingValue(key, scope)
|
|
|
|
return
|
|
|
|
|
|
|
|
if value_pattern is None:
|
|
|
|
del cfg[key]
|
|
|
|
return
|
|
|
|
|
|
|
|
if cur is None:
|
|
|
|
del cfg[key]
|
|
|
|
return
|
|
|
|
|
|
|
|
pat = re.compile(value_pattern)
|
|
|
|
cfg[key] = [v for v in cur if not pat.match(v)]
|
|
|
|
|
|
|
|
|
|
|
|
class GIT(object):
|
|
|
|
current_version = None
|
|
|
|
rev_parse_cache = {}
|
|
|
|
|
|
|
|
# Maps cwd -> {config key, [config values]}
|
|
|
|
# This cache speeds up all `git config ...` operations by only running a
|
|
|
|
# single subcommand, which can greatly accelerate things like
|
|
|
|
# git-map-branches.
|
|
|
|
_CONFIG_CACHE: Dict[pathlib.Path, Optional[CachedGitConfigState]] = {}
|
|
|
|
_CONFIG_CACHE_LOCK = threading.Lock()
|
|
|
|
|
|
|
|
@classmethod
|
|
|
|
def drop_config_cache(cls):
|
|
|
|
"""Completely purges all cached git config data.
|
|
|
|
|
|
|
|
This should always be safe to call (it will be lazily repopulated), but
|
|
|
|
really is only meant to be called from tests.
|
|
|
|
"""
|
|
|
|
with cls._CONFIG_CACHE_LOCK:
|
|
|
|
cls._CONFIG_CACHE = {}
|
|
|
|
|
|
|
|
@staticmethod
|
|
|
|
def _new_config_state(root: pathlib.Path) -> GitConfigStateBase:
|
|
|
|
"""_new_config_state is mocked in tests/scm_mock to return
|
|
|
|
a GitConfigStateTest."""
|
|
|
|
return GitConfigStateReal(root)
|
|
|
|
|
|
|
|
@classmethod
|
|
|
|
def _get_config_state(cls, cwd: str) -> CachedGitConfigState:
|
|
|
|
key = pathlib.Path(cwd).absolute()
|
|
|
|
with cls._CONFIG_CACHE_LOCK:
|
|
|
|
cur = GIT._CONFIG_CACHE.get(key, None)
|
|
|
|
if cur is not None:
|
|
|
|
return cur
|
|
|
|
ret = CachedGitConfigState(cls._new_config_state(key))
|
|
|
|
cls._CONFIG_CACHE[key] = ret
|
|
|
|
return ret
|
|
|
|
|
|
|
|
@classmethod
|
|
|
|
def _dump_config_state(cls) -> Dict[str, GitFlatConfigData]:
|
|
|
|
"""Dump internal config state.
|
|
|
|
|
|
|
|
Used for testing. This will NOT work properly in non-test
|
|
|
|
contexts as it relies on internal caches.
|
|
|
|
"""
|
|
|
|
with cls._CONFIG_CACHE_LOCK:
|
|
|
|
state = {}
|
|
|
|
for key, val in cls._CONFIG_CACHE.items():
|
|
|
|
if val is not None:
|
|
|
|
state[str(key)] = val._maybe_load_config().get(
|
|
|
|
'default', {})
|
|
|
|
return state
|
|
|
|
|
|
|
|
@staticmethod
|
|
|
|
def ApplyEnvVars(kwargs):
|
|
|
|
env = kwargs.pop('env', None) or os.environ.copy()
|
|
|
|
# Don't prompt for passwords; just fail quickly and noisily.
|
|
|
|
# By default, git will use an interactive terminal prompt when a
|
|
|
|
# username/ password is needed. That shouldn't happen in the chromium
|
|
|
|
# workflow, and if it does, then gclient may hide the prompt in the
|
|
|
|
# midst of a flood of terminal spew. The only indication that something
|
|
|
|
# has gone wrong will be when gclient hangs unresponsively. Instead, we
|
|
|
|
# disable the password prompt and simply allow git to fail noisily. The
|
|
|
|
# error message produced by git will be copied to gclient's output.
|
|
|
|
env.setdefault('GIT_ASKPASS', 'true')
|
|
|
|
env.setdefault('SSH_ASKPASS', 'true')
|
|
|
|
# 'cat' is a magical git string that disables pagers on all platforms.
|
|
|
|
env.setdefault('GIT_PAGER', 'cat')
|
|
|
|
return env
|
|
|
|
|
|
|
|
@staticmethod
|
|
|
|
def Capture(args, cwd=None, strip_out=True, **kwargs) -> str | bytes:
|
|
|
|
kwargs.setdefault('env', GIT.ApplyEnvVars(kwargs))
|
|
|
|
kwargs.setdefault('cwd', cwd)
|
|
|
|
kwargs.setdefault('autostrip', strip_out)
|
|
|
|
return git_common.run(*args, **kwargs)
|
|
|
|
|
|
|
|
@staticmethod
|
|
|
|
def CaptureStatus(
|
|
|
|
cwd: str,
|
|
|
|
upstream_branch: str,
|
|
|
|
end_commit: Optional[str] = None,
|
|
|
|
ignore_submodules: bool = True) -> Sequence[tuple[str, str]]:
|
|
|
|
"""Returns git status.
|
|
|
|
|
|
|
|
Returns an array of (status, file) tuples."""
|
|
|
|
if end_commit is None:
|
|
|
|
end_commit = ''
|
|
|
|
if upstream_branch is None:
|
|
|
|
upstream_branch = GIT.GetUpstreamBranch(cwd)
|
|
|
|
if upstream_branch is None:
|
|
|
|
raise gclient_utils.Error('Cannot determine upstream branch')
|
|
|
|
|
|
|
|
command = [
|
|
|
|
'-c', 'core.quotePath=false', 'diff', '--name-status',
|
|
|
|
'--no-renames'
|
|
|
|
]
|
|
|
|
if ignore_submodules:
|
|
|
|
command.append('--ignore-submodules=all')
|
|
|
|
command.extend(['-r', '%s...%s' % (upstream_branch, end_commit)])
|
|
|
|
|
|
|
|
status = GIT.Capture(command, cwd)
|
|
|
|
assert isinstance(status, str)
|
|
|
|
results = []
|
|
|
|
if status:
|
|
|
|
for statusline in status.splitlines():
|
|
|
|
# 3-way merges can cause the status can be 'MMM' instead of 'M'.
|
|
|
|
# This can happen when the user has 2 local branches and he
|
|
|
|
# diffs between these 2 branches instead diffing to upstream.
|
|
|
|
m = re.match(r'^(\w)+\t(.+)$', statusline)
|
|
|
|
if not m:
|
|
|
|
raise gclient_utils.Error(
|
|
|
|
'status currently unsupported: %s' % statusline)
|
|
|
|
# Only grab the first letter.
|
|
|
|
results.append(('%s ' % m.group(1)[0], m.group(2)))
|
|
|
|
return results
|
|
|
|
|
|
|
|
@staticmethod
|
|
|
|
def GetConfig(cwd: str,
|
|
|
|
key: str,
|
|
|
|
default: Optional[str] = None,
|
|
|
|
scope: Optional[str] = None) -> Optional[str]:
|
|
|
|
"""Lazily loads all configration observable for this CachedGitConfigState,
|
|
|
|
then returns the last value for `key` as a string.
|
|
|
|
|
|
|
|
If `key` is missing, returns default.
|
|
|
|
"""
|
|
|
|
return GIT._get_config_state(cwd).GetConfig(key, default, scope)
|
|
|
|
|
|
|
|
@staticmethod
|
|
|
|
def GetConfigBool(cwd: str, key: str) -> bool:
|
|
|
|
"""Returns the booleanized value of `key`.
|
|
|
|
|
|
|
|
This follows `git config` semantics (i.e. it normalizes the string value
|
|
|
|
of the config value to "true" - all other string values return False).
|
|
|
|
"""
|
|
|
|
return GIT._get_config_state(cwd).GetConfigBool(key)
|
|
|
|
|
|
|
|
@staticmethod
|
|
|
|
def GetConfigList(cwd: str, key: str) -> list[str]:
|
|
|
|
"""Returns all values of `key` as a list of strings."""
|
|
|
|
return GIT._get_config_state(cwd).GetConfigList(key)
|
|
|
|
|
|
|
|
@staticmethod
|
|
|
|
def YieldConfigRegexp(
|
|
|
|
cwd: str,
|
|
|
|
pattern: Optional[str] = None) -> Iterable[tuple[str, str]]:
|
|
|
|
"""Yields (key, value) pairs for any config keys matching `pattern`.
|
|
|
|
|
|
|
|
This use re.match, so `pattern` needs to be for the entire config key.
|
|
|
|
|
|
|
|
If pattern is None, this returns all config items.
|
|
|
|
"""
|
|
|
|
yield from GIT._get_config_state(cwd).YieldConfigRegexp(pattern)
|
|
|
|
|
|
|
|
@staticmethod
|
|
|
|
def GetBranchConfig(cwd: str,
|
|
|
|
branch: str,
|
|
|
|
key: str,
|
|
|
|
default: Optional[str] = None) -> Optional[str]:
|
|
|
|
assert branch, 'A branch must be given'
|
|
|
|
key = 'branch.%s.%s' % (branch, key)
|
|
|
|
return GIT.GetConfig(cwd, key, default)
|
|
|
|
|
|
|
|
@staticmethod
|
|
|
|
def SetConfig(cwd: str,
|
|
|
|
key: str,
|
|
|
|
value: Optional[str] = None,
|
|
|
|
*,
|
|
|
|
append: bool = False,
|
|
|
|
missing_ok: bool = True,
|
|
|
|
modify_all: bool = False,
|
|
|
|
scope: GitConfigScope = 'local',
|
|
|
|
value_pattern: Optional[str] = None):
|
|
|
|
"""Sets or unsets one or more config values.
|
|
|
|
|
|
|
|
Args:
|
|
|
|
cwd: path to set `git config` for.
|
|
|
|
key: The specific config key to affect.
|
|
|
|
value: The value to set. If this is None, `key` will be unset.
|
|
|
|
append: If True and `value` is not None, this will append
|
|
|
|
the value instead of replacing an existing one. Must not be
|
|
|
|
specified with value_pattern.
|
|
|
|
missing_ok: If `value` is None (i.e. this is an unset operation),
|
|
|
|
ignore retcode=5 from `git config` (meaning that the value is
|
|
|
|
not present). If `value` is not None, then this option has no
|
|
|
|
effect. If this is false and the key is missing, this will raise
|
|
|
|
GitConfigUnsetMissingValue.
|
|
|
|
modify_all: If True, this will change a set operation to
|
|
|
|
`--replace-all`, and will change an unset operation to
|
|
|
|
`--unset-all`. Must not be specified with value_pattern.
|
|
|
|
scope: By default this is the `local` scope, but could be `global`
|
|
|
|
or `worktree`, depending on which config scope you want to affect.
|
|
|
|
Note that the `system` scope cannot be modified.
|
|
|
|
value_pattern: For use with `modify_all=True`, allows
|
|
|
|
further filtering of the set or unset operation based on
|
|
|
|
the currently configured value. Ignored for
|
|
|
|
`modify_all=False`.
|
|
|
|
"""
|
|
|
|
GIT._get_config_state(cwd).SetConfig(key,
|
|
|
|
value,
|
|
|
|
append=append,
|
|
|
|
missing_ok=missing_ok,
|
|
|
|
modify_all=modify_all,
|
|
|
|
scope=scope,
|
|
|
|
value_pattern=value_pattern)
|
|
|
|
|
|
|
|
@staticmethod
|
|
|
|
def SetBranchConfig(cwd, branch, key, value=None):
|
|
|
|
assert branch, 'A branch must be given'
|
|
|
|
key = 'branch.%s.%s' % (branch, key)
|
|
|
|
GIT.SetConfig(cwd, key, value)
|
|
|
|
|
|
|
|
@staticmethod
|
|
|
|
def ShortBranchName(branch):
|
|
|
|
"""Converts a name like 'refs/heads/foo' to just 'foo'."""
|
|
|
|
return branch.replace('refs/heads/', '')
|
|
|
|
|
|
|
|
@staticmethod
|
|
|
|
def GetBranchRef(cwd):
|
|
|
|
"""Returns the full branch reference, e.g. 'refs/heads/main'."""
|
|
|
|
try:
|
|
|
|
return GIT.Capture(['symbolic-ref', 'HEAD'], cwd=cwd)
|
|
|
|
except subprocess2.CalledProcessError:
|
|
|
|
return None
|
|
|
|
|
|
|
|
@staticmethod
|
|
|
|
def GetRemoteHeadRef(cwd, url, remote):
|
|
|
|
"""Returns the full default remote branch reference, e.g.
|
|
|
|
'refs/remotes/origin/main'."""
|
|
|
|
if os.path.exists(cwd):
|
|
|
|
ref = 'refs/remotes/%s/HEAD' % remote
|
|
|
|
try:
|
|
|
|
# Try using local git copy first
|
|
|
|
ref = GIT.Capture(['symbolic-ref', ref], cwd=cwd)
|
|
|
|
assert isinstance(ref, str)
|
|
|
|
if not ref.endswith('master'):
|
|
|
|
return ref
|
|
|
|
except subprocess2.CalledProcessError:
|
|
|
|
pass
|
|
|
|
|
|
|
|
try:
|
|
|
|
# Check if there are changes in the default branch for this
|
|
|
|
# particular repository.
|
|
|
|
GIT.Capture(['remote', 'set-head', '-a', remote], cwd=cwd)
|
|
|
|
return GIT.Capture(['symbolic-ref', ref], cwd=cwd)
|
|
|
|
except subprocess2.CalledProcessError:
|
|
|
|
pass
|
|
|
|
|
|
|
|
try:
|
|
|
|
# Fetch information from git server
|
|
|
|
resp = GIT.Capture(['ls-remote', '--symref', url, 'HEAD'])
|
|
|
|
assert isinstance(resp, str)
|
|
|
|
regex = r'^ref: (.*)\tHEAD$'
|
|
|
|
for line in resp.split('\n'):
|
|
|
|
m = re.match(regex, line)
|
|
|
|
if m:
|
|
|
|
refpair = GIT.RefToRemoteRef(m.group(1), remote)
|
|
|
|
assert isinstance(refpair, tuple)
|
|
|
|
return ''.join(refpair)
|
|
|
|
except subprocess2.CalledProcessError:
|
|
|
|
pass
|
|
|
|
# Return default branch
|
|
|
|
return 'refs/remotes/%s/main' % remote
|
|
|
|
|
|
|
|
@staticmethod
|
|
|
|
def GetBranch(cwd):
|
|
|
|
"""Returns the short branch name, e.g. 'main'."""
|
|
|
|
branchref = GIT.GetBranchRef(cwd)
|
|
|
|
if branchref:
|
|
|
|
return GIT.ShortBranchName(branchref)
|
|
|
|
return None
|
|
|
|
|
|
|
|
@staticmethod
|
|
|
|
def GetRemoteBranches(cwd):
|
|
|
|
return GIT.Capture(['branch', '-r'], cwd=cwd).split()
|
|
|
|
|
|
|
|
@staticmethod
|
|
|
|
def FetchUpstreamTuple(
|
|
|
|
cwd: str,
|
|
|
|
branch: Optional[str] = None
|
|
|
|
) -> tuple[Optional[str], Optional[str]]:
|
|
|
|
"""Returns a tuple containing remote and remote ref,
|
|
|
|
e.g. 'origin', 'refs/heads/main'
|
|
|
|
"""
|
|
|
|
try:
|
|
|
|
branch = branch or GIT.GetBranch(cwd)
|
|
|
|
except subprocess2.CalledProcessError:
|
|
|
|
pass
|
|
|
|
if branch:
|
|
|
|
upstream_branch = GIT.GetBranchConfig(cwd, branch, 'merge')
|
|
|
|
if upstream_branch:
|
|
|
|
remote = GIT.GetBranchConfig(cwd, branch, 'remote', '.')
|
|
|
|
return remote, upstream_branch
|
|
|
|
|
|
|
|
upstream_branch = GIT.GetConfig(cwd, 'rietveld.upstream-branch')
|
|
|
|
if upstream_branch:
|
|
|
|
remote = GIT.GetConfig(cwd, 'rietveld.upstream-remote', '.')
|
|
|
|
return remote, upstream_branch
|
|
|
|
|
|
|
|
# Else, try to guess the origin remote.
|
|
|
|
remote_branches = GIT.GetRemoteBranches(cwd)
|
|
|
|
if 'origin/main' in remote_branches:
|
|
|
|
# Fall back on origin/main if it exits.
|
|
|
|
return 'origin', 'refs/heads/main'
|
|
|
|
|
|
|
|
if 'origin/master' in remote_branches:
|
|
|
|
# Fall back on origin/master if it exits.
|
|
|
|
return 'origin', 'refs/heads/master'
|
|
|
|
|
|
|
|
return None, None
|
|
|
|
|
|
|
|
@staticmethod
|
|
|
|
def RefToRemoteRef(ref, remote) -> Optional[tuple[str, str]]:
|
|
|
|
"""Convert a checkout ref to the equivalent remote ref.
|
|
|
|
|
|
|
|
Returns:
|
|
|
|
A tuple of the remote ref's (common prefix, unique suffix), or None if it
|
|
|
|
doesn't appear to refer to a remote ref (e.g. it's a commit hash).
|
|
|
|
"""
|
|
|
|
# TODO(mmoss): This is just a brute-force mapping based of the expected
|
|
|
|
# git config. It's a bit better than the even more brute-force
|
|
|
|
# replace('heads', ...), but could still be smarter (like maybe actually
|
|
|
|
# using values gleaned from the git config).
|
|
|
|
m = re.match('^(refs/(remotes/)?)?branch-heads/', ref or '')
|
|
|
|
if m:
|
|
|
|
return ('refs/remotes/branch-heads/', ref.replace(m.group(0), ''))
|
|
|
|
|
|
|
|
m = re.match('^((refs/)?remotes/)?%s/|(refs/)?heads/' % remote, ref
|
|
|
|
or '')
|
|
|
|
if m:
|
|
|
|
return ('refs/remotes/%s/' % remote, ref.replace(m.group(0), ''))
|
|
|
|
|
|
|
|
return None
|
|
|
|
|
|
|
|
@staticmethod
|
|
|
|
def RemoteRefToRef(ref, remote):
|
|
|
|
assert remote, 'A remote must be given'
|
|
|
|
if not ref or not ref.startswith('refs/'):
|
|
|
|
return None
|
|
|
|
if not ref.startswith('refs/remotes/'):
|
|
|
|
return ref
|
|
|
|
if ref.startswith('refs/remotes/branch-heads/'):
|
|
|
|
return 'refs' + ref[len('refs/remotes'):]
|
|
|
|
if ref.startswith('refs/remotes/%s/' % remote):
|
|
|
|
return 'refs/heads' + ref[len('refs/remotes/%s' % remote):]
|
|
|
|
return None
|
|
|
|
|
|
|
|
@staticmethod
|
|
|
|
def GetUpstreamBranch(cwd) -> Optional[str]:
|
|
|
|
"""Gets the current branch's upstream branch."""
|
|
|
|
remote, upstream_branch = GIT.FetchUpstreamTuple(cwd)
|
|
|
|
if remote != '.' and upstream_branch:
|
|
|
|
remote_ref = GIT.RefToRemoteRef(upstream_branch, remote)
|
|
|
|
if remote_ref:
|
|
|
|
upstream_branch = ''.join(remote_ref)
|
|
|
|
return upstream_branch
|
|
|
|
|
|
|
|
@staticmethod
|
|
|
|
def IsAncestor(maybe_ancestor: str,
|
|
|
|
ref: str,
|
|
|
|
cwd: Optional[str] = None) -> bool:
|
|
|
|
"""Verifies if |maybe_ancestor| is an ancestor of |ref|."""
|
|
|
|
try:
|
|
|
|
GIT.Capture(['merge-base', '--is-ancestor', maybe_ancestor, ref],
|
|
|
|
cwd=cwd)
|
|
|
|
return True
|
|
|
|
except subprocess2.CalledProcessError:
|
|
|
|
return False
|
|
|
|
|
|
|
|
@staticmethod
|
|
|
|
def GetOldContents(cwd, filename, branch=None):
|
|
|
|
if not branch:
|
|
|
|
branch = GIT.GetUpstreamBranch(cwd)
|
|
|
|
if platform.system() == 'Windows':
|
|
|
|
# git show <sha>:<path> wants a posix path.
|
|
|
|
filename = filename.replace('\\', '/')
|
|
|
|
command = ['show', '%s:%s' % (branch, filename)]
|
|
|
|
try:
|
|
|
|
return GIT.Capture(command, cwd=cwd, strip_out=False)
|
|
|
|
except subprocess2.CalledProcessError:
|
|
|
|
return ''
|
|
|
|
|
|
|
|
@staticmethod
|
|
|
|
def GenerateDiff(cwd: str,
|
|
|
|
branch: Optional[str] = None,
|
|
|
|
branch_head: str = 'HEAD',
|
|
|
|
full_move: bool = False,
|
|
|
|
files: Optional[Iterable[str]] = None) -> str:
|
|
|
|
"""Diffs against the upstream branch or optionally another branch.
|
|
|
|
|
|
|
|
full_move means that move or copy operations should completely recreate the
|
|
|
|
files, usually in the prospect to apply the patch for a try job."""
|
|
|
|
if not branch:
|
|
|
|
branch = GIT.GetUpstreamBranch(cwd)
|
|
|
|
assert isinstance(branch, str)
|
|
|
|
command = [
|
|
|
|
'-c',
|
|
|
|
'core.quotePath=false',
|
|
|
|
'diff',
|
|
|
|
'-p',
|
|
|
|
'--no-color',
|
|
|
|
'--no-prefix',
|
|
|
|
'--no-ext-diff',
|
|
|
|
branch + "..." + branch_head,
|
|
|
|
]
|
|
|
|
if full_move:
|
|
|
|
command.append('--no-renames')
|
|
|
|
else:
|
|
|
|
command.append('-C')
|
|
|
|
# TODO(maruel): --binary support.
|
|
|
|
if files:
|
|
|
|
command.append('--')
|
|
|
|
command.extend(files)
|
|
|
|
output = GIT.Capture(command, cwd=cwd, strip_out=False)
|
|
|
|
assert isinstance(output, str)
|
|
|
|
diff = output.splitlines(True)
|
|
|
|
for i in range(len(diff)):
|
|
|
|
# In the case of added files, replace /dev/null with the path to the
|
|
|
|
# file being added.
|
|
|
|
if diff[i].startswith('--- /dev/null'):
|
|
|
|
diff[i] = '--- %s' % diff[i + 1][4:]
|
|
|
|
return ''.join(diff)
|
|
|
|
|
|
|
|
@staticmethod
|
|
|
|
def GetAllFiles(cwd):
|
|
|
|
"""Returns the list of all files under revision control."""
|
|
|
|
command = ['-c', 'core.quotePath=false', 'ls-files', '--', '.']
|
|
|
|
return GIT.Capture(command, cwd=cwd).splitlines(False)
|
|
|
|
|
|
|
|
@staticmethod
|
|
|
|
def GetSubmoduleCommits(cwd: str,
|
|
|
|
submodules: list[str]) -> Mapping[str, str]:
|
|
|
|
"""Returns a mapping of staged or committed new commits for submodules."""
|
|
|
|
if not submodules:
|
|
|
|
return {}
|
|
|
|
result = subprocess2.check_output(['git', 'ls-files', '-s', '--'] +
|
|
|
|
submodules,
|
|
|
|
cwd=cwd).decode('utf-8')
|
|
|
|
commit_hashes = {}
|
|
|
|
for r in result.splitlines():
|
|
|
|
# ['<mode>', '<commit_hash>', '<stage_number>', '<path>'].
|
|
|
|
record = r.strip().split(maxsplit=3) # path can contain spaces.
|
|
|
|
assert record[0] == '160000', 'file is not a gitlink: %s' % record
|
|
|
|
commit_hashes[record[3]] = record[1]
|
|
|
|
return commit_hashes
|
|
|
|
|
|
|
|
@staticmethod
|
|
|
|
def GetCheckoutRoot(cwd) -> str:
|
|
|
|
"""Returns the top level directory of a git checkout as an absolute path.
|
|
|
|
"""
|
|
|
|
root = GIT.Capture(['rev-parse', '--show-cdup'], cwd=cwd)
|
|
|
|
assert isinstance(root, str)
|
|
|
|
return os.path.abspath(os.path.join(cwd, root))
|
|
|
|
|
|
|
|
@staticmethod
|
|
|
|
def IsInsideWorkTree(cwd):
|
|
|
|
try:
|
|
|
|
return GIT.Capture(['rev-parse', '--is-inside-work-tree'], cwd=cwd)
|
|
|
|
except (OSError, subprocess2.CalledProcessError):
|
|
|
|
return False
|
|
|
|
|
|
|
|
@staticmethod
|
|
|
|
def IsVersioned(cwd: str, relative_dir: str) -> int:
|
|
|
|
"""Checks whether the given |relative_dir| is part of cwd's repo."""
|
|
|
|
output = GIT.Capture(['ls-tree', 'HEAD', '--', relative_dir], cwd=cwd)
|
|
|
|
assert isinstance(output, str)
|
|
|
|
if not output:
|
|
|
|
return VERSIONED_NO
|
|
|
|
if output.startswith('160000'):
|
|
|
|
return VERSIONED_SUBMODULE
|
|
|
|
return VERSIONED_DIR
|
|
|
|
|
|
|
|
@staticmethod
|
|
|
|
def ListSubmodules(repo_root: str) -> Collection[str]:
|
|
|
|
"""Returns the list of submodule paths for the given repo.
|
|
|
|
|
|
|
|
Path separators will be adjusted for the current OS.
|
|
|
|
"""
|
|
|
|
if not os.path.exists(os.path.join(repo_root, '.gitmodules')):
|
|
|
|
return []
|
|
|
|
try:
|
|
|
|
config_output = GIT.Capture(
|
|
|
|
['config', '--file', '.gitmodules', '--get-regexp', 'path'],
|
|
|
|
cwd=repo_root)
|
|
|
|
except subprocess2.CalledProcessError:
|
|
|
|
# Git exits with 1 if no config matches are found.
|
|
|
|
return []
|
|
|
|
assert isinstance(config_output, str)
|
|
|
|
return [
|
|
|
|
line.split()[-1].replace('/', os.path.sep)
|
|
|
|
for line in config_output.splitlines()
|
|
|
|
]
|
|
|
|
|
|
|
|
@staticmethod
|
|
|
|
def CleanupDir(cwd, relative_dir):
|
|
|
|
"""Cleans up untracked file inside |relative_dir|."""
|
|
|
|
return bool(GIT.Capture(['clean', '-df', relative_dir], cwd=cwd))
|
|
|
|
|
|
|
|
@staticmethod
|
|
|
|
def ResolveCommit(cwd, rev):
|
|
|
|
cache_key = None
|
|
|
|
# We do this instead of rev-parse --verify rev^{commit}, since on
|
|
|
|
# Windows git can be either an executable or batch script, each of which
|
|
|
|
# requires escaping the caret (^) a different way.
|
|
|
|
if gclient_utils.IsFullGitSha(rev):
|
|
|
|
# Only cache full SHAs
|
|
|
|
cache_key = hash(cwd + rev)
|
|
|
|
if val := GIT.rev_parse_cache.get(cache_key):
|
|
|
|
return val
|
|
|
|
|
|
|
|
# git-rev parse --verify FULL_GIT_SHA always succeeds, even if we
|
|
|
|
# don't have FULL_GIT_SHA locally. Removing the last character
|
|
|
|
# forces git to check if FULL_GIT_SHA refers to an object in the
|
|
|
|
# local database.
|
|
|
|
rev = rev[:-1]
|
|
|
|
res = GIT.Capture(['rev-parse', '--quiet', '--verify', rev], cwd=cwd)
|
|
|
|
if cache_key:
|
|
|
|
# We don't expect concurrent execution, so we don't lock anything.
|
|
|
|
GIT.rev_parse_cache[cache_key] = res
|
|
|
|
|
|
|
|
return res
|
|
|
|
|
|
|
|
@staticmethod
|
|
|
|
def IsValidRevision(cwd, rev, sha_only=False):
|
|
|
|
"""Verifies the revision is a proper git revision.
|
|
|
|
|
|
|
|
sha_only: Fail unless rev is a sha hash.
|
|
|
|
"""
|
|
|
|
try:
|
|
|
|
sha = GIT.ResolveCommit(cwd, rev)
|
|
|
|
except subprocess2.CalledProcessError:
|
|
|
|
return None
|
|
|
|
|
|
|
|
if sha_only:
|
|
|
|
return sha == rev.lower()
|
|
|
|
return True
|
|
|
|
|
|
|
|
|
|
|
|
class DIFF(object):
|
|
|
|
|
|
|
|
@staticmethod
|
|
|
|
def GetAllFiles(cwd):
|
|
|
|
"""Return all files under the repo at cwd.
|
|
|
|
|
|
|
|
If .gitmodules exists in cwd, use it to determine which folders are
|
|
|
|
submodules and don't recurse into them. Submodule paths are returned.
|
|
|
|
"""
|
|
|
|
# `git config --file` works outside of a git workspace.
|
|
|
|
submodules = GIT.ListSubmodules(cwd)
|
|
|
|
if not submodules:
|
|
|
|
return [
|
|
|
|
str(p.relative_to(cwd)) for p in pathlib.Path(cwd).rglob("*")
|
|
|
|
if p.is_file()
|
|
|
|
]
|
|
|
|
|
|
|
|
full_path_submodules = {os.path.join(cwd, s) for s in submodules}
|
|
|
|
|
|
|
|
def should_recurse(dirpath, dirname):
|
|
|
|
full_path = os.path.join(dirpath, dirname)
|
|
|
|
return full_path not in full_path_submodules
|
|
|
|
|
|
|
|
paths = list(full_path_submodules)
|
|
|
|
for dirpath, dirnames, filenames in os.walk(cwd):
|
|
|
|
paths.extend([os.path.join(dirpath, f) for f in filenames])
|
|
|
|
dirnames[:] = [d for d in dirnames if should_recurse(dirpath, d)]
|
|
|
|
|
|
|
|
return [os.path.relpath(p, cwd) for p in paths]
|
|
|
|
|
|
|
|
|
|
|
|
# vim: sts=4:ts=4:sw=4:tw=80:et:
|