mirror of https://github.com/mongodb/mongo
1017 lines
38 KiB
Python
Executable File
1017 lines
38 KiB
Python
Executable File
#!/usr/bin/env python3
|
|
"""
|
|
Generate a CycloneDX SBOM using scan results from Endor Labs.
|
|
Schema validation of output is not performed.
|
|
Use 'buildscripts/sbom_linter.py' for validation.
|
|
|
|
Invoke with ---help or -h for help message.
|
|
"""
|
|
|
|
import argparse
|
|
import json
|
|
import logging
|
|
import os
|
|
import re
|
|
import subprocess
|
|
import sys
|
|
import urllib.parse
|
|
import uuid
|
|
from datetime import datetime, timezone
|
|
from pathlib import Path
|
|
|
|
from config import (
|
|
endor_components_remove,
|
|
endor_components_rename,
|
|
get_semver_from_release_version,
|
|
process_component_special_cases,
|
|
)
|
|
from endorctl_utils import EndorCtl
|
|
from git import Commit, Repo
|
|
|
|
# region init
|
|
|
|
|
|
class WarningListHandler(logging.Handler):
|
|
"""Collect warnings"""
|
|
|
|
def __init__(self):
|
|
super().__init__()
|
|
self.warnings = []
|
|
|
|
def emit(self, record):
|
|
if record.levelno >= logging.WARNING:
|
|
self.warnings.append(record)
|
|
|
|
|
|
logging.basicConfig(stream=sys.stdout)
|
|
logger = logging.getLogger("generate_sbom")
|
|
logger.setLevel(logging.INFO)
|
|
|
|
# Create an instance of the custom handler
|
|
warning_handler = WarningListHandler()
|
|
|
|
# Add the handler to the logger
|
|
logger.addHandler(warning_handler)
|
|
|
|
# Get the absolute path of the script file and directory
|
|
script_path = Path(__file__).resolve()
|
|
script_directory = script_path.parent
|
|
|
|
# Regex for validation
|
|
REGEX_COMMIT_SHA = r"^[0-9a-fA-F]{40}$"
|
|
REGEX_GIT_BRANCH = r"^[a-zA-Z0-9_.\-/]+$"
|
|
REGEX_GITHUB_URL = r"^(https://github.com/)([a-zA-Z0-9-]{1,39}/[a-zA-Z0-9-_.]{1,100})(\.git)$"
|
|
REGEX_RELEASE_BRANCH = r"^v\d\.\d$"
|
|
REGEX_RELEASE_TAG = r"^r\d\.\d.\d(-\w*)?$"
|
|
|
|
# ################ PURL Validation ################
|
|
REGEX_STR_PURL_OPTIONAL = ( # Optional Version (any chars except ? @ #)
|
|
r"(?:@[^?@#]*)?"
|
|
# Optional Qualifiers (any chars except @ #)
|
|
r"(?:\?[^@#]*)?"
|
|
# Optional Subpath (any chars)
|
|
r"(?:#.*)?$"
|
|
)
|
|
|
|
REGEX_PURL = {
|
|
# deb PURL. https://github.com/package-url/purl-spec/blob/main/types-doc/deb-definition.md
|
|
"deb": re.compile(
|
|
r"^pkg:deb/" # Scheme and type
|
|
# Namespace (organization/user), letters must be lowercase
|
|
r"(debian|ubuntu)+"
|
|
r"/"
|
|
r"[a-z0-9._-]+" + REGEX_STR_PURL_OPTIONAL # Name
|
|
),
|
|
# Generic PURL. https://github.com/package-url/purl-spec/blob/main/types-doc/generic-definition.md
|
|
"generic": re.compile(
|
|
r"^pkg:generic/" # Scheme and type
|
|
r"([a-zA-Z0-9._-]+/)?" # Optional namespace segment
|
|
r"[a-zA-Z0-9._-]+" + REGEX_STR_PURL_OPTIONAL # Name (required)
|
|
),
|
|
# GitHub PURL. https://github.com/package-url/purl-spec/blob/main/types-doc/github-definition.md
|
|
"github": re.compile(
|
|
r"^pkg:github/" # Scheme and type
|
|
# Namespace (organization/user), letters must be lowercase
|
|
r"[a-z0-9-]+"
|
|
r"/"
|
|
r"[a-z0-9._-]+" + REGEX_STR_PURL_OPTIONAL # Name (repository)
|
|
),
|
|
# PyPI PURL. https://github.com/package-url/purl-spec/blob/main/types-doc/pypi-definition.md
|
|
"pypi": re.compile(
|
|
r"^pkg:pypi/" # Scheme and type
|
|
r"[a-z0-9_-]+" # Name, letters must be lowercase, dashes, underscore
|
|
+ REGEX_STR_PURL_OPTIONAL
|
|
),
|
|
}
|
|
|
|
|
|
# Metadata SBOM requirements
|
|
METADATA_FIELDS_REQUIRED = [
|
|
"type",
|
|
"bom-ref",
|
|
"group",
|
|
"name",
|
|
"version",
|
|
"description",
|
|
"licenses",
|
|
"copyright",
|
|
"externalReferences",
|
|
"scope",
|
|
]
|
|
METADATA_FIELDS_ONE_OF = [
|
|
["author", "supplier"],
|
|
["purl", "cpe"],
|
|
]
|
|
|
|
# endregion init
|
|
|
|
|
|
# region functions and classes
|
|
|
|
|
|
class GitInfo:
|
|
"""Get, set, format git info"""
|
|
|
|
def __init__(self):
|
|
print_banner("Gathering git info")
|
|
try:
|
|
self.repo_root = Path(
|
|
subprocess.run(
|
|
"git rev-parse --show-toplevel",
|
|
shell=True,
|
|
text=True,
|
|
capture_output=True,
|
|
check=True,
|
|
).stdout.strip()
|
|
)
|
|
self._repo = Repo(self.repo_root)
|
|
except Exception as e:
|
|
logger.warning(
|
|
"Unable to read git repo information. All necessary script arguments must be provided."
|
|
)
|
|
logger.warning(e)
|
|
self._repo = None
|
|
else:
|
|
try:
|
|
self.project = self._repo.remotes.origin.config_reader.get("url")
|
|
if not self.project.endswith(".git"):
|
|
self.project += ".git"
|
|
org_repo = extract_repo_from_git_url(self.project)
|
|
self.org = org_repo["org"]
|
|
self.repo = org_repo["repo"]
|
|
self.commit = self._repo.head.commit.hexsha
|
|
self.branch = self._repo.active_branch.name
|
|
|
|
# filter tags for latest release e.g., r8.2.1
|
|
release_tags = []
|
|
filtered_tags = [
|
|
tag for tag in self._repo.tags if re.fullmatch(REGEX_RELEASE_TAG, tag.name)
|
|
]
|
|
logging.info(f"GIT: Parsing {len(filtered_tags)} release tags for match to commit")
|
|
for tag in filtered_tags:
|
|
if tag.commit == self.commit:
|
|
release_tags.append(tag.name)
|
|
if len(release_tags) > 0:
|
|
self.release_tag = release_tags[-1]
|
|
else:
|
|
self.release_tag = None
|
|
logging.debug(f"GitInfo->release_tag(): {self.release_tag}")
|
|
|
|
logging.debug(f"GitInfo->__init__: {self}")
|
|
except Exception as e:
|
|
logger.warning("Unable to fully parse git info.")
|
|
logger.warning(e)
|
|
|
|
def close(self):
|
|
"""Closes the underlying Git repo object to release resources."""
|
|
if self._repo:
|
|
logger.debug("Closing Git repo object.")
|
|
self._repo.close()
|
|
self._repo = None
|
|
|
|
def added_new_3p_folder(self, commit: Commit) -> bool:
|
|
"""
|
|
Checks if a given commit added a new third-party subfolder.
|
|
|
|
Args:
|
|
commit: The GitPython Commit object to analyze.
|
|
|
|
Returns:
|
|
True if the commit added a new subfolder, False otherwise.
|
|
"""
|
|
if not commit.parents:
|
|
# If it's the initial commit, all folders are "new"
|
|
# You might want to refine this logic based on your definition of "new"
|
|
# Check if there are any subfolders in the initial commit
|
|
return bool(commit.tree.trees)
|
|
|
|
parent_commit = commit.parents[0]
|
|
diff_index = commit.diff(parent_commit)
|
|
|
|
for diff in diff_index:
|
|
# Check for added items that are directories
|
|
if diff.change_type == "A" and diff.b_is_dir:
|
|
return True
|
|
return False
|
|
|
|
|
|
def print_banner(text: str) -> None:
|
|
"""print() a padded status message to stdout"""
|
|
print()
|
|
print(text.center(len(text) + 2, " ").center(120, "="))
|
|
|
|
|
|
def extract_repo_from_git_url(git_url: str) -> dict:
|
|
"""Determine org/repo for a given git url"""
|
|
git_org = git_url.split("/")[-2].replace(".git", "")
|
|
git_repo = git_url.split("/")[-1].replace(".git", "")
|
|
return {
|
|
"org": git_org,
|
|
"repo": git_repo,
|
|
}
|
|
|
|
|
|
def is_valid_purl(purl: str) -> bool:
|
|
"""Validate a GitHub or Generic PURL"""
|
|
for purl_type, regex in REGEX_PURL.items():
|
|
if regex.match(purl):
|
|
logger.debug(f"PURL: {purl} matched PURL type '{purl_type}' regex '{regex.pattern}'")
|
|
return True
|
|
return False
|
|
|
|
|
|
def sbom_components_to_dict(sbom: dict, with_version: bool = False) -> dict:
|
|
"""Create a dict of SBOM components with a version-less PURL as the key"""
|
|
components = sbom["components"]
|
|
if with_version:
|
|
components_dict = {
|
|
urllib.parse.unquote(component["bom-ref"]): component for component in components
|
|
}
|
|
else:
|
|
components_dict = {
|
|
urllib.parse.unquote(component["bom-ref"]).split("@")[0]: component
|
|
for component in components
|
|
}
|
|
return components_dict
|
|
|
|
|
|
def check_metadata_sbom(meta_bom: dict) -> None:
|
|
"""Run checks on SBOM component metadata for expected fields."""
|
|
for component in meta_bom["components"]:
|
|
for field in METADATA_FIELDS_REQUIRED:
|
|
if field not in component:
|
|
logger.warning(
|
|
f"METADATA: '{component['bom-ref'] or component['name']} is missing required field '{field}'."
|
|
)
|
|
for fields in METADATA_FIELDS_ONE_OF:
|
|
found = False
|
|
for field in fields:
|
|
found = found or field in component
|
|
if not found:
|
|
logger.warning(
|
|
f"METADATA: '{component['bom-ref'] or component['name']} is missing one of fields '{fields}'."
|
|
)
|
|
|
|
|
|
def read_sbom_json_file(file_path: str) -> dict:
|
|
"""Load a JSON SBOM file (schema is not validated)"""
|
|
try:
|
|
with open(file_path, "r", encoding="utf-8") as input_json:
|
|
sbom_json = input_json.read()
|
|
result = json.loads(sbom_json)
|
|
except Exception as e:
|
|
logger.error(f"Error loading SBOM file from {file_path}")
|
|
logger.error(e)
|
|
else:
|
|
logger.info(f"SBOM loaded from {file_path} with {len(result['components'])} components")
|
|
return result
|
|
|
|
|
|
def write_sbom_json_file(sbom_dict: dict, file_path: str) -> None:
|
|
"""Save a JSON SBOM file (schema is not validated)"""
|
|
try:
|
|
file_path = os.path.abspath(file_path)
|
|
with open(file_path, "w", encoding="utf-8") as output_json:
|
|
formatted_sbom = json.dumps(sbom_dict, indent=2) + "\n"
|
|
output_json.write(formatted_sbom)
|
|
except Exception as e:
|
|
logger.error(f"Error writing SBOM file to {file_path}")
|
|
logger.error(e)
|
|
else:
|
|
logger.info(f"SBOM file saved to {file_path}")
|
|
|
|
|
|
def write_list_to_text_file(str_list: list, file_path: str) -> None:
|
|
"""Save a list of strings to a text file"""
|
|
try:
|
|
file_path = os.path.abspath(file_path)
|
|
with open(file_path, "w", encoding="utf-8") as output_txt:
|
|
for item in str_list:
|
|
output_txt.write(f"{item}\n")
|
|
except Exception as e:
|
|
logger.error(f"Error writing text file to {file_path}")
|
|
logger.error(e)
|
|
else:
|
|
logger.info(f"Text file saved to {file_path}")
|
|
|
|
|
|
def set_component_version(
|
|
component: dict, version: str, purl_version: str = None, cpe_version: str = None
|
|
) -> None:
|
|
"""Update the appropriate version fields in a component from the metadata SBOM"""
|
|
if not purl_version:
|
|
purl_version = version
|
|
|
|
if not cpe_version:
|
|
cpe_version = version
|
|
|
|
component["bom-ref"] = component["bom-ref"].replace("{{VERSION}}", purl_version)
|
|
component["version"] = component["version"].replace("{{VERSION}}", version)
|
|
if component.get("purl"):
|
|
component["purl"] = component["purl"].replace("{{VERSION}}", purl_version)
|
|
if not is_valid_purl(component["purl"]):
|
|
logger.warning(f"PURL: Invalid PURL ({component['purl']})")
|
|
if component.get("cpe"):
|
|
component["cpe"] = component["cpe"].replace("{{VERSION}}", cpe_version)
|
|
|
|
|
|
def set_dependency_version(dependencies: list, meta_bom_ref: str, purl_version: str) -> None:
|
|
"""Update the appropriate dependency version fields in the metadata SBOM"""
|
|
r = 0
|
|
d = 0
|
|
for dependency in dependencies:
|
|
if "{{VERSION}}" in dependency["ref"] and dependency["ref"] == meta_bom_ref:
|
|
dependency["ref"] = dependency["ref"].replace("{{VERSION}}", purl_version)
|
|
r += 1
|
|
for i in range(len(dependency["dependsOn"])):
|
|
if dependency["dependsOn"][i] == meta_bom_ref:
|
|
dependency["dependsOn"][i] = dependency["dependsOn"][i].replace(
|
|
"{{VERSION}}", purl_version
|
|
)
|
|
d += 1
|
|
|
|
logger.debug(f"set_dependency_version: '{meta_bom_ref}' updated {r} refs and {d} dependsOn")
|
|
|
|
|
|
def get_subfolders_dict(folder_path: str = ".") -> dict:
|
|
"""Get list of all directories in the specified path"""
|
|
subfolders = []
|
|
try:
|
|
# Get all entries (files and directories) in the specified path
|
|
entries = os.listdir(folder_path)
|
|
|
|
# Filter for directories
|
|
for entry in entries:
|
|
full_path = os.path.join(folder_path, entry)
|
|
if os.path.isdir(full_path):
|
|
subfolders.append(entry)
|
|
except FileNotFoundError:
|
|
logger.error(f"Error: Directory '{folder_path}' not found.")
|
|
except Exception as e:
|
|
logger.error(f"An error occurred: {e}")
|
|
|
|
subfolders.sort()
|
|
return {key: 0 for key in subfolders}
|
|
|
|
|
|
def get_component_import_script_path(component: dict) -> str:
|
|
"""Extract the path to a third-party library import script as defined in component 'properties' as 'import_script_path'"""
|
|
import_script_path = [
|
|
p.get("value")
|
|
for p in component.get("properties", [])
|
|
if p.get("name") == "import_script_path"
|
|
]
|
|
if len(import_script_path):
|
|
# There should only be 1 result, if any
|
|
return import_script_path[0]
|
|
else:
|
|
return None
|
|
|
|
|
|
def get_component_priority_version_source(component: dict) -> str:
|
|
"""Get the priority version source, if defined in metadata file."""
|
|
priority_version_source = [
|
|
p.get("value")
|
|
for p in component.get("properties", [])
|
|
if p.get("name") == "generate_sbom:priority_version_source"
|
|
]
|
|
if len(priority_version_source):
|
|
# There should only be 1 result, if any
|
|
return priority_version_source[0]
|
|
else:
|
|
return None
|
|
|
|
|
|
def del_component_priority_version_source(component: dict) -> None:
|
|
"""Delete all priority version source properties."""
|
|
|
|
# Reverse iterate properties list to safely modify in situ
|
|
if "properties" in component:
|
|
for i in range(len(component["properties"]) - 1, -1, -1):
|
|
if component["properties"][i].get("name") == "generate_sbom:priority_version_source":
|
|
logger.debug(
|
|
f"PRIORITY VERSION SOURCE: {component['bom-ref']}: Removing priority version source from SBOM metadata."
|
|
)
|
|
del component["properties"][i]
|
|
|
|
|
|
def get_version_from_import_script(file_path: str) -> str:
|
|
"""A rudimentary parse of a shell script file to extract the static value defined for the VERSION variable"""
|
|
try:
|
|
with open(file_path, "r", encoding="utf-8") as file:
|
|
for line in file:
|
|
if line.strip().startswith("VERSION="):
|
|
return re.sub(
|
|
r"^VERSION=(?P<quote>[\"']?)(?P<content>\S+)(?P=quote).*$",
|
|
r"\g<content>",
|
|
line.strip(),
|
|
)
|
|
except Exception as e:
|
|
logger.warning(f"Unable to load {file_path}")
|
|
logger.warning(e)
|
|
else:
|
|
return None
|
|
|
|
|
|
# endregion functions and classes
|
|
|
|
|
|
def main() -> None:
|
|
# region define args
|
|
|
|
parser = argparse.ArgumentParser(
|
|
description="""Generate a CycloneDX v1.5 JSON SBOM file using a combination of scan results from Endor Labs, pre-defined SBOM metadata, and the existing SBOM.
|
|
Requires endorctl to be installed and configured, which can be done using 'buildscripts/sbom/install_endorctl.sh'.
|
|
For use in CI, script may be run with no arguments.""",
|
|
epilog="Note: The git-related default values are dynamically generated.",
|
|
formatter_class=argparse.MetavarTypeHelpFormatter,
|
|
)
|
|
|
|
endor = parser.add_argument_group("Endor Labs API (via 'endorctl')")
|
|
endor.add_argument(
|
|
"--endorctl-path",
|
|
help="Path to endorctl, the Endor Labs CLI (Default: 'endorctl')",
|
|
default="endorctl",
|
|
type=str,
|
|
)
|
|
endor.add_argument(
|
|
"--config-path",
|
|
help="Path to endor config directory containing config.yaml (Default: '$HOME/.endorctl')",
|
|
default=None,
|
|
type=str,
|
|
)
|
|
endor.add_argument(
|
|
"--namespace", help="Endor Labs namespace (Default: mongodb.{git org})", type=str
|
|
)
|
|
endor.add_argument(
|
|
"--target",
|
|
help="Target for generated SBOM. Commit: results from running/completed PR scan, Branch: results from latest monitoring scan, Project: results from latest monitoring scan of the 'default' branch (default: commit)",
|
|
choices=["commit", "branch", "project"],
|
|
default="commit",
|
|
type=str,
|
|
)
|
|
endor.add_argument(
|
|
"--project",
|
|
help="Full GitHub git URL [e.g., https://github.com/10gen/mongo.git] (Default: current git URL)",
|
|
type=str,
|
|
)
|
|
|
|
target = parser.add_argument_group("Target values. Apply only if --target is not 'project'")
|
|
exclusive_target = target.add_mutually_exclusive_group()
|
|
exclusive_target.add_argument(
|
|
"--commit",
|
|
help="PR commit SHA [40-character hex string] (Default: current git commit)",
|
|
type=str,
|
|
)
|
|
exclusive_target.add_argument(
|
|
"--branch",
|
|
help="Git repo branch monitored by Endor Labs [e.g., v8.0] (Default: current git org/repo)",
|
|
type=str,
|
|
)
|
|
|
|
files = parser.add_argument_group("SBOM files")
|
|
files.add_argument(
|
|
"--sbom-metadata",
|
|
help="Input path for template SBOM file with metadata (Default: './buildscripts/sbom/metadata.cdx.json')",
|
|
default="./buildscripts/sbom/metadata.cdx.json",
|
|
type=str,
|
|
)
|
|
files.add_argument(
|
|
"--sbom-in",
|
|
help="Input path for previous SBOM file (Default: './sbom.json')",
|
|
default="./sbom.json",
|
|
type=str,
|
|
)
|
|
files.add_argument(
|
|
"--sbom-out",
|
|
help="Output path for SBOM file (Default: './sbom.json')",
|
|
default="./sbom.json",
|
|
type=str,
|
|
)
|
|
parser.add_argument(
|
|
"--retry-limit",
|
|
help="Maximum number of times to retry when a target PR scan has not started (Default: 5)",
|
|
default=5,
|
|
type=int,
|
|
)
|
|
parser.add_argument(
|
|
"--sleep-duration",
|
|
help="Number of seconds to wait between retries (Default: 30)",
|
|
default=30,
|
|
type=int,
|
|
)
|
|
parser.add_argument(
|
|
"--save-warnings",
|
|
help="Save warning messages to a specified file (Default: None)",
|
|
default=None,
|
|
type=str,
|
|
)
|
|
parser.add_argument("--debug", help="Set logging level to DEBUG", action="store_true")
|
|
|
|
# endregion define args
|
|
|
|
# region parse args
|
|
|
|
args = parser.parse_args()
|
|
|
|
git_info = GitInfo()
|
|
|
|
# endor
|
|
endorctl_path = args.endorctl_path
|
|
config_path = args.config_path
|
|
namespace = args.namespace if args.namespace else f"mongodb.{git_info.org}"
|
|
target = args.target
|
|
|
|
# project
|
|
if args.project and args.project != git_info.project:
|
|
if not re.fullmatch(REGEX_GITHUB_URL, args.project):
|
|
parser.error(f"Invalid Git URL: {args.project}.")
|
|
git_info.project = args.project
|
|
git_info.org, git_info.repo = map(
|
|
extract_repo_from_git_url(git_info.project).get, ("org", "repo")
|
|
)
|
|
git_info.release_tag = None
|
|
|
|
# targets
|
|
# commit
|
|
if args.commit and args.commit != git_info.commit:
|
|
if not re.fullmatch(REGEX_COMMIT_SHA, args.commit):
|
|
parser.error(
|
|
f"Invalid Git commit SHA: {args.commit}. Must be a 40-character hexadecimal string (SHA-1)."
|
|
)
|
|
git_info.commit = args.commit
|
|
|
|
# branch
|
|
if args.branch and args.branch != git_info.branch:
|
|
if len(args.branch.encode("utf-8")) > 244 or not re.fullmatch(
|
|
REGEX_GIT_BRANCH, args.branch
|
|
):
|
|
parser.error(
|
|
f"Invalid Git branch name: {args.branch}. Limit is 244 bytes with allowed characters: [a-zA-Z0-9_.-/]"
|
|
)
|
|
git_info.branch = args.branch
|
|
|
|
# files
|
|
sbom_out_path = args.sbom_out
|
|
sbom_in_path = args.sbom_in
|
|
sbom_metadata_path = args.sbom_metadata
|
|
save_warnings = args.save_warnings
|
|
|
|
# environment
|
|
retry_limit = args.retry_limit
|
|
sleep_duration = args.sleep_duration
|
|
|
|
if args.debug:
|
|
logger.setLevel(logging.DEBUG)
|
|
|
|
# endregion parse args
|
|
|
|
# region export Endor Labs SBOM
|
|
|
|
print_banner(f"Exporting Endor Labs SBOM for {target} {getattr(git_info, target)}")
|
|
endorctl = EndorCtl(namespace, retry_limit, sleep_duration, endorctl_path, config_path)
|
|
if target == "commit":
|
|
endor_bom = endorctl.get_sbom_for_commit(git_info.project, git_info.commit)
|
|
elif target == "branch":
|
|
endor_bom = endorctl.get_sbom_for_branch(git_info.project, git_info.branch)
|
|
elif target == "project":
|
|
endor_bom = endorctl.get_sbom_for_project(git_info.project)
|
|
else:
|
|
endor_bom = None
|
|
|
|
if not endor_bom:
|
|
logger.error("Empty result for Endor SBOM!")
|
|
if target == "commit":
|
|
logger.error("Check Endor Labs for any unanticipated issues with the target PR scan.")
|
|
else:
|
|
logger.error("Check Endor Labs for status of the target monitoring scan.")
|
|
sys.exit(1)
|
|
|
|
# endregion export Endor Labs SBOM
|
|
|
|
# region Pre-process Endor Labs SBOM
|
|
|
|
print_banner("Pre-Processing Endor Labs SBOM")
|
|
|
|
## remove uneeded components ##
|
|
# [list]endor_components_remove is defined in config.py
|
|
|
|
# Reverse iterate the SBOM components list to safely modify in situ
|
|
for i in range(len(endor_bom["components"]) - 1, -1, -1):
|
|
component = endor_bom["components"][i]
|
|
removed = False
|
|
for remove in endor_components_remove:
|
|
if component["bom-ref"].startswith(remove):
|
|
logger.info("ENDOR SBOM PRE-PROCESS: removing " + component["bom-ref"])
|
|
del endor_bom["components"][i]
|
|
removed = True
|
|
break
|
|
if not removed:
|
|
for rename in endor_components_rename:
|
|
old = rename[0]
|
|
new = rename[1]
|
|
component["bom-ref"] = component["bom-ref"].replace(old, new)
|
|
component["purl"] = component["purl"].replace(old, new)
|
|
|
|
logger.info(f"Endor Labs SBOM pre-processed with {len(endor_bom['components'])} components")
|
|
|
|
# endregion Pre-process Endor Labs SBOM
|
|
|
|
# region load metadata and previous SBOMs
|
|
|
|
print_banner("Loading metadata SBOM and previous SBOM")
|
|
|
|
meta_bom = read_sbom_json_file(sbom_metadata_path)
|
|
if not meta_bom:
|
|
logger.error("No SBOM metadata. This is fatal.")
|
|
sys.exit(1)
|
|
|
|
prev_bom = read_sbom_json_file(sbom_in_path)
|
|
if not prev_bom:
|
|
logger.warning(
|
|
"Unable to load previous SBOM data. The new SBOM will be generated without any previous context. This is unexpected, but not fatal."
|
|
)
|
|
# Create empty prev_bom to avoid downstream processing errors
|
|
prev_bom = {
|
|
"bom-ref": None,
|
|
"metadata": {
|
|
"timestamp": endor_bom["metadata"]["timestamp"],
|
|
"component": {
|
|
"version": None,
|
|
},
|
|
},
|
|
"components": [],
|
|
}
|
|
|
|
# endregion load metadata and previous SBOMs
|
|
|
|
# region Build composite SBOM
|
|
# Note: No exception handling here. The most likely reason for an exception is missing data elements
|
|
# in SBOM files, which is fatal if it happens. Code is in place to handle the situation
|
|
# where there is no previous SBOM to include, but we want to fail if required data is absent.
|
|
print_banner("Building composite SBOM (metadata + endor + previous)")
|
|
|
|
# Sort components by bom-ref
|
|
endor_bom["components"].sort(key=lambda c: c["bom-ref"])
|
|
meta_bom["components"].sort(key=lambda c: c["bom-ref"])
|
|
prev_bom["components"].sort(key=lambda c: c["bom-ref"])
|
|
|
|
# Check metadata SBOM for completeness
|
|
check_metadata_sbom(meta_bom)
|
|
|
|
# Create SBOM component lookup dicts
|
|
endor_components = sbom_components_to_dict(endor_bom)
|
|
prev_components = sbom_components_to_dict(prev_bom)
|
|
|
|
# region MongoDB primary component
|
|
|
|
# Attempt to determine the MongoDB Version being scanned
|
|
logger.debug(
|
|
f"Available MongoDB version options, tag: {git_info.release_tag}, branch: {git_info.branch}, previous SBOM: {prev_bom['metadata']['component']['version']}"
|
|
)
|
|
meta_bom_ref = meta_bom["metadata"]["component"]["bom-ref"]
|
|
|
|
# Project scan always set to 'master' or if using 'master' branch
|
|
if target == "project" or git_info.branch == "master":
|
|
version = "master"
|
|
purl_version = "master"
|
|
cpe_version = "master"
|
|
logger.info("Using branch 'master' as MongoDB version")
|
|
|
|
# tagged release. e.g., r8.1.0, r8.2.1-rc0
|
|
elif git_info.release_tag:
|
|
version = git_info.release_tag[1:] # remove leading 'r'
|
|
purl_version = git_info.release_tag
|
|
cpe_version = version # without leading 'r'
|
|
logger.info(f"Using release_tag '{git_info.release_tag}' as MongoDB version")
|
|
|
|
# Release branch e.g., v7.0 or v8.2
|
|
elif target == "branch" and re.fullmatch(REGEX_RELEASE_BRANCH, git_info.branch):
|
|
version = git_info.branch
|
|
purl_version = git_info.branch
|
|
# remove leading 'v', add wildcard. e.g. 8.2.*
|
|
cpe_version = version[1:] + ".*"
|
|
logger.info(f"Using release branch '{git_info.branch}' as MongoDB version")
|
|
|
|
# Previous SBOM app version, if all needed specifiers exist
|
|
elif (
|
|
prev_bom.get("metadata", {}).get("component", {}).get("version")
|
|
and prev_bom.get("metadata", {}).get("component", {}).get("purl")
|
|
and prev_bom.get("metadata", {}).get("component", {}).get("cpe")
|
|
):
|
|
version = prev_bom["metadata"]["component"]["version"]
|
|
purl_version = prev_bom["metadata"]["component"]["purl"].split("@")[-1]
|
|
cpe_version = prev_bom["metadata"]["component"]["cpe"].split(":")[5]
|
|
logger.info(f"Using previous SBOM version '{version}' as MongoDB version")
|
|
|
|
else:
|
|
# Fall back to the version specified in the Endor SBOM
|
|
# This is unlikely to be accurate
|
|
version = endor_bom["metadata"]["component"]["version"]
|
|
purl_version = version
|
|
cpe_version = version
|
|
logger.warning(
|
|
f"Using SBOM version '{version}' from Endor Labs scan. This is unlikely to be accurate and may specify a PR #."
|
|
)
|
|
|
|
# Set main component version
|
|
set_component_version(meta_bom["metadata"]["component"], version, purl_version, cpe_version)
|
|
# Run through 'dependency' objects to set main component version
|
|
set_dependency_version(meta_bom["dependencies"], meta_bom_ref, purl_version)
|
|
|
|
# endregion MongoDB primary component
|
|
|
|
# region SBOM components
|
|
|
|
# region Parse metadata SBOM components
|
|
|
|
third_party_folders = get_subfolders_dict(git_info.repo_root.as_posix() + "/src/third_party")
|
|
# pre-exclude 'scripts' folder
|
|
del third_party_folders["scripts"]
|
|
|
|
for component in meta_bom["components"]:
|
|
versions = {
|
|
"endor": None,
|
|
"import_script": None,
|
|
"metadata": None,
|
|
"priority_version_source": None,
|
|
}
|
|
|
|
component_key = component["bom-ref"].split("@")[0]
|
|
|
|
print_banner("Component: " + component_key)
|
|
|
|
############## Priority Version Source ###############
|
|
# Priority version source, if exists
|
|
priority_version_source = get_component_priority_version_source(component)
|
|
if priority_version_source:
|
|
versions["priority_version_source"] = priority_version_source
|
|
logger.info(
|
|
f"PRIORITY VERSION SOURCE: {component_key}: Set priority version source to '{priority_version_source}'"
|
|
)
|
|
del_component_priority_version_source(component)
|
|
|
|
################ Endor Labs ################
|
|
if component_key in endor_components:
|
|
# Pop component from dict so we are left with only unmatched components
|
|
endor_component = endor_components.pop(component_key)
|
|
versions["endor"] = endor_component.get("version")
|
|
logger.debug(
|
|
f"VERSION ENDOR: {component_key}: Found version '{versions['endor']}' in Endor Labs results"
|
|
)
|
|
|
|
############## Import Script ###############
|
|
# Import script version, if exists
|
|
import_script_path = get_component_import_script_path(component)
|
|
if import_script_path:
|
|
import_script = Path(import_script_path)
|
|
if import_script.exists():
|
|
versions["import_script"] = get_version_from_import_script(import_script_path)
|
|
if versions["import_script"]:
|
|
versions["import_script"] = versions["import_script"].replace("release-", "")
|
|
if versions["import_script"]:
|
|
logger.debug(
|
|
f"VERSION IMPORT SCRIPT: {component_key}: Found version '{versions['import_script']}' in import script '{import_script_path}'"
|
|
)
|
|
else:
|
|
logger.debug(
|
|
f"VERSION IMPORT SCRIPT: {component_key}: Import script not found! '{import_script_path}'"
|
|
)
|
|
|
|
############## Metadata ###############
|
|
# Hard-coded metadata version, if exists
|
|
if "{{VERSION}}" not in component["version"]:
|
|
versions["metadata"] = component.get("version")
|
|
|
|
logger.info(f"VERSIONS: {component_key}: " + str(versions))
|
|
|
|
############## Component Special Cases ###############
|
|
process_component_special_cases(
|
|
component_key, component, versions, git_info.repo_root.as_posix()
|
|
)
|
|
|
|
# Log a warning if Endor and import scripts versions do not match
|
|
if (
|
|
versions["endor"]
|
|
and versions["import_script"]
|
|
and get_semver_from_release_version(versions["endor"])
|
|
!= get_semver_from_release_version(versions["import_script"])
|
|
):
|
|
logger.debug(
|
|
",".join(
|
|
[
|
|
"endor:",
|
|
str(versions["endor"]),
|
|
"semver(endor):",
|
|
get_semver_from_release_version(versions["endor"]),
|
|
"import_script:",
|
|
str(versions["import_script"]),
|
|
"semver(import_script):",
|
|
get_semver_from_release_version(versions["import_script"]),
|
|
"priority_version_source:",
|
|
str(versions["priority_version_source"]),
|
|
]
|
|
)
|
|
)
|
|
logger.warning(
|
|
f"VERSION MISMATCH: {component_key}: Endor version {versions['endor']} does not match import script version {versions['import_script']}. 'priority_version_source' from metadata: {versions['priority_version_source']}"
|
|
)
|
|
|
|
# For the standard workflow, we favor the pre-set priority version source,
|
|
# followed by Endor Labs version, followed by import script, followed by hard coded
|
|
if versions["priority_version_source"] and versions["priority_version_source"] in versions:
|
|
version = versions[versions["priority_version_source"]]
|
|
logger.info(
|
|
f"VERSION: {component_key}: Using priority_version_source '{priority_version_source}' from metadata file."
|
|
)
|
|
else:
|
|
version = versions["endor"] or versions["import_script"] or versions["metadata"]
|
|
|
|
############## Assign Version ###############
|
|
if version:
|
|
meta_bom_ref = component["bom-ref"]
|
|
|
|
## Special case for FireFox ##
|
|
# The CPE for FireFox ESR needs the 'esr' removed from the version, as it is specified in another section
|
|
if component["bom-ref"].startswith("pkg:deb/debian/firefox-esr@"):
|
|
set_component_version(component, version, cpe_version=version.replace("esr", ""))
|
|
else:
|
|
semver = get_semver_from_release_version(version)
|
|
set_component_version(component, semver, version, semver)
|
|
|
|
set_dependency_version(meta_bom["dependencies"], meta_bom_ref, version)
|
|
|
|
# check against third_party folders
|
|
component_defines_location = False
|
|
for occurrence in component.get("evidence", {}).get("occurrences", []):
|
|
location = occurrence.get("location")
|
|
if location:
|
|
component_defines_location = True
|
|
if location.startswith("src/third_party/"):
|
|
location = location.replace("src/third_party/", "")
|
|
if location in third_party_folders:
|
|
third_party_folders[location] += 1
|
|
logger.debug(
|
|
f"THIRD_PARTY FOLDER: {component_key} matched folder {location} specified in SBOM"
|
|
)
|
|
else:
|
|
logger.warning(
|
|
f"THIRD_PARTY FOLDER: {component_key} lists third-party location folder as {location}, which does not exist!"
|
|
)
|
|
else:
|
|
logger.warning(
|
|
f"THIRD_PARTY FOLDER: {component_key} lists a location as '{location}'. Ideally, all third-party components are located under 'src/third_party/'."
|
|
)
|
|
if not component_defines_location:
|
|
logger.warning(
|
|
f"THIRD_PARTY FOLDER: {component_key} does not define a location in '.evidence.occurrences[]'"
|
|
)
|
|
else:
|
|
logger.warning(
|
|
f"VERSION NOT FOUND: Could not find a version for {component_key}! Removing from SBOM. Component may need to be removed from the {sbom_metadata_path} file."
|
|
)
|
|
del component
|
|
|
|
print_banner("Third Party Folders")
|
|
third_party_folders_missed = {
|
|
key: value for key, value in third_party_folders.items() if value == 0
|
|
}
|
|
if third_party_folders_missed:
|
|
logger.warning(
|
|
"THIRD_PARTY FOLDERS: 'src/third_party' folders not matched with a component: "
|
|
+ ",".join(third_party_folders_missed.keys())
|
|
)
|
|
else:
|
|
logger.info(
|
|
"THIRD_PARTY FOLDERS: All 'src/third_party' folders successfully matched with one or more components."
|
|
)
|
|
|
|
# explicit cleanup to avoid gc race condition on script temination
|
|
git_info.close()
|
|
del git_info
|
|
|
|
# endregion Parse metadata SBOM components
|
|
|
|
# region Parse unmatched Endor Labs components
|
|
|
|
print_banner("New Endor Labs components")
|
|
if endor_components:
|
|
logger.info(
|
|
f"ENDOR SBOM: There are {len(endor_components)} unmatched components in the Endor Labs SBOM. Adding as-is. The applicable metadata should be added to the metadata SBOM for the next run."
|
|
)
|
|
for component in endor_components:
|
|
# set scope to excluded by default until the component is evaluated
|
|
endor_components[component]["scope"] = "excluded"
|
|
meta_bom["components"].append(endor_components[component])
|
|
meta_bom["dependencies"].append(
|
|
{"ref": endor_components[component]["bom-ref"], "dependsOn": []}
|
|
)
|
|
logger.info(f"SBOM AS-IS COMPONENT: Added {component}")
|
|
|
|
# endregion Parse unmatched Endor Labs components
|
|
|
|
# region Finalize SBOM
|
|
|
|
# Have the SBOM app version changed?
|
|
sbom_app_version_changed = (
|
|
prev_bom["metadata"]["component"]["version"] != meta_bom["metadata"]["component"]["version"]
|
|
)
|
|
logger.info(f"SUMMARY: MongoDB version changed: {sbom_app_version_changed}")
|
|
|
|
# Have the components changed?
|
|
prev_components = sbom_components_to_dict(prev_bom, with_version=True)
|
|
meta_components = sbom_components_to_dict(meta_bom, with_version=True)
|
|
sbom_components_changed = prev_components.keys() != meta_components.keys()
|
|
logger.info(
|
|
f"SBOM_DIFF: SBOM components changed (added, removed, or version): {sbom_components_changed}. Previous SBOM has {len(prev_components)} components; New SBOM has {len(meta_components)} components"
|
|
)
|
|
|
|
# Components in prev SBOM but not in generated SBOM
|
|
prev_components = sbom_components_to_dict(prev_bom, with_version=False)
|
|
meta_components = sbom_components_to_dict(meta_bom, with_version=False)
|
|
prev_components_diff = list(set(prev_components.keys()) - set(meta_components.keys()))
|
|
if prev_components_diff:
|
|
logger.info(
|
|
"SBOM_DIFF: Components in previous SBOM and not in generated SBOM: "
|
|
+ ",".join(prev_components_diff)
|
|
)
|
|
|
|
# Components in generated SBOM but not in prev SBOM
|
|
meta_components_diff = list(set(meta_components.keys()) - set(prev_components.keys()))
|
|
if meta_components_diff:
|
|
logger.info(
|
|
"SBOM_DIFF: Components in generated SBOM and not in previous SBOM: "
|
|
+ ",".join(meta_components_diff)
|
|
)
|
|
|
|
# serialNumber https://cyclonedx.org/docs/1.5/json/#serialNumber
|
|
# version (SBOM version) https://cyclonedx.org/docs/1.5/json/#version
|
|
if sbom_app_version_changed:
|
|
# New MongoDB version requires a unique serial number and version 1
|
|
meta_bom["serialNumber"] = uuid.uuid4().urn
|
|
meta_bom["version"] = 1
|
|
else:
|
|
# MongoDB version is the same, so reuse the serial number and SBOM version
|
|
meta_bom["serialNumber"] = prev_bom["serialNumber"]
|
|
meta_bom["version"] = prev_bom["version"]
|
|
# If the components have changed, bump the SBOM version
|
|
if sbom_components_changed:
|
|
meta_bom["version"] += 1
|
|
|
|
# metadata.timestamp https://cyclonedx.org/docs/1.5/json/#metadata_timestamp
|
|
# Only update the timestamp if something has changed
|
|
if sbom_app_version_changed or sbom_components_changed:
|
|
meta_bom["metadata"]["timestamp"] = (
|
|
datetime.now(timezone.utc).isoformat(timespec="seconds").replace("+00:00", "Z")
|
|
)
|
|
else:
|
|
meta_bom["metadata"]["timestamp"] = prev_bom["metadata"]["timestamp"]
|
|
|
|
# metadata.tools https://cyclonedx.org/docs/1.5/json/#metadata_tools
|
|
meta_bom["metadata"]["tools"] = endor_bom["metadata"]["tools"]
|
|
|
|
write_sbom_json_file(meta_bom, sbom_out_path)
|
|
|
|
# Access the collected warnings
|
|
print_banner("CONSOLIDATED WARNINGS")
|
|
warnings = []
|
|
for record in warning_handler.warnings:
|
|
warnings.append(record.getMessage())
|
|
|
|
print("\n".join(warnings))
|
|
|
|
if save_warnings:
|
|
write_list_to_text_file(warnings, save_warnings)
|
|
|
|
print_banner("COMPLETED")
|
|
if not os.getenv("CI"):
|
|
print("Be sure to add the SBOM to your next commit if the file content has changed.")
|
|
|
|
# endregion Finalize SBOM
|
|
|
|
# endregion Build composite SBOM
|
|
|
|
|
|
if __name__ == "__main__":
|
|
main()
|