mongo/jstests/libs/override_methods/continuous_stepdown.js

374 lines
15 KiB
JavaScript

/**
* Loading this file exposes ContinuousStepdown, which contains the "configure" function that
* extends the prototype for ReplSetTest to spawn a thread that continuously step down its primary
* node.
*
* ContinuousStepdown#configure takes a configuration object with the following options:
*
* configStepdown: boolean (default true)
* True if a stepdown thread should be started for the CSRS.
*
* electionTimeoutMS: number (default 5 seconds)
* The election timeout for the replica set.
*
* shardStepdown: boolean (default true)
* True if a stepdown thread should be started for each shard replica set.
*
* stepdownDurationSecs: number (default 10 seconds)
* Number of seconds after stepping down as primary for which the node is not re-electable.
*
* stepdownIntervalMS: number (default 8 seconds)
* Number of milliseconds to wait after issuing a step down command, and discovering the new
* primary.
*
* catchUpTimeoutMS: number (default 0 seconds)
* The amount of time allowed for newly-elected primaries to catch up.
*/
import {Thread} from "jstests/libs/parallelTester.js";
import {ReplSetTest} from "jstests/libs/replsettest.js";
import {ShardingTest} from "jstests/libs/shardingtest.js";
import {reconfig, reconnect} from "jstests/replsets/rslib.js";
export class ContinuousStepdown {
/**
* Defines two methods on ReplSetTest, startContinuousFailover and stopContinuousFailover, that
* allow starting and stopping a separate thread that will periodically step down the replica
* set's primary node. Also defines these methods on ShardingTest, which allow starting and
* stopping a stepdown thread for the test's config server replica set and each of the shard
* replica sets, as specified by the given stepdownOptions object.
*/
static configure(stepdownOptions, {verbositySetting: verbositySetting = {}} = {}) {
const defaultOptions = {
configStepdown: true,
electionTimeoutMS: 5 * 1000,
shardStepdown: true,
stepdownDurationSecs: 10,
stepdownIntervalMS: 8 * 1000,
catchUpTimeoutMS: 0,
};
stepdownOptions = Object.merge(defaultOptions, stepdownOptions);
verbositySetting = tojson(verbositySetting);
return {
ReplSetTestWithContinuousPrimaryStepdown: makeReplSetTestWithContinuousPrimaryStepdown(
stepdownOptions,
verbositySetting,
),
ShardingTestWithContinuousPrimaryStepdown: makeShardingTestWithContinuousPrimaryStepdown(
stepdownOptions,
verbositySetting,
),
};
}
}
/**
* Helper class to manage the Thread instance that will continuously step down the primary
* node.
*/
const StepdownThread = function () {
let _counter = null;
let _thread = null;
/**
* This function is intended to be called in a separate thread and it continuously
* steps down the current primary for a number of attempts.
*
* @param {CountDownLatch} stopCounter Object, which can be used to stop the thread.
*
* @param {string} seedNode The connection string of a node from which to discover
* the primary of the replica set.
*
* @param {Object} options Configuration object with the following fields:
* stepdownDurationSecs {integer}: The number of seconds after stepping down the
* primary for which the node is not re-electable.
* stepdownIntervalMS {integer}: The number of milliseconds to wait after
* issuing a step down command.
*
* @return Object with the following fields:
* ok {integer}: 0 if it failed, 1 if it succeeded.
* error {string}: Only present if ok == 0. Contains the cause for the error.
* stack {string}: Only present if ok == 0. Contains the stack at the time of
* the error.
*/
function _continuousPrimaryStepdownFn(stopCounter, seedNode, options) {
jsTest.log.info("*** Continuous stepdown thread running with seed node " + seedNode);
try {
// The config primary may unexpectedly step down during startup if under heavy
// load and too slowly processing heartbeats.
const replSet = new ReplSetTest(seedNode);
let primary = replSet.getPrimary();
while (stopCounter.getCount() > 0) {
jsTest.log.info("*** Stepping down " + primary);
// The command may fail if the node is no longer primary or is in the process of
// stepping down.
assert.commandWorkedOrFailedWithCode(
primary.adminCommand({replSetStepDown: options.stepdownDurationSecs, force: true}),
[ErrorCodes.NotWritablePrimary, ErrorCodes.ConflictingOperationInProgress],
);
// Wait for primary to get elected and allow the test to make some progress
// before attempting another stepdown.
if (stopCounter.getCount() > 0) {
primary = replSet.getPrimary();
}
if (stopCounter.getCount() > 0) {
sleep(options.stepdownIntervalMS);
}
}
jsTest.log.info("*** Continuous stepdown thread completed successfully");
return {ok: 1};
} catch (e) {
jsTest.log.info("*** Continuous stepdown thread caught exception", {error: e});
return {ok: 0, error: e.toString(), stack: e.stack};
}
}
/**
* Returns true if the stepdown thread has been created and started.
*/
this.hasStarted = function () {
return !!_thread;
};
/**
* Spawns a Thread using the given seedNode to discover the replica set.
*/
this.start = function (seedNode, options) {
if (_thread) {
throw new Error("Continuous stepdown thread is already active");
}
_counter = new CountDownLatch(1);
_thread = new Thread(_continuousPrimaryStepdownFn, _counter, seedNode, options);
_thread.start();
};
/**
* Sets the stepdown thread's counter to 0, and waits for it to finish. Throws if the
* stepdown thread did not exit successfully.
*/
this.stop = function () {
if (!_thread) {
throw new Error("Continuous stepdown thread is not active");
}
_counter.countDown();
_counter = null;
_thread.join();
const retVal = _thread.returnData();
_thread = null;
assert.commandWorked(retVal);
};
};
/**
* Overrides the ReplSetTest constructor to start the continuous primary stepdown thread.
*/
function makeReplSetTestWithContinuousPrimaryStepdown(stepdownOptions, verbositySetting) {
return class ReplSetTestWithContinuousPrimaryStepdown extends ReplSetTest {
constructor(options) {
super(options);
// Handle for the continuous stepdown thread.
this._stepdownThread = new StepdownThread();
// Preserve the original set of nodeOptions passed to the constructor.
this._origNodeOpts = Object.assign({}, (options && options.nodeOptions) || {});
}
/**
* Overrides startSet call to increase logging verbosity. Ensure that we only override the
* 'logComponentVerbosity' server parameter, but retain any other parameters that were
* supplied during ReplSetTest construction.
*/
startSet(options, restart) {
// Helper function to convert a string representation of setParameter to object form.
function setParamToObj(setParam) {
if (typeof setParam === "string") {
let eqIdx = setParam.indexOf("=");
if (eqIdx != -1) {
let param = setParam.substring(0, eqIdx);
let value = setParam.substring(eqIdx + 1);
return {[param]: value};
}
}
return Object.assign({}, setParam || {});
}
options = options || {};
options.setParameter = Object.assign(
setParamToObj(this._origNodeOpts.setParameter),
setParamToObj(options.setParameter),
{logComponentVerbosity: verbositySetting},
);
return super.startSet(options, restart);
}
/**
* Overrides stopSet to terminate the failover thread.
*/
stopSet(signal, forRestart, options) {
this.stopContinuousFailover({waitForPrimary: true});
super.stopSet(signal, forRestart, options);
}
/**
* Overrides awaitLastOpCommitted to retry on network errors.
*/
awaitLastOpCommitted() {
return retryOnNetworkError(() => super.awaitLastOpCommitted());
}
/**
* Reconfigures the replica set, then starts the stepdown thread. As part of the new
* config, this sets:
* - electionTimeoutMillis to stepdownOptions.electionTimeoutMS so a new primary can
* get elected before the stepdownOptions.stepdownIntervalMS period would cause one
* to step down again.
* - catchUpTimeoutMillis to stepdownOptions.catchUpTimeoutMS. Lower values increase
* the likelihood and volume of rollbacks.
*/
startContinuousFailover() {
if (this._stepdownThread.hasStarted()) {
throw new Error("Continuous failover thread is already active");
}
const rsconfig = this.getReplSetConfigFromNode();
const shouldUpdateElectionTimeout =
rsconfig.settings.electionTimeoutMillis !== stepdownOptions.electionTimeoutMS;
const shouldUpdateCatchUpTimeout =
rsconfig.settings.catchUpTimeoutMillis !== stepdownOptions.catchUpTimeoutMS;
if (shouldUpdateElectionTimeout || shouldUpdateCatchUpTimeout) {
rsconfig.settings.electionTimeoutMillis = stepdownOptions.electionTimeoutMS;
rsconfig.settings.catchUpTimeoutMillis = stepdownOptions.catchUpTimeoutMS;
rsconfig.version += 1;
reconfig(this, rsconfig);
const newSettings = this.getReplSetConfigFromNode().settings;
assert.eq(
newSettings.electionTimeoutMillis,
stepdownOptions.electionTimeoutMS,
"Failed to set the electionTimeoutMillis to " +
stepdownOptions.electionTimeoutMS +
" milliseconds.",
);
assert.eq(
newSettings.catchUpTimeoutMillis,
stepdownOptions.catchUpTimeoutMS,
"Failed to set the catchUpTimeoutMillis to " + stepdownOptions.catchUpTimeoutMS + " milliseconds.",
);
}
this._stepdownThread.start(this.nodes[0].host, stepdownOptions);
}
/**
* Blocking method, which tells the thread running continuousPrimaryStepdownFn to stop
* and waits for it to terminate.
*
* If waitForPrimary is true, blocks until a new primary has been elected and reestablishes
* all connections.
*/
stopContinuousFailover({waitForPrimary: waitForPrimary = false} = {}) {
if (!this._stepdownThread.hasStarted()) {
return;
}
this._stepdownThread.stop();
if (waitForPrimary) {
this.getPrimary();
this.nodes.forEach((node) => reconnect(node));
}
}
};
}
/**
* Overrides the ShardingTest constructor to start the continuous primary stepdown thread.
*/
function makeShardingTestWithContinuousPrimaryStepdown(stepdownOptions, verbositySetting) {
return class ShardingTestWithContinuousPrimaryStepdown extends ShardingTest {
constructor(params) {
params.other = params.other || {};
if (stepdownOptions.configStepdown) {
params.other.configOptions = params.other.configOptions || {};
params.other.configOptions.setParameter = params.other.configOptions.setParameter || {};
params.other.configOptions.setParameter.logComponentVerbosity = verbositySetting;
}
if (stepdownOptions.shardStepdown) {
params.other.rsOptions = params.other.rsOptions || {};
params.other.rsOptions.setParameter = params.other.rsOptions.setParameter || {};
params.other.rsOptions.setParameter.logComponentVerbosity = verbositySetting;
}
// Construct the original object.
super(params);
// Validate the stepdown options.
if (stepdownOptions.configStepdown && !this.configRS) {
throw new Error("Continuous config server primary step down only available with CSRS");
}
if (stepdownOptions.shardStepdown && this._rs.some((rst) => !rst)) {
throw new Error("Continuous shard primary step down only available with replica set shards");
}
}
/**
* Calls startContinuousFailover on the config server and/or each shard replica set as
* specifed by the stepdownOptions object.
*/
startContinuousFailover() {
if (stepdownOptions.configStepdown) {
this.configRS.startContinuousFailover();
}
if (stepdownOptions.shardStepdown) {
this._rs.forEach(function (rst) {
rst.test.startContinuousFailover();
});
}
}
/**
* Calls stopContinuousFailover on the config server and each shard replica set as
* specified by the stepdownOptions object.
*
* If waitForPrimary is true, blocks until each replica set has elected a primary.
*/
stopContinuousFailover({waitForPrimary: waitForPrimary = false} = {}) {
if (stepdownOptions.configStepdown) {
this.configRS.stopContinuousFailover({waitForPrimary: waitForPrimary});
}
if (stepdownOptions.shardStepdown) {
this._rs.forEach(function (rst) {
rst.test.stopContinuousFailover({waitForPrimary: waitForPrimary});
});
}
}
/**
* This method is disabled because it runs aggregation, which doesn't handle config
* server stepdown correctly.
*/
printShardingStatus() {}
};
}