mongo/jstests/replsets/initial_sync_during_stepdow...

178 lines
7.3 KiB
JavaScript

/**
* Test that stepdown during collection cloning and oplog fetching does not interrupt initial sync.
*/
import {waitForCurOpByFailPointNoNS} from "jstests/libs/curop_helpers.js";
import {configureFailPoint, kDefaultWaitForFailPointTimeout} from "jstests/libs/fail_point_util.js";
import {ReplSetTest} from "jstests/libs/replsettest.js";
const testName = "initialSyncDuringStepDown";
const dbName = testName;
const collName = "testcoll";
// Start a 3 node replica set to avoid primary step down after secondary restart.
const rst = new ReplSetTest({
nodes: [{}, {rsConfig: {priority: 0}}, {arbiter: true}],
settings: {chainingAllowed: false},
});
rst.startSet();
rst.initiate(null, null, {initiateWithDefaultElectionTimeout: true});
let primary = rst.getPrimary();
let primaryDB = primary.getDB(dbName);
let primaryAdmin = primary.getDB("admin");
let primaryColl = primaryDB[collName];
let secondary = rst.getSecondary();
let secondaryDB = secondary.getDB(dbName);
let secondaryColl = secondaryDB[collName];
let dbNss = primaryDB.getName();
let collNss = primaryColl.getFullName();
// The default WC is majority and this test can't satisfy majority writes.
assert.commandWorked(
primary.adminCommand({setDefaultRWConcern: 1, defaultWriteConcern: {w: 1}, writeConcern: {w: "majority"}}),
);
function setupTest({
failPoint,
nss: nss = "",
nssSuffix: nssSuffix = "",
secondaryStartupParams: secondaryStartupParams = {},
}) {
assert.commandWorked(primary.adminCommand({clearLog: "global"}));
jsTestLog("Writing data to collection.");
assert.commandWorked(primaryColl.insert([{_id: 1}, {_id: 2}]));
jsTestLog("Stopping secondary.");
rst.stop(secondary);
jsTestLog("Enabling failpoint '" + failPoint + "' on primary (sync source).");
configureFailPoint(primary, failPoint, {nss: nss + nssSuffix, shouldCheckForInterrupt: true});
jsTestLog("Starting secondary.");
secondaryStartupParams["numInitialSyncAttempts"] = 1;
// Skip clearing initial sync progress after a successful initial sync attempt so that we
// can check initialSyncStatus fields after initial sync is complete.
secondaryStartupParams["failpoint.skipClearInitialSyncState"] = tojson({mode: "alwaysOn"});
secondary = rst.start(secondary, {startClean: true, setParameter: secondaryStartupParams});
secondaryDB = secondary.getDB(dbName);
secondaryColl = secondaryDB[collName];
// Wait until secondary reaches RS_STARTUP2 state.
rst.waitForState(secondary, ReplSetTest.State.STARTUP_2);
}
function finishTest({failPoint, nss: nss = "", DocsCopiedByOplogFetcher: DocsCopiedByOplogFetcher = 0}) {
jsTestLog("Waiting for primary to reach failPoint '" + failPoint + "'.");
waitForCurOpByFailPointNoNS(primaryAdmin, failPoint);
jsTestLog("Making primary step down");
const joinStepDownThread = startParallelShell(() => {
assert.commandWorked(db.adminCommand({"replSetStepDown": 30 * 60, "force": true}));
}, primary.port);
// Wait until the step down has started to kill user operations.
checkLog.contains(primary, "Starting to kill user operations");
jsTestLog("Allowing initial sync to continue.");
configureFailPoint(primaryAdmin, failPoint, {}, "off");
jsTestLog("Waiting for initial sync to complete.");
rst.awaitSecondaryNodes(null, [secondary]);
// Wait until the primary transitioned to SECONDARY state.
joinStepDownThread();
rst.awaitSecondaryNodes(null, [primary]);
jsTestLog("Validating initial sync data.");
let res = assert.commandWorked(secondary.adminCommand({replSetGetStatus: 1}));
assert.eq(0, res.initialSyncStatus.failedInitialSyncAttempts);
assert.eq(2 + DocsCopiedByOplogFetcher, secondaryColl.find().itcount());
// As checkReplicatedDataHashes requires primary to validate the cloned data, we need to
// unfreeze the old primary and make it re-elected.
assert.commandWorked(primaryAdmin.adminCommand({replSetFreeze: 0}));
rst.getPrimary();
rst.checkReplicatedDataHashes();
jsTestLog("Dropping collection '" + collName + "'.");
assert(primaryColl.drop());
}
function runStepDownTest(params) {
setupTest(params);
finishTest(params);
}
jsTestLog("Testing stepdown while 'databases' cloner lists databases.");
runStepDownTest({failPoint: "hangBeforeListDatabases"});
jsTestLog("Testing stepdown while 'database' cloner lists collections.");
runStepDownTest({failPoint: "hangBeforeListCollections", nss: dbNss, nssSuffix: ".$cmd.listCollections"});
jsTestLog("Testing stepdown while 'collection' cloner performs collection count.");
runStepDownTest({failPoint: "hangBeforeCollectionCount", nss: collNss});
jsTestLog("Testing stepdown while 'collection' cloner list indexes for a collection.");
runStepDownTest({failPoint: "hangBeforeListIndexes", nss: collNss});
jsTestLog("Testing stepdown while 'collection' cloner clones collection data.");
runStepDownTest({failPoint: "waitInFindBeforeMakingBatch", nss: collNss});
jsTestLog("Testing stepdown between collection data batches.");
runStepDownTest({
failPoint: "waitWithPinnedCursorDuringGetMoreBatch",
nss: collNss,
secondaryStartupParams: {collectionClonerBatchSize: 1},
});
// Restart secondary with "oplogFetcherInitialSyncMaxFetcherRestarts"
// set to zero to avoid masking the oplog fetcher error and an increased oplog fetcher network
// timeout to avoid spurious failures. Enable fail point "waitAfterPinningCursorBeforeGetMoreBatch"
// which drops and reacquires read lock to prevent deadlock between getmore and insert thread for
// ephemeral storage engine.
jsTestLog("Testing stepdown during oplog fetching");
const oplogNss = "local.oplog.rs";
setupTest({
failPoint: "waitAfterPinningCursorBeforeGetMoreBatch",
nss: oplogNss,
secondaryStartupParams: {
initialSyncOplogFetcherBatchSize: 1,
oplogFetcherInitialSyncMaxFetcherRestarts: 0,
oplogNetworkTimeoutBufferSeconds: ReplSetTest.kDefaultTimeoutMS / 1000,
"failpoint.initialSyncHangAfterDataCloning": tojson({mode: "alwaysOn"}),
},
});
jsTestLog("Waiting for collection cloning to complete.");
assert.commandWorked(
secondary.adminCommand({
waitForFailPoint: "initialSyncHangAfterDataCloning",
timesEntered: 1,
maxTimeMS: kDefaultWaitForFailPointTimeout,
}),
);
// Insert more data so that these are replicated to secondary node via oplog fetcher.
jsTestLog("Inserting more data on primary.");
assert.commandWorked(primaryColl.insert([{_id: 3}, {_id: 4}]));
// Insert is successful. So, enable fail point "waitWithPinnedCursorDuringGetMoreBatch"
// such that it doesn't drop locks when getmore cmd waits inside the fail point block.
configureFailPoint(primary, "waitWithPinnedCursorDuringGetMoreBatch", {nss: oplogNss, shouldCheckForInterrupt: true});
// Now, disable fail point "waitAfterPinningCursorBeforeGetMoreBatch" to allow getmore to
// continue and hang on "waitWithPinnedCursorDuringGetMoreBatch" fail point.
configureFailPoint(primary, "waitAfterPinningCursorBeforeGetMoreBatch", {}, "off");
// Disable fail point on secondary to allow initial sync to continue.
assert.commandWorked(secondary.adminCommand({configureFailPoint: "initialSyncHangAfterDataCloning", mode: "off"}));
finishTest({
failPoint: "waitWithPinnedCursorDuringGetMoreBatch",
nss: "local.oplog.rs",
DocsCopiedByOplogFetcher: 2,
});
rst.stopSet();