#!/usr/bin/env python3 # Copyright 2017 The Chromium Authors. All rights reserved. # Use of this source code is governed by a BSD-style license that can be # found in the LICENSE file. """Splits a branch into smaller branches and uploads CLs.""" import collections import dataclasses import hashlib import os import re import tempfile from typing import List, Set, Tuple, Dict, Any import gclient_utils import git_footers import scm import git_common as git # If a call to `git cl split` will generate more than this number of CLs, the # command will prompt the user to make sure they know what they're doing. Large # numbers of CLs generated by `git cl split` have caused infrastructure issues # in the past. CL_SPLIT_FORCE_LIMIT = 10 # The maximum number of top reviewers to list. `git cl split` may send many CLs # to a single reviewer, so the top reviewers with the most CLs sent to them # will be listed. CL_SPLIT_TOP_REVIEWERS = 5 def Emit(msg: str): """Wrapper for easier mocking during tests""" print(msg) def EmitWarning(msg: str): print("Warning: ", msg) def HashList(lst: List[Any]) -> str: """ Hash a list, returning a positive integer. Lists with identical elements should have the same hash, regardless of order. """ # We need a bytes-like object for hashlib algorithms byts = bytes().join( (action + file).encode() for action, file in sorted(lst)) # No security implication: we just need a deterministic output hashed = hashlib.sha1(byts) return hashed.hexdigest()[:10] FilesAndOwnersDirectory = collections.namedtuple("FilesAndOwnersDirectory", "files owners_directories") @dataclasses.dataclass class CLInfo: """ Data structure representing a single CL. The script will split the large CL into a list of these. Fields: - reviewers: the reviewers the CL will be sent to. - files: a list of , pairs in the CL. Has the same format as `git status`. - description: a string describing the CL. Typically the list of affected directories. Only used for replacing $description in the user-provided CL description. """ # Have to use default_factory because lists are mutable reviewers: Set[str] = dataclasses.field(default_factory=set) files: List[Tuple[str, str]] = dataclasses.field(default_factory=list) # This is only used for formatting in the CL description, so it just # has to be convertible to string. description: Any = "" def FormatForPrinting(self) -> str: """ Format the CLInfo for printing to a file in a human-readable format. """ # Don't quote the reviewer emails in the output reviewers_str = ", ".join(self.reviewers) lines = [ f"Reviewers: [{reviewers_str}]", f"Description: {self.description}" ] + [f"{action}, {file}" for (action, file) in self.files] return "\n".join(lines) def CLInfoFromFilesAndOwnersDirectoriesDict( d: Dict[Tuple[str], FilesAndOwnersDirectory]) -> List[CLInfo]: """ Transform a dictionary mapping reviewer tuples to FilesAndOwnersDirectories into a list of CLInfo """ cl_infos = [] for (reviewers, fod) in d.items(): cl_infos.append( CLInfo(set(reviewers), fod.files, FormatDirectoriesForPrinting(fod.owners_directories))) return cl_infos def EnsureInGitRepository(): """Throws an exception if the current directory is not a git repository.""" git.run('rev-parse') def CreateBranchName(prefix: str, files: List[Tuple[str, str]]) -> str: """ Given a sub-CL as a list of (action, file) pairs, create a unique and deterministic branch name for it. The name has the format ___split. """ file_names = [file for _, file in files] if len(file_names) == 1: # Only one file, just use its directory as the common path common_path = os.path.dirname(file_names[0]) else: common_path = os.path.commonpath(file_names) if not common_path: # Files have nothing in common at all. Unlikely but possible. common_path = "None" # Replace path delimiter with underscore in common_path. common_path = common_path.replace(os.path.sep, '_') return f"{prefix}_{HashList(files)}_{common_path}_split" def CreateBranchForOneCL(prefix: str, files: List[Tuple[str, str]], upstream: str) -> bool: """Creates a branch named |prefix| + "_" + |hash(files)| + "_split". Return false if the branch already exists. |upstream| is used as upstream for the created branch. """ branches_on_disk = set(git.branches(use_limit=False)) branch_name = CreateBranchName(prefix, files) if branch_name in branches_on_disk: return False git.run('checkout', '-t', upstream, '-b', branch_name) return True def ValidateExistingBranches(prefix: str, cl_infos: List[CLInfo]) -> bool: """ Check if there are splitting branches left over from a previous run. We only allow branches to exist if we're resuming a previous upload, in which case we require that the existing branches are a subset of the branches we're going to generate. """ branches_on_disk = set( branch for branch in git.branches(use_limit=False) if branch.startswith(prefix + "_") and branch.endswith("_split")) branches_to_be_made = set( CreateBranchName(prefix, info.files) for info in cl_infos) if not branches_on_disk.issubset(branches_to_be_made): Emit("It seems like you've already run `git cl split` on this branch.\n" "If you're resuming a previous upload, you must pass in the " "same splitting as before, using the --from-file option.\n" "If you're starting a new upload, please clean up existing split " f"branches (starting with '{prefix}_' and ending with '_split'), " "and re-run the tool.") Emit("The following branches need to be cleaned up:\n") for branch in branches_on_disk - branches_to_be_made: Emit(branch) return False return True def FormatDirectoriesForPrinting(directories: List[str], prefix: str = None) -> str: """Formats directory list for printing Uses dedicated format for single-item list.""" prefixed = directories if prefix: prefixed = [(prefix + d) for d in directories] return str(prefixed[0]) if len(prefixed) == 1 else str(prefixed) def FormatDescriptionOrComment(txt, desc): """Replaces $description with |desc| in |txt|.""" # TODO(389069356): Remove support for $directory entirely once it's been # deprecated for a while. replaced_txt = txt.replace('$directory', desc) if txt != replaced_txt: EmitWarning('Usage of $directory is deprecated and will be removed ' 'in a future update. Please use $description instead, ' 'which has the same behavior by default.\n\n') replaced_txt = replaced_txt.replace('$description', desc) return replaced_txt def AddUploadedByGitClSplitToDescription(description): """Adds a 'This CL was uploaded by git cl split.' line to |description|. The line is added before footers, or at the end of |description| if it has no footers. """ split_footers = git_footers.split_footers(description) lines = split_footers[0] if lines[-1] and not lines[-1].isspace(): lines = lines + [''] lines = lines + ['This CL was uploaded by git cl split.'] if split_footers[1]: lines += [''] + split_footers[1] return '\n'.join(lines) def UploadCl(refactor_branch, refactor_branch_upstream, cl_description, files, user_description, saved_splitting_file, comment, reviewers, changelist, cmd_upload, cq_dry_run, enable_auto_submit, topic, repository_root): """Uploads a CL with all changes to |files| in |refactor_branch|. Args: refactor_branch: Name of the branch that contains the changes to upload. refactor_branch_upstream: Name of the upstream of |refactor_branch|. cl_description: Description of this specific CL, e.g. the list of affected directories. files: List of AffectedFile instances to include in the uploaded CL. user_description: Description provided by user. comment: Comment to post on the uploaded CL. reviewers: A set of reviewers for the CL. changelist: The Changelist class. cmd_upload: The function associated with the git cl upload command. cq_dry_run: If CL uploads should also do a cq dry run. enable_auto_submit: If CL uploads should also enable auto submit. topic: Topic to associate with uploaded CLs. """ # Create a branch. if not CreateBranchForOneCL(refactor_branch, files, refactor_branch_upstream): Emit( f'Skipping existing branch for CL with description: {cl_description}' ) return # Checkout all changes to files in |files|. deleted_files = [] modified_files = [] for action, f in files: abspath = os.path.abspath(os.path.join(repository_root, f)) if action == 'D': deleted_files.append(abspath) else: modified_files.append(abspath) if deleted_files: git.run(*['rm'] + deleted_files) if modified_files: git.run(*['checkout', refactor_branch, '--'] + modified_files) # Commit changes. The temporary file is created with delete=False so that it # can be deleted manually after git has read it rather than automatically # when it is closed. with gclient_utils.temporary_file() as tmp_file: gclient_utils.FileWrite( tmp_file, FormatDescriptionOrComment(user_description, cl_description)) git.run('commit', '-F', tmp_file) # Upload a CL. upload_args = ['-f'] if reviewers: upload_args.extend(['-r', ','.join(sorted(reviewers))]) if cq_dry_run: upload_args.append('--cq-dry-run') if not comment: upload_args.append('--send-mail') if enable_auto_submit: upload_args.append('--enable-auto-submit') if topic: upload_args.append('--topic={}'.format(topic)) Emit(f'Uploading CL with description: {cl_description} ...') ret = cmd_upload(upload_args) if ret != 0: Emit('Uploading failed.') Emit('Note: git cl split has built-in resume capabilities.') Emit(f'Delete {git.current_branch()} then run\n' f'git cl split --from-file={saved_splitting_file}\n' 'to resume uploading.') if comment: changelist().AddComment(FormatDescriptionOrComment( comment, cl_description), publish=True) def GetFilesSplitByOwners(files, max_depth): """Returns a map of files split by OWNERS file. Returns: A map where keys are paths to directories containing an OWNERS file and values are lists of files sharing an OWNERS file. """ files_split_by_owners = {} for action, path in files: # normpath() is important to normalize separators here, in prepration # for str.split() before. It would be nicer to use something like # pathlib here but alas... dir_with_owners = os.path.normpath(os.path.dirname(path)) if max_depth >= 1: dir_with_owners = os.path.join( *dir_with_owners.split(os.path.sep)[:max_depth]) # Find the closest parent directory with an OWNERS file. while (dir_with_owners not in files_split_by_owners and not os.path.isfile(os.path.join(dir_with_owners, 'OWNERS'))): dir_with_owners = os.path.dirname(dir_with_owners) files_split_by_owners.setdefault(dir_with_owners, []).append( (action, path)) return files_split_by_owners def PrintClInfo(cl_index, num_cls, cl_description, file_paths, user_description, reviewers, cq_dry_run, enable_auto_submit, topic): """Prints info about a CL. Args: cl_index: The index of this CL in the list of CLs to upload. num_cls: The total number of CLs that will be uploaded. cl_description: Description of this specific CL, e.g. the list of affected directories. file_paths: A list of files in this CL. user_description: Description provided by user. reviewers: A set of reviewers for this CL. cq_dry_run: If the CL should also be sent to CQ dry run. enable_auto_submit: If the CL should also have auto submit enabled. topic: Topic to set for this CL. """ description_lines = FormatDescriptionOrComment(user_description, cl_description).splitlines() indented_description = '\n'.join([' ' + l for l in description_lines]) Emit('CL {}/{}'.format(cl_index, num_cls)) Emit('Paths: {}'.format(cl_description)) Emit('Reviewers: {}'.format(', '.join(reviewers))) Emit('Auto-Submit: {}'.format(enable_auto_submit)) Emit('CQ Dry Run: {}'.format(cq_dry_run)) Emit('Topic: {}'.format(topic)) Emit('\n' + indented_description + '\n') Emit('\n'.join(file_paths)) def LoadDescription(description_file, dry_run): if not description_file: if not dry_run: # Parser checks this as well, so should be impossible raise ValueError( "Must provide a description file except during dry runs") return ('Dummy description for dry run.\n' 'description = $description') return gclient_utils.FileRead(description_file) def PrintSummary(cl_infos, refactor_branch): """Print a brief summary of the splitting so the user can review it before uploading. Args: files_split_by_reviewers: A dictionary mapping reviewer tuples to the files and directories assigned to them. """ for info in cl_infos: Emit(f'Reviewers: {info.reviewers}, files: {len(info.files)}, ' f'description: {info.description}') num_cls = len(cl_infos) Emit(f'\nWill split branch {refactor_branch} into {num_cls} CLs. ' 'Please quickly review them before proceeding.\n') if (num_cls > CL_SPLIT_FORCE_LIMIT): EmitWarning( 'Uploading this many CLs may potentially ' 'reach the limit of concurrent runs, imposed on you by the ' 'build infrastructure. Your runs may be throttled as a ' 'result.\n\nPlease email infra-dev@chromium.org if you ' 'have any questions. ' 'The infra team reserves the right to cancel ' 'your jobs if they are overloading the CQ.\n\n' '(Alternatively, you can reduce the number of CLs created by ' 'using the --max-depth option. Pass --dry-run to examine the ' 'CLs which will be created until you are happy with the ' 'results.)') def SplitCl(description_file, comment_file, changelist, cmd_upload, dry_run, summarize, reviewers_override, cq_dry_run, enable_auto_submit, max_depth, topic, from_file, repository_root): """"Splits a branch into smaller branches and uploads CLs. Args: description_file: File containing the description of uploaded CLs. comment_file: File containing the comment of uploaded CLs. changelist: The Changelist class. cmd_upload: The function associated with the git cl upload command. dry_run: Whether this is a dry run (no branches or CLs created). reviewers_override: Either None or a (possibly empty) list of reviewers all CLs should be sent to. cq_dry_run: If CL uploads should also do a cq dry run. enable_auto_submit: If CL uploads should also enable auto submit. max_depth: The maximum directory depth to search for OWNERS files. A value less than 1 means no limit. topic: Topic to associate with split CLs. Returns: 0 in case of success. 1 in case of error. """ description = LoadDescription(description_file, dry_run) description = AddUploadedByGitClSplitToDescription(description) comment = gclient_utils.FileRead(comment_file) if comment_file else None EnsureInGitRepository() cl = changelist() upstream = cl.GetCommonAncestorWithUpstream() files = [(action.strip(), f) for action, f in scm.GIT.CaptureStatus(repository_root, upstream)] if not files: Emit('Cannot split an empty CL.') return 1 author = git.run('config', 'user.email').strip() or None refactor_branch = git.current_branch() assert refactor_branch, "Can't run from detached branch." refactor_branch_upstream = git.upstream(refactor_branch) assert refactor_branch_upstream, \ "Branch %s must have an upstream." % refactor_branch if not dry_run and not CheckDescriptionBugLink(description): return 0 if from_file: cl_infos = LoadSplittingFromFile(from_file, files_on_disk=files) else: files_split_by_reviewers = SelectReviewersForFiles( cl, author, files, max_depth) cl_infos = CLInfoFromFilesAndOwnersDirectoriesDict( files_split_by_reviewers) # Note that we do this override even if the list is empty (indicating that # the user requested CLs not be assigned to any reviewers). if reviewers_override != None: for info in cl_infos: info.reviewers = set(reviewers_override) if not dry_run: PrintSummary(cl_infos, refactor_branch) answer = gclient_utils.AskForData( 'Proceed? (y/N, or i to edit interactively): ') if answer.lower() == 'i': cl_infos, saved_splitting_file = EditSplittingInteractively( cl_infos, files_on_disk=files) else: # Save even if we're continuing, so the user can safely resume an # aborted upload with the same splitting saved_splitting_file = SaveSplittingToTempFile(cl_infos) if answer.lower() != 'y': return 0 # Make sure there isn't any clutter left over from a previous run if not ValidateExistingBranches(refactor_branch, cl_infos): return 0 elif summarize: PrintSummary(cl_infos, refactor_branch) cls_per_reviewer = collections.defaultdict(int) for cl_index, cl_info in enumerate(cl_infos, 1): if dry_run and summarize: pass elif dry_run: file_paths = [f for _, f in cl_info.files] PrintClInfo(cl_index, len(cl_infos), cl_info.description, file_paths, description, cl_info.reviewers, cq_dry_run, enable_auto_submit, topic) else: UploadCl(refactor_branch, refactor_branch_upstream, cl_info.description, cl_info.files, description, saved_splitting_file, comment, cl_info.reviewers, changelist, cmd_upload, cq_dry_run, enable_auto_submit, topic, repository_root) for reviewer in cl_info.reviewers: cls_per_reviewer[reviewer] += 1 # List the top reviewers that will be sent the most CLs as a result of # the split. reviewer_rankings = sorted(cls_per_reviewer.items(), key=lambda item: item[1], reverse=True) Emit('The top reviewers are:') for reviewer, count in reviewer_rankings[:CL_SPLIT_TOP_REVIEWERS]: Emit(f' {reviewer}: {count} CLs') if dry_run: # Wait until now to save the splitting so the file name doesn't get # washed away by the flood of dry-run printing. SaveSplittingToTempFile(cl_infos) # Go back to the original branch. git.run('checkout', refactor_branch) return 0 def CheckDescriptionBugLink(description): """Verifies that the description contains a bug link. Examples: Bug: 123 Bug: chromium:456 Prompts user if the description does not contain a bug link. """ bug_pattern = re.compile(r"^Bug:\s*(?:[a-zA-Z]+:)?[0-9]+", re.MULTILINE) matches = re.findall(bug_pattern, description) answer = 'y' if not matches: answer = gclient_utils.AskForData( 'Description does not include a bug link. Proceed? (y/N):') return answer.lower() == 'y' def SelectReviewersForFiles(cl, author, files, max_depth): """Selects reviewers for passed-in files Args: cl: Changelist class instance author: Email of person running 'git cl split' files: List of files max_depth: The maximum directory depth to search for OWNERS files. A value less than 1 means no limit. """ info_split_by_owners = GetFilesSplitByOwners(files, max_depth) info_split_by_reviewers = {} for (directory, split_files) in info_split_by_owners.items(): # Use '/' as a path separator in the branch name and the CL description # and comment. directory = directory.replace(os.path.sep, '/') file_paths = [f for _, f in split_files] # Convert reviewers list to tuple in order to use reviewers as key to # dictionary. reviewers = tuple( cl.owners_client.SuggestOwners( file_paths, exclude=[author, cl.owners_client.EVERYONE])) if not reviewers in info_split_by_reviewers: info_split_by_reviewers[reviewers] = FilesAndOwnersDirectory([], []) info_split_by_reviewers[reviewers].files.extend(split_files) info_split_by_reviewers[reviewers].owners_directories.append(directory) return info_split_by_reviewers def SaveSplittingToFile(cl_infos: List[CLInfo], filename: str, silent=False): """ Writes the listed CLs to the designated file, in a human-readable and editable format. Include an explanation of the file format at the top, as well as instructions for how to use it. """ preamble = ( "# CLs in this file must have the following format:\n" "# A 'Reviewers: [...]' line, where '...' is a (possibly empty) list " "of reviewer emails.\n" "# A 'Description: ...' line, where '...' is any string (by default, " "the list of directories the files have been pulled from).\n" "# One or more file lines, consisting of an , pair, in " "the format output by `git status`.\n\n" "# Each 'Reviewers' line begins a new CL.\n" "# To use the splitting in this file, use the --from-file option.\n\n") cl_string = "\n\n".join([info.FormatForPrinting() for info in cl_infos]) gclient_utils.FileWrite(filename, preamble + cl_string) if not silent: Emit(f"Saved splitting to {filename}") def SaveSplittingToTempFile(cl_infos: List[CLInfo], silent=False): """ Create a file in the user's temp directory, and save the splitting there. """ # We can't use gclient_utils.temporary_file because it will be removed temp_file, temp_name = tempfile.mkstemp(prefix="split_cl_") os.close(temp_file) # Necessary for windows SaveSplittingToFile(cl_infos, temp_name, silent) return temp_name class ClSplitParseError(Exception): pass # Matches 'Reviewers: [...]', extracts the ... reviewers_re = re.compile(r'Reviewers:\s*\[([^\]]*)\]') # Matches 'Description: ...', extracts the ... description_re = re.compile(r'Description:\s*(.+)') # Matches ', ', and extracts both # must be a valid code (either 1 or 2 letters) file_re = re.compile(r'([MTADRC]{1,2}),\s*(.+)') # We use regex parsing instead of e.g. json because it lets us use a much more # human-readable format, similar to the summary printed in dry runs def ParseSplittings(lines: List[str]) -> List[CLInfo]: """ Parse a splitting file. We expect to get a series of lines in the format of CLInfo.FormatForPrinting. In the following order, we expect to see - A 'Reviewers: ' line containing a list, - A 'Description: ' line containing anything, and - A list of , pairs, each on its own line Note that this function only transforms the file into a list of CLInfo (if possible). It does not validate the information; for that, see ValidateSplitting. """ cl_infos = [] current_cl_info = None for line in lines: line = line.strip() # Skip empty or commented lines if not line or line.startswith('#'): continue # Start a new CL whenever we see a new Reviewers: line m = re.fullmatch(reviewers_re, line) if m: reviewers_str = m.group(1) reviewers = [r.strip() for r in reviewers_str.split(",")] # Account for empty list or trailing comma if not reviewers[-1]: reviewers = reviewers[:-1] if current_cl_info: cl_infos.append(current_cl_info) current_cl_info = CLInfo(reviewers=reviewers) continue if not current_cl_info: # Make sure no nonempty lines appear before the first CL raise ClSplitParseError( f"Error: Line appears before the first 'Reviewers: ' line:\n{line}" ) # Description is just used as a description, so any string is fine m = re.fullmatch(description_re, line) if m: if current_cl_info.description: raise ClSplitParseError( f"Error parsing line: CL already has a description entry\n{line}" ) current_cl_info.description = m.group(1).strip() continue # Any other line is presumed to be an ', ' pair m = re.fullmatch(file_re, line) if m: action, path = m.groups() current_cl_info.files.append((action, path)) continue raise ClSplitParseError("Error parsing line: Does not look like\n" "'Reviewers: [...]',\n" "'Description: ...', or\n" f"a pair of ', ':\n{line}") if (current_cl_info): cl_infos.append(current_cl_info) return cl_infos def ValidateSplitting(cl_infos: List[CLInfo], filename: str, files_on_disk: List[Tuple[str, str]]): """ Ensure that the provided list of CLs is a valid splitting. Specifically, check that: - Each file is in at most one CL - Each file and action appear in the list of changed files reported by git - Warn if some files don't appear in any CL - Warn if a reviewer string looks wrong, or if a CL is empty """ # Validate the parsed information if not cl_infos: EmitWarning("No CLs listed in file. No action will be taken.") return [] files_in_loaded_cls = set() # Collect all files, ensuring no duplicates # Warn on empty CLs or invalid reviewer strings for info in cl_infos: if not info.files: EmitWarning("CL has no files, and will be skipped:\n", info.FormatForPrinting()) for file_info in info.files: if file_info in files_in_loaded_cls: raise ClSplitParseError( f"File appears in multiple CLs in {filename}:\n{file_info}") files_in_loaded_cls.add(file_info) for reviewer in info.reviewers: if not (re.fullmatch(r"[^@]+@[^.]+\..+", reviewer)): EmitWarning("reviewer does not look like an email address: ", reviewer) # Strip empty CLs cl_infos = [info for info in cl_infos if info.files] # Ensure the files in the user-provided CL splitting match the files # that git reports. # Warn if not all the files git reports appear. # Fail if the user mentions a file that isn't reported by git files_on_disk = set(files_on_disk) if not files_in_loaded_cls.issubset(files_on_disk): extra_files = files_in_loaded_cls.difference(files_on_disk) extra_files_str = "\n".join(f"{action}, {file}" for (action, file) in extra_files) raise ClSplitParseError( f"Some files are listed in {filename} but do not match any files " f"listed by git:\n{extra_files_str}") unmentioned_files = files_on_disk.difference(files_in_loaded_cls) if (unmentioned_files): EmitWarning( "the following files are not included in any CL in {filename}. " "They will not be uploaded:\n", unmentioned_files) def LoadSplittingFromFile(filename: str, files_on_disk: List[Tuple[str, str]]) -> List[CLInfo]: """ Given a file and the list of , pairs reported by git, read the file and return the list of CLInfos it contains. """ lines = gclient_utils.FileRead(filename).splitlines() cl_infos = ParseSplittings(lines) ValidateSplitting(cl_infos, filename, files_on_disk) return cl_infos def EditSplittingInteractively( cl_infos: List[CLInfo], files_on_disk: List[Tuple[str, str]]) -> List[CLInfo]: """ Allow the user to edit the generated splitting using their default editor. Make sure the edited splitting is saved so they can retrieve it if needed. """ tmp_file = SaveSplittingToTempFile(cl_infos, silent=True) splitting = gclient_utils.RunEditor(gclient_utils.FileRead(tmp_file), False) cl_infos = ParseSplittings(splitting.splitlines()) # Save the edited splitting before validation, so the user can go back # and edit it if there are any typos SaveSplittingToFile(cl_infos, tmp_file) ValidateSplitting(cl_infos, "the provided splitting", files_on_disk) return cl_infos, tmp_file