#!/usr/bin/env python3 # Copyright (c) 2018 The Chromium Authors. All rights reserved. # Use of this source code is governed by a BSD-style license that can be # found in the LICENSE file. from string import Template import argparse import io import json import os import re import shutil import sys ARCH_VAR = 'arch' OS_VAR = 'os' PLATFORM_VAR = 'platform' CIPD_SUBDIR_RE = '@Subdir (.*)' CIPD_DESCRIBE = 'describe' CIPD_ENSURE = 'ensure' CIPD_ENSURE_FILE_RESOLVE = 'ensure-file-resolve' CIPD_EXPAND_PKG = 'expand-package-name' CIPD_EXPORT = 'export' DESCRIBE_STDOUT_TEMPLATE = """\ Package: ${package} Instance ID: ${package}-fake-instance-id Registered by: user:fake-testing-support Registered at: 2023-05-10 18:53:55.078574 +0000 UTC Refs: ${package}-latest Tags: git_revision:${package}-fake-git-revision ${package}-fake-tag:1.0 ${package}-fake-tag:2.0 """ DESCRIBE_JSON_TEMPLATE = """{ "result": { "pin": { "package": "${package}", "instance_id": "${package}-fake-instance-id" }, "registered_by": "user:fake-testing-support", "registered_ts": 1683744835, "refs": [ { "ref": "${package}-latest", "instance_id": "${package}-fake-instance-id", "modified_by": "user:fake-testing-support", "modified_ts": 1683744835 } ], "tags": [ { "tag": "git_revision:${package}-fake-git-revision", "registered_by": "user:fake-testing-support", "registered_ts": 1683744835 }, { "tag": "${package}-fake-tag:1.0", "registered_by": "user:fake-testing-support", "registered_ts": 1683744835 }, { "tag": "${package}-fake-tag:2.0", "registered_by": "user:fake-testing-support", "registered_ts": 1683744835 } ] } }""" def parse_cipd(root, contents): tree = {} current_subdir = None for line in contents: line = line.strip() match = re.match(CIPD_SUBDIR_RE, line) if match: print('match') current_subdir = os.path.join(root, *match.group(1).split('/')) if not root: current_subdir = match.group(1) elif line and current_subdir: print('no match') tree.setdefault(current_subdir, []).append(line) return tree def expand_package_name_cmd(package_name): package_split = package_name.split("/") suffix = package_split[-1] # Any use of var equality should return empty for testing. if "=" in suffix: if suffix != "${platform=fake-platform-ok}": return "" package_name = "/".join(package_split[:-1] + ["${platform}"]) for v in [ARCH_VAR, OS_VAR, PLATFORM_VAR]: var = "${%s}" % v if package_name.endswith(var): package_name = package_name.replace(var, "%s-expanded-test-only" % v) return package_name def ensure_file_resolve(): resolved = {"result": {}} parser = argparse.ArgumentParser() parser.add_argument('-ensure-file', required=True) parser.add_argument('-json-output') args, _ = parser.parse_known_args() with io.open(args.ensure_file, 'r', encoding='utf-8') as f: new_content = parse_cipd("", f.readlines()) for path, packages in new_content.items(): resolved_packages = [] for package in packages: package_name = expand_package_name_cmd(package.split(" ")[0]) resolved_packages.append({ "package": package_name, "pin": { "package": package_name, "instance_id": package_name + "-fake-resolved-id", } }) resolved["result"][path] = resolved_packages with io.open(args.json_output, 'w', encoding='utf-8') as f: f.write(json.dumps(resolved, indent=4)) def describe_cmd(package_name): parser = argparse.ArgumentParser() parser.add_argument('-json-output') parser.add_argument('-version', required=True) args, _ = parser.parse_known_args() json_template = Template(DESCRIBE_JSON_TEMPLATE).substitute( package=package_name) cli_out = Template(DESCRIBE_STDOUT_TEMPLATE).substitute( package=package_name) json_out = json.loads(json_template) found = False for tag in json_out['result']['tags']: if tag['tag'] == args.version: found = True break for tag in json_out['result']['refs']: if tag['ref'] == args.version: found = True break if found: if args.json_output: with io.open(args.json_output, 'w', encoding='utf-8') as f: f.write(json.dumps(json_out, indent=4)) sys.stdout.write(cli_out) return 0 sys.stdout.write('Error: no such ref.\n') return 1 def main(): cmd = sys.argv[1] assert cmd in [ CIPD_DESCRIBE, CIPD_ENSURE, CIPD_ENSURE_FILE_RESOLVE, CIPD_EXPAND_PKG, CIPD_EXPORT ] # Handle cipd expand-package-name if cmd == CIPD_EXPAND_PKG: # Expecting argument after cmd assert len(sys.argv) == 3 # Write result to stdout sys.stdout.write(expand_package_name_cmd(sys.argv[2])) return 0 if cmd == CIPD_DESCRIBE: # Expecting argument after cmd assert len(sys.argv) >= 3 return describe_cmd(sys.argv[2]) if cmd == CIPD_ENSURE_FILE_RESOLVE: return ensure_file_resolve() parser = argparse.ArgumentParser() parser.add_argument('-ensure-file') parser.add_argument('-root') args, _ = parser.parse_known_args() with io.open(args.ensure_file, 'r', encoding='utf-8') as f: new_content = parse_cipd(args.root, f.readlines()) # Install new packages for path, packages in new_content.items(): if not os.path.exists(path): os.makedirs(path) with io.open(os.path.join(path, '_cipd'), 'w', encoding='utf-8') as f: f.write('\n'.join(packages)) # Save the ensure file that we got shutil.copy(args.ensure_file, os.path.join(args.root, '_cipd')) return 0 if __name__ == '__main__': sys.exit(main())