mongo/evergreen/functions/upload_sbom_via_silkbomb.py

261 lines
9.3 KiB
Python

import pathlib
import subprocess
import sys
import typer
from git import Repo
from typing_extensions import Annotated
app = typer.Typer(
help="Checks for SBOM file changes in a PR and uploads it to Kondukto if changed.",
add_completion=False,
)
def get_changed_files_from_latest_commit(local_repo_path: str, branch_name: str = "master") -> dict:
try:
repo = Repo(local_repo_path)
if branch_name not in repo.heads:
raise ValueError(f"Branch '{branch_name}' does not exist in the repository.")
last_commit = repo.heads[branch_name].commit
title = last_commit.summary
commit_hash = last_commit.hexsha
# If the last commit has no parents, it means it's the first commit in the repo
if not last_commit.parents:
files = [item.path for item in last_commit.tree.traverse()]
else:
# Comparing the last commit with its parent to find changed files
files = [file.a_path for file in last_commit.diff(last_commit.parents[0])]
return {"title": title, "hash": commit_hash, "files": files}
except Exception as e:
print(f"Error retrieving changed files: {e}")
raise e
def upload_sbom_via_silkbomb(
sbom_repo_path: str,
workdir: str,
local_repo_path: str,
repo_name: str,
branch_name: str,
creds_file_path: pathlib.Path,
container_command: str,
container_image: str,
timeout_seconds: int = 60 * 5,
):
container_options = ["--pull=always", "--platform=linux/amd64", "--rm"]
container_env_files = ["--env-file", str(creds_file_path.resolve())]
container_volumes = ["-v", f"{workdir}:/workdir"]
silkbomb_command = "augment" # it augment first and uses upload command
silkbomb_args = [
"--sbom-in",
f"/workdir/{local_repo_path}/{sbom_repo_path}",
"--branch",
branch_name,
"--repo",
repo_name,
]
command = [
container_command,
"run",
*container_options,
*container_env_files,
*container_volumes,
container_image,
silkbomb_command,
*silkbomb_args,
]
aws_region = "us-east-1"
ecr_registry_url = (
"901841024863.dkr.ecr.us-east-1.amazonaws.com/release-infrastructure/silkbomb"
)
print(f"Attempting to authenticate to AWS ECR registry '{ecr_registry_url}'...")
try:
login_cmd = f"aws ecr get-login-password --region {aws_region} | {container_command} login --username AWS --password-stdin {ecr_registry_url}"
subprocess.run(
login_cmd,
shell=True,
check=True,
text=True,
capture_output=True,
timeout=timeout_seconds,
)
print("ECR authentication successful.")
except FileNotFoundError:
print(
f"Error: A required command was not found. Please ensure AWS CLI and '{container_command}' are installed and in your PATH."
)
raise
except subprocess.TimeoutExpired as e:
print(
f"Error: Command timed out after {timeout_seconds} seconds. Please check Evergreen network state and try again."
)
raise e
except subprocess.CalledProcessError as e:
print(f"Error during ECR authentication:\n--- STDERR ---\n{e.stderr}")
raise
try:
print(f"Running command: {' '.join(command)}")
subprocess.run(command, check=True, text=True, capture_output=True, timeout=timeout_seconds)
print("Updated sbom.json file upload via Silkbomb successful!")
except FileNotFoundError as e:
print(f"Error: '{container_command}' command not found.")
raise e
except subprocess.TimeoutExpired as e:
print(
f"Error: Command timed out after {timeout_seconds} seconds. Please check Evergreen network state and try again."
)
raise e
except subprocess.CalledProcessError as e:
print(
f"Error during container execution:\n--- STDOUT ---\n{e.stdout}\n--- STDERR ---\n{e.stderr}"
)
raise e
# TODO (SERVER-109205): Add Slack Alerts for failures
@app.command()
def run(
github_org: Annotated[
str,
typer.Option(..., envvar="GITHUB_ORG", help="Name of the github organization (e.g. 10gen)"),
],
github_repo: Annotated[
str, typer.Option(..., envvar="GITHUB_REPO", help="Repo name in 'owner/repo' format.")
],
local_repo_path: Annotated[
str,
typer.Option(..., envvar="LOCAL_REPO_PATH", help="Path to the local git repository."),
],
branch_name: Annotated[
str,
typer.Option(..., envvar="BRANCH_NAME", help="The head branch (e.g., the PR branch name)."),
],
sbom_repo_path: Annotated[
str,
typer.Option(
...,
"--sbom-in",
envvar="SBOM_REPO_PATH",
help="Path to the SBOM file to check and upload.",
),
] = "sbom.json",
requester: Annotated[
str,
typer.Option(
...,
envvar="REQUESTER",
help="The entity requesting the run (e.g., 'github_merge_queue').",
),
] = "",
container_command: Annotated[
str,
typer.Option(
..., envvar="CONTAINER_COMMAND", help="Container engine to use ('podman' or 'docker')."
),
] = "podman",
container_image: Annotated[
str, typer.Option(..., envvar="CONTAINER_IMAGE", help="Silkbomb container image.")
] = "901841024863.dkr.ecr.us-east-1.amazonaws.com/release-infrastructure/silkbomb:2.0",
creds_file: Annotated[
pathlib.Path,
typer.Option(
..., envvar="CONTAINER_ENV_FILES", help="Path for the temporary credentials file."
),
] = pathlib.Path("kondukto_credentials.env"),
workdir: Annotated[
str, typer.Option(..., envvar="WORKING_DIR", help="Path for the container volumes.")
] = "/workdir",
dry_run: Annotated[
bool, typer.Option("--dry-run/--run", help="Check for changes without uploading.")
] = True,
check_sbom_file_change: Annotated[
bool, typer.Option("--check-sbom-file-change", help="Check for changes to the SBOM file.")
] = False,
):
if requester != "commit" and not dry_run:
print(f"Skipping: Run can only be triggered for 'commit', but requester was '{requester}'.")
sys.exit(0)
major_branches = ["v7.0", "v8.0", "v8.1", "master"] # Only major branches that MongoDB supports
if False and branch_name not in major_branches:
print(f"Skipping: Branch '{branch_name}' is not a major branch. Exiting.")
sys.exit(0)
repo_path = pathlib.Path(f"{workdir}/{local_repo_path}")
sbom_path = pathlib.Path(f"{repo_path}/{sbom_repo_path}")
if not sbom_path.resolve().exists():
print(f"Error: SBOM file not found at path: {str(sbom_path.resolve())}")
sys.exit(1)
try:
sbom_file_changed = True
if check_sbom_file_change:
commit_changed_files = get_changed_files_from_latest_commit(repo_path, branch_name)
if commit_changed_files:
print(
f"Latest commit '{commit_changed_files['title']}' ({commit_changed_files['hash']}) in branch '{branch_name}' has the following changed files:"
)
print(f"{commit_changed_files['files']}")
else:
print(
f"No changed files found in the commit '{commit_changed_files['title']}' ({commit_changed_files['hash']}) in branch '{branch_name}'. Exiting without upload."
)
sys.exit(0)
print(f"Checking for changes to file: {sbom_path} ({sbom_repo_path})")
sbom_file_changed = sbom_repo_path in commit_changed_files["files"]
if sbom_file_changed:
print(f"File '{sbom_path}' was modified. Initiating upload...")
else:
print(f"File '{sbom_repo_path}' was not modified. Nothing to upload.")
if not dry_run and sbom_file_changed:
upload_sbom_via_silkbomb(
sbom_repo_path=sbom_repo_path,
workdir=workdir,
local_repo_path=local_repo_path,
repo_name=f"{github_org}/{github_repo}",
branch_name=branch_name,
creds_file_path=creds_file,
container_command=container_command,
container_image=container_image,
)
else:
print("--dry-run enabled, skipping upload.")
print(
f"File '{sbom_repo_path}'"
+ (" was modified." if sbom_file_changed else " was not modified.")
)
if dry_run:
print("Upload metadata:")
print(f" SBOM Path: {sbom_path}")
print(f" Repo Name: '{github_org}/{github_repo}'")
print(f" Repo Branch: '{branch_name}'")
print(f" Container Command: {container_command}")
print(f" Container Image: {container_image}")
print(f" Workdir: {workdir}")
if check_sbom_file_change:
print(
f"Latest commit '{commit_changed_files['title']}' ({commit_changed_files['hash']})"
)
except Exception as e:
print(f"Exception during script execution: {e}")
sys.exit(1)
if __name__ == "__main__":
app()