#!/usr/bin/env python3 # # Component/Team Smoke Tests # # To be run prior to submitting evergreen patches. # Runs the following locally and makes sure they pass: # * clang format # * clang tidy # * build install-dist-test # * unit tests # * smoke tests # # By default, notifies the locally configured Evergreen user # via slack once the smoke test are finished. # import hashlib import os import shutil import subprocess import sys import time from collections import defaultdict, deque from dataclasses import dataclass from pathlib import Path from socket import gethostname from typing import Any, Deque, Dict, List, Optional, Set, Union from rich.status import Status SMOKE_TEST_DIR = Path(__file__).resolve().parent ROOT = SMOKE_TEST_DIR.parent.parent MONGO_PYTHON = ROOT.joinpath("python3-venv") MONGO_PYTHON_INTERPRETER = MONGO_PYTHON.joinpath("bin", "python") BAZEL = Path(shutil.which("bazel")) def make_unique_name(): ctx = hashlib.new("sha256") ctx.update(ROOT.resolve().as_posix().encode()) return ctx.hexdigest()[-8:] REPO_UNIQUE_NAME = make_unique_name() def ensure_python3_venv(): if sys.executable != MONGO_PYTHON_INTERPRETER.as_posix(): os.execv( MONGO_PYTHON_INTERPRETER, [MONGO_PYTHON_INTERPRETER, *sys.argv], ) # needed for relative imports for eg: buildscripts sys.path.append(ROOT.as_posix()) ensure_python3_venv() # can import these after verifying we're running with the correct venv from buildscripts.resmokelib.utils.evergreen_conn import get_evergreen_api @dataclass class Node: name: str args: List[str] popen_kwargs: Dict[str, str] log_file: Path deps: Set["Node"] _start_time: Optional[float] = None _finish_time: Optional[float] = None _proc: Optional[subprocess.Popen] = None def __str__(self): return f'Node("{self.name}")' def __repr__(self): return f'Node("{self.name}")' def __hash__(self): return hash(self.name) def __eq__(self, other: "Node"): return self.name == other.name def __lt__(self, other: "Node"): return self.name < other.name def start(self): self._start_time = time.monotonic() logstream = self.log_file.open("w") self._proc = subprocess.Popen( self.args, stdout=logstream, stderr=logstream, ) def returncode(self): if self._proc is None: return None if self._finish_time is not None: return self._proc.returncode if self._proc.poll() is None: return None self._finish_time = time.monotonic() return self._proc.returncode def deps_are_satisfied(self, finished: Set["Node"]): return self.deps.issubset(finished) def normalize_deps(x: Union[None, Node, Set[Node]]): if x is None: return set() elif isinstance(x, (tuple, list, set)): return set(x) else: return {x} def send_slack_notification(nodes: List[Node], total_elapsed: float, component_name: str): overall_success = True lines = [ "```", f"id={REPO_UNIQUE_NAME} host={gethostname()} root={ROOT}", ] failure_lines = list() for node in nodes: rc = node.returncode() succeeded = rc == 0 finished = rc is not None command = " ".join(node.args) overall_success &= succeeded if succeeded: elapsed = node._finish_time - node._start_time lines.append(f"{time.strftime('%H:%M:%S', time.gmtime(elapsed))} {node.name} ✔") elif not finished: lines.append(f" {node.name}") else: elapsed = node._finish_time - node._start_time lines.append(f"{time.strftime('%H:%M:%S', time.gmtime(elapsed))} {node.name} ✖") failure_lines.append(f"Command '{node.name}', rc={node._proc.returncode}:") failure_lines.append(f"```\n{command}\n```") failure_lines.append(f"Log: {node.log_file}") lines.append("```") lines.extend(failure_lines) if overall_success: lines.insert( 0, f"SUCCESS - {component_name} smoke tests passed in {time.strftime('%H:%M:%S', time.gmtime(total_elapsed))}", ) else: lines.insert( 0, f"FAILURE - {component_name} smoke tests failed in {time.strftime('%H:%M:%S', time.gmtime(total_elapsed))}", ) evg = get_evergreen_api() evg.send_slack_message( target=f"@{evg._auth.username}", msg="\n".join(lines), ) class CommandRunner: def __init__( self, *, log_path: Path, notify_slack: bool, parallelism: int, component_name: str, ): self._log_path = log_path self._parallelism = parallelism self._downstream: Dict[Node, Set[Node]] = defaultdict(set) self._nodes: Set[Node] = set() self._finished: Set[Node] = set() self._ready: Deque[Node] = deque() self._running: Set[Node] = set() self._status = Status(status=f"{component_name} smoke tests") self._start_time = time.monotonic() self._finish_time: Optional[float] = None self._notify_slack = notify_slack self._component_name = component_name def _notify(self, event: str, node: Node): if event == "spawn": print(f"{' '.join(node.args)}") self._update_display() def _update_display(self): nrun = len(self._running) nfin = len(self._finished) ntot = len(self._nodes) elapsed = time.monotonic() - self._start_time self._status.update( status=f"running {nrun}, completed {nfin}/{ntot} {time.strftime('%H:%M:%S', time.gmtime(elapsed))}" ) def command( self, *, name: str, args: List[Any], log_file: str, deps: Union[None, Node, Set[Node]] = None, **kwargs, ) -> Node: log_file = self._log_path.joinpath(log_file) kwargs.setdefault("cwd", ROOT) node = Node( name=name, args=list(map(str, args)), popen_kwargs=kwargs, log_file=log_file, deps=normalize_deps(deps), ) self._nodes.add(node) if len(node.deps) == 0: self._notify("ready", node) self._ready.append(node) for dep in node.deps: self._downstream[dep].add(node) return node def run(self): print(f"Logging results to {self._log_path}") self._status.start() try: iter_finished: Set[Node] = set() while self._finished != self._nodes: while len(self._running) < self._parallelism and len(self._ready) > 0: node = self._ready.popleft() node.start() self._running.add(node) self._notify("spawn", node) for node in self._running: rc = node.returncode() if rc is not None: self._notify("reap", node) iter_finished.add(node) self._finished.add(node) if rc != 0: raise subprocess.CalledProcessError( returncode=rc, cmd=" ".join(node.args), output=f"Log: {node.log_file}", ) for down in self._downstream[node]: if down.deps_are_satisfied(self._finished): self._notify("ready", down) self._ready.append(down) for node in iter_finished: self._running.remove(node) if len(iter_finished) == 0: time.sleep(0.1) iter_finished.clear() self._update_display() elapsed = time.monotonic() - self._start_time if self._notify_slack: send_slack_notification( nodes=sorted(self._nodes), total_elapsed=elapsed, component_name=self._component_name, ) print(f"Completed {len(self._finished)}/{len(self._nodes)} in {elapsed:.3f}s.") except subprocess.CalledProcessError as cpe: print(f"""\ Failure: command {cpe.cmd} rc {cpe.returncode} log {cpe.output}""") send_slack_notification( nodes=sorted(self._nodes), total_elapsed=time.monotonic() - self._start_time, component_name=self._component_name, ) raise finally: self._status.stop() component_name_to_formal_name = { "catalog-and-routing": "Catalog and Routing", "replication": "Replication", "server-integration": "Storage Engines Server Integration", "server-programmability": "Server Programmability", "storage-execution": "Storage Execution", "server-bsoncolumn": "Server BSONColumn", "server-collection-write-path": "Server Collection Write Path", "server-external-sorter": "Server External Sorter", "server-index-builds": "Server Index Builds", "server-key-string": "Server Key String", "server-storage-engine-integration": "Server Storage Engine Integration", "server-timeseries-bucket-catalog": "Server Timeseries Bucket Catalog", "server-ttl": "Server TTL", } component_name_to_integration_test_suite = { "catalog-and-routing": "catalog_and_routing", "replication": "replication", "storage-execution": "storage_execution", "server-integration": "server_integration", "server-collection-write-path": "server_collection_write_path", "server-index-builds": "server_index_builds", "server-programmability": "server_programmability", "server-storage-engine-integration": "server_storage_engine_integration", "server-ttl": "server_ttl", } component_name_to_unit_test_tag = { "server-integration": "server-integration-smoke", "replication": "mongo_unittest", "storage-execution": "server-bsoncolumn,server-collection-write-path,server-external-sorter,server-index-builds,server-key-string,server-storage-engine-integration,server-timeseries-bucket-catalog,server-tracking-allocators,server-ttl", "server-bsoncolumn": "server-bsoncolumn", "server-collection-write-path": "server-collection-write-path", "server-external-sorter": "server-external-sorter", "server-index-builds": "server-index-builds", "server-key-string": "server-key-string", "server-storage-engine-integration": "server-storage-engine-integration", "server-timeseries-bucket-catalog": "server-timeseries-bucket-catalog", "server-tracking-allocators": "server-tracking-allocators", "server-ttl": "server-ttl", } def run_smoke_tests( *, component_name, log_path: Path, upstream_branch: str, bazel_args: List[str], run_clang_tidy: bool, send_slack_notification: bool, ): log_path = log_path.joinpath(REPO_UNIQUE_NAME) log_path.mkdir(parents=True, exist_ok=True) integration_suite_name = ( component_name_to_integration_test_suite[component_name] if component_name in component_name_to_integration_test_suite else "" ) unit_test_tag = ( component_name_to_unit_test_tag[component_name] if component_name in component_name_to_unit_test_tag else "" ) unit_test_build_target = ( "//src/mongo/db/repl/..." if component_name == "replication" else "//src/mongo/..." ) component_name_for_messages = component_name_to_formal_name[component_name] runner = CommandRunner( component_name=component_name_for_messages, log_path=log_path, notify_slack=send_slack_notification, parallelism=os.cpu_count(), ) formatters = [ runner.command( name="misc. lint", args=[ BAZEL, "run", "//:lint", ], log_file="misc_lint.log", ), runner.command( # catch-all for other bazel-driven formatters name="misc. format", args=[ BAZEL, "run", "//:format", ], log_file="misc_format.log", ), ] install = runner.command( name="build install executables", args=[ BAZEL, "build", *bazel_args, "//:install-dist-test", ], log_file="build_install_dist_test.log", deps=formatters, ) integration_tests = None if integration_suite_name != "": integration_tests = runner.command( name=f"run {component_name} smoke tests", args=[ MONGO_PYTHON_INTERPRETER, ROOT.joinpath("buildscripts", "run_smoke_tests.py"), "--suites", integration_suite_name, ], log_file="smoke_tests.log", deps=install, ) unittests = None if unit_test_tag != "": unittests = runner.command( name=f"run {component_name} unittests", args=[ BAZEL, "test", *bazel_args, f"--test_tag_filters={unit_test_tag}", "--test_output=summary", "--dev_stacktrace=False", unit_test_build_target, ], # NOTE: bazel already stores the real logs somewhere else log_file="unittests.log", # not a true dep, but bazel access has to be serialized deps=install, ) if run_clang_tidy: runner.command( name="clang tidy", args=[ BAZEL, "build", # NOTE: don't use user-provided bazel args for clang-tidy "--config=clang-tidy", "--verbose_failures", "--keep_going", "//src/mongo/...", ], log_file="clang_tidy.log", # serializes bazel access. Also prevents clang-tidy from changing # the build config from under the smoke test suite before it's # finished running. deps=(integration_tests, unittests), ) runner.run() def main(): from argparse import ArgumentParser p = ArgumentParser() p.add_argument( "component", type=str, help="Component that you wish to run the smoke test suite for. The available components are: catalog-and-routing, server-integration, replication, server-bsoncolumn, server-collection-write-path, server-external-sorter, server-index-builds, server-storage-engine-integration, server-timeseries-bucket-catalog, server-tracking-allocator, server-ttl", ) p.add_argument( "--log-path", type=Path, help="Directory to place logs from smoke test stages", default=Path("~/.logs/smoke_tests").expanduser(), ) p.add_argument( "--upstream-branch", type=str, default="origin/master", help="Git branch to format diff against", ) p.add_argument( "--run-clang-tidy", type=bool, default=False, help="Run clang tidy. This might take a while to run, and will also change the build config to config=clang-tidy.", ) p.add_argument( "--send-slack-notification", type=int, default=1, help='Send a slack notification based on the local evergreen configuration to "yourself"', ) args, bazel_args = p.parse_known_args() run_smoke_tests( component_name=args.component, log_path=args.log_path, upstream_branch=args.upstream_branch, bazel_args=bazel_args, run_clang_tidy=args.run_clang_tidy, send_slack_notification=args.send_slack_notification, ) if __name__ == "__main__": main()