[ssci] PEP8 formatting for metadata directory

All files in metadata/ are new, so they should follow the PEP-8 style.

Change-Id: I5d8424536c3d7b703e6b8087e0e2d70c06a1549c
Reviewed-on: https://chromium-review.googlesource.com/c/chromium/tools/depot_tools/+/4834909
Reviewed-by: Rachael Newitt <renewitt@google.com>
Commit-Queue: Rachael Newitt <renewitt@google.com>
changes/09/4834909/2
Anne Redulla 2 years ago committed by LUCI CQ
parent 693e0b3121
commit 6715758ed9

@ -0,0 +1,2 @@
[style]
based_on_style = pep8

@ -7,7 +7,7 @@ PRESUBMIT_VERSION = '2.0.0'
def CheckPythonUnitTests(input_api, output_api):
tests = input_api.canned_checks.GetUnitTestsInDirectory(
input_api, output_api, "tests", files_to_check=[r'.+_test\.py$'])
tests = input_api.canned_checks.GetUnitTestsInDirectory(
input_api, output_api, "tests", files_to_check=[r'.+_test\.py$'])
return input_api.RunTests(tests)
return input_api.RunTests(tests)

@ -16,140 +16,146 @@ _ROOT_DIR = os.path.abspath(os.path.join(_THIS_DIR, ".."))
sys.path.insert(0, _ROOT_DIR)
import metadata.fields.field_types as field_types
import metadata.fields.custom.license
import metadata.fields.custom.version
import metadata.fields.custom.license as license_util
import metadata.fields.custom.version as version_util
import metadata.fields.known as known_fields
import metadata.fields.util as util
import metadata.validation_result as vr
class DependencyMetadata:
"""The metadata for a single dependency."""
# Fields that are always required.
_MANDATORY_FIELDS = {
known_fields.NAME,
known_fields.URL,
known_fields.VERSION,
known_fields.LICENSE,
known_fields.SECURITY_CRITICAL,
known_fields.SHIPPED,
}
def __init__(self):
# The record of all entries added, including repeated fields.
self._entries: List[Tuple[str, str]] = []
# The current value of each field.
self._metadata: Dict[field_types.MetadataField, str] = {}
# The record of how many times a field entry was added.
self._occurrences: Dict[field_types.MetadataField, int] = defaultdict(int)
def add_entry(self, field_name: str, field_value: str):
value = field_value.strip()
self._entries.append((field_name, value))
field = known_fields.get_field(field_name)
if field:
self._metadata[field] = value
self._occurrences[field] += 1
def has_entries(self) -> bool:
return len(self._entries) > 0
def get_entries(self) -> List[Tuple[str, str]]:
return list(self._entries)
def _assess_required_fields(self) -> Set[field_types.MetadataField]:
"""Returns the set of required fields, based on the current metadata."""
required = set(self._MANDATORY_FIELDS)
# The date and revision are required if the version has not been specified.
version_value = self._metadata.get(known_fields.VERSION)
if (version_value is None
or metadata.fields.custom.version.is_unknown(version_value)):
required.add(known_fields.DATE)
required.add(known_fields.REVISION)
# Assume the dependency is shipped if not specified.
shipped_value = self._metadata.get(known_fields.SHIPPED)
is_shipped = (shipped_value is None
or util.infer_as_boolean(shipped_value, default=True))
if is_shipped:
# A license file is required if the dependency is shipped.
required.add(known_fields.LICENSE_FILE)
# License compatibility with Android must be set if the package is shipped
# and the license is not in the allowlist.
has_allowlisted = False
license_value = self._metadata.get(known_fields.LICENSE)
if license_value:
licenses = metadata.fields.custom.license.process_license_value(
license_value,
atomic_delimiter=known_fields.LICENSE.VALUE_DELIMITER)
for _, allowed in licenses:
if allowed:
has_allowlisted = True
break
if not has_allowlisted:
required.add(known_fields.LICENSE_ANDROID_COMPATIBLE)
return required
def validate(self, source_file_dir: str,
repo_root_dir: str) -> List[vr.ValidationResult]:
"""Validates all the metadata.
Args:
source_file_dir: the directory of the file that the metadata is from.
repo_root_dir: the repository's root directory.
Returns: the metadata's validation results.
"""
results = []
# Check for duplicate fields.
repeated_field_info = [
f"{field.get_name()} ({count})"
for field, count in self._occurrences.items() if count > 1
]
if repeated_field_info:
repeated = ", ".join(repeated_field_info)
error = vr.ValidationError(reason="There is a repeated field.",
additional=[
f"Repeated fields: {repeated}",
])
results.append(error)
# Check required fields are present.
required_fields = self._assess_required_fields()
for field in required_fields:
if field not in self._metadata:
field_name = field.get_name()
error = vr.ValidationError(
reason=f"Required field '{field_name}' is missing.")
results.append(error)
# Validate values for all present fields.
for field, value in self._metadata.items():
field_result = field.validate(value)
if field_result:
field_result.set_tag(tag="field", value=field.get_name())
results.append(field_result)
# Check existence of the license file(s) on disk.
license_file_value = self._metadata.get(known_fields.LICENSE_FILE)
if license_file_value is not None:
result = known_fields.LICENSE_FILE.validate_on_disk(
value=license_file_value,
source_file_dir=source_file_dir,
repo_root_dir=repo_root_dir,
)
if result:
result.set_tag(tag="field", value=known_fields.LICENSE_FILE.get_name())
results.append(result)
return results
"""The metadata for a single dependency."""
# Fields that are always required.
_MANDATORY_FIELDS = {
known_fields.NAME,
known_fields.URL,
known_fields.VERSION,
known_fields.LICENSE,
known_fields.SECURITY_CRITICAL,
known_fields.SHIPPED,
}
def __init__(self):
# The record of all entries added, including repeated fields.
self._entries: List[Tuple[str, str]] = []
# The current value of each field.
self._metadata: Dict[field_types.MetadataField, str] = {}
# The record of how many times a field entry was added.
self._occurrences: Dict[field_types.MetadataField,
int] = defaultdict(int)
def add_entry(self, field_name: str, field_value: str):
value = field_value.strip()
self._entries.append((field_name, value))
field = known_fields.get_field(field_name)
if field:
self._metadata[field] = value
self._occurrences[field] += 1
def has_entries(self) -> bool:
return len(self._entries) > 0
def get_entries(self) -> List[Tuple[str, str]]:
return list(self._entries)
def _assess_required_fields(self) -> Set[field_types.MetadataField]:
"""Returns the set of required fields, based on the current
metadata.
"""
required = set(self._MANDATORY_FIELDS)
# The date and revision are required if the version has not
# been specified.
version_value = self._metadata.get(known_fields.VERSION)
if version_value is None or version_util.is_unknown(version_value):
required.add(known_fields.DATE)
required.add(known_fields.REVISION)
# Assume the dependency is shipped if not specified.
shipped_value = self._metadata.get(known_fields.SHIPPED)
is_shipped = (shipped_value is None
or util.infer_as_boolean(shipped_value, default=True))
if is_shipped:
# A license file is required if the dependency is shipped.
required.add(known_fields.LICENSE_FILE)
# License compatibility with Android must be set if the
# package is shipped and the license is not in the
# allowlist.
has_allowlisted = False
license_value = self._metadata.get(known_fields.LICENSE)
if license_value:
licenses = license_util.process_license_value(
license_value,
atomic_delimiter=known_fields.LICENSE.VALUE_DELIMITER)
for _, allowed in licenses:
if allowed:
has_allowlisted = True
break
if not has_allowlisted:
required.add(known_fields.LICENSE_ANDROID_COMPATIBLE)
return required
def validate(self, source_file_dir: str,
repo_root_dir: str) -> List[vr.ValidationResult]:
"""Validates all the metadata.
Args:
source_file_dir: the directory of the file that the metadata
is from.
repo_root_dir: the repository's root directory.
Returns: the metadata's validation results.
"""
results = []
# Check for duplicate fields.
repeated_field_info = [
f"{field.get_name()} ({count})"
for field, count in self._occurrences.items() if count > 1
]
if repeated_field_info:
repeated = ", ".join(repeated_field_info)
error = vr.ValidationError(reason="There is a repeated field.",
additional=[
f"Repeated fields: {repeated}",
])
results.append(error)
# Check required fields are present.
required_fields = self._assess_required_fields()
for field in required_fields:
if field not in self._metadata:
field_name = field.get_name()
error = vr.ValidationError(
reason=f"Required field '{field_name}' is missing.")
results.append(error)
# Validate values for all present fields.
for field, value in self._metadata.items():
field_result = field.validate(value)
if field_result:
field_result.set_tag(tag="field", value=field.get_name())
results.append(field_result)
# Check existence of the license file(s) on disk.
license_file_value = self._metadata.get(known_fields.LICENSE_FILE)
if license_file_value is not None:
result = known_fields.LICENSE_FILE.validate_on_disk(
value=license_file_value,
source_file_dir=source_file_dir,
repo_root_dir=repo_root_dir,
)
if result:
result.set_tag(tag="field",
value=known_fields.LICENSE_FILE.get_name())
results.append(result)
return results

