You cannot select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
depot_tools/roll_downstream_gcs_deps.py

223 lines
7.6 KiB
Python

#!/usr/bin/env python3
# Copyright 2024 The Chromium Authors. All rights reserved.
# Use of this source code is governed by a BSD-style license that can be
# found in the LICENSE file.
"""This scripts copies DEPS package information from one source onto
destination.
If the destination doesn't have packages, the script errors out.
Example usage:
roll_downstream_gcs_deps.py \
--source some/repo/DEPS \
--destination some/downstream/repo/DEPS \
--package src/build/linux/debian_bullseye_amd64-sysroot \
--package src/build/linux/debian_bullseye_arm64-sysroot
"""
import argparse
import ast
import sys
from typing import Dict, List
def _get_deps(deps_ast: ast.Module) -> Dict[str, ast.Dict]:
"""Searches for the deps dict in a DEPS file AST.
Args:
deps_ast: AST of the DEPS file.
Raises:
Exception: If the deps dict is not found.
Returns:
The deps dict.
"""
for statement in deps_ast.body:
if not isinstance(statement, ast.Assign):
continue
if len(statement.targets) != 1:
continue
target = statement.targets[0]
if not isinstance(target, ast.Name):
continue
if target.id != 'deps':
continue
if not isinstance(statement.value, ast.Dict):
continue
deps = {}
for key, value in zip(statement.value.keys, statement.value.values):
if not isinstance(key, ast.Constant):
continue
deps[key.value] = value
return deps
raise Exception('no deps found')
def _get_gcs_object_list_ast(package_ast: ast.Dict) -> ast.List:
"""Searches for the objects list in a GCS package AST.
Args:
package_ast: AST of the GCS package.
Raises:
Exception: If the package is not a GCS package.
Returns:
AST of the objects list.
"""
is_gcs = False
result = None
for key, value in zip(package_ast.keys, package_ast.values):
if not isinstance(key, ast.Constant):
continue
if key.value == 'dep_type' and isinstance(
value, ast.Constant) and value.value == 'gcs':
is_gcs = True
if key.value == 'objects' and isinstance(value, ast.List):
result = value
assert is_gcs, 'Not a GCS dependency!'
assert result, 'No objects found!'
return result
def _replace_ast(destination: str, dest_ast: ast.Module, source: str,
source_ast: ast.Module) -> str:
"""Replaces the content of dest_ast with the content of the
same package in source_ast.
Args:
destination: Destination DEPS file content.
dest_ast: AST in the destination DEPS file that will be replaced.
source: Source DEPS file content.
source_ast: AST in the source DEPS file that will replace content of
destination.
Returns:
Content of destination DEPS file with replaced content.
"""
source_lines = source.splitlines()
lines = destination.splitlines()
# Copy all lines before the replaced AST.
result = '\n'.join(lines[:dest_ast.lineno - 1]) + '\n'
# Partially copy the line content before AST's value.
result += lines[dest_ast.lineno - 1][:dest_ast.col_offset]
# Copy data from source AST.
if source_ast.lineno == source_ast.end_lineno:
# Starts and ends on the same line.
result += source_lines[
source_ast.lineno -
1][source_ast.col_offset:source_ast.end_col_offset]
else:
# Copy multiline content from source. The first line and the last line
# of source AST should be partially copied as `result` has a partial
# line from `destination`.
# Partially copy the first line of source AST.
result += source_lines[source_ast.lineno -
1][source_ast.col_offset:] + '\n'
# Copy content in the middle.
result += '\n'.join(
source_lines[source_ast.lineno:source_ast.end_lineno - 1]) + '\n'
# Partially copy the last line of source AST.
result += source_lines[source_ast.end_lineno -
1][:source_ast.end_col_offset]
# Copy the rest of the line after the package value.
result += lines[dest_ast.end_lineno - 1][dest_ast.end_col_offset:] + '\n'
# Copy the rest of the lines after the package value.
result += '\n'.join(lines[dest_ast.end_lineno:])
# Add trailing newline
if destination.endswith('\n'):
result += '\n'
return result
def copy_packages(source_content: str, destination_content: str,
source_packages: List[str],
destination_packages: List[str]) -> str:
"""Copies GCS packages from source to destination.
Args:
source: Source DEPS file content.
destination: Destination DEPS file content.
packages: List of GCS packages to copy. Only objects are copied.
Returns:
Destination DEPS file content with packages copied.
"""
source_deps = _get_deps(ast.parse(source_content, mode='exec'))
for i in range(len(source_packages)):
source_package = source_packages[i]
destination_package = destination_packages[i]
if source_package not in source_deps:
raise Exception('Package %s not found in source' % source_package)
dest_deps = _get_deps(ast.parse(destination_content, mode='exec'))
if destination_package not in dest_deps:
raise Exception('Package %s not found in destination' %
destination_package)
destination_content = _replace_ast(
destination_content,
_get_gcs_object_list_ast(dest_deps[destination_package]),
source_content,
_get_gcs_object_list_ast(source_deps[source_package]))
return destination_content
def main():
parser = argparse.ArgumentParser(description=__doc__)
parser.add_argument('--source-deps',
required=True,
help='Source DEPS file where content will be copied '
'from')
parser.add_argument('--source-package',
action='append',
required=True,
help='List of DEPS packages to update')
parser.add_argument('--destination-deps',
required=True,
help='Destination DEPS file, where content will be '
'saved')
parser.add_argument('--destination-package',
action='append',
required=True,
help='List of DEPS packages to update')
args = parser.parse_args()
if not args.source_package:
parser.error('No source packages specified to roll, aborting...')
if not args.destination_package:
parser.error('No destination packages specified to roll, aborting...')
if len(args.destination_package) != len(args.source_package):
parser.error('Source and destination packages must be of the same '
'length, aborting...')
with open(args.source_deps) as f:
source_content = f.read()
with open(args.destination_deps) as f:
destination_content = f.read()
new_content = copy_packages(source_content, destination_content,
args.source_package, args.destination_package)
with open(args.destination_deps, 'w') as f:
f.write(new_content)
print('Run:')
print(' Destination DEPS file updated. You still need to create and '
'upload a change.')
return 0
if __name__ == '__main__':
sys.exit(main())