|
|
|
#!/usr/bin/env python3
|
|
|
|
# Copyright 2024 The Chromium Authors. All rights reserved.
|
|
|
|
# Use of this source code is governed by a BSD-style license that can be
|
|
|
|
# found in the LICENSE file.
|
|
|
|
"""This scripts copies DEPS package information from one source onto
|
|
|
|
destination.
|
|
|
|
|
|
|
|
If the destination doesn't have packages, the script errors out.
|
|
|
|
|
|
|
|
Example usage:
|
|
|
|
roll_downstream_gcs_deps.py \
|
|
|
|
--source some/repo/DEPS \
|
|
|
|
--destination some/downstream/repo/DEPS \
|
|
|
|
--package src/build/linux/debian_bullseye_amd64-sysroot \
|
|
|
|
--package src/build/linux/debian_bullseye_arm64-sysroot
|
|
|
|
|
|
|
|
"""
|
|
|
|
|
|
|
|
import argparse
|
|
|
|
import ast
|
|
|
|
import sys
|
|
|
|
from typing import Dict, List
|
|
|
|
|
|
|
|
|
|
|
|
def _get_deps(deps_ast: ast.Module) -> Dict[str, ast.Dict]:
|
|
|
|
"""Searches for the deps dict in a DEPS file AST.
|
|
|
|
|
|
|
|
Args:
|
|
|
|
deps_ast: AST of the DEPS file.
|
|
|
|
|
|
|
|
Raises:
|
|
|
|
Exception: If the deps dict is not found.
|
|
|
|
|
|
|
|
Returns:
|
|
|
|
The deps dict.
|
|
|
|
"""
|
|
|
|
for statement in deps_ast.body:
|
|
|
|
if not isinstance(statement, ast.Assign):
|
|
|
|
continue
|
|
|
|
if len(statement.targets) != 1:
|
|
|
|
continue
|
|
|
|
target = statement.targets[0]
|
|
|
|
if not isinstance(target, ast.Name):
|
|
|
|
continue
|
|
|
|
if target.id != 'deps':
|
|
|
|
continue
|
|
|
|
if not isinstance(statement.value, ast.Dict):
|
|
|
|
continue
|
|
|
|
deps = {}
|
|
|
|
for key, value in zip(statement.value.keys, statement.value.values):
|
|
|
|
if not isinstance(key, ast.Constant):
|
|
|
|
continue
|
|
|
|
deps[key.value] = value
|
|
|
|
return deps
|
|
|
|
raise Exception('no deps found')
|
|
|
|
|
|
|
|
|
|
|
|
def _get_gcs_object_list_ast(package_ast: ast.Dict) -> ast.List:
|
|
|
|
"""Searches for the objects list in a GCS package AST.
|
|
|
|
|
|
|
|
Args:
|
|
|
|
package_ast: AST of the GCS package.
|
|
|
|
|
|
|
|
Raises:
|
|
|
|
Exception: If the package is not a GCS package.
|
|
|
|
|
|
|
|
Returns:
|
|
|
|
AST of the objects list.
|
|
|
|
"""
|
|
|
|
is_gcs = False
|
|
|
|
result = None
|
|
|
|
for key, value in zip(package_ast.keys, package_ast.values):
|
|
|
|
if not isinstance(key, ast.Constant):
|
|
|
|
continue
|
|
|
|
if key.value == 'dep_type' and isinstance(
|
|
|
|
value, ast.Constant) and value.value == 'gcs':
|
|
|
|
is_gcs = True
|
|
|
|
if key.value == 'objects' and isinstance(value, ast.List):
|
|
|
|
result = value
|
|
|
|
|
|
|
|
assert is_gcs, 'Not a GCS dependency!'
|
|
|
|
assert result, 'No objects found!'
|
|
|
|
return result
|
|
|
|
|
|
|
|
|
|
|
|
def _replace_ast(destination: str, dest_ast: ast.Module, source: str,
|
|
|
|
source_ast: ast.Module) -> str:
|
|
|
|
"""Replaces the content of dest_ast with the content of the
|
|
|
|
same package in source_ast.
|
|
|
|
|
|
|
|
Args:
|
|
|
|
destination: Destination DEPS file content.
|
|
|
|
dest_ast: AST in the destination DEPS file that will be replaced.
|
|
|
|
source: Source DEPS file content.
|
|
|
|
source_ast: AST in the source DEPS file that will replace content of
|
|
|
|
destination.
|
|
|
|
|
|
|
|
Returns:
|
|
|
|
Content of destination DEPS file with replaced content.
|
|
|
|
"""
|
|
|
|
source_lines = source.splitlines()
|
|
|
|
lines = destination.splitlines()
|
|
|
|
# Copy all lines before the replaced AST.
|
|
|
|
result = '\n'.join(lines[:dest_ast.lineno - 1]) + '\n'
|
|
|
|
|
|
|
|
# Partially copy the line content before AST's value.
|
|
|
|
result += lines[dest_ast.lineno - 1][:dest_ast.col_offset]
|
|
|
|
|
|
|
|
# Copy data from source AST.
|
|
|
|
if source_ast.lineno == source_ast.end_lineno:
|
|
|
|
# Starts and ends on the same line.
|
|
|
|
result += source_lines[
|
|
|
|
source_ast.lineno -
|
|
|
|
1][source_ast.col_offset:source_ast.end_col_offset]
|
|
|
|
else:
|
|
|
|
# Copy multiline content from source. The first line and the last line
|
|
|
|
# of source AST should be partially copied as `result` has a partial
|
|
|
|
# line from `destination`.
|
|
|
|
|
|
|
|
# Partially copy the first line of source AST.
|
|
|
|
result += source_lines[source_ast.lineno -
|
|
|
|
1][source_ast.col_offset:] + '\n'
|
|
|
|
# Copy content in the middle.
|
|
|
|
result += '\n'.join(
|
|
|
|
source_lines[source_ast.lineno:source_ast.end_lineno - 1]) + '\n'
|
|
|
|
# Partially copy the last line of source AST.
|
|
|
|
result += source_lines[source_ast.end_lineno -
|
|
|
|
1][:source_ast.end_col_offset]
|
|
|
|
|
|
|
|
# Copy the rest of the line after the package value.
|
|
|
|
result += lines[dest_ast.end_lineno - 1][dest_ast.end_col_offset:] + '\n'
|
|
|
|
|
|
|
|
# Copy the rest of the lines after the package value.
|
|
|
|
result += '\n'.join(lines[dest_ast.end_lineno:])
|
|
|
|
# Add trailing newline
|
|
|
|
if destination.endswith('\n'):
|
|
|
|
result += '\n'
|
|
|
|
return result
|
|
|
|
|
|
|
|
|
|
|
|
def copy_packages(source_content: str, destination_content: str,
|
|
|
|
source_packages: List[str],
|
|
|
|
destination_packages: List[str]) -> str:
|
|
|
|
"""Copies GCS packages from source to destination.
|
|
|
|
|
|
|
|
Args:
|
|
|
|
source: Source DEPS file content.
|
|
|
|
destination: Destination DEPS file content.
|
|
|
|
packages: List of GCS packages to copy. Only objects are copied.
|
|
|
|
|
|
|
|
Returns:
|
|
|
|
Destination DEPS file content with packages copied.
|
|
|
|
"""
|
|
|
|
source_deps = _get_deps(ast.parse(source_content, mode='exec'))
|
|
|
|
for i in range(len(source_packages)):
|
|
|
|
source_package = source_packages[i]
|
|
|
|
destination_package = destination_packages[i]
|
|
|
|
if source_package not in source_deps:
|
|
|
|
raise Exception('Package %s not found in source' % source_package)
|
|
|
|
dest_deps = _get_deps(ast.parse(destination_content, mode='exec'))
|
|
|
|
if destination_package not in dest_deps:
|
|
|
|
raise Exception('Package %s not found in destination' %
|
|
|
|
destination_package)
|
|
|
|
destination_content = _replace_ast(
|
|
|
|
destination_content,
|
|
|
|
_get_gcs_object_list_ast(dest_deps[destination_package]),
|
|
|
|
source_content,
|
|
|
|
_get_gcs_object_list_ast(source_deps[source_package]))
|
|
|
|
|
|
|
|
return destination_content
|
|
|
|
|
|
|
|
|
|
|
|
def main():
|
|
|
|
parser = argparse.ArgumentParser(description=__doc__)
|
|
|
|
parser.add_argument('--source-deps',
|
|
|
|
required=True,
|
|
|
|
help='Source DEPS file where content will be copied '
|
|
|
|
'from')
|
|
|
|
parser.add_argument('--source-package',
|
|
|
|
action='append',
|
|
|
|
required=True,
|
|
|
|
help='List of DEPS packages to update')
|
|
|
|
parser.add_argument('--destination-deps',
|
|
|
|
required=True,
|
|
|
|
help='Destination DEPS file, where content will be '
|
|
|
|
'saved')
|
|
|
|
parser.add_argument('--destination-package',
|
|
|
|
action='append',
|
|
|
|
required=True,
|
|
|
|
help='List of DEPS packages to update')
|
|
|
|
args = parser.parse_args()
|
|
|
|
|
|
|
|
if not args.source_package:
|
|
|
|
parser.error('No source packages specified to roll, aborting...')
|
|
|
|
|
|
|
|
if not args.destination_package:
|
|
|
|
parser.error('No destination packages specified to roll, aborting...')
|
|
|
|
|
|
|
|
if len(args.destination_package) != len(args.source_package):
|
|
|
|
parser.error('Source and destination packages must be of the same '
|
|
|
|
'length, aborting...')
|
|
|
|
|
|
|
|
with open(args.source_deps) as f:
|
|
|
|
source_content = f.read()
|
|
|
|
|
|
|
|
with open(args.destination_deps) as f:
|
|
|
|
destination_content = f.read()
|
|
|
|
|
|
|
|
new_content = copy_packages(source_content, destination_content,
|
|
|
|
args.source_package, args.destination_package)
|
|
|
|
|
|
|
|
with open(args.destination_deps, 'w') as f:
|
|
|
|
f.write(new_content)
|
|
|
|
|
|
|
|
print('Run:')
|
|
|
|
print(' Destination DEPS file updated. You still need to create and '
|
|
|
|
'upload a change.')
|
|
|
|
return 0
|
|
|
|
|
|
|
|
|
|
|
|
if __name__ == '__main__':
|
|
|
|
sys.exit(main())
|