@ -12,27 +12,27 @@ _METADATA_FILES = {
def is_metadata_file(path: str) -> bool:
"""Filter for metadata files."""
return os.path.basename(path) in _METADATA_FILES
"""Filter for metadata files."""
return os.path.basename(path) in _METADATA_FILES
def find_metadata_files(root: str) -> List[str]:
"""Finds all metadata files within the given root directory, including
subdirectories.
Args:
root: the absolute path to the root directory within which to search.
Returns:
the absolute full paths for all the metadata files within the root
directory.
"""
metadata_files = []
for item in os.listdir(root):
full_path = os.path.join(root, item)
if is_metadata_file(item):
metadata_files.append(full_path)
elif os.path.isdir(full_path):
metadata_files.extend(find_metadata_files(full_path))
return metadata_files
"""Finds all metadata files within the given root directory,
including subdirectories.
Args:
root: the absolute path to the root directory within which to
search.
Returns: the absolute full paths for all the metadata files within
the root directory.
"""
metadata_files = []
for item in os.listdir(root):
full_path = os.path.join(root, item)
if is_metadata_file(item):
metadata_files.append(full_path)
elif os.path.isdir(full_path):
metadata_files.extend(find_metadata_files(full_path))
return metadata_files

@ -28,57 +28,60 @@ _PATTERN_CPE_FORMATTED_STRING = re.compile(r"^cpe:2\.3:[aho\-\*](:[^:]+){10}$")
def is_uri_cpe(value: str) -> bool:
"""Returns whether the value conforms to the CPE 2.3 URI binding (which is
compatible with CPE 2.2), with the additional constraint that at least one
component other than "part" has been specified.
For reference, see section 6.1 of the CPE Naming Specification Version 2.3 at
https://nvlpubs.nist.gov/nistpubs/Legacy/IR/nistir7695.pdf
"""
if not util.matches(_PATTERN_CPE_URI, value):
return False
"""Returns whether the value conforms to the CPE 2.3 URI binding
(which is compatible with CPE 2.2), with the additional constraint
that at least one component other than "part" has been specified.
components = value.split(":")
if len(components) < 3:
# At most, only part was provided.
return False
For reference, see section 6.1 of the CPE Naming Specification
Version 2.3 at
https://nvlpubs.nist.gov/nistpubs/Legacy/IR/nistir7695.pdf.
"""
if not util.matches(_PATTERN_CPE_URI, value):
return False
components = value.split(":")
if len(components) < 3:
# At most, only part was provided.
return False
# Check at least one component other than "part" has been specified.
for component in components[2:]:
if component:
return True
# Check at least one component other than "part" has been specified.
for component in components[2:]:
if component:
return True
return False
return False
def is_formatted_string_cpe(value: str) -> bool:
"""Returns whether the value conforms to the CPE 2.3 formatted string
binding.
"""Returns whether the value conforms to the CPE 2.3 formatted
string binding.
For reference, see section 6.2 of the CPE Naming Specification Version 2.3 at
https://nvlpubs.nist.gov/nistpubs/Legacy/IR/nistir7695.pdf
"""
return util.matches(_PATTERN_CPE_FORMATTED_STRING, value)
For reference, see section 6.2 of the CPE Naming Specification
Version 2.3 at
https://nvlpubs.nist.gov/nistpubs/Legacy/IR/nistir7695.pdf.
"""
return util.matches(_PATTERN_CPE_FORMATTED_STRING, value)
class CPEPrefixField(field_types.MetadataField):
"""Custom field for the package's CPE."""
def __init__(self):
super().__init__(name="CPEPrefix", one_liner=True)
def validate(self, value: str) -> Union[vr.ValidationResult, None]:
"""Checks the given value is either 'unknown', or conforms to either the
CPE 2.3 or 2.2 format.
"""
if (util.is_unknown(value) or is_formatted_string_cpe(value)
or is_uri_cpe(value)):
return None
return vr.ValidationError(
reason=f"{self._name} is invalid.",
additional=[
"This field should be a CPE (version 2.3 or 2.2), or 'unknown'.",
"Search for a CPE tag for the package at "
"https://nvd.nist.gov/products/cpe/search.",
f"Current value: '{value}'.",
])
"""Custom field for the package's CPE."""
def __init__(self):
super().__init__(name="CPEPrefix", one_liner=True)
def validate(self, value: str) -> Union[vr.ValidationResult, None]:
"""Checks the given value is either 'unknown', or conforms to
either the CPE 2.3 or 2.2 format.
"""
if (util.is_unknown(value) or is_formatted_string_cpe(value)
or is_uri_cpe(value)):
return None
return vr.ValidationError(
reason=f"{self._name} is invalid.",
additional=[
"This field should be a CPE (version 2.3 or 2.2), "
"or 'unknown'.",
"Search for a CPE tag for the package at "
"https://nvd.nist.gov/products/cpe/search.",
f"Current value: '{value}'.",
])

@ -23,17 +23,17 @@ _PATTERN_DATE = re.compile(r"^\d{4}-(0|1)\d-[0-3]\d$")
class DateField(field_types.MetadataField):
"""Custom field for the date when the package was updated."""
def __init__(self):
super().__init__(name="Date", one_liner=True)
def validate(self, value: str) -> Union[vr.ValidationResult, None]:
"""Checks the given value is a YYYY-MM-DD date."""
if util.matches(_PATTERN_DATE, value):
return None
return vr.ValidationError(reason=f"{self._name} is invalid.",
additional=[
"The correct format is YYYY-MM-DD.",
f"Current value is '{value}'.",
])
"""Custom field for the date when the package was updated."""
def __init__(self):
super().__init__(name="Date", one_liner=True)
def validate(self, value: str) -> Union[vr.ValidationResult, None]:
"""Checks the given value is a YYYY-MM-DD date."""
if util.matches(_PATTERN_DATE, value):
return None
return vr.ValidationError(reason=f"{self._name} is invalid.",
additional=[
"The correct format is YYYY-MM-DD.",
f"Current value is '{value}'.",
])

@ -52,78 +52,85 @@ _PATTERN_VERBOSE_DELIMITER = re.compile(r" and | or | / ")
def process_license_value(value: str,
atomic_delimiter: str) -> List[Tuple[str, bool]]:
"""Process a license field value, which may list multiple licenses.
Args:
value: the value to process, which may include both verbose and atomic
delimiters, e.g. "Apache, 2.0 and MIT and custom"
atomic_delimiter: the delimiter to use as a final step; values will not be
further split after using this delimiter.
Returns: a list of the constituent licenses within the given value,
and whether the constituent license is on the allowlist.
e.g. [("Apache, 2.0", True), ("MIT", True), ("custom", False)]
"""
# Check if the value is on the allowlist as-is, and thus does not require
# further processing.
if is_license_allowlisted(value):
return [(value, True)]
breakdown = []
if re.search(_PATTERN_VERBOSE_DELIMITER, value):
# Split using the verbose delimiters.
for component in re.split(_PATTERN_VERBOSE_DELIMITER, value):
breakdown.extend(
process_license_value(component.strip(), atomic_delimiter))
else:
# Split using the standard value delimiter. This results in atomic values;
# there is no further splitting possible.
for atomic_value in value.split(atomic_delimiter):
atomic_value = atomic_value.strip()
breakdown.append((atomic_value, is_license_allowlisted(atomic_value)))
return breakdown
"""Process a license field value, which may list multiple licenses.
Args:
value: the value to process, which may include both verbose and
atomic delimiters, e.g. "Apache, 2.0 and MIT and custom"
atomic_delimiter: the delimiter to use as a final step; values
will not be further split after using this
delimiter.
Returns: a list of the constituent licenses within the given value,
and whether the constituent license is on the allowlist.
e.g. [("Apache, 2.0", True), ("MIT", True),
("custom", False)]
"""
# Check if the value is on the allowlist as-is, and thus does not
# require further processing.
if is_license_allowlisted(value):
return [(value, True)]
breakdown = []
if re.search(_PATTERN_VERBOSE_DELIMITER, value):
# Split using the verbose delimiters.
for component in re.split(_PATTERN_VERBOSE_DELIMITER, value):
breakdown.extend(
process_license_value(component.strip(), atomic_delimiter))
else:
# Split using the standard value delimiter. This results in
# atomic values; there is no further splitting possible.
for atomic_value in value.split(atomic_delimiter):
atomic_value = atomic_value.strip()
breakdown.append(
(atomic_value, is_license_allowlisted(atomic_value)))
return breakdown
def is_license_allowlisted(value: str) -> bool:
"""Returns whether the value is in the allowlist for license types."""
return util.matches(_PATTERN_LICENSE_ALLOWED, value)
"""Returns whether the value is in the allowlist for license
types.
"""
return util.matches(_PATTERN_LICENSE_ALLOWED, value)
class LicenseField(field_types.MetadataField):
"""Custom field for the package's license type(s).
e.g. Apache 2.0, MIT, BSD, Public Domain.
"""
def __init__(self):
super().__init__(name="License", one_liner=False)
def validate(self, value: str) -> Union[vr.ValidationResult, None]:
"""Checks the given value consists of recognized license types.
"""Custom field for the package's license type(s).
Note: this field supports multiple values.
e.g. Apache 2.0, MIT, BSD, Public Domain.
"""
not_allowlisted = []
licenses = process_license_value(value,
atomic_delimiter=self.VALUE_DELIMITER)
for license, allowed in licenses:
if util.is_empty(license):
return vr.ValidationError(reason=f"{self._name} has an empty value.")
if not allowed:
not_allowlisted.append(license)
if not_allowlisted:
return vr.ValidationWarning(
reason=f"{self._name} has a license not in the allowlist.",
additional=[
f"Separate licenses using a '{self.VALUE_DELIMITER}'.",
f"Licenses not allowlisted: {util.quoted(not_allowlisted)}.",
])
# Suggest using the standard value delimiter when possible.
if (re.search(_PATTERN_VERBOSE_DELIMITER, value)
and self.VALUE_DELIMITER not in value):
return vr.ValidationWarning(
reason=f"Separate licenses using a '{self.VALUE_DELIMITER}'.")
return None
def __init__(self):
super().__init__(name="License", one_liner=False)
def validate(self, value: str) -> Union[vr.ValidationResult, None]:
"""Checks the given value consists of recognized license types.
Note: this field supports multiple values.
"""
not_allowlisted = []
licenses = process_license_value(value,
atomic_delimiter=self.VALUE_DELIMITER)
for license, allowed in licenses:
if util.is_empty(license):
return vr.ValidationError(
reason=f"{self._name} has an empty value.")
if not allowed:
not_allowlisted.append(license)
if not_allowlisted:
return vr.ValidationWarning(
reason=f"{self._name} has a license not in the allowlist.",
additional=[
f"Separate licenses using a '{self.VALUE_DELIMITER}'.",
"Licenses not allowlisted: "
f"{util.quoted(not_allowlisted)}.",
])
# Suggest using the standard value delimiter when possible.
if (re.search(_PATTERN_VERBOSE_DELIMITER, value)
and self.VALUE_DELIMITER not in value):
return vr.ValidationWarning(
reason=f"Separate licenses using a '{self.VALUE_DELIMITER}'.")
return None

@ -27,91 +27,98 @@ _NOT_SHIPPED = "NOT_SHIPPED"
class LicenseFileField(field_types.MetadataField):
"""Custom field for the paths to the package's license file(s)."""
def __init__(self):
super().__init__(name="License File", one_liner=True)
def validate(self, value: str) -> Union[vr.ValidationResult, None]:
"""Checks the given value consists of non-empty paths with no backward
directory navigation (i.e. no "../").
This validation is rudimentary. To check if the license file(s) exist on
disk, see the `LicenseFileField.validate_on_disk` method.
Note: this field supports multiple values.
"""
if value == _NOT_SHIPPED:
return vr.ValidationWarning(
reason=f"{self._name} uses deprecated value '{_NOT_SHIPPED}'.",
additional=[
f"Remove this field and use 'Shipped: {util.NO}' instead.",
])
invalid_values = []
for path in value.split(self.VALUE_DELIMITER):
path = path.strip()
if util.is_empty(path) or util.matches(_PATTERN_PATH_BACKWARD, path):
invalid_values.append(path)
if invalid_values:
return vr.ValidationError(
reason=f"{self._name} is invalid.",
additional=[
"File paths cannot be empty, or include '../'.",
f"Separate license files using a '{self.VALUE_DELIMITER}'.",
f"Invalid values: {util.quoted(invalid_values)}.",
])
return None
def validate_on_disk(
self,
value: str,
source_file_dir: str,
repo_root_dir: str,
) -> Union[vr.ValidationResult, None]:
"""Checks the given value consists of file paths which exist on disk.
Note: this field supports multiple values.
Args:
value: the value to validate.
source_file_dir: the directory of the metadata file that the license file
value is from; this is needed to construct file paths to
license files.
repo_root_dir: the repository's root directory; this is needed to
construct file paths to license files.
Returns: a validation result based on the license file value, and whether
the license file(s) exist on disk, otherwise None.
"""
if value == _NOT_SHIPPED:
return vr.ValidationWarning(
reason=f"{self._name} uses deprecated value '{_NOT_SHIPPED}'.",
additional=[
f"Remove this field and use 'Shipped: {util.NO}' instead.",
])
invalid_values = []
for license_filename in value.split(self.VALUE_DELIMITER):
license_filename = license_filename.strip()
if license_filename.startswith("/"):
license_filepath = os.path.join(
repo_root_dir, os.path.normpath(license_filename.lstrip("/")))
else:
license_filepath = os.path.join(source_file_dir,
os.path.normpath(license_filename))
if not os.path.exists(license_filepath):
invalid_values.append(license_filepath)
if invalid_values:
missing = ", ".join(invalid_values)
return vr.ValidationError(
reason=f"{self._name} is invalid.",
additional=[
"Failed to find all license files on local disk.",
f"Missing files:{missing}.",
])
return None
"""Custom field for the paths to the package's license file(s)."""
def __init__(self):
super().__init__(name="License File", one_liner=True)
def validate(self, value: str) -> Union[vr.ValidationResult, None]:
"""Checks the given value consists of non-empty paths with no
backward directory navigation (i.e. no "../").
This validation is rudimentary. To check if the license file(s)
exist on disk, see the `LicenseFileField.validate_on_disk`
method.
Note: this field supports multiple values.
"""
if value == _NOT_SHIPPED:
return vr.ValidationWarning(
reason=f"{self._name} uses deprecated value '{_NOT_SHIPPED}'.",
additional=[
f"Remove this field and use 'Shipped: {util.NO}' instead.",
])
invalid_values = []
for path in value.split(self.VALUE_DELIMITER):
path = path.strip()
if util.is_empty(path) or util.matches(_PATTERN_PATH_BACKWARD,
path):
invalid_values.append(path)
if invalid_values:
return vr.ValidationError(
reason=f"{self._name} is invalid.",
additional=[
"File paths cannot be empty, or include '../'.",
"Separate license files using a "
f"'{self.VALUE_DELIMITER}'.",
f"Invalid values: {util.quoted(invalid_values)}.",
])
return None
def validate_on_disk(
self,
value: str,
source_file_dir: str,
repo_root_dir: str,
) -> Union[vr.ValidationResult, None]:
"""Checks the given value consists of file paths which exist on
disk.
Note: this field supports multiple values.
Args:
value: the value to validate.
source_file_dir: the directory of the metadata file that the
license file value is from; this is needed
to construct file paths to license files.
repo_root_dir: the repository's root directory; this is
needed to construct file paths to
license files.
Returns: a validation result based on the license file value,
and whether the license file(s) exist on disk,
otherwise None.
"""
if value == _NOT_SHIPPED:
return vr.ValidationWarning(
reason=f"{self._name} uses deprecated value '{_NOT_SHIPPED}'.",
additional=[
f"Remove this field and use 'Shipped: {util.NO}' instead.",
])
invalid_values = []
for license_filename in value.split(self.VALUE_DELIMITER):
license_filename = license_filename.strip()
if license_filename.startswith("/"):
license_filepath = os.path.join(
repo_root_dir,
os.path.normpath(license_filename.lstrip("/")))
else:
license_filepath = os.path.join(
source_file_dir, os.path.normpath(license_filename))
if not os.path.exists(license_filepath):
invalid_values.append(license_filepath)
if invalid_values:
missing = ", ".join(invalid_values)
return vr.ValidationError(
reason=f"{self._name} is invalid.",
additional=[
"Failed to find all license files on local disk.",
f"Missing files:{missing}.",
])
return None

@ -25,31 +25,32 @@ _PATTERN_URL_CANONICAL_REPO = re.compile(
class URLField(field_types.MetadataField):
"""Custom field for the package URL(s)."""
def __init__(self):
super().__init__(name="URL", one_liner=False)
def validate(self, value: str) -> Union[vr.ValidationResult, None]:
"""Checks the given value has acceptable URL values only.
Note: this field supports multiple values.
"""
if util.matches(_PATTERN_URL_CANONICAL_REPO, value):
return None
invalid_values = []
for url in value.split(self.VALUE_DELIMITER):
url = url.strip()
if not util.matches(_PATTERN_URL_ALLOWED, url):
invalid_values.append(url)
if invalid_values:
return vr.ValidationError(
reason=f"{self._name} is invalid.",
additional=[
"URLs must use a protocol scheme in [http, https, ftp, git].",
f"Separate URLs using a '{self.VALUE_DELIMITER}'.",
f"Invalid values: {util.quoted(invalid_values)}.",
])
return None
"""Custom field for the package URL(s)."""
def __init__(self):
super().__init__(name="URL", one_liner=False)
def validate(self, value: str) -> Union[vr.ValidationResult, None]:
"""Checks the given value has acceptable URL values only.
Note: this field supports multiple values.
"""
if util.matches(_PATTERN_URL_CANONICAL_REPO, value):
return None
invalid_values = []
for url in value.split(self.VALUE_DELIMITER):
url = url.strip()
if not util.matches(_PATTERN_URL_ALLOWED, url):
invalid_values.append(url)
if invalid_values:
return vr.ValidationError(
reason=f"{self._name} is invalid.",
additional=[
"URLs must use a protocol scheme in "
"[http, https, ftp, git].",
f"Separate URLs using a '{self.VALUE_DELIMITER}'.",
f"Invalid values: {util.quoted(invalid_values)}.",
])
return None

@ -23,35 +23,35 @@ _PATTERN_NOT_APPLICABLE = re.compile(r"^N ?\/ ?A$", re.IGNORECASE)
def is_unknown(value: str) -> bool:
"""Returns whether the value denotes the version being unknown."""
return (value == "0" or util.matches(_PATTERN_NOT_APPLICABLE, value)
or util.is_unknown(value))
"""Returns whether the value denotes the version being unknown."""
return (value == "0" or util.matches(_PATTERN_NOT_APPLICABLE, value)
or util.is_unknown(value))
class VersionField(field_types.MetadataField):
"""Custom field for the package version."""
def __init__(self):
super().__init__(name="Version", one_liner=True)
def validate(self, value: str) -> Union[vr.ValidationResult, None]:
"""Checks the given value is acceptable - there must be at least one
non-whitespace character, and "N/A" is preferred over "0" if the version is
unknown.
"""
if value == "0" or util.is_unknown(value):
return vr.ValidationWarning(
reason=f"{self._name} is '{value}'.",
additional=[
"Set this field to 'N/A' if this package does not version or is "
"versioned by date or revision.",
])
if util.is_empty(value):
return vr.ValidationError(
reason=f"{self._name} is empty.",
additional=[
"Set this field to 'N/A' if this package does not version or is "
"versioned by date or revision.",
])
return None
"""Custom field for the package version."""
def __init__(self):
super().__init__(name="Version", one_liner=True)
def validate(self, value: str) -> Union[vr.ValidationResult, None]:
"""Checks the given value is acceptable - there must be at least
one non-whitespace character, and "N/A" is preferred over "0" if
the version is unknown.
"""
if value == "0" or util.is_unknown(value):
return vr.ValidationWarning(
reason=f"{self._name} is '{value}'.",
additional=[
"Set this field to 'N/A' if this package does not version "
"or is versioned by date or revision.",
])
if util.is_empty(value):
return vr.ValidationError(
reason=f"{self._name} is empty.",
additional=[
"Set this field to 'N/A' if this package does not version "
"or is versioned by date or revision.",
])
return None

@ -28,70 +28,73 @@ _PATTERN_STARTS_WITH_YES_OR_NO = re.compile(r"^(yes|no)", re.IGNORECASE)
class MetadataField:
"""Base class for all metadata fields."""
"""Base class for all metadata fields."""
# The delimiter used to separate multiple values.
VALUE_DELIMITER = ","
# The delimiter used to separate multiple values.
VALUE_DELIMITER = ","
def __init__(self, name: str, one_liner: bool = True):
self._name = name
self._one_liner = one_liner
def __init__(self, name: str, one_liner: bool = True):
self._name = name
self._one_liner = one_liner
def __eq__(self, other):
if not isinstance(other, MetadataField):
return False
def __eq__(self, other):
if not isinstance(other, MetadataField):
return False
return self._name.lower() == other._name.lower()
return self._name.lower() == other._name.lower()
def __hash__(self):
return hash(self._name.lower())
def __hash__(self):
return hash(self._name.lower())
def get_name(self):
return self._name
def get_name(self):
return self._name
def is_one_liner(self):
return self._one_liner
def is_one_liner(self):
return self._one_liner
def validate(self, value: str) -> Union[vr.ValidationResult, None]:
"""Checks the given value is acceptable for the field.
def validate(self, value: str) -> Union[vr.ValidationResult, None]:
"""Checks the given value is acceptable for the field.
Raises: NotImplementedError if called. This method must be overridden with
the actual validation of the field.
"""
raise NotImplementedError(f"{self._name} field validation not defined.")
Raises: NotImplementedError if called. This method must be
overridden with the actual validation of the field.
"""
raise NotImplementedError(
f"{self._name} field validation not defined.")
class FreeformTextField(MetadataField):
"""Field where the value is freeform text."""
def validate(self, value: str) -> Union[vr.ValidationResult, None]:
"""Checks the given value has at least one non-whitespace character."""
if util.is_empty(value):
return vr.ValidationError(reason=f"{self._name} is empty.")
"""Field where the value is freeform text."""
def validate(self, value: str) -> Union[vr.ValidationResult, None]:
"""Checks the given value has at least one non-whitespace
character.
"""
if util.is_empty(value):
return vr.ValidationError(reason=f"{self._name} is empty.")
return None
return None
class YesNoField(MetadataField):
"""Field where the value must be yes or no."""
def __init__(self, name: str):
super().__init__(name=name, one_liner=True)
def validate(self, value: str) -> Union[vr.ValidationResult, None]:
"""Checks the given value is either yes or no."""
if util.matches(_PATTERN_YES_OR_NO, value):
return None
if util.matches(_PATTERN_STARTS_WITH_YES_OR_NO, value):
return vr.ValidationWarning(
reason=f"{self._name} is invalid.",
additional=[
f"This field should be only {util.YES} or {util.NO}.",
f"Current value is '{value}'.",
])
return vr.ValidationError(
reason=f"{self._name} is invalid.",
additional=[
f"This field must be {util.YES} or {util.NO}.",
f"Current value is '{value}'.",
])
"""Field where the value must be yes or no."""
def __init__(self, name: str):
super().__init__(name=name, one_liner=True)
def validate(self, value: str) -> Union[vr.ValidationResult, None]:
"""Checks the given value is either yes or no."""
if util.matches(_PATTERN_YES_OR_NO, value):
return None
if util.matches(_PATTERN_STARTS_WITH_YES_OR_NO, value):
return vr.ValidationWarning(
reason=f"{self._name} is invalid.",
additional=[
f"This field should be only {util.YES} or {util.NO}.",
f"Current value is '{value}'.",
])
return vr.ValidationError(
reason=f"{self._name} is invalid.",
additional=[
f"This field must be {util.YES} or {util.NO}.",
f"Current value is '{value}'.",
])

@ -65,4 +65,4 @@ _FIELD_MAPPING = {field.get_name().lower(): field for field in ALL_FIELDS}
def get_field(label: str) -> Union[field_types.MetadataField, None]:
return _FIELD_MAPPING.get(label.lower())
return _FIELD_MAPPING.get(label.lower())

@ -10,49 +10,54 @@ from typing import List
YES = "yes"
NO = "no"
# Pattern used to check if the entire string is "unknown", case-insensitive.
# Pattern used to check if the entire string is "unknown",
# case-insensitive.
_PATTERN_UNKNOWN = re.compile(r"^unknown$", re.IGNORECASE)
# Pattern used to check if the entire string is functionally empty, i.e.
# empty string, or all characters are only whitespace.
_PATTERN_ONLY_WHITESPACE = re.compile(r"^\s*$")
# Pattern used to check if the string starts with "yes", case-insensitive.
# Pattern used to check if the string starts with "yes",
# case-insensitive.
_PATTERN_STARTS_WITH_YES = re.compile(r"^yes", re.IGNORECASE)
# Pattern used to check if the string starts with "no", case-insensitive.
# Pattern used to check if the string starts with "no",
# case-insensitive.
_PATTERN_STARTS_WITH_NO = re.compile(r"^no", re.IGNORECASE)
def matches(pattern: re.Pattern, value: str) -> bool:
"""Returns whether the value matches the pattern."""
return pattern.match(value) is not None
"""Returns whether the value matches the pattern."""
return pattern.match(value) is not None
def is_empty(value: str) -> bool:
"""Returns whether the value is functionally empty."""
return matches(_PATTERN_ONLY_WHITESPACE, value)
"""Returns whether the value is functionally empty."""
return matches(_PATTERN_ONLY_WHITESPACE, value)
def is_unknown(value: str) -> bool:
"""Returns whether the value is 'unknown' (case insensitive)."""
return matches(_PATTERN_UNKNOWN, value)
"""Returns whether the value is 'unknown' (case insensitive)."""
return matches(_PATTERN_UNKNOWN, value)
def quoted(values: List[str]) -> str:
"""Returns a string of the given values, each being individually quoted."""
return ", ".join([f"'{entry}'" for entry in values])
"""Returns a string of the given values, each being individually
quoted.
"""
return ", ".join([f"'{entry}'" for entry in values])
def infer_as_boolean(value: str, default: bool = True) -> bool:
"""Attempts to infer the value as a boolean, where:
- "yes"-ish values return True;
- "no"-ish values return False; and
- default is returned otherwise.
"""
if matches(_PATTERN_STARTS_WITH_YES, value):
return True
elif matches(_PATTERN_STARTS_WITH_NO, value):
return False
else:
return default
"""Attempts to infer the value as a boolean, where:
- "yes"-ish values return True;
- "no"-ish values return False; and
- default is returned otherwise.
"""
if matches(_PATTERN_STARTS_WITH_YES, value):
return True
elif matches(_PATTERN_STARTS_WITH_NO, value):
return False
else:
return default

@ -24,64 +24,69 @@ DEPENDENCY_DIVIDER = re.compile(r"^-{20} DEPENDENCY DIVIDER -{20}$")
# Delimiter used to separate a field's name from its value.
FIELD_DELIMITER = ":"
# Pattern used to check if a line from a metadata file declares a new field.
# Pattern used to check if a line from a metadata file declares a new
# field.
_PATTERN_FIELD_DECLARATION = re.compile(
"^({}){}".format("|".join(known_fields.ALL_FIELD_NAMES), FIELD_DELIMITER),
re.IGNORECASE,
)
re.IGNORECASE)
def parse_content(content: str) -> List[dm.DependencyMetadata]:
"""Reads and parses the metadata from the given string.
"""Reads and parses the metadata from the given string.
Args:
content: the string to parse metadata from.
Returns:
all the metadata, which may be for zero or more dependencies, from the
given string.
Returns: all the metadata, which may be for zero or more
dependencies, from the given string.
"""
dependencies = []
current_metadata = dm.DependencyMetadata()
current_field_name = None
current_field_value = ""
for line in content.splitlines(keepends=True):
# Check if a new dependency is being described.
if DEPENDENCY_DIVIDER.match(line):
if current_field_name:
# Save the field value for the previous dependency.
dependencies = []
current_metadata = dm.DependencyMetadata()
current_field_name = None
current_field_value = ""
for line in content.splitlines(keepends=True):
# Check if a new dependency is being described.
if DEPENDENCY_DIVIDER.match(line):
if current_field_name:
# Save the field value for the previous dependency.
current_metadata.add_entry(current_field_name,
current_field_value)
if current_metadata.has_entries():
# Add the previous dependency to the results.
dependencies.append(current_metadata)
# Reset for the new dependency's metadata,
# and reset the field state.
current_metadata = dm.DependencyMetadata()
current_field_name = None
current_field_value = ""
elif _PATTERN_FIELD_DECLARATION.match(line):
# Save the field value to the current dependency's metadata.
if current_field_name:
current_metadata.add_entry(current_field_name,
current_field_value)
current_field_name, current_field_value = line.split(
FIELD_DELIMITER, 1)
field = known_fields.get_field(current_field_name)
if field and field.is_one_liner():
# The field should be on one line, so add it now.
current_metadata.add_entry(current_field_name,
current_field_value)
# Reset the field state.
current_field_name = None
current_field_value = ""
elif current_field_name:
# The field is on multiple lines, so add this line to the
# field value.
current_field_value += line
# At this point, the end of the file has been reached.
# Save any remaining field data and metadata.
if current_field_name:
current_metadata.add_entry(current_field_name, current_field_value)
if current_metadata.has_entries():
# Add the previous dependency to the results.
if current_metadata.has_entries():
dependencies.append(current_metadata)
# Reset for the new dependency's metadata, and reset the field state.
current_metadata = dm.DependencyMetadata()
current_field_name = None
current_field_value = ""
elif _PATTERN_FIELD_DECLARATION.match(line):
# Save the field value to the current dependency's metadata.
if current_field_name:
current_metadata.add_entry(current_field_name, current_field_value)
current_field_name, current_field_value = line.split(FIELD_DELIMITER, 1)
field = known_fields.get_field(current_field_name)
if field and field.is_one_liner():
# The field should be on one line, so it can be added now.
current_metadata.add_entry(current_field_name, current_field_value)
# Reset the field state.
current_field_name = None
current_field_value = ""
elif current_field_name:
# The field is on multiple lines, so add this line to the field value.
current_field_value += line
# At this point, the end of the file has been reached. Save any remaining
# field data and metadata.
if current_field_name:
current_metadata.add_entry(current_field_name, current_field_value)
if current_metadata.has_entries():
dependencies.append(current_metadata)
return dependencies
return dependencies

@ -22,72 +22,73 @@ import metadata.validate
def parse_args() -> argparse.Namespace:
"""Helper to parse args to this script."""
parser = argparse.ArgumentParser()
repo_root_dir = parser.add_argument(
"repo_root_dir",
help=("The path to the repository's root directory, which will be "
"scanned for Chromium metadata files, e.g. '~/chromium/src'."),
)
args = parser.parse_args()
# Check the repo root directory exists.
src_dir = os.path.abspath(args.repo_root_dir)
if not os.path.exists(src_dir) or not os.path.isdir(src_dir):
raise argparse.ArgumentError(
repo_root_dir,
f"Invalid repository root directory '{src_dir}' - not found",
"""Helper to parse args to this script."""
parser = argparse.ArgumentParser()
repo_root_dir = parser.add_argument(
"repo_root_dir",
help=("The path to the repository's root directory, which will be "
"scanned for Chromium metadata files, e.g. '~/chromium/src'."),
)
return args
args = parser.parse_args()
# Check the repo root directory exists.
src_dir = os.path.abspath(args.repo_root_dir)
if not os.path.exists(src_dir) or not os.path.isdir(src_dir):
raise argparse.ArgumentError(
repo_root_dir,
f"Invalid repository root directory '{src_dir}' - not found",
)
return args
def main() -> None:
"""Runs validation on all metadata files within the directory specified by the
repo_root_dir arg.
"""
config = parse_args()
src_dir = os.path.abspath(config.repo_root_dir)
metadata_files = metadata.discover.find_metadata_files(src_dir)
file_count = len(metadata_files)
print(f"Found {file_count} metadata files.")
invalid_file_count = 0
# Key is constructed from the result severity and reason;
# Value is a list of files affected by that reason at that severity.
all_reasons = defaultdict(list)
for filepath in metadata_files:
file_results = metadata.validate.validate_file(filepath,
repo_root_dir=src_dir)
invalid = False
if file_results:
relpath = os.path.relpath(filepath, start=src_dir)
print(f"\n{len(file_results)} problem(s) in {relpath}:")
for result in file_results:
print(f" {result}")
summary_key = "{severity} - {reason}".format(
severity=result.get_severity_prefix(), reason=result.get_reason())
all_reasons[summary_key].append(relpath)
if result.is_fatal():
invalid = True
if invalid:
invalid_file_count += 1
print("\n\nDone.\nSummary:")
for summary_key, affected_files in all_reasons.items():
count = len(affected_files)
plural = "s" if count > 1 else ""
print(f"\n {count} file{plural}: {summary_key}")
for affected_file in affected_files:
print(f" {affected_file}")
print(f"\n\n{invalid_file_count} / {file_count} metadata files are invalid, "
"i.e. the file has at least one fatal validation issue.")
"""Runs validation on all metadata files within the directory
specified by the repo_root_dir arg.
"""
config = parse_args()
src_dir = os.path.abspath(config.repo_root_dir)
metadata_files = metadata.discover.find_metadata_files(src_dir)
file_count = len(metadata_files)
print(f"Found {file_count} metadata files.")
invalid_file_count = 0
# Key is constructed from the result severity and reason;
# Value is a list of files affected by that reason at that severity.
all_reasons = defaultdict(list)
for filepath in metadata_files:
file_results = metadata.validate.validate_file(filepath,
repo_root_dir=src_dir)
invalid = False
if file_results:
relpath = os.path.relpath(filepath, start=src_dir)
print(f"\n{len(file_results)} problem(s) in {relpath}:")
for result in file_results:
print(f" {result}")
summary_key = "{severity} - {reason}".format(
severity=result.get_severity_prefix(),
reason=result.get_reason())
all_reasons[summary_key].append(relpath)
if result.is_fatal():
invalid = True
if invalid:
invalid_file_count += 1
print("\n\nDone.\nSummary:")
for summary_key, affected_files in all_reasons.items():
count = len(affected_files)
plural = "s" if count > 1 else ""
print(f"\n {count} file{plural}: {summary_key}")
for affected_file in affected_files:
print(f" {affected_file}")
print(f"\n\n{invalid_file_count} / {file_count} metadata files are "
"invalid, i.e. the file has at least one fatal validation issue.")
if __name__ == "__main__":
main()
main()

@ -20,109 +20,120 @@ import metadata.validation_result as vr
class DependencyValidationTest(unittest.TestCase):
def test_repeated_field(self):
"""Check that a validation error is returned for a repeated field."""
dependency = dm.DependencyMetadata()
dependency.add_entry(known_fields.NAME.get_name(), "Test repeated field")
dependency.add_entry(known_fields.URL.get_name(), "https://www.example.com")
dependency.add_entry(known_fields.VERSION.get_name(), "1.0.0")
dependency.add_entry(known_fields.LICENSE.get_name(), "Public Domain")
dependency.add_entry(known_fields.LICENSE_FILE.get_name(), "LICENSE")
dependency.add_entry(known_fields.SECURITY_CRITICAL.get_name(), "no")
dependency.add_entry(known_fields.SHIPPED.get_name(), "no")
dependency.add_entry(known_fields.NAME.get_name(), "again")
results = dependency.validate(
source_file_dir=os.path.join(_THIS_DIR, "data"),
repo_root_dir=_THIS_DIR,
)
self.assertEqual(len(results), 1)
self.assertTrue(isinstance(results[0], vr.ValidationError))
self.assertEqual(results[0].get_reason(), "There is a repeated field.")
def test_required_field(self):
"""Check that a validation error is returned for a missing field."""
dependency = dm.DependencyMetadata()
dependency.add_entry(known_fields.SHIPPED.get_name(), "no")
dependency.add_entry(known_fields.SECURITY_CRITICAL.get_name(), "no")
dependency.add_entry(known_fields.LICENSE_FILE.get_name(), "LICENSE")
dependency.add_entry(known_fields.LICENSE.get_name(), "Public Domain")
dependency.add_entry(known_fields.VERSION.get_name(), "1.0.0")
dependency.add_entry(known_fields.NAME.get_name(), "Test missing field")
# Leave URL field unspecified.
results = dependency.validate(
source_file_dir=os.path.join(_THIS_DIR, "data"),
repo_root_dir=_THIS_DIR,
)
self.assertEqual(len(results), 1)
self.assertTrue(isinstance(results[0], vr.ValidationError))
self.assertEqual(results[0].get_reason(),
"Required field 'URL' is missing.")
def test_invalid_field(self):
"""Check field validation issues are returned."""
dependency = dm.DependencyMetadata()
dependency.add_entry(known_fields.URL.get_name(), "https://www.example.com")
dependency.add_entry(known_fields.NAME.get_name(), "Test invalid field")
dependency.add_entry(known_fields.VERSION.get_name(), "1.0.0")
dependency.add_entry(known_fields.LICENSE_FILE.get_name(), "LICENSE")
dependency.add_entry(known_fields.LICENSE.get_name(), "Public domain")
dependency.add_entry(known_fields.SHIPPED.get_name(), "no")
dependency.add_entry(known_fields.SECURITY_CRITICAL.get_name(), "test")
results = dependency.validate(
source_file_dir=os.path.join(_THIS_DIR, "data"),
repo_root_dir=_THIS_DIR,
)
self.assertEqual(len(results), 1)
self.assertTrue(isinstance(results[0], vr.ValidationError))
self.assertEqual(results[0].get_reason(), "Security Critical is invalid.")
def test_invalid_license_file_path(self):
"""Check license file path validation issues are returned."""
dependency = dm.DependencyMetadata()
dependency.add_entry(known_fields.NAME.get_name(), "Test license file path")
dependency.add_entry(known_fields.URL.get_name(), "https://www.example.com")
dependency.add_entry(known_fields.VERSION.get_name(), "1.0.0")
dependency.add_entry(known_fields.LICENSE.get_name(), "Public domain")
dependency.add_entry(known_fields.LICENSE_FILE.get_name(),
"MISSING-LICENSE")
dependency.add_entry(known_fields.SECURITY_CRITICAL.get_name(), "no")
dependency.add_entry(known_fields.SHIPPED.get_name(), "no")
results = dependency.validate(
source_file_dir=os.path.join(_THIS_DIR, "data"),
repo_root_dir=_THIS_DIR,
)
self.assertEqual(len(results), 1)
self.assertTrue(isinstance(results[0], vr.ValidationError))
self.assertEqual(results[0].get_reason(), "License File is invalid.")
def test_multiple_validation_issues(self):
"""Check all validation issues are returned."""
dependency = dm.DependencyMetadata()
dependency.add_entry(known_fields.NAME.get_name(), "Test multiple errors")
# Leave URL field unspecified.
dependency.add_entry(known_fields.VERSION.get_name(), "1.0.0")
dependency.add_entry(known_fields.LICENSE.get_name(), "Public domain")
dependency.add_entry(known_fields.LICENSE_FILE.get_name(),
"MISSING-LICENSE")
dependency.add_entry(known_fields.SECURITY_CRITICAL.get_name(), "test")
dependency.add_entry(known_fields.SHIPPED.get_name(), "no")
dependency.add_entry(known_fields.NAME.get_name(), "again")
# Check 4 validation results are returned, for:
# - missing field;
# - invalid license file path;
# - invalid yes/no field value; and
# - repeated field entry.
results = dependency.validate(
source_file_dir=os.path.join(_THIS_DIR, "data"),
repo_root_dir=_THIS_DIR,
)
self.assertEqual(len(results), 4)
def test_repeated_field(self):
"""Check that a validation error is returned for a repeated
field.
"""
dependency = dm.DependencyMetadata()
dependency.add_entry(known_fields.NAME.get_name(),
"Test repeated field")
dependency.add_entry(known_fields.URL.get_name(),
"https://www.example.com")
dependency.add_entry(known_fields.VERSION.get_name(), "1.0.0")
dependency.add_entry(known_fields.LICENSE.get_name(), "Public Domain")
dependency.add_entry(known_fields.LICENSE_FILE.get_name(), "LICENSE")
dependency.add_entry(known_fields.SECURITY_CRITICAL.get_name(), "no")
dependency.add_entry(known_fields.SHIPPED.get_name(), "no")
dependency.add_entry(known_fields.NAME.get_name(), "again")
results = dependency.validate(
source_file_dir=os.path.join(_THIS_DIR, "data"),
repo_root_dir=_THIS_DIR,
)
self.assertEqual(len(results), 1)
self.assertTrue(isinstance(results[0], vr.ValidationError))
self.assertEqual(results[0].get_reason(), "There is a repeated field.")
def test_required_field(self):
"""Check that a validation error is returned for a missing field."""
dependency = dm.DependencyMetadata()
dependency.add_entry(known_fields.SHIPPED.get_name(), "no")
dependency.add_entry(known_fields.SECURITY_CRITICAL.get_name(), "no")
dependency.add_entry(known_fields.LICENSE_FILE.get_name(), "LICENSE")
dependency.add_entry(known_fields.LICENSE.get_name(), "Public Domain")
dependency.add_entry(known_fields.VERSION.get_name(), "1.0.0")
dependency.add_entry(known_fields.NAME.get_name(),
"Test missing field")
# Leave URL field unspecified.
results = dependency.validate(
source_file_dir=os.path.join(_THIS_DIR, "data"),
repo_root_dir=_THIS_DIR,
)
self.assertEqual(len(results), 1)
self.assertTrue(isinstance(results[0], vr.ValidationError))
self.assertEqual(results[0].get_reason(),
"Required field 'URL' is missing.")
def test_invalid_field(self):
"""Check field validation issues are returned."""
dependency = dm.DependencyMetadata()
dependency.add_entry(known_fields.URL.get_name(),
"https://www.example.com")
dependency.add_entry(known_fields.NAME.get_name(),
"Test invalid field")
dependency.add_entry(known_fields.VERSION.get_name(), "1.0.0")
dependency.add_entry(known_fields.LICENSE_FILE.get_name(), "LICENSE")
dependency.add_entry(known_fields.LICENSE.get_name(), "Public domain")
dependency.add_entry(known_fields.SHIPPED.get_name(), "no")
dependency.add_entry(known_fields.SECURITY_CRITICAL.get_name(), "test")
results = dependency.validate(
source_file_dir=os.path.join(_THIS_DIR, "data"),
repo_root_dir=_THIS_DIR,
)
self.assertEqual(len(results), 1)
self.assertTrue(isinstance(results[0], vr.ValidationError))
self.assertEqual(results[0].get_reason(),
"Security Critical is invalid.")
def test_invalid_license_file_path(self):
"""Check license file path validation issues are returned."""
dependency = dm.DependencyMetadata()
dependency.add_entry(known_fields.NAME.get_name(),
"Test license file path")
dependency.add_entry(known_fields.URL.get_name(),
"https://www.example.com")
dependency.add_entry(known_fields.VERSION.get_name(), "1.0.0")
dependency.add_entry(known_fields.LICENSE.get_name(), "Public domain")
dependency.add_entry(known_fields.LICENSE_FILE.get_name(),
"MISSING-LICENSE")
dependency.add_entry(known_fields.SECURITY_CRITICAL.get_name(), "no")
dependency.add_entry(known_fields.SHIPPED.get_name(), "no")
results = dependency.validate(
source_file_dir=os.path.join(_THIS_DIR, "data"),
repo_root_dir=_THIS_DIR,
)
self.assertEqual(len(results), 1)
self.assertTrue(isinstance(results[0], vr.ValidationError))
self.assertEqual(results[0].get_reason(), "License File is invalid.")
def test_multiple_validation_issues(self):
"""Check all validation issues are returned."""
dependency = dm.DependencyMetadata()
dependency.add_entry(known_fields.NAME.get_name(),
"Test multiple errors")
# Leave URL field unspecified.
dependency.add_entry(known_fields.VERSION.get_name(), "1.0.0")
dependency.add_entry(known_fields.LICENSE.get_name(), "Public domain")
dependency.add_entry(known_fields.LICENSE_FILE.get_name(),
"MISSING-LICENSE")
dependency.add_entry(known_fields.SECURITY_CRITICAL.get_name(), "test")
dependency.add_entry(known_fields.SHIPPED.get_name(), "no")
dependency.add_entry(known_fields.NAME.get_name(), "again")
# Check 4 validation results are returned, for:
# - missing field;
# - invalid license file path;
# - invalid yes/no field value; and
# - repeated field entry.
results = dependency.validate(
source_file_dir=os.path.join(_THIS_DIR, "data"),
repo_root_dir=_THIS_DIR,
)
self.assertEqual(len(results), 4)
if __name__ == "__main__":
unittest.main()
unittest.main()

@ -21,167 +21,175 @@ import metadata.validation_result as vr
class FieldValidationTest(unittest.TestCase):
def _run_field_validation(self,
field: field_types.MetadataField,
valid_values: List[str],
error_values: List[str],
warning_values: List[str] = []):
"""Helper to run a field's validation for different values."""
for value in valid_values:
self.assertIsNone(field.validate(value), value)
for value in error_values:
self.assertIsInstance(field.validate(value), vr.ValidationError, value)
for value in warning_values:
self.assertIsInstance(field.validate(value), vr.ValidationWarning, value)
def test_freeform_text_validation(self):
# Check validation of a freeform text field that should be on one line.
self._run_field_validation(
field=field_types.FreeformTextField("Freeform single", one_liner=True),
valid_values=["Text on single line", "a", "1"],
error_values=["", "\n", " "],
)
# Check validation of a freeform text field that can span multiple lines.
self._run_field_validation(
field=field_types.FreeformTextField("Freeform multi", one_liner=False),
valid_values=[
"This is text spanning multiple lines:\n"
" * with this point\n"
" * and this other point",
"Text on single line",
"a",
"1",
],
error_values=["", "\n", " "],
)
def test_yes_no_field_validation(self):
self._run_field_validation(
field=field_types.YesNoField("Yes/No test"),
valid_values=["yes", "no", "No", "YES"],
error_values=["", "\n", "Probably yes"],
warning_values=["Yes?", "not"],
)
def test_cpe_prefix_validation(self):
self._run_field_validation(
field=known_fields.CPE_PREFIX,
valid_values=[
"unknown",
"cpe:2.3:a:sqlite:sqlite:3.0.0:*:*:*:*:*:*:*",
"cpe:2.3:a:sqlite:sqlite:*:*:*:*:*:*:*:*",
"cpe:/a:vendor:product:version:update:edition:lang",
"cpe:/a::product:",
"cpe:/:vendor::::edition",
"cpe:/:vendor",
],
error_values=[
"",
"\n",
"cpe:2.3:a:sqlite:sqlite:3.0.0",
"cpe:2.3:a:sqlite:sqlite::::::::",
"cpe:/",
"cpe:/a:vendor:product:version:update:edition:lang:",
],
)
def test_date_validation(self):
self._run_field_validation(
field=known_fields.DATE,
valid_values=["2012-03-04"],
error_values=["", "\n", "April 3, 2012", "2012/03/04"],
)
def test_license_validation(self):
self._run_field_validation(
field=known_fields.LICENSE,
valid_values=[
"Apache, 2.0 / MIT / MPL 2",
"LGPL 2.1",
"Apache, Version 2 and Public domain",
],
error_values=["", "\n", ",", "Apache 2.0 / MIT / "],
warning_values=[
"Custom license",
"Custom / MIT",
"Public domain or MPL 2",
"APSL 2 and the MIT license",
],
)
def test_license_file_validation(self):
self._run_field_validation(
field=known_fields.LICENSE_FILE,
valid_values=[
"LICENSE", "src/LICENSE.txt",
"LICENSE, //third_party_test/LICENSE-TEST", "src/MISSING_LICENSE"
],
error_values=["", "\n", ","],
warning_values=["NOT_SHIPPED"],
)
# Check relative path from README directory, and multiple license files.
result = known_fields.LICENSE_FILE.validate_on_disk(
value="LICENSE, src/LICENSE.txt",
source_file_dir=os.path.join(_THIS_DIR, "data"),
repo_root_dir=_THIS_DIR,
)
self.assertIsNone(result)
# Check relative path from Chromium src directory.
result = known_fields.LICENSE_FILE.validate_on_disk(
value="//data/LICENSE",
source_file_dir=os.path.join(_THIS_DIR, "data"),
repo_root_dir=_THIS_DIR,
)
self.assertIsNone(result)
# Check missing file.
result = known_fields.LICENSE_FILE.validate_on_disk(
value="MISSING_LICENSE",
source_file_dir=os.path.join(_THIS_DIR, "data"),
repo_root_dir=_THIS_DIR,
)
self.assertIsInstance(result, vr.ValidationError)
# Check deprecated NOT_SHIPPED.
result = known_fields.LICENSE_FILE.validate_on_disk(
value="NOT_SHIPPED",
source_file_dir=os.path.join(_THIS_DIR, "data"),
repo_root_dir=_THIS_DIR,
)
self.assertIsInstance(result, vr.ValidationWarning)
def test_url_validation(self):
self._run_field_validation(
field=known_fields.URL,
valid_values=[
"https://www.example.com/a",
"http://www.example.com/b",
"ftp://www.example.com/c,git://www.example.com/d",
"This is the canonical public repository",
],
error_values=[
"",
"\n",
"ghttps://www.example.com/e",
"https://www.example.com/ f",
"Https://www.example.com/g",
"This is an unrecognized message for the URL",
],
)
def test_version_validation(self):
self._run_field_validation(
field=known_fields.VERSION,
valid_values=["n / a", "123abc", "unknown forked version"],
error_values=["", "\n"],
warning_values=["0", "unknown"],
)
def _run_field_validation(self,
field: field_types.MetadataField,
valid_values: List[str],
error_values: List[str],
warning_values: List[str] = []):
"""Helper to run a field's validation for different values."""
for value in valid_values:
self.assertIsNone(field.validate(value), value)
for value in error_values:
self.assertIsInstance(field.validate(value), vr.ValidationError,
value)
for value in warning_values:
self.assertIsInstance(field.validate(value), vr.ValidationWarning,
value)
def test_freeform_text_validation(self):
# Check validation of a freeform text field that should be on
# one line.
self._run_field_validation(
field=field_types.FreeformTextField("Freeform single",
one_liner=True),
valid_values=["Text on single line", "a", "1"],
error_values=["", "\n", " "],
)
# Check validation of a freeform text field that can span
# multiple lines.
self._run_field_validation(
field=field_types.FreeformTextField("Freeform multi",
one_liner=False),
valid_values=[
"This is text spanning multiple lines:\n"
" * with this point\n"
" * and this other point",
"Text on single line",
"a",
"1",
],
error_values=["", "\n", " "],
)
def test_yes_no_field_validation(self):
self._run_field_validation(
field=field_types.YesNoField("Yes/No test"),
valid_values=["yes", "no", "No", "YES"],
error_values=["", "\n", "Probably yes"],
warning_values=["Yes?", "not"],
)
def test_cpe_prefix_validation(self):
self._run_field_validation(
field=known_fields.CPE_PREFIX,
valid_values=[
"unknown",
"cpe:2.3:a:sqlite:sqlite:3.0.0:*:*:*:*:*:*:*",
"cpe:2.3:a:sqlite:sqlite:*:*:*:*:*:*:*:*",
"cpe:/a:vendor:product:version:update:edition:lang",
"cpe:/a::product:",
"cpe:/:vendor::::edition",
"cpe:/:vendor",
],
error_values=[
"",
"\n",
"cpe:2.3:a:sqlite:sqlite:3.0.0",
"cpe:2.3:a:sqlite:sqlite::::::::",
"cpe:/",
"cpe:/a:vendor:product:version:update:edition:lang:",
],
)
def test_date_validation(self):
self._run_field_validation(
field=known_fields.DATE,
valid_values=["2012-03-04"],
error_values=["", "\n", "April 3, 2012", "2012/03/04"],
)
def test_license_validation(self):
self._run_field_validation(
field=known_fields.LICENSE,
valid_values=[
"Apache, 2.0 / MIT / MPL 2",
"LGPL 2.1",
"Apache, Version 2 and Public domain",
],
error_values=["", "\n", ",", "Apache 2.0 / MIT / "],
warning_values=[
"Custom license",
"Custom / MIT",
"Public domain or MPL 2",
"APSL 2 and the MIT license",
],
)
def test_license_file_validation(self):
self._run_field_validation(
field=known_fields.LICENSE_FILE,
valid_values=[
"LICENSE", "src/LICENSE.txt",
"LICENSE, //third_party_test/LICENSE-TEST",
"src/MISSING_LICENSE"
],
error_values=["", "\n", ","],
warning_values=["NOT_SHIPPED"],
)
# Check relative path from README directory, and multiple
# license files.
result = known_fields.LICENSE_FILE.validate_on_disk(
value="LICENSE, src/LICENSE.txt",
source_file_dir=os.path.join(_THIS_DIR, "data"),
repo_root_dir=_THIS_DIR,
)
self.assertIsNone(result)
# Check relative path from Chromium src directory.
result = known_fields.LICENSE_FILE.validate_on_disk(
value="//data/LICENSE",
source_file_dir=os.path.join(_THIS_DIR, "data"),
repo_root_dir=_THIS_DIR,
)
self.assertIsNone(result)
# Check missing file.
result = known_fields.LICENSE_FILE.validate_on_disk(
value="MISSING_LICENSE",
source_file_dir=os.path.join(_THIS_DIR, "data"),
repo_root_dir=_THIS_DIR,
)
self.assertIsInstance(result, vr.ValidationError)
# Check deprecated NOT_SHIPPED.
result = known_fields.LICENSE_FILE.validate_on_disk(
value="NOT_SHIPPED",
source_file_dir=os.path.join(_THIS_DIR, "data"),
repo_root_dir=_THIS_DIR,
)
self.assertIsInstance(result, vr.ValidationWarning)
def test_url_validation(self):
self._run_field_validation(
field=known_fields.URL,
valid_values=[
"https://www.example.com/a",
"http://www.example.com/b",
"ftp://www.example.com/c,git://www.example.com/d",
"This is the canonical public repository",
],
error_values=[
"",
"\n",
"ghttps://www.example.com/e",
"https://www.example.com/ f",
"Https://www.example.com/g",
"This is an unrecognized message for the URL",
],
)
def test_version_validation(self):
self._run_field_validation(
field=known_fields.VERSION,
valid_values=["n / a", "123abc", "unknown forked version"],
error_values=["", "\n"],
warning_values=["0", "unknown"],
)
if __name__ == "__main__":
unittest.main()
unittest.main()

@ -19,102 +19,102 @@ import metadata.parse
class ParseTest(unittest.TestCase):
def test_parse_single(self):
"""Check parsing works for a single dependency's metadata."""
filepath = os.path.join(_THIS_DIR, "data",
"README.chromium.test.single-valid")
content = gclient_utils.FileRead(filepath)
all_metadata = metadata.parse.parse_content(content)
def test_parse_single(self):
"""Check parsing works for a single dependency's metadata."""
filepath = os.path.join(_THIS_DIR, "data",
"README.chromium.test.single-valid")
content = gclient_utils.FileRead(filepath)
all_metadata = metadata.parse.parse_content(content)
self.assertEqual(len(all_metadata), 1)
self.assertListEqual(
all_metadata[0].get_entries(),
[
("Name", "Test-A README for Chromium metadata"),
("Short Name", "metadata-test-valid"),
("URL", "https://www.example.com/metadata,\n"
" https://www.example.com/parser"),
("Version", "1.0.12"),
("Date", "2020-12-03"),
("License", "Apache, 2.0 and MIT"),
("License File", "LICENSE"),
("Security Critical", "yes"),
("Shipped", "yes"),
("CPEPrefix", "unknown"),
("Description", "A test metadata file, with a\n"
" multi-line description."),
("Local Modifications", "None,\nEXCEPT:\n* nothing."),
],
)
self.assertEqual(len(all_metadata), 1)
self.assertListEqual(
all_metadata[0].get_entries(),
[
("Name", "Test-A README for Chromium metadata"),
("Short Name", "metadata-test-valid"),
("URL", "https://www.example.com/metadata,\n"
" https://www.example.com/parser"),
("Version", "1.0.12"),
("Date", "2020-12-03"),
("License", "Apache, 2.0 and MIT"),
("License File", "LICENSE"),
("Security Critical", "yes"),
("Shipped", "yes"),
("CPEPrefix", "unknown"),
("Description", "A test metadata file, with a\n"
" multi-line description."),
("Local Modifications", "None,\nEXCEPT:\n* nothing."),
],
)
def test_parse_multiple(self):
"""Check parsing works for multiple dependencies' metadata."""
filepath = os.path.join(_THIS_DIR, "data",
"README.chromium.test.multi-invalid")
content = gclient_utils.FileRead(filepath)
all_metadata = metadata.parse.parse_content(content)
def test_parse_multiple(self):
"""Check parsing works for multiple dependencies' metadata."""
filepath = os.path.join(_THIS_DIR, "data",
"README.chromium.test.multi-invalid")
content = gclient_utils.FileRead(filepath)
all_metadata = metadata.parse.parse_content(content)
# Dependency metadata with no entries at all are ignored.
self.assertEqual(len(all_metadata), 3)
# Dependency metadata with no entries at all are ignored.
self.assertEqual(len(all_metadata), 3)
# Check entries are added according to fields being one-liners.
self.assertListEqual(
all_metadata[0].get_entries(),
[
("Name",
"Test-A README for Chromium metadata (0 errors, 0 warnings)"),
("Short Name", "metadata-test-valid"),
("URL", "https://www.example.com/metadata,\n"
" https://www.example.com/parser"),
("Version", "1.0.12"),
("Date", "2020-12-03"),
("License", "Apache, 2.0 and MIT"),
("License File", "LICENSE"),
("Security Critical", "yes"),
("Shipped", "yes"),
("CPEPrefix", "unknown"),
("Description", "A test metadata file, with a\n"
" multi-line description."),
("Local Modifications", "None,\nEXCEPT:\n* nothing."),
],
)
# Check entries are added according to fields being one-liners.
self.assertListEqual(
all_metadata[0].get_entries(),
[
("Name",
"Test-A README for Chromium metadata (0 errors, 0 warnings)"),
("Short Name", "metadata-test-valid"),
("URL", "https://www.example.com/metadata,\n"
" https://www.example.com/parser"),
("Version", "1.0.12"),
("Date", "2020-12-03"),
("License", "Apache, 2.0 and MIT"),
("License File", "LICENSE"),
("Security Critical", "yes"),
("Shipped", "yes"),
("CPEPrefix", "unknown"),
("Description", "A test metadata file, with a\n"
" multi-line description."),
("Local Modifications", "None,\nEXCEPT:\n* nothing."),
],
)
# Check the parser handles different casing for field names, and strips
# leading and trailing whitespace from values.
self.assertListEqual(
all_metadata[1].get_entries(),
[
("Name",
"Test-B README for Chromium metadata (4 errors, 1 warning)"),
("SHORT NAME", "metadata-test-invalid"),
("URL", "file://home/drive/chromium/src/metadata"),
("Version", "0"),
("Date", "2020-12-03"),
("License", "MIT"),
("Security critical", "yes"),
("Shipped", "Yes"),
("Description", ""),
("Local Modifications", "None."),
],
)
# Check the parser handles different casing for field names, and
# strips leading and trailing whitespace from values.
self.assertListEqual(
all_metadata[1].get_entries(),
[
("Name",
"Test-B README for Chromium metadata (4 errors, 1 warning)"),
("SHORT NAME", "metadata-test-invalid"),
("URL", "file://home/drive/chromium/src/metadata"),
("Version", "0"),
("Date", "2020-12-03"),
("License", "MIT"),
("Security critical", "yes"),
("Shipped", "Yes"),
("Description", ""),
("Local Modifications", "None."),
],
)
# Check repeated fields persist in the metadata's entries.
self.assertListEqual(
all_metadata[2].get_entries(),
[
("Name",
"Test-C README for Chromium metadata (5 errors, 1 warning)"),
("URL", "https://www.example.com/first"),
("URL", "https://www.example.com/second"),
("Version", "N/A"),
("Date", "2020-12-03"),
("License", "Custom license"),
("Security Critical", "yes"),
("Description", "Test metadata with multiple entries for one "
"field, and\nmissing a mandatory field."),
],
)
# Check repeated fields persist in the metadata's entries.
self.assertListEqual(
all_metadata[2].get_entries(),
[
("Name",
"Test-C README for Chromium metadata (5 errors, 1 warning)"),
("URL", "https://www.example.com/first"),
("URL", "https://www.example.com/second"),
("Version", "N/A"),
("Date", "2020-12-03"),
("License", "Custom license"),
("Security Critical", "yes"),
("Description", "Test metadata with multiple entries for one "
"field, and\nmissing a mandatory field."),
],
)
if __name__ == "__main__":
unittest.main()
unittest.main()

@ -21,125 +21,125 @@ import metadata.validate
_SOURCE_FILE_DIR = os.path.join(_THIS_DIR, "data")
_VALID_METADATA_FILEPATH = os.path.join(_THIS_DIR, "data",
"README.chromium.test.multi-valid")
_INVALID_METADATA_FILEPATH = os.path.join(_THIS_DIR, "data",
"README.chromium.test.multi-invalid")
_INVALID_METADATA_FILEPATH = os.path.join(
_THIS_DIR, "data", "README.chromium.test.multi-invalid")
class ValidateContentTest(unittest.TestCase):
"""Tests for the validate_content function."""
def test_empty(self):
# Validate empty content (should result in a validation error).
results = metadata.validate.validate_content(
content="",
source_file_dir=_SOURCE_FILE_DIR,
repo_root_dir=_THIS_DIR,
)
self.assertEqual(len(results), 1)
self.assertTrue(results[0].is_fatal())
def test_valid(self):
# Validate valid file content (no errors or warnings).
results = metadata.validate.validate_content(
content=gclient_utils.FileRead(_VALID_METADATA_FILEPATH),
source_file_dir=_SOURCE_FILE_DIR,
repo_root_dir=_THIS_DIR,
)
self.assertEqual(len(results), 0)
def test_invalid(self):
# Validate invalid file content (both errors and warnings).
results = metadata.validate.validate_content(
content=gclient_utils.FileRead(_INVALID_METADATA_FILEPATH),
source_file_dir=_SOURCE_FILE_DIR,
repo_root_dir=_THIS_DIR,
)
self.assertEqual(len(results), 11)
error_count = 0
warning_count = 0
for result in results:
if result.is_fatal():
error_count += 1
else:
warning_count += 1
self.assertEqual(error_count, 9)
self.assertEqual(warning_count, 2)
"""Tests for the validate_content function."""
def test_empty(self):
# Validate empty content (should result in a validation error).
results = metadata.validate.validate_content(
content="",
source_file_dir=_SOURCE_FILE_DIR,
repo_root_dir=_THIS_DIR,
)
self.assertEqual(len(results), 1)
self.assertTrue(results[0].is_fatal())
def test_valid(self):
# Validate valid file content (no errors or warnings).
results = metadata.validate.validate_content(
content=gclient_utils.FileRead(_VALID_METADATA_FILEPATH),
source_file_dir=_SOURCE_FILE_DIR,
repo_root_dir=_THIS_DIR,
)
self.assertEqual(len(results), 0)
def test_invalid(self):
# Validate invalid file content (both errors and warnings).
results = metadata.validate.validate_content(
content=gclient_utils.FileRead(_INVALID_METADATA_FILEPATH),
source_file_dir=_SOURCE_FILE_DIR,
repo_root_dir=_THIS_DIR,
)
self.assertEqual(len(results), 11)
error_count = 0
warning_count = 0
for result in results:
if result.is_fatal():
error_count += 1
else:
warning_count += 1
self.assertEqual(error_count, 9)
self.assertEqual(warning_count, 2)
class ValidateFileTest(unittest.TestCase):
"""Tests for the validate_file function."""
def test_missing(self):
# Validate a file that does not exist.
results = metadata.validate.validate_file(
filepath=os.path.join(_THIS_DIR, "data", "MISSING.chromium"),
repo_root_dir=_THIS_DIR,
)
# There should be exactly 1 error returned.
self.assertEqual(len(results), 1)
self.assertTrue(results[0].is_fatal())
def test_valid(self):
# Validate a valid file (no errors or warnings).
results = metadata.validate.validate_file(
filepath=_VALID_METADATA_FILEPATH,
repo_root_dir=_THIS_DIR,
)
self.assertEqual(len(results), 0)
def test_invalid(self):
# Validate an invalid file (both errors and warnings).
results = metadata.validate.validate_file(
filepath=_INVALID_METADATA_FILEPATH,
repo_root_dir=_THIS_DIR,
)
self.assertEqual(len(results), 11)
error_count = 0
warning_count = 0
for result in results:
if result.is_fatal():
error_count += 1
else:
warning_count += 1
self.assertEqual(error_count, 9)
self.assertEqual(warning_count, 2)
"""Tests for the validate_file function."""
def test_missing(self):
# Validate a file that does not exist.
results = metadata.validate.validate_file(
filepath=os.path.join(_THIS_DIR, "data", "MISSING.chromium"),
repo_root_dir=_THIS_DIR,
)
# There should be exactly 1 error returned.
self.assertEqual(len(results), 1)
self.assertTrue(results[0].is_fatal())
def test_valid(self):
# Validate a valid file (no errors or warnings).
results = metadata.validate.validate_file(
filepath=_VALID_METADATA_FILEPATH,
repo_root_dir=_THIS_DIR,
)
self.assertEqual(len(results), 0)
def test_invalid(self):
# Validate an invalid file (both errors and warnings).
results = metadata.validate.validate_file(
filepath=_INVALID_METADATA_FILEPATH,
repo_root_dir=_THIS_DIR,
)
self.assertEqual(len(results), 11)
error_count = 0
warning_count = 0
for result in results:
if result.is_fatal():
error_count += 1
else:
warning_count += 1
self.assertEqual(error_count, 9)
self.assertEqual(warning_count, 2)
class CheckFileTest(unittest.TestCase):
"""Tests for the check_file function."""
def test_missing(self):
# Check a file that does not exist.
errors, warnings = metadata.validate.check_file(
filepath=os.path.join(_THIS_DIR, "data", "MISSING.chromium"),
repo_root_dir=_THIS_DIR,
)
# TODO(aredulla): update this test once validation errors can be returned
# as errors. Bug: b/285453019.
# self.assertEqual(len(errors), 1)
# self.assertEqual(len(warnings), 0)
self.assertEqual(len(errors), 0)
self.assertEqual(len(warnings), 1)
def test_valid(self):
# Check file with valid content (no errors or warnings).
errors, warnings = metadata.validate.check_file(
filepath=_VALID_METADATA_FILEPATH,
repo_root_dir=_THIS_DIR,
)
self.assertEqual(len(errors), 0)
self.assertEqual(len(warnings), 0)
def test_invalid(self):
# Check file with invalid content (both errors and warnings).
errors, warnings = metadata.validate.check_file(
filepath=_INVALID_METADATA_FILEPATH,
repo_root_dir=_THIS_DIR,
)
# TODO(aredulla): update this test once validation errors can be returned
# as errors. Bug: b/285453019.
# self.assertEqual(len(errors), 9)
# self.assertEqual(len(warnings), 2)
self.assertEqual(len(errors), 0)
self.assertEqual(len(warnings), 11)
"""Tests for the check_file function."""
def test_missing(self):
# Check a file that does not exist.
errors, warnings = metadata.validate.check_file(
filepath=os.path.join(_THIS_DIR, "data", "MISSING.chromium"),
repo_root_dir=_THIS_DIR,
)
# TODO(aredulla): update this test once validation errors can be
# returned as errors. Bug: b/285453019.
# self.assertEqual(len(errors), 1)
# self.assertEqual(len(warnings), 0)
self.assertEqual(len(errors), 0)
self.assertEqual(len(warnings), 1)
def test_valid(self):
# Check file with valid content (no errors or warnings).
errors, warnings = metadata.validate.check_file(
filepath=_VALID_METADATA_FILEPATH,
repo_root_dir=_THIS_DIR,
)
self.assertEqual(len(errors), 0)
self.assertEqual(len(warnings), 0)
def test_invalid(self):
# Check file with invalid content (both errors and warnings).
errors, warnings = metadata.validate.check_file(
filepath=_INVALID_METADATA_FILEPATH,
repo_root_dir=_THIS_DIR,
)
# TODO(aredulla): update this test once validation errors can be
# returned as errors. Bug: b/285453019.
# self.assertEqual(len(errors), 9)
# self.assertEqual(len(warnings), 2)
self.assertEqual(len(errors), 0)
self.assertEqual(len(warnings), 11)
if __name__ == "__main__":
unittest.main()
unittest.main()

@ -21,37 +21,41 @@ import metadata.validation_result as vr
def validate_content(content: str, source_file_dir: str,
repo_root_dir: str) -> List[vr.ValidationResult]:
"""Validate the content as a metadata file.
Args:
content: the entire content of a file to be validated as a metadata file.
source_file_dir: the directory of the metadata file that the license file
value is from; this is needed to construct file paths to
license files.
repo_root_dir: the repository's root directory; this is needed to construct
file paths to license files.
Returns: the validation results for the given content.
"""
results = []
dependencies = metadata.parse.parse_content(content)
if not dependencies:
result = vr.ValidationError(reason="No dependency metadata found.")
return [result]
for dependency in dependencies:
dependency_results = dependency.validate(source_file_dir=source_file_dir,
repo_root_dir=repo_root_dir)
results.extend(dependency_results)
return results
def _construct_file_read_error(filepath: str, cause: str) -> vr.ValidationError:
"""Helper function to create a validation error for a file reading issue."""
result = vr.ValidationError(
reason="Cannot read metadata file.",
additional=[f"Attempted to read '{filepath}' but {cause}."])
return result
"""Validate the content as a metadata file.
Args:
content: the entire content of a file to be validated as a
metadata file.
source_file_dir: the directory of the metadata file that the
license file value is from; this is needed to
construct file paths to license files.
repo_root_dir: the repository's root directory; this is needed
to construct file paths to license files.
Returns: the validation results for the given content.
"""
results = []
dependencies = metadata.parse.parse_content(content)
if not dependencies:
result = vr.ValidationError(reason="No dependency metadata found.")
return [result]
for dependency in dependencies:
dependency_results = dependency.validate(
source_file_dir=source_file_dir, repo_root_dir=repo_root_dir)
results.extend(dependency_results)
return results
def _construct_file_read_error(filepath: str,
cause: str) -> vr.ValidationError:
"""Helper function to create a validation error for a
file reading issue.
"""
result = vr.ValidationError(
reason="Cannot read metadata file.",
additional=[f"Attempted to read '{filepath}' but {cause}."])
return result
def validate_file(
@ -59,39 +63,42 @@ def validate_file(
repo_root_dir: str,
reader: Callable[[str], Union[str, bytes]] = None,
) -> List[vr.ValidationResult]:
"""Validate the item located at the given filepath is a valid dependency
metadata file.
Args:
filepath: the path to validate,
e.g. "/chromium/src/third_party/libname/README.chromium"
repo_root_dir: the repository's root directory; this is needed to construct
file paths to license files.
reader (optional): callable function/method to read the content of the file.
Returns: the validation results for the given filepath and its contents, if
it exists.
"""
if reader is None:
reader = gclient_utils.FileRead
try:
content = reader(filepath)
except FileNotFoundError:
return [_construct_file_read_error(filepath, "file not found")]
except PermissionError:
return [_construct_file_read_error(filepath, "access denied")]
except Exception as e:
return [_construct_file_read_error(filepath, f"unexpected error: '{e}'")]
else:
if not isinstance(content, str):
return [_construct_file_read_error(filepath, "decoding failed")]
# Get the directory the metadata file is in.
source_file_dir = os.path.dirname(filepath)
return validate_content(content=content,
source_file_dir=source_file_dir,
repo_root_dir=repo_root_dir)
"""Validate the item located at the given filepath is a valid
dependency metadata file.
Args:
filepath: the path to validate, e.g.
"/chromium/src/third_party/libname/README.chromium"
repo_root_dir: the repository's root directory; this is needed
to construct file paths to license files.
reader (optional): callable function/method to read the content
of the file.
Returns: the validation results for the given filepath and its
contents, if it exists.
"""
if reader is None:
reader = gclient_utils.FileRead
try:
content = reader(filepath)
except FileNotFoundError:
return [_construct_file_read_error(filepath, "file not found")]
except PermissionError:
return [_construct_file_read_error(filepath, "access denied")]
except Exception as e:
return [
_construct_file_read_error(filepath, f"unexpected error: '{e}'")
]
else:
if not isinstance(content, str):
return [_construct_file_read_error(filepath, "decoding failed")]
# Get the directory the metadata file is in.
source_file_dir = os.path.dirname(filepath)
return validate_content(content=content,
source_file_dir=source_file_dir,
repo_root_dir=repo_root_dir)
def check_file(
@ -99,39 +106,41 @@ def check_file(
repo_root_dir: str,
reader: Callable[[str], Union[str, bytes]] = None,
) -> Tuple[List[str], List[str]]:
"""Run metadata validation on the given filepath, and return all validation
errors and validation warnings.
Args:
filepath: the path to a metadata file,
e.g. "/chromium/src/third_party/libname/README.chromium"
repo_root_dir: the repository's root directory; this is needed to construct
file paths to license files.
reader (optional): callable function/method to read the content of the file.
Returns:
error_messages: the fatal validation issues present in the file;
i.e. presubmit should fail.
warning_messages: the non-fatal validation issues present in the file;
i.e. presubmit should still pass.
"""
results = validate_file(filepath=filepath,
repo_root_dir=repo_root_dir,
reader=reader)
error_messages = []
warning_messages = []
for result in results:
message = result.get_message(width=60)
# TODO(aredulla): Actually distinguish between validation errors and
# warnings. The quality of metadata is currently being uplifted, but is not
# yet guaranteed to pass validation. So for now, all validation results will
# be returned as warnings so CLs are not blocked by invalid metadata in
# presubmits yet. Bug: b/285453019.
# if result.is_fatal():
# error_messages.append(message)
# else:
warning_messages.append(message)
return error_messages, warning_messages
"""Run metadata validation on the given filepath, and return all
validation errors and validation warnings.
Args:
filepath: the path to a metadata file, e.g.
"/chromium/src/third_party/libname/README.chromium"
repo_root_dir: the repository's root directory; this is needed
to construct file paths to license files.
reader (optional): callable function/method to read the content
of the file.
Returns:
error_messages: the fatal validation issues present in the file;
i.e. presubmit should fail.
warning_messages: the non-fatal validation issues present in the
file; i.e. presubmit should still pass.
"""
results = validate_file(filepath=filepath,
repo_root_dir=repo_root_dir,
reader=reader)
error_messages = []
warning_messages = []
for result in results:
message = result.get_message(width=60)
# TODO(aredulla): Actually distinguish between validation errors
# and warnings. The quality of metadata is currently being
# uplifted, but is not yet guaranteed to pass validation. So for
# now, all validation results will be returned as warnings so
# CLs are not blocked by invalid metadata in presubmits yet.
# Bug: b/285453019.
# if result.is_fatal():
# error_messages.append(message)
# else:
warning_messages.append(message)
return error_messages, warning_messages

@ -6,77 +6,78 @@
import textwrap
from typing import Dict, List, Union
_CHROMIUM_METADATA_PRESCRIPT = "Third party metadata issue:"
_CHROMIUM_METADATA_POSTSCRIPT = ("Check //third_party/README.chromium.template "
"for details.")
_CHROMIUM_METADATA_POSTSCRIPT = (
"Check //third_party/README.chromium.template "
"for details.")
class ValidationResult:
"""Base class for validation issues."""
def __init__(self, reason: str, fatal: bool, additional: List[str] = []):
"""Constructor for a validation issue.
"""Base class for validation issues."""
def __init__(self, reason: str, fatal: bool, additional: List[str] = []):
"""Constructor for a validation issue.
Args:
reason: the root cause of the issue.
fatal: whether the issue is fatal.
additional: details that should be included in the validation message,
e.g. advice on how to address the issue, or specific
problematic values.
"""
self._reason = reason
self._fatal = fatal
self._message = " ".join([reason] + additional)
self._tags = {}
Args:
reason: the root cause of the issue.
fatal: whether the issue is fatal.
additional: details that should be included in the
validation message, e.g. advice on how to
address the issue, or specific problematic
values.
"""
self._reason = reason
self._fatal = fatal
self._message = " ".join([reason] + additional)
self._tags = {}
def __str__(self) -> str:
prefix = self.get_severity_prefix()
return f"{prefix} - {self._message}"
def __str__(self) -> str:
prefix = self.get_severity_prefix()
return f"{prefix} - {self._message}"
def __repr__(self) -> str:
return str(self)
def __repr__(self) -> str:
return str(self)
def is_fatal(self) -> bool:
return self._fatal
def is_fatal(self) -> bool:
return self._fatal
def get_severity_prefix(self):
if self._fatal:
return "ERROR"
return "[non-fatal]"
def get_severity_prefix(self):
if self._fatal:
return "ERROR"
return "[non-fatal]"
def get_reason(self) -> str:
return self._reason
def get_reason(self) -> str:
return self._reason
def set_tag(self, tag: str, value: str) -> bool:
self._tags[tag] = value
def set_tag(self, tag: str, value: str) -> bool:
self._tags[tag] = value
def get_tag(self, tag: str) -> Union[str, None]:
return self._tags.get(tag)
def get_tag(self, tag: str) -> Union[str, None]:
return self._tags.get(tag)
def get_all_tags(self) -> Dict[str, str]:
return dict(self._tags)
def get_all_tags(self) -> Dict[str, str]:
return dict(self._tags)
def get_message(self,
prescript: str = _CHROMIUM_METADATA_PRESCRIPT,
postscript: str = _CHROMIUM_METADATA_POSTSCRIPT,
width: int = 0) -> str:
components = [prescript, self._message, postscript]
message = " ".join(
[component for component in components if len(component) > 0])
def get_message(self,
prescript: str = _CHROMIUM_METADATA_PRESCRIPT,
postscript: str = _CHROMIUM_METADATA_POSTSCRIPT,
width: int = 0) -> str:
components = [prescript, self._message, postscript]
message = " ".join(
[component for component in components if len(component) > 0])
if width > 0:
return textwrap.fill(text=message, width=width)
if width > 0:
return textwrap.fill(text=message, width=width)
return message
return message
class ValidationError(ValidationResult):
"""Fatal validation issue. Presubmit should fail."""
def __init__(self, reason: str, additional: List[str] = []):
super().__init__(reason=reason, fatal=True, additional=additional)
"""Fatal validation issue. Presubmit should fail."""
def __init__(self, reason: str, additional: List[str] = []):
super().__init__(reason=reason, fatal=True, additional=additional)
class ValidationWarning(ValidationResult):
"""Non-fatal validation issue. Presubmit should pass."""
def __init__(self, reason: str, additional: List[str] = []):
super().__init__(reason=reason, fatal=False, additional=additional)
"""Non-fatal validation issue. Presubmit should pass."""
def __init__(self, reason: str, additional: List[str] = []):
super().__init__(reason=reason, fatal=False, additional=additional)

Loading…
Cancel
Save