mirror of https://github.com/mongodb/mongo
486 lines
20 KiB
Python
486 lines
20 KiB
Python
"""Test hook that periodically makes the primary of a replica set step down."""
|
|
|
|
import collections
|
|
import os.path
|
|
import random
|
|
import threading
|
|
import time
|
|
|
|
import pymongo.errors
|
|
|
|
from buildscripts.resmokelib import errors
|
|
from buildscripts.resmokelib.testing.fixtures import interface as fixture_interface
|
|
from buildscripts.resmokelib.testing.fixtures import replicaset, shardedcluster
|
|
from buildscripts.resmokelib.testing.hooks import interface
|
|
from buildscripts.resmokelib.testing.hooks import lifecycle as lifecycle_interface
|
|
|
|
|
|
class ContinuousStepdown(interface.Hook):
|
|
"""Regularly connect to replica sets and send a replSetStepDown command."""
|
|
|
|
DESCRIPTION = (
|
|
"Continuous stepdown (steps down the primary of replica sets at regular" " intervals)"
|
|
)
|
|
|
|
IS_BACKGROUND = True
|
|
|
|
# The hook stops the fixture partially during its execution.
|
|
STOPS_FIXTURE = True
|
|
|
|
# The various kill methods this hook can execute.
|
|
STEPDOWN = 0
|
|
TERMINATE = 1
|
|
KILL = 2
|
|
|
|
def __init__(
|
|
self,
|
|
hook_logger,
|
|
fixture,
|
|
config_stepdown=True,
|
|
shard_stepdown=True,
|
|
stepdown_interval_ms=8000,
|
|
terminate=False,
|
|
kill=False,
|
|
randomize_kill=False,
|
|
is_fsm_workload=False,
|
|
background_reconfig=False,
|
|
auth_options=None,
|
|
should_downgrade=False,
|
|
):
|
|
"""Initialize the ContinuousStepdown.
|
|
|
|
Args:
|
|
hook_logger: the logger instance for this hook.
|
|
fixture: the target fixture (replica sets or a sharded cluster).
|
|
config_stepdown: whether to stepdown the CSRS.
|
|
shard_stepdown: whether to stepdown the shard replica sets in a sharded cluster.
|
|
stepdown_interval_ms: the number of milliseconds between stepdowns.
|
|
terminate: shut down the node cleanly as a means of stepping it down.
|
|
kill: With a 50% probability, kill the node instead of shutting it down cleanly.
|
|
randomize_kill: Randomly kill, terminate or stepdown.
|
|
is_fsm_workload: Whether the hook is running as an FSM workload is executing
|
|
auth_options: dictionary of auth options.
|
|
background_reconfig: whether to conduct reconfig in the background.
|
|
should_downgrade: whether dowgrades should be performed as part of the stepdown.
|
|
|
|
Note that the "terminate" and "kill" arguments are named after the "SIGTERM" and
|
|
"SIGKILL" signals that are used to stop the process. On Windows, there are no signals,
|
|
so we use a different means to achieve the same result as sending SIGTERM or SIGKILL.
|
|
"""
|
|
interface.Hook.__init__(self, hook_logger, fixture, ContinuousStepdown.DESCRIPTION)
|
|
|
|
self._fixture = fixture
|
|
if hasattr(fixture, "config_shard") and fixture.config_shard is not None and shard_stepdown:
|
|
# If the config server is a shard, shard_stepdown implies config_stepdown.
|
|
config_stepdown = shard_stepdown
|
|
|
|
self._config_stepdown = config_stepdown
|
|
self._shard_stepdown = shard_stepdown
|
|
self._stepdown_interval_secs = float(stepdown_interval_ms) / 1000
|
|
|
|
self._rs_fixtures = []
|
|
self._mongos_fixtures = []
|
|
self._stepdown_thread = None
|
|
|
|
# kill implies terminate.
|
|
self._terminate = terminate or kill
|
|
self._kill = kill
|
|
self._randomize_kill = randomize_kill
|
|
|
|
self._background_reconfig = background_reconfig
|
|
self._auth_options = auth_options
|
|
self._should_downgrade = should_downgrade
|
|
|
|
# The action file names need to match the same construction as found in
|
|
# jstests/concurrency/fsm_libs/resmoke_runner.js.
|
|
dbpath_prefix = fixture.get_dbpath_prefix()
|
|
|
|
# When running an FSM workload, we use the file-based lifecycle protocol
|
|
# in which a file is used as a form of communication between the hook and
|
|
# the FSM workload to decided when the hook is allowed to run.
|
|
if is_fsm_workload:
|
|
# Each hook uses a unique set of action files - the uniqueness is brought
|
|
# about by using the hook's name as a suffix.
|
|
self.__action_files = lifecycle_interface.ActionFiles._make(
|
|
[
|
|
os.path.join(dbpath_prefix, field + "_" + self.__class__.__name__)
|
|
for field in lifecycle_interface.ActionFiles._fields
|
|
]
|
|
)
|
|
else:
|
|
self.__action_files = None
|
|
|
|
def before_suite(self, test_report):
|
|
"""Before suite."""
|
|
if not self._rs_fixtures:
|
|
for cluster in self._fixture.get_testable_clusters():
|
|
self._add_fixture(cluster)
|
|
|
|
if self.__action_files is not None:
|
|
lifecycle = lifecycle_interface.FileBasedThreadLifecycle(self.__action_files)
|
|
else:
|
|
lifecycle = lifecycle_interface.FlagBasedThreadLifecycle()
|
|
|
|
self._stepdown_thread = _StepdownThread(
|
|
self.logger,
|
|
self._mongos_fixtures,
|
|
self._rs_fixtures,
|
|
self._stepdown_interval_secs,
|
|
self._terminate,
|
|
self._kill,
|
|
self._randomize_kill,
|
|
lifecycle,
|
|
self._background_reconfig,
|
|
self._fixture,
|
|
self._auth_options,
|
|
self._should_downgrade,
|
|
)
|
|
self.logger.info("Starting the stepdown thread.")
|
|
self._stepdown_thread.start()
|
|
|
|
def after_suite(self, test_report, teardown_flag=None):
|
|
"""After suite."""
|
|
self.logger.info("Stopping the stepdown thread.")
|
|
self._stepdown_thread.stop()
|
|
self.logger.info("Stepdown thread stopped.")
|
|
|
|
def before_test(self, test, test_report):
|
|
"""Before test."""
|
|
self.logger.info("Resuming the stepdown thread.")
|
|
self._stepdown_thread.pause()
|
|
self._stepdown_thread.resume()
|
|
|
|
def after_test(self, test, test_report):
|
|
"""After test."""
|
|
self.logger.info("Pausing the stepdown thread.")
|
|
self._stepdown_thread.pause()
|
|
self.logger.info("Paused the stepdown thread.")
|
|
|
|
def _add_fixture(self, fixture):
|
|
if isinstance(fixture, replicaset.ReplicaSetFixture):
|
|
if not fixture.all_nodes_electable:
|
|
raise ValueError(
|
|
"The replica sets that are the target of the ContinuousStepdown hook must have"
|
|
" the 'all_nodes_electable' option set."
|
|
)
|
|
self._rs_fixtures.append(fixture)
|
|
elif isinstance(fixture, shardedcluster.ShardedClusterFixture):
|
|
if self._shard_stepdown:
|
|
for shard_fixture in fixture.shards:
|
|
if fixture.config_shard is None or self._config_stepdown:
|
|
self._add_fixture(shard_fixture)
|
|
if self._config_stepdown and fixture.config_shard is None:
|
|
self._add_fixture(fixture.configsvr)
|
|
for mongos_fixture in fixture.mongos:
|
|
self._mongos_fixtures.append(mongos_fixture)
|
|
|
|
|
|
class _StepdownThread(threading.Thread):
|
|
def __init__(
|
|
self,
|
|
logger,
|
|
mongos_fixtures,
|
|
rs_fixtures,
|
|
stepdown_interval_secs,
|
|
terminate,
|
|
kill,
|
|
randomize_kill,
|
|
stepdown_lifecycle,
|
|
background_reconfig,
|
|
fixture,
|
|
auth_options=None,
|
|
should_downgrade=False,
|
|
):
|
|
"""Initialize _StepdownThread."""
|
|
threading.Thread.__init__(self, name="StepdownThread")
|
|
self.daemon = True
|
|
self.logger = logger
|
|
self._mongos_fixtures = mongos_fixtures
|
|
self._rs_fixtures = rs_fixtures
|
|
self._stepdown_interval_secs = stepdown_interval_secs
|
|
# We set the self._stepdown_duration_secs to a very long time, to ensure that the former
|
|
# primary will not step back up on its own and the stepdown thread will cause it step up via
|
|
# replSetStepUp.
|
|
self._stepdown_duration_secs = 24 * 60 * 60 # 24 hours
|
|
self._terminate = terminate
|
|
self._kill = kill
|
|
self._randomize_kill = randomize_kill
|
|
self.__lifecycle = stepdown_lifecycle
|
|
self._background_reconfig = background_reconfig
|
|
self._fixture = fixture
|
|
self._auth_options = auth_options
|
|
self._should_downgrade = should_downgrade
|
|
|
|
self._last_exec = time.time()
|
|
# Event set when the thread has been stopped using the 'stop()' method.
|
|
self._is_stopped_evt = threading.Event()
|
|
# Event set when the thread is not performing stepdowns.
|
|
self._is_idle_evt = threading.Event()
|
|
self._is_idle_evt.set()
|
|
|
|
self._step_up_stats = collections.Counter()
|
|
|
|
def run(self):
|
|
"""Execute the thread."""
|
|
if not self._rs_fixtures:
|
|
self.logger.warning("No replica set on which to run stepdowns.")
|
|
return
|
|
|
|
try:
|
|
while True:
|
|
self._is_idle_evt.set()
|
|
|
|
permitted = self.__lifecycle.wait_for_action_permitted()
|
|
if not permitted:
|
|
break
|
|
|
|
self._is_idle_evt.clear()
|
|
|
|
now = time.time()
|
|
if now - self._last_exec > self._stepdown_interval_secs:
|
|
self.logger.info("Starting stepdown of all primaries")
|
|
self._step_down_all()
|
|
# Wait until each replica set has a primary, so the test can make progress.
|
|
self._await_primaries()
|
|
self._last_exec = time.time()
|
|
self.logger.info(
|
|
"Completed stepdown of all primaries in %0d ms",
|
|
(self._last_exec - now) * 1000,
|
|
)
|
|
|
|
found_idle_request = self.__lifecycle.poll_for_idle_request()
|
|
if found_idle_request:
|
|
self.__lifecycle.send_idle_acknowledgement()
|
|
continue
|
|
|
|
# The 'wait_secs' is used to wait 'self._stepdown_interval_secs' from the moment
|
|
# the last stepdown command was sent.
|
|
now = time.time()
|
|
wait_secs = max(0, self._stepdown_interval_secs - (now - self._last_exec))
|
|
self.__lifecycle.wait_for_action_interval(wait_secs)
|
|
except Exception:
|
|
# Proactively log the exception when it happens so it will be
|
|
# flushed immediately.
|
|
self.logger.exception("Stepdown Thread threw exception")
|
|
# The event should be signaled whenever the thread is not performing stepdowns.
|
|
self._is_idle_evt.set()
|
|
|
|
def stop(self):
|
|
"""Stop the thread."""
|
|
self.__lifecycle.stop()
|
|
self._is_stopped_evt.set()
|
|
# Unpause to allow the thread to finish.
|
|
self.resume()
|
|
self.join()
|
|
|
|
def pause(self):
|
|
"""Pause the thread."""
|
|
self.__lifecycle.mark_test_finished()
|
|
|
|
# Wait until we are no longer executing stepdowns.
|
|
self._is_idle_evt.wait()
|
|
# Check if the thread is alive in case it has thrown an exception while running.
|
|
self._check_thread()
|
|
# Wait until we all the replica sets have primaries.
|
|
self._await_primaries()
|
|
|
|
# Check that fixtures are still running
|
|
for rs_fixture in self._rs_fixtures:
|
|
if not rs_fixture.is_running():
|
|
raise errors.ServerFailure(
|
|
"ReplicaSetFixture with pids {} expected to be running in"
|
|
" ContinuousStepdown, but wasn't.".format(rs_fixture.pids())
|
|
)
|
|
for mongos_fixture in self._mongos_fixtures:
|
|
if not mongos_fixture.is_running():
|
|
raise errors.ServerFailure(
|
|
"MongoSFixture with pids {} expected to be running in"
|
|
" ContinuousStepdown, but wasn't.".format(mongos_fixture.pids())
|
|
)
|
|
|
|
def resume(self):
|
|
"""Resume the thread."""
|
|
self.__lifecycle.mark_test_started()
|
|
|
|
self.logger.info(
|
|
"Current statistics about which nodes have been successfully stepped up: %s",
|
|
self._step_up_stats,
|
|
)
|
|
|
|
def _wait(self, timeout):
|
|
# Wait until stop or timeout.
|
|
self._is_stopped_evt.wait(timeout)
|
|
|
|
def _check_thread(self):
|
|
if not self.is_alive():
|
|
msg = "The stepdown thread is not running."
|
|
self.logger.error(msg)
|
|
raise errors.ServerFailure(msg)
|
|
|
|
def _await_primaries(self):
|
|
for fixture in self._rs_fixtures:
|
|
fixture.get_primary()
|
|
|
|
def _create_client(self, node):
|
|
return fixture_interface.build_client(node, self._auth_options)
|
|
|
|
def _step_down_all(self):
|
|
for rs_fixture in self._rs_fixtures:
|
|
self._step_down(rs_fixture)
|
|
|
|
def _step_down(self, rs_fixture):
|
|
with rs_fixture.removeshard_teardown_mutex:
|
|
if rs_fixture.removeshard_teardown_marker:
|
|
return
|
|
|
|
try:
|
|
old_primary = rs_fixture.get_primary(timeout_secs=self._stepdown_interval_secs)
|
|
except errors.ServerFailure:
|
|
# We ignore the ServerFailure exception because it means a primary wasn't available.
|
|
# We'll try again after self._stepdown_interval_secs seconds.
|
|
return
|
|
|
|
secondaries = rs_fixture.get_secondaries()
|
|
|
|
self.logger.info(
|
|
"Stepping down primary on port %d of replica set '%s'",
|
|
old_primary.port,
|
|
rs_fixture.replset_name,
|
|
)
|
|
|
|
kill_method = ContinuousStepdown.STEPDOWN
|
|
if self._randomize_kill:
|
|
kill_method = random.choice(
|
|
[
|
|
ContinuousStepdown.STEPDOWN,
|
|
ContinuousStepdown.TERMINATE,
|
|
ContinuousStepdown.KILL,
|
|
]
|
|
)
|
|
elif self._kill:
|
|
kill_method = random.choice([ContinuousStepdown.TERMINATE, ContinuousStepdown.KILL])
|
|
elif self._terminate:
|
|
kill_method = ContinuousStepdown.TERMINATE
|
|
|
|
if (
|
|
kill_method == ContinuousStepdown.KILL
|
|
or kill_method == ContinuousStepdown.TERMINATE
|
|
):
|
|
if not rs_fixture.stop_primary(
|
|
old_primary, self._background_reconfig, kill_method == ContinuousStepdown.KILL
|
|
):
|
|
return
|
|
|
|
if self._should_downgrade:
|
|
new_primary = rs_fixture.change_version_and_restart_node(
|
|
old_primary, self._auth_options
|
|
)
|
|
else:
|
|
|
|
def step_up_secondary():
|
|
while secondaries:
|
|
chosen = random.choice(secondaries)
|
|
self.logger.info(
|
|
"Chose secondary on port %d of replica set '%s' for step up attempt.",
|
|
chosen.port,
|
|
rs_fixture.replset_name,
|
|
)
|
|
if not rs_fixture.stepup_node(chosen, self._auth_options):
|
|
self.logger.info(
|
|
"Attempt to step up secondary on port %d of replica set '%s' failed.",
|
|
chosen.port,
|
|
rs_fixture.replset_name,
|
|
)
|
|
secondaries.remove(chosen)
|
|
else:
|
|
return chosen
|
|
|
|
new_primary = step_up_secondary()
|
|
|
|
if (
|
|
kill_method == ContinuousStepdown.KILL
|
|
or kill_method == ContinuousStepdown.TERMINATE
|
|
):
|
|
rs_fixture.restart_node(old_primary)
|
|
|
|
if secondaries:
|
|
# We successfully stepped up a secondary, wait for the former primary to step down via
|
|
# heartbeats. We need to wait for the former primary to step down to complete this step
|
|
# down round and to avoid races between the ContinuousStepdown hook and other test hooks
|
|
# that may depend on the health of the replica set.
|
|
self.logger.info(
|
|
"Successfully stepped up the secondary on port %d of replica set '%s'.",
|
|
new_primary.port,
|
|
rs_fixture.replset_name,
|
|
)
|
|
retry_time_secs = rs_fixture.AWAIT_REPL_TIMEOUT_MINS * 60
|
|
retry_start_time = time.time()
|
|
while True:
|
|
try:
|
|
client = self._create_client(old_primary)
|
|
is_secondary = client.admin.command("isMaster")["secondary"]
|
|
if is_secondary:
|
|
break
|
|
except pymongo.errors.AutoReconnect:
|
|
pass
|
|
if time.time() - retry_start_time > retry_time_secs:
|
|
raise errors.ServerFailure(
|
|
"The old primary on port {} of replica set {} did not step down in"
|
|
" {} seconds.".format(
|
|
client.port, rs_fixture.replset_name, retry_time_secs
|
|
)
|
|
)
|
|
self.logger.info(
|
|
"Waiting for primary on port %d of replica set '%s' to step down.",
|
|
old_primary.port,
|
|
rs_fixture.replset_name,
|
|
)
|
|
time.sleep(0.2) # Wait a little bit before trying again.
|
|
self.logger.info(
|
|
"Primary on port %d of replica set '%s' stepped down.",
|
|
old_primary.port,
|
|
rs_fixture.replset_name,
|
|
)
|
|
|
|
if not secondaries:
|
|
# If we failed to step up one of the secondaries, then we run the replSetStepUp to try
|
|
# and elect the former primary again. This way we don't need to wait
|
|
# self._stepdown_duration_secs seconds to restore write availability to the cluster.
|
|
# Since the former primary may have been killed, we need to wait until it has been
|
|
# restarted by retrying replSetStepUp.
|
|
|
|
retry_time_secs = rs_fixture.AWAIT_REPL_TIMEOUT_MINS * 60
|
|
retry_start_time = time.time()
|
|
while True:
|
|
try:
|
|
client = self._create_client(old_primary)
|
|
client.admin.command("replSetStepUp")
|
|
is_primary = client.admin.command("isMaster")["ismaster"]
|
|
# There is a chance that the old master is still in catchup stage when we issue replSetStepUp
|
|
# then it will step down due to term change in the previous election failure. We should ensure the old
|
|
# primary becomes a writable primary here, or there will have no primary for a day.
|
|
if is_primary:
|
|
break
|
|
else:
|
|
self._wait(0.2)
|
|
except pymongo.errors.AutoReconnect:
|
|
self.logger.info("AutoReconnect exception thrown, retrying...")
|
|
time.sleep(0.1)
|
|
except pymongo.errors.OperationFailure:
|
|
self._wait(0.2)
|
|
if time.time() - retry_start_time > retry_time_secs:
|
|
raise errors.ServerFailure(
|
|
"The old primary on port {} of replica set {} did not step up in"
|
|
" {} seconds.".format(
|
|
client.port, rs_fixture.replset_name, retry_time_secs
|
|
)
|
|
)
|
|
|
|
# Bump the counter for the chosen secondary to indicate that the replSetStepUp command
|
|
# executed successfully.
|
|
key = "{}/{}".format(
|
|
rs_fixture.replset_name,
|
|
new_primary.get_internal_connection_string() if secondaries else "none",
|
|
)
|
|
self._step_up_stats[key] += 1
|