mongo/buildscripts/resmokelib/testing/hooks/initialsync.py

295 lines
11 KiB
Python

"""Test hook for verifying correctness of initial sync."""
import os.path
import random
import time
import bson
import bson.errors
import pymongo.errors
from buildscripts.resmokelib import errors
from buildscripts.resmokelib.testing.fixtures import interface as fixture_interface
from buildscripts.resmokelib.testing.fixtures import replicaset
from buildscripts.resmokelib.testing.hooks import cleanup, interface, jsfile
class BackgroundInitialSync(interface.Hook):
"""BackgroundInitialSync class.
After every test, this hook checks if a background node has finished initial sync and if so,
validates it, tears it down, and restarts it.
This test accepts a parameter 'n' that specifies a number of tests after which it will wait for
replication to finish before validating and restarting the initial sync node.
This requires the ReplicaSetFixture to be started with 'start_initial_sync_node=True'. If used
at the same time as CleanEveryN, the 'n' value passed to this hook should be equal to the 'n'
value for CleanEveryN.
"""
DEFAULT_N = cleanup.CleanEveryN.DEFAULT_N
IS_BACKGROUND = True
def __init__(self, hook_logger, fixture, n=DEFAULT_N, shell_options=None):
"""Initialize BackgroundInitialSync."""
if not isinstance(fixture, replicaset.ReplicaSetFixture):
raise ValueError(
"`fixture` must be an instance of ReplicaSetFixture, not {}".format(
fixture.__class__.__name__
)
)
description = "Background Initial Sync"
interface.Hook.__init__(self, hook_logger, fixture, description)
self.n = n
self.tests_run = 0
self.random_restarts = 0
self._shell_options = shell_options
def before_test(self, test, test_report):
"""Before test execution."""
hook_test_case = BackgroundInitialSyncTestCase.create_after_test(
test.logger, test, self, self._shell_options
)
hook_test_case.configure(self.fixture)
hook_test_case.run_dynamic_test(test_report)
self.tests_run += 1
class BackgroundInitialSyncTestCase(jsfile.DynamicJSTestCase):
"""BackgroundInitialSyncTestCase class."""
JS_FILENAME = os.path.join("jstests", "hooks", "run_initial_sync_node_validation.js")
INTERRUPTED_DUE_TO_REPL_STATE_CHANGE = 11602
INTERRUPTED_DUE_TO_STORAGE_CHANGE = 355
def __init__(self, logger, test_name, description, base_test_name, hook, shell_options=None):
"""Initialize BackgroundInitialSyncTestCase."""
jsfile.DynamicJSTestCase.__init__(
self,
logger,
test_name,
description,
base_test_name,
hook,
self.JS_FILENAME,
shell_options,
)
def run_test(self):
"""Execute test hook."""
sync_node = self.fixture.get_initial_sync_node()
sync_node_conn = sync_node.mongo_client()
# If it's been 'n' tests so far, wait for the initial sync node to finish syncing.
if self._hook.tests_run >= self._hook.n:
self.logger.info(
"%d tests have been run against the fixture, waiting for initial sync"
" node to go into SECONDARY state",
self._hook.tests_run,
)
self._hook.tests_run = 0
cmd = bson.SON(
[
("replSetTest", 1),
("waitForMemberState", 2),
(
"timeoutMillis",
fixture_interface.ReplFixture.AWAIT_REPL_TIMEOUT_FOREVER_MINS * 60 * 1000,
),
]
)
while True:
try:
sync_node_conn.admin.command(cmd)
break
except pymongo.errors.AutoReconnect:
self.logger.info("AutoReconnect exception thrown, retrying...")
time.sleep(0.1)
except pymongo.errors.OperationFailure as err:
if err.code not in (
self.INTERRUPTED_DUE_TO_REPL_STATE_CHANGE,
self.INTERRUPTED_DUE_TO_STORAGE_CHANGE,
):
raise
msg = (
"Interrupted while waiting for node to reach secondary state, retrying: {}"
).format(err)
self.logger.error(msg)
# Check if the initial sync node is in SECONDARY state. If it's been 'n' tests, then it
# should have waited to be in SECONDARY state and the test should be marked as a failure.
# Otherwise, we just skip the hook and will check again after the next test.
try:
state = sync_node_conn.admin.command("replSetGetStatus").get("myState")
if state != 2:
if self._hook.tests_run == 0:
msg = "Initial sync node did not catch up after waiting 24 hours"
self.logger.exception("{0} failed: {1}".format(self._hook.description, msg))
raise errors.TestFailure(msg)
self.logger.info(
"Initial sync node is in state %d, not state SECONDARY (2)."
" Skipping BackgroundInitialSync hook for %s",
state,
self._base_test_name,
)
# If we have not restarted initial sync since the last time we ran the data
# validation, restart initial sync with a 20% probability.
if self._hook.random_restarts < 1 and random.random() < 0.2:
self.logger.info(
"randomly restarting initial sync in the middle of initial sync"
)
self.__restart_init_sync(sync_node)
self._hook.random_restarts += 1
return
except pymongo.errors.OperationFailure:
# replSetGetStatus can fail if the node is in STARTUP state. The node will soon go into
# STARTUP2 state and replSetGetStatus will succeed after the next test.
self.logger.info(
"replSetGetStatus call failed in BackgroundInitialSync hook, skipping hook for %s",
self._base_test_name,
)
return
self._hook.random_restarts = 0
# Run data validation and dbhash checking.
self._js_test_case.run_test()
self.__restart_init_sync(sync_node)
# Restarts initial sync by shutting down the node, clearing its data, and restarting it.
def __restart_init_sync(self, sync_node):
storage_engine = (
sync_node.mongo_client().admin.command("serverStatus")["storageEngine"].get("name")
)
# File copy based initial sync is not supported for in-memory storage engine.
if storage_engine == "inMemory":
init_sync_method = "logical"
else:
init_sync_method = random.choice(["logical", "fileCopyBased"])
# Tear down and restart the initial sync node to start initial sync again.
sync_node.teardown()
self.logger.info(
"Starting the initial sync node back up again with initial sync method {}".format(
init_sync_method
)
)
sync_node.mongod_options["set_parameters"]["initialSyncMethod"] = init_sync_method
sync_node.setup()
self.logger.info(fixture_interface.create_fixture_table(self.fixture))
sync_node.await_ready()
class IntermediateInitialSync(interface.Hook):
"""IntermediateInitialSync class.
This hook accepts a parameter 'n' that specifies a number of tests after which it will start up
a node to initial sync, wait for replication to finish, and then validate the data.
This requires the ReplicaSetFixture to be started with 'start_initial_sync_node=True'.
"""
DEFAULT_N = cleanup.CleanEveryN.DEFAULT_N
IS_BACKGROUND = False
def __init__(self, hook_logger, fixture, n=DEFAULT_N):
"""Initialize IntermediateInitialSync."""
if not isinstance(fixture, replicaset.ReplicaSetFixture):
raise ValueError(
"`fixture` must be an instance of ReplicaSetFixture, not {}".format(
fixture.__class__.__name__
)
)
description = "Intermediate Initial Sync"
interface.Hook.__init__(self, hook_logger, fixture, description)
self.n = n
self.tests_run = 0
def _should_run_after_test(self):
self.tests_run += 1
# If we have not run 'n' tests yet, skip this hook.
if self.tests_run < self.n:
return False
self.tests_run = 0
return True
def after_test(self, test, test_report):
"""After test execution."""
if not self._should_run_after_test():
return
hook_test_case = IntermediateInitialSyncTestCase.create_after_test(test.logger, test, self)
hook_test_case.configure(self.fixture)
hook_test_case.run_dynamic_test(test_report)
class IntermediateInitialSyncTestCase(jsfile.DynamicJSTestCase):
"""IntermediateInitialSyncTestCase class."""
JS_FILENAME = os.path.join("jstests", "hooks", "run_initial_sync_node_validation.js")
def __init__(self, logger, test_name, description, base_test_name, hook):
"""Initialize IntermediateInitialSyncTestCase."""
jsfile.DynamicJSTestCase.__init__(
self, logger, test_name, description, base_test_name, hook, self.JS_FILENAME
)
def run_test(self):
"""Execute test hook."""
sync_node = self.fixture.get_initial_sync_node()
sync_node_conn = sync_node.mongo_client()
sync_node.teardown()
self.logger.info("Starting the initial sync node back up again...")
sync_node.setup()
sync_node.await_ready()
# Do initial sync round.
self.logger.info("Waiting for initial sync node to go into SECONDARY state")
cmd = bson.SON(
[
("replSetTest", 1),
("waitForMemberState", 2),
(
"timeoutMillis",
fixture_interface.ReplFixture.AWAIT_REPL_TIMEOUT_FOREVER_MINS * 60 * 1000,
),
]
)
while True:
try:
sync_node_conn.admin.command(cmd)
break
except pymongo.errors.AutoReconnect:
self.logger.info("AutoReconnect exception thrown, retrying...")
time.sleep(0.1)
except pymongo.errors.OperationFailure as err:
if err.code not in (
self.INTERRUPTED_DUE_TO_REPL_STATE_CHANGE,
self.INTERRUPTED_DUE_TO_STORAGE_CHANGE,
):
raise
msg = (
"Interrupted while waiting for node to reach secondary state, retrying: {}"
).format(err)
self.logger.error(msg)
# Run data validation and dbhash checking.
self._js_test_case.run_test()