mirror of https://github.com/mongodb/mongo
374 lines
14 KiB
Python
374 lines
14 KiB
Python
"""Subclass of unittest.TestCase with helpers for spawning a separate process.
|
|
|
|
This is used to perform the actual test case.
|
|
"""
|
|
|
|
import os
|
|
import os.path
|
|
import sys
|
|
import threading
|
|
import timeit
|
|
import unittest
|
|
import uuid
|
|
from typing import Any, Callable, Dict, Optional
|
|
|
|
import psutil
|
|
|
|
from buildscripts.resmokelib import config, logging
|
|
from buildscripts.resmokelib.hang_analyzer.hang_analyzer import HangAnalyzer
|
|
from buildscripts.resmokelib.utils import registry
|
|
from buildscripts.resmokelib.utils.self_test_fakes import test_analysis
|
|
|
|
_TEST_CASES: Dict[str, Callable] = {} # type: ignore
|
|
|
|
|
|
def make_test_case(test_kind, *args, **kwargs) -> "TestCase":
|
|
"""Provide factory function for creating TestCase instances."""
|
|
if test_kind not in _TEST_CASES:
|
|
raise ValueError("Unknown test kind '%s'" % test_kind)
|
|
return _TEST_CASES[test_kind](*args, **kwargs)
|
|
|
|
|
|
class TestCase(unittest.TestCase, metaclass=registry.make_registry_metaclass(_TEST_CASES)):
|
|
"""A test case to execute."""
|
|
|
|
REGISTERED_NAME = registry.LEAVE_UNREGISTERED
|
|
|
|
def __init__(
|
|
self, logger: logging.Logger, test_kind: str, test_name: str, dynamic: bool = False
|
|
):
|
|
"""Initialize the TestCase with the name of the test."""
|
|
unittest.TestCase.__init__(self, methodName="run_test")
|
|
|
|
if not isinstance(logger, logging.Logger):
|
|
raise TypeError("logger must be a Logger instance")
|
|
|
|
if not isinstance(test_kind, str):
|
|
raise TypeError("test_kind must be a string")
|
|
|
|
if not isinstance(test_name, str):
|
|
raise TypeError(f"test_name must be a string instead it is {type(test_name)}")
|
|
|
|
self._id = uuid.uuid4()
|
|
|
|
# When the TestCase is created by the TestSuiteExecutor (through a call to make_test_case())
|
|
# logger is an instance of TestQueueLogger. When the TestCase is created by a hook
|
|
# implementation it is an instance of BaseLogger.
|
|
self.logger = logger
|
|
# Used to store the logger when overridden by a test logger in Report.start_test().
|
|
self._original_logger: Optional[logging.Logger] = None
|
|
|
|
self.test_kind = test_kind
|
|
self.test_name = test_name
|
|
self.dynamic = dynamic
|
|
|
|
self.fixture: Optional["fixture.Fixture"] = None
|
|
self.return_code = None
|
|
self.propagate_error = None
|
|
self.timed_out = threading.Event()
|
|
self.timed_out_processed = threading.Event()
|
|
|
|
self.is_configured = False
|
|
|
|
def long_name(self):
|
|
"""Return the path to the test, relative to the current working directory."""
|
|
return os.path.relpath(self.test_name)
|
|
|
|
def basename(self):
|
|
"""Return the basename of the test."""
|
|
return os.path.basename(self.test_name)
|
|
|
|
def short_name(self):
|
|
"""Return the basename of the test without the file extension."""
|
|
return os.path.splitext(self.basename())[0]
|
|
|
|
def id(self):
|
|
"""Return the id of the test."""
|
|
return self._id
|
|
|
|
def get_test_kind(self):
|
|
"""Return the kind of the test. This will be something like JSTest."""
|
|
return self.test_kind
|
|
|
|
def short_description(self):
|
|
"""Return the short_description of the test."""
|
|
return "%s %s" % (self.test_kind, self.test_name)
|
|
|
|
def override_logger(self, new_logger: logging.Logger):
|
|
"""Override this instance's logger with a new logger.
|
|
|
|
This method is used by the repport to set the test logger.
|
|
"""
|
|
assert not self._original_logger, "Logger already overridden"
|
|
self._original_logger = self.logger
|
|
self.logger = new_logger
|
|
|
|
def reset_logger(self):
|
|
"""Reset this instance's logger to its original value."""
|
|
assert self._original_logger, "Logger was not overridden"
|
|
self.logger = self._original_logger
|
|
self._original_logger = None
|
|
|
|
def configure(self, fixture: "fixture.Fixture", *args, **kwargs):
|
|
"""Store 'fixture' as an attribute for later use during execution."""
|
|
if self.is_configured:
|
|
raise RuntimeError("configure can only be called once")
|
|
|
|
self.is_configured = True
|
|
self.fixture = fixture
|
|
|
|
def run_test(self):
|
|
"""Run the specified test."""
|
|
raise NotImplementedError("run_test must be implemented by TestCase subclasses")
|
|
|
|
def on_timeout(self):
|
|
"""Invoked when test execution has exceeded its time limit."""
|
|
self.timed_out.set()
|
|
self.timed_out_processed.set()
|
|
|
|
def as_command(self):
|
|
"""Return the command invocation used to run the test or None."""
|
|
return None
|
|
|
|
class METRIC_NAMES:
|
|
BASE_NAME = "test_base_name"
|
|
LONG_NAME = "test_long_name"
|
|
ID = "test_id"
|
|
KIND = "test_kind"
|
|
DYNAMIC = "test_dynamic"
|
|
BACKGROUND = "test_background"
|
|
|
|
def get_test_otel_attributes(self) -> Dict[str, Any]:
|
|
return {
|
|
TestCase.METRIC_NAMES.BASE_NAME: self.basename(),
|
|
TestCase.METRIC_NAMES.LONG_NAME: self.long_name(),
|
|
TestCase.METRIC_NAMES.ID: str(self.id()),
|
|
TestCase.METRIC_NAMES.KIND: self.get_test_kind(),
|
|
TestCase.METRIC_NAMES.DYNAMIC: self.dynamic,
|
|
}
|
|
|
|
|
|
class ProcessTestCase(TestCase):
|
|
"""Base class for TestCases that executes an external process."""
|
|
|
|
def run_test(self):
|
|
"""Run the test."""
|
|
try:
|
|
self.proc = self._make_process()
|
|
self._execute(self.proc)
|
|
except self.failureException:
|
|
raise
|
|
except:
|
|
self.logger.exception(
|
|
"Encountered an error running %s %s", self.test_kind, self.basename()
|
|
)
|
|
raise
|
|
|
|
def as_command(self):
|
|
"""Return the command invocation used to run the test."""
|
|
try:
|
|
proc = self._make_process()
|
|
return proc.as_command()
|
|
except:
|
|
self.logger.exception(
|
|
"Encountered an error getting command for %s %s", self.test_kind, self.basename()
|
|
)
|
|
raise self.failureException(
|
|
"%s failed when building process command" % (self.short_description(),)
|
|
)
|
|
|
|
def _execute(self, process: "process.Process"):
|
|
"""Run the specified process."""
|
|
|
|
start_time = timeit.default_timer()
|
|
self.logger.info("Starting %s...\n%s", self.short_description(), process.as_command())
|
|
|
|
process.start()
|
|
self.logger.info("%s started with pid %s.", self.short_description(), process.pid)
|
|
self.return_code = process.wait()
|
|
finished_time = timeit.default_timer()
|
|
|
|
if self.timed_out.is_set():
|
|
self.timed_out_processed.wait()
|
|
raise self.failureException(
|
|
"%s timed out and was killed, pid %s. Duration of process %fs"
|
|
% (
|
|
self.short_description(),
|
|
process.pid,
|
|
finished_time - start_time,
|
|
)
|
|
)
|
|
|
|
if self.return_code != 0:
|
|
raise self.failureException(
|
|
"%s failed with exit code %s, pid %s. Duration of process %fs"
|
|
% (
|
|
self.short_description(),
|
|
self.return_code,
|
|
process.pid,
|
|
finished_time - start_time,
|
|
)
|
|
)
|
|
|
|
self.logger.info(
|
|
"%s finished. Duration of process %fs",
|
|
self.short_description(),
|
|
finished_time - start_time,
|
|
)
|
|
|
|
def _make_process(self) -> "process.Process":
|
|
"""Return a new Process instance that could be used to run the test or log the command."""
|
|
raise NotImplementedError("_make_process must be implemented by TestCase subclasses")
|
|
|
|
def _get_all_processes(self):
|
|
"""
|
|
A best effort collection of all processes involved in the current test:
|
|
- Processes from the fixture.
|
|
- The test process itself.
|
|
- Any child of the test process.
|
|
- Any process in the same process group as the test process and children (Unix only).
|
|
- Any process that contains the environment variable marker the test process was created with (RESMOKE_TEST_ID=...).
|
|
|
|
It is possible this will miss orphaned processes created in a new process group on Mac,
|
|
since reading environment variables from arbitrary processes is generally blocked.
|
|
"""
|
|
|
|
def get_processes_by_pgid(target_pgid):
|
|
processes = []
|
|
for proc in psutil.process_iter():
|
|
try:
|
|
if os.getpgid(proc.pid) == target_pgid:
|
|
processes.append(proc)
|
|
except (psutil.NoSuchProcess, psutil.AccessDenied, psutil.ZombieProcess):
|
|
pass
|
|
return processes
|
|
|
|
def get_processes_with_env(env, value):
|
|
processes = []
|
|
for proc in psutil.process_iter():
|
|
try:
|
|
if env in proc.environ() and proc.environ().get(env) == value:
|
|
processes.append(proc)
|
|
except (psutil.NoSuchProcess, psutil.AccessDenied, psutil.ZombieProcess):
|
|
pass
|
|
return processes
|
|
|
|
pids = set([self.proc.pid])
|
|
|
|
if self.fixture:
|
|
pids.update(self.fixture.pids())
|
|
|
|
processes_children = psutil.Process(self.proc.pid).children(recursive=True)
|
|
pids.update([p.pid for p in processes_children])
|
|
|
|
processes_with_marker = get_processes_with_env("RESMOKE_TEST_ID", str(self._id))
|
|
pids.update([p.pid for p in processes_with_marker])
|
|
|
|
if sys.platform != "win32": # getpgid is Unix only
|
|
for child in [*processes_children, self.proc]:
|
|
try:
|
|
# Only kill the entire process group if the child process was created in a new group.
|
|
# If it is the same as this process, the group contains resmoke's python process and
|
|
# those from other parallel jobs.
|
|
pgid_child = os.getpgid(child.pid)
|
|
if pgid_child != os.getpgid(0):
|
|
for p in get_processes_by_pgid(pgid_child):
|
|
pids.add(p.pid)
|
|
except ProcessLookupError:
|
|
continue
|
|
return pids
|
|
|
|
def on_timeout(self):
|
|
self.timed_out.set()
|
|
|
|
pids = self._get_all_processes()
|
|
|
|
if "test_analysis" in config.INTERNAL_PARAMS:
|
|
test_analysis(self.logger, pids)
|
|
else:
|
|
options = {
|
|
"dump_core": True,
|
|
"process_ids": ",".join([str(p) for p in pids]),
|
|
"kill_processes": True,
|
|
"debugger_output": "",
|
|
"process_match": "exact",
|
|
"max_disk_usage_percent": 90,
|
|
"go_process_names": "",
|
|
"process_names": "",
|
|
}
|
|
hang_analyzer = HangAnalyzer(options, task_id=None, logger=self.logger)
|
|
hang_analyzer.execute()
|
|
self.timed_out_processed.set()
|
|
|
|
|
|
class TestCaseFactory:
|
|
def __init__(self, factory_class, shell_options):
|
|
if not issubclass(factory_class, TestCase):
|
|
raise TypeError(
|
|
"factory_class should be a subclass of Interface.TestCase", factory_class
|
|
)
|
|
self._factory_class = factory_class
|
|
self.shell_options = shell_options
|
|
|
|
def create_test_case(self, logger: logging.Logger, shell_options) -> TestCase:
|
|
raise NotImplementedError(
|
|
"create_test_case must be implemented by TestCaseFactory subclasses"
|
|
)
|
|
|
|
def create_test_case_for_thread(
|
|
self,
|
|
logger: logging.Logger,
|
|
num_clients: int = 1,
|
|
thread_id: int = 0,
|
|
tenant_id: Optional[str] = None,
|
|
) -> TestCase:
|
|
"""Create and configure a TestCase to be run in a separate thread."""
|
|
|
|
shell_options = self._get_shell_options_for_thread(num_clients, thread_id, tenant_id)
|
|
test_case = self.create_test_case(logger, shell_options)
|
|
return test_case
|
|
|
|
def configure(self, fixture: "fixture.Fixture", *args, **kwargs):
|
|
"""Configure the test case factory."""
|
|
raise NotImplementedError("configure must be implemented by TestCaseFactory subclasses")
|
|
|
|
def make_process(self):
|
|
"""Make a process for a TestCase."""
|
|
raise NotImplementedError("make_process must be implemented by TestCaseFactory subclasses")
|
|
|
|
def _get_shell_options_for_thread(
|
|
self, num_clients: int, thread_id: int, tenant_id: Optional[str]
|
|
) -> dict:
|
|
"""Get shell_options with an initialized TestData object for given thread."""
|
|
|
|
# We give each thread its own copy of the shell_options.
|
|
shell_options = self.shell_options.copy()
|
|
global_vars = shell_options["global_vars"].copy()
|
|
test_data = global_vars["TestData"].copy()
|
|
if tenant_id:
|
|
test_data["tenantId"] = tenant_id
|
|
|
|
# We set a property on TestData to mark the main test when multiple clients are going to run
|
|
# concurrently in case there is logic within the test that must execute only once. We also
|
|
# set a property on TestData to indicate how many clients are going to run the test so they
|
|
# can avoid executing certain logic when there may be other operations running concurrently.
|
|
is_main_test = thread_id == 0
|
|
test_data["isMainTest"] = is_main_test
|
|
test_data["numTestClients"] = num_clients
|
|
|
|
global_vars["TestData"] = test_data
|
|
shell_options["global_vars"] = global_vars
|
|
|
|
return shell_options
|
|
|
|
|
|
def append_process_tracking_options(kwargs, test_id):
|
|
"""Append process kwargs that will enable tracking subprocesses created by this test."""
|
|
|
|
# This is leveraged by test timeouts. Since we would like to not apply them in processes where
|
|
# there are nested resmoke invocations, only apply them when a test timeout is set.
|
|
if config.TEST_TIMEOUT is not None:
|
|
kwargs.setdefault("env_vars", {})
|
|
kwargs["env_vars"]["RESMOKE_TEST_ID"] = str(test_id)
|
|
kwargs["start_new_session"] = True
|