mirror of https://github.com/mongodb/mongo
487 lines
19 KiB
Python
487 lines
19 KiB
Python
#!/usr/bin/env python3
|
|
"""
|
|
Utility functions for the Endor Labs API via endorctl
|
|
|
|
"""
|
|
|
|
import json
|
|
import logging
|
|
import subprocess
|
|
import time
|
|
from datetime import datetime
|
|
from enum import Enum
|
|
|
|
logger = logging.getLogger("generate_sbom")
|
|
logger.setLevel(logging.NOTSET)
|
|
|
|
default_field_masks = {
|
|
"PackageVersion": [
|
|
"context",
|
|
"meta",
|
|
"processing_status",
|
|
"spec.package_name",
|
|
"spec.resolved_dependencies.dependencies",
|
|
"spec.source_code_reference",
|
|
],
|
|
"ScanResult": [
|
|
"context",
|
|
"meta",
|
|
"spec.end_time",
|
|
"spec.logs",
|
|
"spec.refs",
|
|
"spec.start_time",
|
|
"spec.status",
|
|
"spec.versions",
|
|
],
|
|
}
|
|
|
|
|
|
def _get_default_field_mask(kind):
|
|
default_field_mask = default_field_masks.get(kind, [])
|
|
return ",".join(default_field_mask)
|
|
|
|
|
|
class EndorResourceKind(Enum):
|
|
"""Enumeration for Endor Labs API resource kinds"""
|
|
|
|
PROJECT = "Project"
|
|
REPOSITORY_VERSION = "RepositoryVersion"
|
|
SCAN_RESULT = "ScanResult"
|
|
PACKAGE_VERSION = "PackageVersion"
|
|
|
|
|
|
class EndorContextType(Enum):
|
|
"""Most objects include a common nested object called Context. Contexts keep objects from different scans separated.
|
|
https://docs.endorlabs.com/rest-api/using-the-rest-api/data-model/common-fields/#context"""
|
|
|
|
# Objects from a scan of the default branch. All objects in the OSS namespace are in the main context. The context ID is always default.
|
|
MAIN = "CONTEXT_TYPE_MAIN"
|
|
# Objects from a scan of a specific branch. The context ID is the branch reference name.
|
|
REF = "CONTEXT_TYPE_REF"
|
|
# Objects from a PR scan. The context ID is the PR UUID. Objects in this context are deleted after 30 days.
|
|
CI_RUN = "CONTEXT_TYPE_CI_RUN"
|
|
|
|
|
|
class EndorFilter:
|
|
"""Provide standard filters for Endor Labs API resource kinds"""
|
|
|
|
def __init__(self, context_id=None, context_type=None):
|
|
self.context_id = context_id
|
|
self.context_type = context_type
|
|
|
|
def _base_filters(self):
|
|
base_filters = []
|
|
if self.context_id:
|
|
base_filters.append(f"context.id=={self.context_id}")
|
|
if self.context_type:
|
|
base_filters.append(f"context.type=={self.context_type}")
|
|
|
|
return base_filters
|
|
|
|
def repository_version(self, project_uuid=None, sha=None, ref=None):
|
|
filters = self._base_filters()
|
|
if project_uuid:
|
|
filters.append(f"meta.parent_uuid=={project_uuid}")
|
|
if sha:
|
|
filters.append(f"spec.version.sha=={sha}")
|
|
if ref:
|
|
filters.append(f"spec.version.ref=={ref}")
|
|
|
|
return " and ".join(filters)
|
|
|
|
def package_version(
|
|
self,
|
|
context_type: EndorContextType = None,
|
|
context_id=None,
|
|
project_uuid=None,
|
|
name=None,
|
|
package_name=None,
|
|
):
|
|
filters = self._base_filters()
|
|
if context_type:
|
|
filters.append(f"context.type=={context_type.value}")
|
|
if context_type:
|
|
filters.append(f"context.id=={context_id}")
|
|
if project_uuid:
|
|
filters.append(f"spec.project_uuid=={project_uuid}")
|
|
if name:
|
|
filters.append(f"spec.package_name=={name}")
|
|
if package_name:
|
|
filters.append(f"meta.name=={package_name}")
|
|
|
|
return " and ".join(filters)
|
|
|
|
def scan_result(
|
|
self,
|
|
context_type: EndorContextType = None,
|
|
project_uuid=None,
|
|
ref=None,
|
|
sha=None,
|
|
status=None,
|
|
):
|
|
filters = self._base_filters()
|
|
if context_type:
|
|
filters.append(f"context.type=={context_type.value}")
|
|
if project_uuid:
|
|
filters.append(f"meta.parent_uuid=={project_uuid}")
|
|
if ref:
|
|
filters.append(f"spec.versions.ref contains '{ref}'")
|
|
if sha:
|
|
filters.append(f"spec.versions.sha contains '{sha}'")
|
|
if status:
|
|
filters.append(f"spec.status=={status}")
|
|
|
|
return " and ".join(filters)
|
|
|
|
|
|
class EndorCtl:
|
|
"""Interact with endorctl (Endor Labs CLI)"""
|
|
|
|
# region internal functions
|
|
def __init__(
|
|
self,
|
|
namespace,
|
|
retry_limit=5,
|
|
sleep_duration=30,
|
|
endorctl_path="endorctl",
|
|
config_path=None,
|
|
):
|
|
self.namespace = namespace
|
|
self.retry_limit = retry_limit
|
|
self.sleep_duration = sleep_duration
|
|
self.endorctl_path = endorctl_path
|
|
self.config_path = config_path
|
|
|
|
def _call_endorctl(self, command, subcommand, **kwargs):
|
|
"""https://docs.endorlabs.com/endorctl/"""
|
|
|
|
try:
|
|
command = [self.endorctl_path, command, subcommand, f"--namespace={self.namespace}"]
|
|
if self.config_path:
|
|
command.append(f"--config-path={self.config_path}")
|
|
|
|
# parse args into flags
|
|
for key, value in kwargs.items():
|
|
# Handle endorctl flags with hyphens that are defined in the script with underscores
|
|
flag = key.replace("_", "-")
|
|
if value:
|
|
command.append(f"--{flag}={value}")
|
|
logger.info("Running: %s", " ".join(command))
|
|
|
|
result = subprocess.run(command, capture_output=True, text=True, check=True)
|
|
|
|
resource = json.loads(result.stdout)
|
|
|
|
except subprocess.CalledProcessError as e:
|
|
logger.error(f"Error executing command: {e}")
|
|
logger.error(e.stderr)
|
|
except json.JSONDecodeError as e:
|
|
logger.error(f"Error decoding JSON: {e}")
|
|
logger.error(f"Stdout: {result.stdout}")
|
|
except FileNotFoundError as e:
|
|
logger.error(f"FileNotFoundError: {e}")
|
|
logger.error(
|
|
f"'endorctl' not found in path '{self.endorctl_path}'. Supply the correct path, run 'buildscripts/install_endorctl.sh' or visit https://docs.endorlabs.com/endorctl/install-and-configure/"
|
|
)
|
|
except Exception as e:
|
|
logger.error(f"An unexpected error occurred: {e}")
|
|
else:
|
|
return resource
|
|
|
|
def _api_get(self, resource, **kwargs):
|
|
"""https://docs.endorlabs.com/endorctl/commands/api/"""
|
|
return self._call_endorctl("api", "get", resource=resource, **kwargs)
|
|
|
|
def _api_list(self, resource, filter=None, retry=True, **kwargs):
|
|
"""https://docs.endorlabs.com/endorctl/commands/api/"""
|
|
# If this script is run immediately after making a commit, Endor Labs will likely not yet have created the assocaited ScanResult object. The wait/retry logic below handles this scenario.
|
|
tries = 0
|
|
while True:
|
|
tries += 1
|
|
result = self._call_endorctl("api", "list", resource=resource, filter=filter, **kwargs)
|
|
|
|
# The expected output of 'endorctl api list' is: { "list": { "objects": [...] } }
|
|
# We want to just return the objects. In case we get an empty list, return a list
|
|
# with a single None to avoid having to handle index errors downstream.
|
|
if result and result["list"].get("objects") and len(result["list"]["objects"]) > 0:
|
|
return result["list"]["objects"]
|
|
elif retry:
|
|
logger.info(
|
|
f"API LIST: Resource not found: {resource} with filter '{filter}' in namespace '{self.namespace}'"
|
|
)
|
|
if tries <= self.retry_limit:
|
|
logger.info(
|
|
f"API LIST: Waiting for {self.sleep_duration} seconds before retry attempt {tries} of {self.retry_limit}"
|
|
)
|
|
time.sleep(self.sleep_duration)
|
|
else:
|
|
logger.warning(
|
|
f"API LIST: Maximum number of allowed retries {self.retry_limit} attempted with no {resource} found using filter '{filter}'"
|
|
)
|
|
return [None]
|
|
else:
|
|
return [None]
|
|
|
|
def _check_resource(self, resource, resource_description) -> None:
|
|
if not resource:
|
|
raise LookupError(f"Resource not found: {resource_description}")
|
|
logger.info(f"Retrieved: {resource_description}")
|
|
|
|
# endregion internal functions
|
|
|
|
# region resource functions
|
|
def get_resource(self, resource, uuid=None, name=None, field_mask=None, **kwargs):
|
|
"""https://docs.endorlabs.com/rest-api/using-the-rest-api/data-model/resource-kinds/"""
|
|
if not field_mask:
|
|
field_mask = _get_default_field_mask(resource)
|
|
return self._api_get(
|
|
resource=resource, uuid=uuid, name=name, field_mask=field_mask, **kwargs
|
|
)
|
|
|
|
def get_resources(
|
|
self,
|
|
resource,
|
|
filter=None,
|
|
field_mask=None,
|
|
sort_path="meta.create_time",
|
|
sort_order="descending",
|
|
retry=True,
|
|
**kwargs,
|
|
):
|
|
"""https://docs.endorlabs.com/rest-api/using-the-rest-api/data-model/resource-kinds/"""
|
|
if not field_mask:
|
|
field_mask = _get_default_field_mask(resource)
|
|
return self._api_list(
|
|
resource=resource,
|
|
filter=filter,
|
|
field_mask=field_mask,
|
|
sort_path=sort_path,
|
|
sort_order=sort_order,
|
|
retry=retry,
|
|
**kwargs,
|
|
)
|
|
|
|
def get_project(self, git_url):
|
|
resource_kind = EndorResourceKind.PROJECT.value
|
|
resource_description = (
|
|
f"{resource_kind} with name '{git_url}' in namespace '{self.namespace}'"
|
|
)
|
|
project = self.get_resource(resource_kind, name=git_url)
|
|
self._check_resource(project, resource_description)
|
|
return project
|
|
|
|
def get_repository_version(self, filter=None, retry=True):
|
|
resource_kind = EndorResourceKind.REPOSITORY_VERSION.value
|
|
resource_description = (
|
|
f"{resource_kind} with filter '{filter}' in namespace '{self.namespace}'"
|
|
)
|
|
repository_version = self.get_resources(
|
|
resource_kind, filter=filter, retry=retry, page_size=1
|
|
)[0]
|
|
self._check_resource(repository_version, resource_description)
|
|
return repository_version
|
|
|
|
def get_scan_result(self, filter=None, retry=True):
|
|
resource_kind = EndorResourceKind.SCAN_RESULT.value
|
|
resource_description = (
|
|
f"{resource_kind} with filter '{filter}' in namespace '{self.namespace}'"
|
|
)
|
|
scan_result = self.get_resources(resource_kind, filter=filter, retry=retry, page_size=1)[0]
|
|
self._check_resource(scan_result, resource_description)
|
|
uuid = scan_result.get("uuid")
|
|
start_time = scan_result["spec"].get("start_time")
|
|
refs = scan_result["spec"].get("refs")
|
|
polling_start_time = datetime.now()
|
|
while True:
|
|
status = scan_result["spec"].get("status")
|
|
end_time = scan_result["spec"].get("end_time")
|
|
if status == "STATUS_SUCCESS":
|
|
logger.info(
|
|
f" Scan completed successfully. ScanResult uuid {uuid} for refs {refs} started at {start_time}, ended at {end_time}."
|
|
)
|
|
return scan_result
|
|
elif status == "STATUS_RUNNING":
|
|
logger.info(
|
|
f" Scan is running. ScanResult uuid {uuid} for refs {refs} started at {start_time}."
|
|
)
|
|
logger.info(
|
|
f" Waiting {self.sleep_duration} seconds before checking status. Total wait time: {(datetime.now() - polling_start_time).total_seconds()/60:.2f} minutes"
|
|
)
|
|
time.sleep(self.sleep_duration)
|
|
scan_result = self.get_resources(
|
|
resource_kind, filter=filter, retry=retry, page_size=1
|
|
)[0]
|
|
elif status == "STATUS_PARTIAL_SUCCESS":
|
|
scan_logs = scan_result["spec"].get("logs")
|
|
raise RuntimeError(
|
|
f" Scan completed, but with critical warnings or errors. ScanResult uuid {uuid} for refs {refs} started at {start_time}, ended at {end_time}. Scan logs: {scan_logs}"
|
|
)
|
|
elif status == "STATUS_FAILURE":
|
|
scan_logs = scan_result["spec"].get("logs")
|
|
raise RuntimeError(
|
|
f" Scan failed. ScanResult uuid {uuid} for refs {refs} started at {start_time}, ended at {end_time}. Scan logs: {scan_logs}"
|
|
)
|
|
|
|
def get_package_versions(self, filter):
|
|
resource_kind = EndorResourceKind.PACKAGE_VERSION.value
|
|
resource_description = (
|
|
f"{resource_kind} with filter '{filter}' in namespace '{self.namespace}'"
|
|
)
|
|
package_versions = self.get_resources(resource_kind, filter=filter)
|
|
self._check_resource(package_versions, resource_description)
|
|
return package_versions
|
|
|
|
def export_sbom(
|
|
self,
|
|
package_version_uuid=None,
|
|
package_version_uuids=None,
|
|
package_version_name=None,
|
|
app_name=None,
|
|
project_name=None,
|
|
project_uuid=None,
|
|
):
|
|
"""Export an SBOM from Endor Labs
|
|
|
|
Valid parameter sets (other combinations result in an error from 'endorctl'):
|
|
Single-Package SBOM:
|
|
package_version_uuid
|
|
package_version_name
|
|
Multi-Package SBOM:
|
|
package_version_uuids,app_name
|
|
project_uuid,app_name,app_name
|
|
project_name,app_name,app_name
|
|
|
|
https://docs.endorlabs.com/endorctl/commands/sbom/export/
|
|
"""
|
|
if package_version_uuids:
|
|
package_version_uuids = ",".join(package_version_uuids)
|
|
return self._call_endorctl(
|
|
"sbom",
|
|
"export",
|
|
package_version_uuid=package_version_uuid,
|
|
package_version_uuids=package_version_uuids,
|
|
package_version_name=package_version_name,
|
|
app_name=app_name,
|
|
project_name=project_name,
|
|
project_uuid=project_uuid,
|
|
)
|
|
|
|
# endregion resource functions
|
|
|
|
# region workflow functions
|
|
def get_sbom_for_commit(self, git_url: str, commit_sha: str) -> dict:
|
|
"""Export SBOM for the PR commit (sha)"""
|
|
|
|
endor_filter = EndorFilter()
|
|
|
|
try:
|
|
# Project: get uuid
|
|
project = self.get_project(git_url)
|
|
project_uuid = project["uuid"]
|
|
app_name = project["spec"]["git"]["full_name"]
|
|
|
|
# RepositoryVersion: get the context for the PR scan
|
|
endor_filter.context_type = EndorContextType.CI_RUN.value
|
|
filter_str = endor_filter.repository_version(project_uuid, commit_sha)
|
|
repository_version = self.get_repository_version(filter_str)
|
|
context_id = repository_version["context"]["id"]
|
|
|
|
# ScanResult: wait for a completed scan
|
|
endor_filter.context_id = context_id
|
|
filter_str = endor_filter.scan_result(project_uuid)
|
|
self.get_scan_result(filter_str)
|
|
|
|
# PackageVersions: get package versions for SBOM
|
|
filter_str = endor_filter.package_version(project_uuid)
|
|
package_versions = self.get_package_versions(filter_str)
|
|
package_version_uuids = [
|
|
package_version["uuid"] for package_version in package_versions
|
|
]
|
|
package_version_names = [
|
|
package_version["meta"]["name"] for package_version in package_versions
|
|
]
|
|
|
|
# Export SBOM
|
|
sbom = self.export_sbom(package_version_uuids=package_version_uuids, app_name=app_name)
|
|
print(
|
|
f"Retrieved: CycloneDX SBOM for PackageVersion(s), name: {package_version_names}, uuid: {package_version_uuids}"
|
|
)
|
|
return sbom
|
|
|
|
except Exception as e:
|
|
print(f"Exception: {e}")
|
|
return
|
|
|
|
def get_sbom_for_branch(self, git_url: str, branch: str) -> dict:
|
|
"""Export lastest SBOM for a monitored branch/ref"""
|
|
|
|
endor_filter = EndorFilter()
|
|
|
|
try:
|
|
# Project: get uuid
|
|
project = self.get_project(git_url)
|
|
project_uuid = project["uuid"]
|
|
app_name = project["spec"]["git"]["full_name"]
|
|
|
|
# RepositoryVersion: get the context for the latest branch scan
|
|
filter_str = endor_filter.repository_version(project_uuid, ref=branch)
|
|
repository_version = self.get_repository_version(filter_str)
|
|
repository_version_uuid = repository_version["uuid"]
|
|
repository_version_ref = repository_version["spec"]["version"]["ref"]
|
|
repository_version_sha = repository_version["spec"]["version"]["sha"]
|
|
repository_version_scan_object_status = repository_version["scan_object"]["status"]
|
|
if repository_version_scan_object_status != "STATUS_SCANNED":
|
|
logger.warning(
|
|
f"RepositoryVersion (uuid: {repository_version_uuid}, ref: {repository_version_ref}, sha: {repository_version_sha}) scan status is '{repository_version_scan_object_status}' (expected 'STATUS_SCANNED')"
|
|
)
|
|
|
|
# ScanResult: search for a completed scan
|
|
filter_str = endor_filter.scan_result(
|
|
EndorContextType.MAIN, project_uuid, repository_version_ref, repository_version_sha
|
|
)
|
|
scan_result = self.get_scan_result(filter_str, retry=False)
|
|
project_uuid = scan_result["meta"]["parent_uuid"]
|
|
|
|
# PackageVersions: get package versions for SBOM
|
|
if branch == "master":
|
|
context_type = EndorContextType.MAIN
|
|
context_id = "default"
|
|
else:
|
|
context_type = EndorContextType.REF
|
|
context_id = branch
|
|
filter_str = endor_filter.package_version(context_type, context_id, project_uuid)
|
|
package_version = self.get_package_versions(filter_str)[0]
|
|
package_version_name = package_version["meta"]["name"]
|
|
package_version_uuid = package_version["uuid"]
|
|
|
|
# Export SBOM
|
|
sbom = self.export_sbom(package_version_uuid=package_version_uuid, app_name=app_name)
|
|
logger.info(
|
|
f"SBOM: Retrieved CycloneDX SBOM for PackageVersion, name: {package_version_name}, uuid {package_version_uuid}"
|
|
)
|
|
return sbom
|
|
|
|
except Exception as e:
|
|
print(f"Exception: {e}")
|
|
return
|
|
|
|
def get_sbom_for_project(self, git_url: str) -> dict:
|
|
"""Export latest SBOM for EndorCtl project default branch"""
|
|
|
|
try:
|
|
# Project: get uuid
|
|
project = self.get_project(git_url)
|
|
project_uuid = project["uuid"]
|
|
app_name = project["spec"]["git"]["full_name"]
|
|
|
|
# Export SBOM
|
|
sbom = self.export_sbom(project_uuid=project_uuid, app_name=app_name)
|
|
logger.info(f"Retrieved: CycloneDX SBOM for Project {app_name}")
|
|
return sbom
|
|
|
|
except Exception as e:
|
|
print(f"Exception: {e}")
|
|
return
|
|
|
|
# endregion workflow functions
|