mongo/buildscripts/resmokelib/testing/hooks/rotate_execution_control_pa...

321 lines
12 KiB
Python

"""
Hook that periodically rotates the 'executionControlConcurrencyAdjustmentAlgorithm' and deprioritization
parameters to new random valid values on all mongod processes.
"""
import random
import sys
import threading
import time
from buildscripts.resmokelib import errors
from buildscripts.resmokelib.testing.fixtures import interface as fixture_interface
from buildscripts.resmokelib.testing.fixtures import replicaset, shardedcluster, standalone
from buildscripts.resmokelib.testing.hooks import interface
from buildscripts.resmokelib.testing.hooks import lifecycle as lifecycle_interface
class RotateExecutionControlParams(interface.Hook):
"""
Periodically sets 'executionControlConcurrencyAdjustmentAlgorithm' and deprioritization parameters
to random valid values.
"""
DESCRIPTION = "Periodically rotates 'executionControlConcurrencyAdjustmentAlgorithm' and deprioritization parameters to random valid values"
IS_BACKGROUND = True
# The list of valid values to choose from.
_ALGORITHM_OPTIONS = [
"fixedConcurrentTransactions",
"throughputProbing",
]
def __init__(
self,
hook_logger,
fixture,
seed=random.randrange(sys.maxsize),
auth_options=None,
):
"""Initialize the RotateExecutionControlParams hook.
Args:
hook_logger: the logger instance for this hook.
fixture: the target fixture (standalone, replica set, sharded cluster, or multi-cluster fixture).
seed: the random seed to use.
auth_options: dictionary of auth options.
"""
interface.Hook.__init__(
self, hook_logger, fixture, RotateExecutionControlParams.DESCRIPTION
)
self._fixture = fixture
self._auth_options = auth_options
self._rng = random.Random(seed)
self._standalone_fixtures = []
self._rs_fixtures = []
self._set_param_thread = None
def before_suite(self, test_report):
"""Before suite."""
self.logger.info("Finding all mongod fixtures to target...")
for cluster in self._fixture.get_testable_clusters():
self._add_fixture(cluster)
self.logger.info(
f"Found {len(self._standalone_fixtures)} standalone and {len(self._rs_fixtures)} replica set fixtures."
)
self._set_param_thread = _RotateExecutionControlParamsThread(
self.logger,
self._rs_fixtures,
self._standalone_fixtures,
self._rng,
self._ALGORITHM_OPTIONS,
lifecycle_interface.FlagBasedThreadLifecycle(),
self._auth_options,
)
self.logger.info("Starting the execution control parameters rotation thread.")
self._set_param_thread.start()
def after_suite(self, test_report, teardown_flag=None):
"""After suite."""
self.logger.info("Stopping the execution control parameters rotation thread.")
if self._set_param_thread:
self._set_param_thread.stop()
self.logger.info("Execution control parameters rotation thread stopped.")
def before_test(self, test, test_report):
"""Before test. Log current config."""
self.logger.info("Logging current parameter state before test...")
for repl_set in self._rs_fixtures:
for node in repl_set.nodes:
self._invoke_get_parameter_and_log(node)
for standalone in self._standalone_fixtures:
self._invoke_get_parameter_and_log(standalone)
self.logger.info("Resuming the execution control parameters rotation thread.")
self._set_param_thread.pause()
self._set_param_thread.resume()
def after_test(self, test, test_report):
"""After test. Log current config."""
self.logger.info("Pausing the execution control parameters rotation thread.")
self._set_param_thread.pause()
self.logger.info("Paused the execution control parameters rotation thread.")
self.logger.info("Logging current parameter state after test...")
for repl_set in self._rs_fixtures:
for node in repl_set.nodes:
self._invoke_get_parameter_and_log(node)
for standalone in self._standalone_fixtures:
self._invoke_get_parameter_and_log(standalone)
def _add_fixture(self, fixture):
"""
Recursively find and add all mongod fixtures (standalone or replicaset) to our internal lists.
"""
if isinstance(fixture, standalone.MongoDFixture):
self._standalone_fixtures.append(fixture)
elif isinstance(fixture, replicaset.ReplicaSetFixture):
self._rs_fixtures.append(fixture)
elif isinstance(fixture, shardedcluster.ShardedClusterFixture):
# Recurse into shards
for shard_fixture in fixture.shards:
self._add_fixture(shard_fixture)
# Recurse into config server
if fixture.config_shard is None:
self._add_fixture(fixture.configsvr)
# We intentionally DO NOT add fixture.mongos, as the parameter is not valid on mongos.
else:
# This could be a direct MongoSFixture or other non-mongod fixture.
self.logger.debug(f"Skipping fixture {fixture} as it is not a mongod.")
def _invoke_get_parameter_and_log(self, node):
"""
Helper to print the current state of the execution control parameters.
"""
client = fixture_interface.build_client(node, self._auth_options)
try:
algorithm_result = client.admin.command(
"getParameter",
1,
executionControlConcurrencyAdjustmentAlgorithm=1,
)
heuristic_result = client.admin.command(
"getParameter",
1,
executionControlHeuristicDeprioritization=1,
)
background_result = client.admin.command(
"getParameter",
1,
executionControlBackgroundTasksDeprioritization=1,
)
deprioritization_result = client.admin.command(
"getParameter",
1,
executionControlDeprioritizationGate=1,
)
self.logger.info(
"Current state on node %d: algorithm=%s, heuristic=%s, background=%s, deprio=%s",
node.port,
algorithm_result.get("executionControlConcurrencyAdjustmentAlgorithm", "NOT_FOUND"),
heuristic_result.get("executionControlHeuristicDeprioritization", "NOT_FOUND"),
background_result.get(
"executionControlBackgroundTasksDeprioritization", "NOT_FOUND"
),
deprioritization_result.get("executionControlDeprioritizationGate", "NOT_FOUND"),
)
except Exception as e:
self.logger.warning(
"Failed to getParameter from node %d: %s",
node.port,
e,
)
class _RotateExecutionControlParamsThread(threading.Thread):
def __init__(
self,
logger,
rs_fixtures,
standalone_fixtures,
rng,
algorithm_options,
lifecycle,
auth_options=None,
):
"""Initialize _RotateExecutionControlParamsThread."""
threading.Thread.__init__(self, name="RotateExecutionControlParamsThread")
self.daemon = True
self.logger = logger
self._rs_fixtures = rs_fixtures
self._standalone_fixtures = standalone_fixtures
self._rng = rng
self._algorithm_options = algorithm_options
self.__lifecycle = lifecycle
self._auth_options = auth_options
self._setparameter_interval_secs = 30 # Set parameter every 30 seconds
self._last_exec = time.time()
# Event set when the thread has been stopped using the 'stop()' method.
self._is_stopped_evt = threading.Event()
# Event set when the thread is not performing stepdowns.
self._is_idle_evt = threading.Event()
self._is_idle_evt.set()
def run(self):
"""Execute the thread."""
try:
while True:
self._is_idle_evt.set()
permitted = self.__lifecycle.wait_for_action_permitted()
if not permitted:
break # Thread was stopped
self._is_idle_evt.clear()
now = time.time()
if now - self._last_exec > self._setparameter_interval_secs:
self._do_set_parameter()
self._last_exec = time.time()
found_idle_request = self.__lifecycle.poll_for_idle_request()
if found_idle_request:
self.__lifecycle.send_idle_acknowledgement()
continue
# The 'wait_secs' is used to wait 'self._setparameter_interval_secs' from the moment
# the last setParameter command was sent.
now = time.time()
wait_secs = max(0, self._setparameter_interval_secs - (now - self._last_exec))
self.__lifecycle.wait_for_action_interval(wait_secs)
except Exception:
# Proactively log the exception
self.logger.exception("RotateExecutionControlParamsThread threw exception")
self._is_idle_evt.set()
def stop(self):
"""Stop the thread."""
self.__lifecycle.stop()
self._is_stopped_evt.set()
# Unpause to allow the thread to finish.
self.resume()
self.join()
def pause(self):
"""Pause the thread."""
self.__lifecycle.mark_test_finished()
# Wait until we are no longer executing setParameter.
self._is_idle_evt.wait()
# Check if the thread is alive
self._check_thread()
# Check that fixtures are still running
for rs_fixture in self._rs_fixtures:
if not rs_fixture.is_running():
raise errors.ServerFailure(
f"ReplicaSetFixture with pids {rs_fixture.pids()} expected to be running in"
" SetParameter, but wasn't."
)
def resume(self):
"""Resume the thread."""
self.__lifecycle.mark_test_started()
def _wait(self, timeout):
# Wait until stop or timeout.
self._is_stopped_evt.wait(timeout)
def _check_thread(self):
if not self.is_alive():
msg = "The RotateExecutionControlParamsThread thread is not running."
self.logger.error(msg)
raise errors.ServerFailure(msg)
def _invoke_set_parameter(self, client, param_name, param_value):
"""Helper to invoke setParameter on a given client for a single parameter."""
client.admin.command("setParameter", 1, **{param_name: param_value})
def _do_set_parameter(self):
"""
Picks a new random algorithm and random boolean values for the deprioritization parameters,
then applies them to all standalone and replica set nodes.
"""
params_to_set = {
"executionControlConcurrencyAdjustmentAlgorithm": self._rng.choice(
self._algorithm_options
),
"executionControlHeuristicDeprioritization": self._rng.choice([True, False]),
"executionControlBackgroundTasksDeprioritization": self._rng.choice([True, False]),
"executionControlDeprioritizationGate": self._rng.choice([True, False]),
}
for repl_set in self._rs_fixtures:
self.logger.info(
"Setting parameters on all nodes of replica set %s. Parameters: %s",
repl_set.replset_name,
params_to_set,
)
for node in repl_set.nodes:
client = fixture_interface.build_client(node, self._auth_options)
for param_name, param_value in params_to_set.items():
self._invoke_set_parameter(client, param_name, param_value)
for standalone in self._standalone_fixtures:
self.logger.info(
"Setting parameters on standalone on port %d. Parameters: %s",
standalone.port,
params_to_set,
)
client = fixture_interface.build_client(standalone, self._auth_options)
for param_name, param_value in params_to_set.items():
self._invoke_set_parameter(client, param_name, param_value)