mongo/buildscripts/resmokelib/testing/hooks/fuzz_runtime_parameters.py

435 lines
17 KiB
Python

"""Test hook that periodically makes the primary of a replica set step down."""
import copy
import random
import re
import sys
import threading
import time
from pymongo.errors import OperationFailure
from buildscripts.resmokelib import errors
from buildscripts.resmokelib.generate_fuzz_config.mongo_fuzzer_configs import (
generate_normal_mongo_parameters,
)
from buildscripts.resmokelib.testing.fixtures import interface as fixture_interface
from buildscripts.resmokelib.testing.fixtures import replicaset, shardedcluster, standalone
from buildscripts.resmokelib.testing.hooks import interface
from buildscripts.resmokelib.testing.hooks import lifecycle as lifecycle_interface
from buildscripts.resmokelib.testing.retry import with_naive_retry
def validate_runtime_parameter_spec(spec):
for key, value in spec.items():
if not (isinstance(value, dict) and value.get("period", 0) >= 1):
raise ValueError(
f"Invalid runtime parameter fuzz config entry for key '{key}' : {value}"
)
class RuntimeParametersState:
"""Encapsulates the runtime-state of a set of parameters we are fuzzing. Tracks the last time we set a parameter value and holds
the logic for generating new values."""
def __init__(self, spec, seed):
# Initialize the runtime state of each parameter in the spec, including the lastSet time at now, so we start setting the parameters
# at appropriate intervals after the suite begins.
now = time.time()
self._params = {
key: {**copy.deepcopy(value), "lastSet": now} for key, value in spec.items()
}
self._rng = random.Random(seed)
def generate_parameters(self):
"""Returns a dictionary of what parameters should be set now, along with values to set them to, based on the last time the
parameter was set and the period provided in the spec"""
ret = {}
now = time.time()
for key, value in self._params.items():
if now - value["lastSet"] >= value["period"]:
ret[key] = generate_normal_mongo_parameters(self._rng, value)
value["lastSet"] = now
return ret
def get_spec(self):
"""Return a dictionary of all parameters subject to runtime fuzzing suitable for use with getParameter."""
return {key: 1 for key in self._params.keys()}
def get_keys(self):
"""Returns all parameter names."""
return list(self._params.keys())
class FuzzRuntimeParameters(interface.Hook):
"""Regularly connect to nodes and sends them a setParameter command."""
DESCRIPTION = "Changes the value of runtime-settable parameters at regular intervals"
IS_BACKGROUND = True
def __init__(
self,
hook_logger,
fixture,
seed=random.randrange(sys.maxsize),
auth_options=None,
):
"""Initialize the FuzzRuntimeParameters.
Args:
hook_logger: the logger instance for this hook.
fixture: the target fixture (standalone, replica set, sharded cluster, or multi-cluster fixture).
auth_options: dictionary of auth options.
"""
interface.Hook.__init__(self, hook_logger, fixture, FuzzRuntimeParameters.DESCRIPTION)
self._mongod_param_state = None
self._mongos_param_state = None
self._cluster_param_state = None
self._seed = seed
self._fixture = fixture
self._standalone_fixtures = []
self._rs_fixtures = []
self._mongos_fixtures = []
self._setParameter_thread = None
self._auth_options = auth_options
def before_suite(self, test_report):
"""Before suite."""
for cluster in self._fixture.get_testable_clusters():
self._add_fixture(cluster)
from buildscripts.resmokelib.generate_fuzz_config.config_fuzzer_limits import (
config_fuzzer_params,
)
# Get only the mongod and mongos parameters that have "runtime" in the "fuzz_at" param value.
runtime_mongod_params = {
param: val
for param, val in config_fuzzer_params["mongod"].items()
if "runtime" in val.get("fuzz_at", [])
}
runtime_mongos_params = {
param: val
for param, val in config_fuzzer_params["mongos"].items()
if "runtime" in val.get("fuzz_at", [])
}
# Get cluster parameters
cluster_params = dict(config_fuzzer_params["cluster"].items())
from buildscripts.resmokelib import config
# Flow control-related parameters should only be fuzzed when enableFlowControl is set to True at startup.
enableFlowControl = re.search(r"enableFlowControl:\s+(\w+)", config.MONGOD_SET_PARAMETERS)
if not enableFlowControl or enableFlowControl.group(1) == "false":
runtime_mongod_params = {
k: v for k, v in runtime_mongod_params.items() if "flowControl" not in k
}
auditingEnabled = config.MONGOD_EXTRA_CONFIG.get("auditRuntimeConfiguration", "off") == "on"
if not auditingEnabled:
# auditConfig requires auditing to be enabled, so we should not fuzz it if auditing is disabled.
del cluster_params["auditConfig"]
validate_runtime_parameter_spec(runtime_mongod_params)
validate_runtime_parameter_spec(runtime_mongos_params)
validate_runtime_parameter_spec(cluster_params)
# Construct the runtime state before the suite begins.
# The initial lastSet time of each parameter is the start time of the suite.
self._mongod_param_state = RuntimeParametersState(runtime_mongod_params, self._seed)
self._mongos_param_state = RuntimeParametersState(runtime_mongos_params, self._seed)
self._cluster_param_state = RuntimeParametersState(cluster_params, self._seed)
self._setParameter_thread = _SetParameterThread(
self.logger,
self._mongos_fixtures,
self._rs_fixtures,
self._standalone_fixtures,
self._fixture,
self._mongod_param_state,
self._mongos_param_state,
self._cluster_param_state,
lifecycle_interface.FlagBasedThreadLifecycle(),
self._auth_options,
)
self.logger.info("Starting the runtime parameter fuzzing thread.")
self._setParameter_thread.start()
def after_suite(self, test_report, teardown_flag=None):
"""After suite."""
self.logger.info("Stopping the runtime parameter fuzzing thread.")
self._setParameter_thread.stop()
self.logger.info("Runtime parameter fuzzing thread stopped.")
def before_test(self, test, test_report):
"""Before test. Log current config of all runtime-fuzzable params."""
for repl_set in self._rs_fixtures:
for node in repl_set.nodes:
self._invoke_get_parameter_and_log(node)
for standalone in self._standalone_fixtures:
self._invoke_get_parameter_and_log(standalone)
for mongos in self._mongos_fixtures:
self._invoke_get_parameter_and_log(mongos)
if self._mongos_fixtures:
self._invoke_get_cluster_parameter_and_log(self._mongos_fixtures[0])
self.logger.info("Resuming the runtime parameter fuzzing thread.")
self._setParameter_thread.pause()
self._setParameter_thread.resume()
def after_test(self, test, test_report):
"""After test. Log current config of all runtime-fuzzable params."""
self.logger.info("Pausing the runtime parameter fuzzing thread.")
self._setParameter_thread.pause()
self.logger.info("Paused the runtime parameter fuzzing thread.")
for repl_set in self._rs_fixtures:
for node in repl_set.nodes:
self._invoke_get_parameter_and_log(node)
for standalone in self._standalone_fixtures:
self._invoke_get_parameter_and_log(standalone)
for mongos in self._mongos_fixtures:
self._invoke_get_parameter_and_log(mongos)
if self._mongos_fixtures:
self._invoke_get_cluster_parameter_and_log(self._mongos_fixtures[0])
def _add_fixture(self, fixture):
if isinstance(fixture, standalone.MongoDFixture):
self._standalone_fixtures.append(fixture)
elif isinstance(fixture, replicaset.ReplicaSetFixture):
self._rs_fixtures.append(fixture)
elif isinstance(fixture, shardedcluster.ShardedClusterFixture):
for shard_fixture in fixture.shards:
self._add_fixture(shard_fixture)
if fixture.config_shard is None:
self._add_fixture(fixture.configsvr)
for mongos_fixture in fixture.mongos:
self._mongos_fixtures.append(mongos_fixture)
else:
raise ValueError("No fixture to run setParameter on.")
def _invoke_get_parameter_and_log(self, node):
"""Helper to print the current state of a node's runtime-fuzzable parameters. Only usable once before_suite has initialized the runtime state of the parameters."""
client = fixture_interface.build_client(node, self._auth_options)
params_to_get = (
self._mongos_param_state.get_spec()
if client.is_mongos
else self._mongod_param_state.get_spec()
)
get_result = client.admin.command("getParameter", 1, **params_to_get)
self.logger.info(
"Current state of runtime-fuzzable parameters on node on port %d. Parameters: %s",
node.port,
get_result,
)
def _invoke_get_cluster_parameter_and_log(self, node):
"""Helper to print the current state of a cluster's fuzzable cluster parameters. Only usable once before_suite has initialized the runtime state of the parameters."""
client = fixture_interface.build_client(node, self._auth_options)
params_to_get = self._cluster_param_state.get_keys()
get_result = client.admin.command("getClusterParameter", params_to_get)
self.logger.info(
"Current state of fuzzable cluster parameters on cluster. Parameters: %s",
get_result,
)
class _SetParameterThread(threading.Thread):
def __init__(
self,
logger,
mongos_fixtures,
rs_fixtures,
standalone_fixtures,
fixture,
mongod_param_state,
mongos_param_state,
cluster_param_state,
lifecycle,
auth_options=None,
):
"""Initialize _SetParameterThread."""
threading.Thread.__init__(self, name="SetParameterThread")
self.daemon = True
self.logger = logger
self._mongos_fixtures = mongos_fixtures
self._rs_fixtures = rs_fixtures
self._standalone_fixtures = standalone_fixtures
self._fixture = fixture
self._mongod_param_state = mongod_param_state
self._mongos_param_state = mongos_param_state
self._cluster_param_state = cluster_param_state
self.__lifecycle = lifecycle
self._auth_options = auth_options
self._setparameter_interval_secs = 1
self._last_exec = time.time()
# Event set when the thread has been stopped using the 'stop()' method.
self._is_stopped_evt = threading.Event()
# Event set when the thread is not performing stepdowns.
self._is_idle_evt = threading.Event()
self._is_idle_evt.set()
def run(self):
"""Execute the thread."""
try:
while True:
self._is_idle_evt.set()
permitted = self.__lifecycle.wait_for_action_permitted()
if not permitted:
break
self._is_idle_evt.clear()
now = time.time()
if now - self._last_exec > self._setparameter_interval_secs:
self._do_set_parameter()
self._last_exec = time.time()
found_idle_request = self.__lifecycle.poll_for_idle_request()
if found_idle_request:
self.__lifecycle.send_idle_acknowledgement()
continue
# The 'wait_secs' is used to wait 'self._setparameter_interval_secs' from the moment
# the last setParameter command was sent.
now = time.time()
wait_secs = max(0, self._setparameter_interval_secs - (now - self._last_exec))
self.__lifecycle.wait_for_action_interval(wait_secs)
except Exception:
# Proactively log the exception when it happens so it will be
# flushed immediately.
self.logger.exception("SetParameter thread threw exception")
# The event should be signaled whenever the thread is not performing stepdowns.
self._is_idle_evt.set()
def stop(self):
"""Stop the thread."""
self.__lifecycle.stop()
self._is_stopped_evt.set()
# Unpause to allow the thread to finish.
self.resume()
self.join()
def pause(self):
"""Pause the thread."""
self.__lifecycle.mark_test_finished()
# Wait until we are no longer executing stepdowns.
self._is_idle_evt.wait()
# Check if the thread is alive in case it has thrown an exception while running.
self._check_thread()
# Check that fixtures are still running
for rs_fixture in self._rs_fixtures:
if not rs_fixture.is_running():
raise errors.ServerFailure(
"ReplicaSetFixture with pids {} expected to be running in"
" SetParameter, but wasn't.".format(rs_fixture.pids())
)
for mongos_fixture in self._mongos_fixtures:
if not mongos_fixture.is_running():
raise errors.ServerFailure(
"MongoSFixture with pids {} expected to be running in"
" SetParameter, but wasn't.".format(mongos_fixture.pids())
)
def resume(self):
"""Resume the thread."""
self.__lifecycle.mark_test_started()
def _wait(self, timeout):
# Wait until stop or timeout.
self._is_stopped_evt.wait(timeout)
def _check_thread(self):
if not self.is_alive():
msg = "The setParameter thread is not running."
self.logger.error(msg)
raise errors.ServerFailure(msg)
def _do_set_parameter(self):
mongod_params_to_set = self._mongod_param_state.generate_parameters()
mongos_params_to_set = self._mongos_param_state.generate_parameters()
cluster_params_to_set = self._cluster_param_state.generate_parameters()
def invoke_set_parameter(client, params):
# Do nothing if there are no params to set this iteration.
if not params:
return
client.admin.command("setParameter", 1, **params)
def invoke_set_cluster_parameter(client, params):
for key, value in params.items():
try:
with_naive_retry(
lambda: client.admin.command("setClusterParameter", {key: value}),
# Retry on
#
# ConflictingOperationInProgress
# as setClusterParameter is a DDL operation, it might clash with other
# operations
#
# AddOrRemoveShardInProgress
# setClusterParameter explicitly clashes with that one too
extra_retryable_error_codes=[117, 414],
)
except OperationFailure as exc:
# BadValue (code 2) might happen when we do a downgrade and try to set a cluster
# variable that is not supported in the given FCV
if exc.code != 2:
raise
for repl_set in self._rs_fixtures:
self.logger.info(
"Setting parameters on all nodes of replica set %s. Parameters: %s",
repl_set.replset_name,
mongod_params_to_set,
)
for node in repl_set.nodes:
invoke_set_parameter(
fixture_interface.build_client(node, self._auth_options), mongod_params_to_set
)
for standalone in self._standalone_fixtures:
self.logger.info(
"Setting parameters on standalone on port %d. Parameters: %s",
standalone.port,
mongod_params_to_set,
)
invoke_set_parameter(
fixture_interface.build_client(standalone, self._auth_options), mongod_params_to_set
)
for mongos in self._mongos_fixtures:
self.logger.info(
"Setting parameters on mongos port %d. Parameters: %s",
mongos.port,
mongos_params_to_set,
)
invoke_set_parameter(
fixture_interface.build_client(mongos, self._auth_options), mongos_params_to_set
)
if self._mongos_fixtures:
self.logger.info(
"Setting parameters cluster. Parameters: %s",
cluster_params_to_set,
)
invoke_set_cluster_parameter(
fixture_interface.build_client(self._mongos_fixtures[0], self._auth_options),
cluster_params_to_set,
)