mongo/buildscripts/resmokelib/testing/testcases/fsm_workload_test.py

212 lines
7.6 KiB
Python

"""The unittest.TestCase for FSM workloads."""
import copy
import hashlib
import threading
import uuid
from typing import Optional
from buildscripts.resmokelib import logging, utils
from buildscripts.resmokelib.testing.testcases import interface, jsrunnerfile, jstest
from buildscripts.resmokelib.utils import registry
class _SingleFSMWorkloadTestCase(jsrunnerfile.JSRunnerFileTestCase):
"""An FSM workload to execute."""
REGISTERED_NAME = registry.LEAVE_UNREGISTERED
def __init__(self, logger, test_name, test_id, shell_executable=None, shell_options=None):
"""Initialize the _SingleFSMWorkloadTestCase with the FSM workload file."""
jsrunnerfile.JSRunnerFileTestCase.__init__(
self,
logger,
"FSM workload",
test_name,
test_runner_file="jstests/concurrency/fsm_libs/resmoke_runner.js",
shell_executable=shell_executable,
shell_options=shell_options,
)
self._id = test_id
def configure(self, fixture, *args, **kwargs):
"""Configure the FSMWorkloadTestCase runner."""
interface.ProcessTestCase.configure(self, fixture, *args, **kwargs)
class _FSMWorkloadTestCaseBuilder(interface.TestCaseFactory):
"""Build the real TestCase in the FSMWorkloadTestCase wrapper."""
_COUNTER_LOCK = threading.Lock()
_COUNTER = 0
def __init__(
self,
logger,
fsm_workload_group,
test_name,
test_id,
shell_executable=None,
shell_options=None,
same_db=False,
same_collection=False,
db_name_prefix=None,
):
"""Initialize the _FSMWorkloadTestCaseBuilder."""
interface.TestCaseFactory.__init__(self, _SingleFSMWorkloadTestCase, shell_options)
self.logger = logger
self.fsm_workload_group = fsm_workload_group
self.test_name = test_name
self.test_id = test_id
self.shell_executable = shell_executable
self.shell_options = utils.default_if_none(shell_options, {}).copy()
self.same_collection = same_collection
self.same_db = same_db or self.same_collection
self.db_name_prefix = db_name_prefix
self.dbpath_prefix = None
self.fixture = None
def configure(self, fixture, *args, **kwargs):
self.fixture = fixture
self.dbpath_prefix = self.fixture.get_dbpath_prefix()
global_vars = self.shell_options.get("global_vars", {}).copy()
test_data = global_vars.get("TestData", {}).copy()
self._populate_test_data(test_data)
global_vars["TestData"] = test_data
self.shell_options["global_vars"] = global_vars
process_kwargs = copy.deepcopy(self.shell_options.get("process_kwargs", {}))
interface.append_process_tracking_options(process_kwargs, self.test_id)
self.shell_options["process_kwargs"] = process_kwargs
def _populate_test_data(self, test_data):
test_data["fsmWorkloads"] = self.fsm_workload_group
test_data["resmokeDbPathPrefix"] = self.dbpath_prefix
with _FSMWorkloadTestCaseBuilder._COUNTER_LOCK:
count = _FSMWorkloadTestCaseBuilder._COUNTER
_FSMWorkloadTestCaseBuilder._COUNTER += 1
# We use a global incrementing counter as a prefix for the database name to avoid any
# collection lifecycle related issues in sharded clusters. This more closely matches how
# uniqueDBName() and uniqueCollName() would have returned distinct values when called once
# for each FSM workload in the entire schedule by runner.js.
test_prefix = self.db_name_prefix if self.db_name_prefix else "test"
test_data["dbNamePrefix"] = "{}{:d}_".format(test_prefix, count)
test_data["sameDB"] = self.same_db
test_data["sameCollection"] = self.same_collection
test_data["peerPids"] = self.fixture.pids()
def make_process(self):
# This function should only be called by MultiClientsTestCase's _make_process().
test_case = self.create_test_case(self.logger, self.shell_options)
return test_case._make_process()
def create_test_case(self, logger, shell_options) -> _SingleFSMWorkloadTestCase:
test_case = _SingleFSMWorkloadTestCase(
logger, self.test_name, self.test_id, self.shell_executable, shell_options
)
test_case.configure(self.fixture)
return test_case
class FSMWorkloadTestCase(jstest.MultiClientsTestCase):
"""A wrapper for several copies of a _SingleFSMWorkloadTestCase to execute."""
REGISTERED_NAME = "fsm_workload_test"
TEST_KIND = "FSM workload"
def __init__(
self,
logger: logging.Logger,
selected_tests: list[str],
shell_executable: Optional[str] = None,
shell_options: Optional[dict] = None,
same_db: bool = False,
same_collection: bool = False,
db_name_prefix: Optional[str] = None,
):
"""Initialize the FSMWorkloadTestCase with the FSM workload file."""
assert len(selected_tests) == 1
fsm_workload_group = self.get_workload_group(selected_tests[0])
test_name = self.get_workload_uid(selected_tests[0])
test_id = uuid.uuid4()
factory = _FSMWorkloadTestCaseBuilder(
logger,
fsm_workload_group,
test_name,
test_id,
shell_executable,
shell_options,
same_db,
same_collection,
db_name_prefix,
)
jstest.MultiClientsTestCase.__init__(
self, logger, self.TEST_KIND, test_name, test_id, factory
)
@staticmethod
def get_workload_group(selected_test: str) -> list[str]:
"""Generate an FSM workload group from tests selected by the selector."""
# Selectors for non-parallel FSM suites return the name of a workload, we
# put it into a list to create a workload group of size 1.
return [selected_test]
@staticmethod
def get_workload_uid(selected_test: str) -> str:
"""Get an unique identifier for a workload group."""
# For non-parallel versions of the FSM framework, the workload group name is just the
# name of the workload.
return selected_test
class ParallelFSMWorkloadTestCase(FSMWorkloadTestCase):
"""An FSM workload to execute."""
REGISTERED_NAME = "parallel_fsm_workload_test"
@staticmethod
def get_workload_group(selected_tests: list[str]) -> list[str]:
"""Generate an FSM workload group from tests selected by the selector.
When this function was updated the naming was misleading.
Now the naming is right and mypy warns that that this is a different function than its parent which is correct.
This should be refactored when time permits.
Just return the list of selected tests as the workload.
Args:
selected_tests (list[str]): list of tests to use
Returns
list[str]: selected tests
"""
return selected_tests
@staticmethod
def get_workload_uid(selected_tests: list[str]) -> str:
"""Get an unique identifier for a workload group.
When this function was updated the naming was misleading.
Now the naming is right and mypy warns that that this is a different function than its parent which is correct.
This should be refactored when time permits.
Args:
selected_tests (list[str]): list of tests to use
Returns
str: hash of all tests
"""
uid = hashlib.md5()
for workload_name in sorted(selected_tests):
uid.update(workload_name.encode("utf-8"))
return uid.hexdigest()