mongo/buildscripts/sbom/endorctl_utils.py

487 lines
19 KiB
Python

#!/usr/bin/env python3
"""
Utility functions for the Endor Labs API via endorctl
"""
import json
import logging
import subprocess
import time
from datetime import datetime
from enum import Enum
logger = logging.getLogger("generate_sbom")
logger.setLevel(logging.NOTSET)
default_field_masks = {
"PackageVersion": [
"context",
"meta",
"processing_status",
"spec.package_name",
"spec.resolved_dependencies.dependencies",
"spec.source_code_reference",
],
"ScanResult": [
"context",
"meta",
"spec.end_time",
"spec.logs",
"spec.refs",
"spec.start_time",
"spec.status",
"spec.versions",
],
}
def _get_default_field_mask(kind):
default_field_mask = default_field_masks.get(kind, [])
return ",".join(default_field_mask)
class EndorResourceKind(Enum):
"""Enumeration for Endor Labs API resource kinds"""
PROJECT = "Project"
REPOSITORY_VERSION = "RepositoryVersion"
SCAN_RESULT = "ScanResult"
PACKAGE_VERSION = "PackageVersion"
class EndorContextType(Enum):
"""Most objects include a common nested object called Context. Contexts keep objects from different scans separated.
https://docs.endorlabs.com/rest-api/using-the-rest-api/data-model/common-fields/#context"""
# Objects from a scan of the default branch. All objects in the OSS namespace are in the main context. The context ID is always default.
MAIN = "CONTEXT_TYPE_MAIN"
# Objects from a scan of a specific branch. The context ID is the branch reference name.
REF = "CONTEXT_TYPE_REF"
# Objects from a PR scan. The context ID is the PR UUID. Objects in this context are deleted after 30 days.
CI_RUN = "CONTEXT_TYPE_CI_RUN"
class EndorFilter:
"""Provide standard filters for Endor Labs API resource kinds"""
def __init__(self, context_id=None, context_type=None):
self.context_id = context_id
self.context_type = context_type
def _base_filters(self):
base_filters = []
if self.context_id:
base_filters.append(f"context.id=={self.context_id}")
if self.context_type:
base_filters.append(f"context.type=={self.context_type}")
return base_filters
def repository_version(self, project_uuid=None, sha=None, ref=None):
filters = self._base_filters()
if project_uuid:
filters.append(f"meta.parent_uuid=={project_uuid}")
if sha:
filters.append(f"spec.version.sha=={sha}")
if ref:
filters.append(f"spec.version.ref=={ref}")
return " and ".join(filters)
def package_version(
self,
context_type: EndorContextType = None,
context_id=None,
project_uuid=None,
name=None,
package_name=None,
):
filters = self._base_filters()
if context_type:
filters.append(f"context.type=={context_type.value}")
if context_type:
filters.append(f"context.id=={context_id}")
if project_uuid:
filters.append(f"spec.project_uuid=={project_uuid}")
if name:
filters.append(f"spec.package_name=={name}")
if package_name:
filters.append(f"meta.name=={package_name}")
return " and ".join(filters)
def scan_result(
self,
context_type: EndorContextType = None,
project_uuid=None,
ref=None,
sha=None,
status=None,
):
filters = self._base_filters()
if context_type:
filters.append(f"context.type=={context_type.value}")
if project_uuid:
filters.append(f"meta.parent_uuid=={project_uuid}")
if ref:
filters.append(f"spec.versions.ref contains '{ref}'")
if sha:
filters.append(f"spec.versions.sha contains '{sha}'")
if status:
filters.append(f"spec.status=={status}")
return " and ".join(filters)
class EndorCtl:
"""Interact with endorctl (Endor Labs CLI)"""
# region internal functions
def __init__(
self,
namespace,
retry_limit=5,
sleep_duration=30,
endorctl_path="endorctl",
config_path=None,
):
self.namespace = namespace
self.retry_limit = retry_limit
self.sleep_duration = sleep_duration
self.endorctl_path = endorctl_path
self.config_path = config_path
def _call_endorctl(self, command, subcommand, **kwargs):
"""https://docs.endorlabs.com/endorctl/"""
try:
command = [self.endorctl_path, command, subcommand, f"--namespace={self.namespace}"]
if self.config_path:
command.append(f"--config-path={self.config_path}")
# parse args into flags
for key, value in kwargs.items():
# Handle endorctl flags with hyphens that are defined in the script with underscores
flag = key.replace("_", "-")
if value:
command.append(f"--{flag}={value}")
logger.info("Running: %s", " ".join(command))
result = subprocess.run(command, capture_output=True, text=True, check=True)
resource = json.loads(result.stdout)
except subprocess.CalledProcessError as e:
logger.error(f"Error executing command: {e}")
logger.error(e.stderr)
except json.JSONDecodeError as e:
logger.error(f"Error decoding JSON: {e}")
logger.error(f"Stdout: {result.stdout}")
except FileNotFoundError as e:
logger.error(f"FileNotFoundError: {e}")
logger.error(
f"'endorctl' not found in path '{self.endorctl_path}'. Supply the correct path, run 'buildscripts/install_endorctl.sh' or visit https://docs.endorlabs.com/endorctl/install-and-configure/"
)
except Exception as e:
logger.error(f"An unexpected error occurred: {e}")
else:
return resource
def _api_get(self, resource, **kwargs):
"""https://docs.endorlabs.com/endorctl/commands/api/"""
return self._call_endorctl("api", "get", resource=resource, **kwargs)
def _api_list(self, resource, filter=None, retry=True, **kwargs):
"""https://docs.endorlabs.com/endorctl/commands/api/"""
# If this script is run immediately after making a commit, Endor Labs will likely not yet have created the assocaited ScanResult object. The wait/retry logic below handles this scenario.
tries = 0
while True:
tries += 1
result = self._call_endorctl("api", "list", resource=resource, filter=filter, **kwargs)
# The expected output of 'endorctl api list' is: { "list": { "objects": [...] } }
# We want to just return the objects. In case we get an empty list, return a list
# with a single None to avoid having to handle index errors downstream.
if result and result["list"].get("objects") and len(result["list"]["objects"]) > 0:
return result["list"]["objects"]
elif retry:
logger.info(
f"API LIST: Resource not found: {resource} with filter '{filter}' in namespace '{self.namespace}'"
)
if tries <= self.retry_limit:
logger.info(
f"API LIST: Waiting for {self.sleep_duration} seconds before retry attempt {tries} of {self.retry_limit}"
)
time.sleep(self.sleep_duration)
else:
logger.warning(
f"API LIST: Maximum number of allowed retries {self.retry_limit} attempted with no {resource} found using filter '{filter}'"
)
return [None]
else:
return [None]
def _check_resource(self, resource, resource_description) -> None:
if not resource:
raise LookupError(f"Resource not found: {resource_description}")
logger.info(f"Retrieved: {resource_description}")
# endregion internal functions
# region resource functions
def get_resource(self, resource, uuid=None, name=None, field_mask=None, **kwargs):
"""https://docs.endorlabs.com/rest-api/using-the-rest-api/data-model/resource-kinds/"""
if not field_mask:
field_mask = _get_default_field_mask(resource)
return self._api_get(
resource=resource, uuid=uuid, name=name, field_mask=field_mask, **kwargs
)
def get_resources(
self,
resource,
filter=None,
field_mask=None,
sort_path="meta.create_time",
sort_order="descending",
retry=True,
**kwargs,
):
"""https://docs.endorlabs.com/rest-api/using-the-rest-api/data-model/resource-kinds/"""
if not field_mask:
field_mask = _get_default_field_mask(resource)
return self._api_list(
resource=resource,
filter=filter,
field_mask=field_mask,
sort_path=sort_path,
sort_order=sort_order,
retry=retry,
**kwargs,
)
def get_project(self, git_url):
resource_kind = EndorResourceKind.PROJECT.value
resource_description = (
f"{resource_kind} with name '{git_url}' in namespace '{self.namespace}'"
)
project = self.get_resource(resource_kind, name=git_url)
self._check_resource(project, resource_description)
return project
def get_repository_version(self, filter=None, retry=True):
resource_kind = EndorResourceKind.REPOSITORY_VERSION.value
resource_description = (
f"{resource_kind} with filter '{filter}' in namespace '{self.namespace}'"
)
repository_version = self.get_resources(
resource_kind, filter=filter, retry=retry, page_size=1
)[0]
self._check_resource(repository_version, resource_description)
return repository_version
def get_scan_result(self, filter=None, retry=True):
resource_kind = EndorResourceKind.SCAN_RESULT.value
resource_description = (
f"{resource_kind} with filter '{filter}' in namespace '{self.namespace}'"
)
scan_result = self.get_resources(resource_kind, filter=filter, retry=retry, page_size=1)[0]
self._check_resource(scan_result, resource_description)
uuid = scan_result.get("uuid")
start_time = scan_result["spec"].get("start_time")
refs = scan_result["spec"].get("refs")
polling_start_time = datetime.now()
while True:
status = scan_result["spec"].get("status")
end_time = scan_result["spec"].get("end_time")
if status == "STATUS_SUCCESS":
logger.info(
f" Scan completed successfully. ScanResult uuid {uuid} for refs {refs} started at {start_time}, ended at {end_time}."
)
return scan_result
elif status == "STATUS_RUNNING":
logger.info(
f" Scan is running. ScanResult uuid {uuid} for refs {refs} started at {start_time}."
)
logger.info(
f" Waiting {self.sleep_duration} seconds before checking status. Total wait time: {(datetime.now() - polling_start_time).total_seconds()/60:.2f} minutes"
)
time.sleep(self.sleep_duration)
scan_result = self.get_resources(
resource_kind, filter=filter, retry=retry, page_size=1
)[0]
elif status == "STATUS_PARTIAL_SUCCESS":
scan_logs = scan_result["spec"].get("logs")
raise RuntimeError(
f" Scan completed, but with critical warnings or errors. ScanResult uuid {uuid} for refs {refs} started at {start_time}, ended at {end_time}. Scan logs: {scan_logs}"
)
elif status == "STATUS_FAILURE":
scan_logs = scan_result["spec"].get("logs")
raise RuntimeError(
f" Scan failed. ScanResult uuid {uuid} for refs {refs} started at {start_time}, ended at {end_time}. Scan logs: {scan_logs}"
)
def get_package_versions(self, filter):
resource_kind = EndorResourceKind.PACKAGE_VERSION.value
resource_description = (
f"{resource_kind} with filter '{filter}' in namespace '{self.namespace}'"
)
package_versions = self.get_resources(resource_kind, filter=filter)
self._check_resource(package_versions, resource_description)
return package_versions
def export_sbom(
self,
package_version_uuid=None,
package_version_uuids=None,
package_version_name=None,
app_name=None,
project_name=None,
project_uuid=None,
):
"""Export an SBOM from Endor Labs
Valid parameter sets (other combinations result in an error from 'endorctl'):
Single-Package SBOM:
package_version_uuid
package_version_name
Multi-Package SBOM:
package_version_uuids,app_name
project_uuid,app_name,app_name
project_name,app_name,app_name
https://docs.endorlabs.com/endorctl/commands/sbom/export/
"""
if package_version_uuids:
package_version_uuids = ",".join(package_version_uuids)
return self._call_endorctl(
"sbom",
"export",
package_version_uuid=package_version_uuid,
package_version_uuids=package_version_uuids,
package_version_name=package_version_name,
app_name=app_name,
project_name=project_name,
project_uuid=project_uuid,
)
# endregion resource functions
# region workflow functions
def get_sbom_for_commit(self, git_url: str, commit_sha: str) -> dict:
"""Export SBOM for the PR commit (sha)"""
endor_filter = EndorFilter()
try:
# Project: get uuid
project = self.get_project(git_url)
project_uuid = project["uuid"]
app_name = project["spec"]["git"]["full_name"]
# RepositoryVersion: get the context for the PR scan
endor_filter.context_type = EndorContextType.CI_RUN.value
filter_str = endor_filter.repository_version(project_uuid, commit_sha)
repository_version = self.get_repository_version(filter_str)
context_id = repository_version["context"]["id"]
# ScanResult: wait for a completed scan
endor_filter.context_id = context_id
filter_str = endor_filter.scan_result(project_uuid)
self.get_scan_result(filter_str)
# PackageVersions: get package versions for SBOM
filter_str = endor_filter.package_version(project_uuid)
package_versions = self.get_package_versions(filter_str)
package_version_uuids = [
package_version["uuid"] for package_version in package_versions
]
package_version_names = [
package_version["meta"]["name"] for package_version in package_versions
]
# Export SBOM
sbom = self.export_sbom(package_version_uuids=package_version_uuids, app_name=app_name)
print(
f"Retrieved: CycloneDX SBOM for PackageVersion(s), name: {package_version_names}, uuid: {package_version_uuids}"
)
return sbom
except Exception as e:
print(f"Exception: {e}")
return
def get_sbom_for_branch(self, git_url: str, branch: str) -> dict:
"""Export lastest SBOM for a monitored branch/ref"""
endor_filter = EndorFilter()
try:
# Project: get uuid
project = self.get_project(git_url)
project_uuid = project["uuid"]
app_name = project["spec"]["git"]["full_name"]
# RepositoryVersion: get the context for the latest branch scan
filter_str = endor_filter.repository_version(project_uuid, ref=branch)
repository_version = self.get_repository_version(filter_str)
repository_version_uuid = repository_version["uuid"]
repository_version_ref = repository_version["spec"]["version"]["ref"]
repository_version_sha = repository_version["spec"]["version"]["sha"]
repository_version_scan_object_status = repository_version["scan_object"]["status"]
if repository_version_scan_object_status != "STATUS_SCANNED":
logger.warning(
f"RepositoryVersion (uuid: {repository_version_uuid}, ref: {repository_version_ref}, sha: {repository_version_sha}) scan status is '{repository_version_scan_object_status}' (expected 'STATUS_SCANNED')"
)
# ScanResult: search for a completed scan
filter_str = endor_filter.scan_result(
EndorContextType.MAIN, project_uuid, repository_version_ref, repository_version_sha
)
scan_result = self.get_scan_result(filter_str, retry=False)
project_uuid = scan_result["meta"]["parent_uuid"]
# PackageVersions: get package versions for SBOM
if branch == "master":
context_type = EndorContextType.MAIN
context_id = "default"
else:
context_type = EndorContextType.REF
context_id = branch
filter_str = endor_filter.package_version(context_type, context_id, project_uuid)
package_version = self.get_package_versions(filter_str)[0]
package_version_name = package_version["meta"]["name"]
package_version_uuid = package_version["uuid"]
# Export SBOM
sbom = self.export_sbom(package_version_uuid=package_version_uuid, app_name=app_name)
logger.info(
f"SBOM: Retrieved CycloneDX SBOM for PackageVersion, name: {package_version_name}, uuid {package_version_uuid}"
)
return sbom
except Exception as e:
print(f"Exception: {e}")
return
def get_sbom_for_project(self, git_url: str) -> dict:
"""Export latest SBOM for EndorCtl project default branch"""
try:
# Project: get uuid
project = self.get_project(git_url)
project_uuid = project["uuid"]
app_name = project["spec"]["git"]["full_name"]
# Export SBOM
sbom = self.export_sbom(project_uuid=project_uuid, app_name=app_name)
logger.info(f"Retrieved: CycloneDX SBOM for Project {app_name}")
return sbom
except Exception as e:
print(f"Exception: {e}")
return
# endregion workflow functions