mongo/jstests/sharding/resharding_abort_command.js

404 lines
15 KiB
JavaScript

/**
* Test to make sure that the abort command interrupts a resharding operation that has not yet
* persisted a decision.
*
* @tags: [uses_atclustertime]
*/
import {DiscoverTopology} from "jstests/libs/discover_topology.js";
import {configureFailPoint} from "jstests/libs/fail_point_util.js";
import {Thread} from "jstests/libs/parallelTester.js";
import {ReshardingTest} from "jstests/sharding/libs/resharding_test_fixture.js";
const originalCollectionNs = "reshardingDb.coll";
const enterAbortFailpointName = "reshardingPauseCoordinatorBeforeStartingErrorFlow";
const nodeTypeEnum = {
COORDINATOR: 1,
DONOR: 2,
RECIPIENT: 3,
NO_EXTRA_FAILPOINTS_SENTINEL: 4,
};
const abortLocationEnum = {
BEFORE_STEADY_STATE: 1,
BEFORE_DECISION_PERSISTED: 2,
AFTER_DECISION_PERSISTED: 3,
};
let getConnStringsFromNodeType = (nodeType, reshardingTest, topology) => {
let connStrings = [];
if (nodeType == nodeTypeEnum.COORDINATOR) {
connStrings.push(topology.configsvr.nodes[0]);
} else if (nodeType == nodeTypeEnum.DONOR) {
for (let donor of reshardingTest.donorShardNames) {
connStrings.push(topology.shards[donor].primary);
}
} else if (nodeType == nodeTypeEnum.RECIPIENT) {
for (let recipient of reshardingTest.recipientShardNames) {
connStrings.push(topology.shards[recipient].primary);
}
} else if (nodeType == nodeTypeEnum.NO_EXTRA_FAILPOINTS_SENTINEL) {
//
} else {
throw "unsupported node type in resharding abort test";
}
return connStrings;
};
let getMongosFromConnStrings = (connStrings) => {
let mongos = [];
for (let conn of connStrings) {
mongos.push(new Mongo(conn));
}
return mongos;
};
let generateFailpoints = (failpointName, failpointNodeType, reshardingTest, toplogy, failpointMode = "alwaysOn") => {
const failpointTargetConnStrings = getConnStringsFromNodeType(failpointNodeType, reshardingTest, toplogy);
const failpointHosts = getMongosFromConnStrings(failpointTargetConnStrings);
let failpoints = [];
for (let host of failpointHosts) {
failpoints.push(configureFailPoint(host, failpointName, {} /* data */, failpointMode));
}
return failpoints;
};
let generateAbortThread = (mongosConnString, ns, expectedErrorCodes) => {
return new Thread(
(mongosConnString, ns, expectedErrorCodes) => {
const mongos = new Mongo(mongosConnString);
if (expectedErrorCodes == ErrorCodes.OK) {
assert.commandWorked(mongos.adminCommand({abortReshardCollection: ns}));
} else {
assert.commandFailedWithCode(mongos.adminCommand({abortReshardCollection: ns}), expectedErrorCodes);
}
},
mongosConnString,
ns,
expectedErrorCodes,
);
};
let triggerAbortAndCoordinateFailpoints = (
failpointName,
failpointNodeType,
reshardingTest,
topology,
mongos,
configsvr,
abortThread,
failpoints,
executeBeforeWaitingOnFailpointsFn,
executeAfterWaitingOnFailpointsFn,
executeAfterAbortingFn,
) => {
if (executeBeforeWaitingOnFailpointsFn) {
jsTestLog(`Executing the before-waiting-on-failpoint function`);
executeBeforeWaitingOnFailpointsFn(mongos, originalCollectionNs);
}
if (failpointNodeType != nodeTypeEnum.NO_EXTRA_FAILPOINTS_SENTINEL) {
jsTestLog(`Wait for the failpoint ${failpointName} to be reached on all applicable nodes`);
for (let failpoint of failpoints) {
failpoint.wait();
}
}
if (executeAfterWaitingOnFailpointsFn) {
jsTestLog(`Executing the after-waiting-on-failpoint function`);
executeAfterWaitingOnFailpointsFn(reshardingTest, topology, mongos, originalCollectionNs);
}
jsTestLog(`Wait for the coordinator to recognize that it's been aborted`);
const enterAbortFailpoint = configureFailPoint(configsvr, enterAbortFailpointName);
abortThread.start();
enterAbortFailpoint.wait();
if (executeAfterAbortingFn) {
jsTestLog(`Executing the after-aborting function`);
executeAfterAbortingFn(reshardingTest, topology, mongos, originalCollectionNs);
}
enterAbortFailpoint.off();
if (failpointNodeType != nodeTypeEnum.NO_EXTRA_FAILPOINTS_SENTINEL) {
jsTestLog(
`Turn off the failpoint ${failpointName} to allow both the abort and the resharding operation to complete`,
);
for (let failpoint of failpoints) {
failpoint.off();
}
}
};
let triggerPostDecisionPersistedAbort = (mongos, abortThread) => {
assert.soon(() => {
// It's possible that after the decision has been persisted, the
// coordinator document could be in either of the two specified states,
// or will have been completely deleted. Await any of these conditions
// in order to test the abort command's inability to abort after a
// persisted decision.
const coordinatorDoc = mongos.getCollection("config.reshardingOperations").findOne({ns: originalCollectionNs});
return (
coordinatorDoc == null || coordinatorDoc.state === "decision-persisted" || coordinatorDoc.state === "done"
);
});
abortThread.start();
};
const runAbortWithFailpoint = (
failpointName,
failpointNodeType,
abortLocation,
{
executeBeforeReshardingStartsFn = null,
executeAtStartOfReshardingFn = null,
executeBeforeWaitingOnFailpointsFn = null,
executeAfterWaitingOnFailpointsFn = null,
executeAfterAbortingFn = null,
} = {},
) => {
const reshardingTest = new ReshardingTest({numDonors: 2, numRecipients: 2, reshardInPlace: true});
reshardingTest.setup();
const donorShardNames = reshardingTest.donorShardNames;
const recipientShardNames = reshardingTest.recipientShardNames;
const sourceCollection = reshardingTest.createShardedCollection({
ns: "reshardingDb.coll",
shardKeyPattern: {oldKey: 1},
chunks: [
{min: {oldKey: MinKey}, max: {oldKey: 0}, shard: donorShardNames[0]},
{min: {oldKey: 0}, max: {oldKey: MaxKey}, shard: donorShardNames[1]},
],
});
const mongos = sourceCollection.getMongo();
const topology = DiscoverTopology.findConnectedNodes(mongos);
const configsvr = new Mongo(topology.configsvr.nodes[0]);
let expectedAbortErrorCodes = ErrorCodes.OK;
let expectedReshardingErrorCode = ErrorCodes.ReshardCollectionAborted;
// If the abort is going to happen after the decision is persisted, it's expected that the
// resharding operation will have finished without error, and that the abort itself will
// error.
if (abortLocation == abortLocationEnum.AFTER_DECISION_PERSISTED) {
expectedAbortErrorCodes = [ErrorCodes.ReshardCollectionCommitted, ErrorCodes.NoSuchReshardCollection];
expectedReshardingErrorCode = ErrorCodes.OK;
}
const abortThread = generateAbortThread(topology.mongos.nodes[0], originalCollectionNs, expectedAbortErrorCodes);
if (executeBeforeReshardingStartsFn) {
jsTestLog(`Executing the before-resharding-starts fn`);
executeBeforeReshardingStartsFn(reshardingTest, topology, mongos, originalCollectionNs);
}
let failpoints = [];
if (failpointNodeType != nodeTypeEnum.NO_EXTRA_FAILPOINTS_SENTINEL) {
failpoints = generateFailpoints(failpointName, failpointNodeType, reshardingTest, topology);
}
reshardingTest.withReshardingInBackground(
{
newShardKeyPattern: {newKey: 1},
newChunks: [
{min: {newKey: MinKey}, max: {newKey: 0}, shard: recipientShardNames[0]},
{min: {newKey: 0}, max: {newKey: MaxKey}, shard: recipientShardNames[1]},
],
},
() => {
if (executeAtStartOfReshardingFn) {
jsTestLog(`Executing the start-of-resharding fn`);
executeAtStartOfReshardingFn(reshardingTest, topology, mongos, originalCollectionNs);
}
if (abortLocation == abortLocationEnum.BEFORE_STEADY_STATE) {
triggerAbortAndCoordinateFailpoints(
failpointName,
failpointNodeType,
reshardingTest,
topology,
mongos,
configsvr,
abortThread,
failpoints,
executeBeforeWaitingOnFailpointsFn,
executeAfterWaitingOnFailpointsFn,
executeAfterAbortingFn,
);
}
},
{
expectedErrorCode: expectedReshardingErrorCode,
postCheckConsistencyFn: () => {
if (abortLocation == abortLocationEnum.BEFORE_DECISION_PERSISTED) {
triggerAbortAndCoordinateFailpoints(
failpointName,
failpointNodeType,
reshardingTest,
topology,
mongos,
configsvr,
abortThread,
failpoints,
executeBeforeWaitingOnFailpointsFn,
executeAfterWaitingOnFailpointsFn,
executeAfterAbortingFn,
);
}
},
postDecisionPersistedFn: () => {
if (abortLocation == abortLocationEnum.AFTER_DECISION_PERSISTED) {
triggerPostDecisionPersistedAbort(mongos, abortThread);
}
},
},
);
const reshardingMetrics = configsvr.getDB("admin").serverStatus({}).shardingStatistics.resharding;
let reshardingOperationsFinalCount = reshardingMetrics.countStarted;
let reshardingSuccessesFinalCount = reshardingMetrics.countSucceeded;
let reshardingCanceledFinalCount = reshardingMetrics.countCanceled;
assert.eq(reshardingOperationsFinalCount, 1);
if (expectedReshardingErrorCode == ErrorCodes.OK) {
assert.eq(reshardingSuccessesFinalCount, 1);
assert.eq(reshardingCanceledFinalCount, 0);
} else if (expectedAbortErrorCodes == ErrorCodes.OK) {
assert.eq(reshardingCanceledFinalCount, 1);
assert.eq(reshardingSuccessesFinalCount, 0);
}
abortThread.join();
reshardingTest.teardown();
};
runAbortWithFailpoint(
"reshardingPauseRecipientBeforeCloning",
nodeTypeEnum.RECIPIENT,
abortLocationEnum.BEFORE_STEADY_STATE,
);
runAbortWithFailpoint(
"reshardingPauseRecipientDuringCloning",
nodeTypeEnum.RECIPIENT,
abortLocationEnum.BEFORE_STEADY_STATE,
);
runAbortWithFailpoint(
"reshardingPauseRecipientDuringOplogApplication",
nodeTypeEnum.RECIPIENT,
abortLocationEnum.BEFORE_STEADY_STATE,
{
executeAfterWaitingOnFailpointsFn: (reshardingTest, topology, mongos, ns) => {
assert.commandWorked(
mongos.getCollection(ns).insert([
{_id: 0, oldKey: -10, newKey: -10},
{_id: 1, oldKey: 10, newKey: -10},
{_id: 2, oldKey: -10, newKey: 10},
{_id: 3, oldKey: 10, newKey: 10},
]),
);
},
},
);
function waitForAllRecipientsToReachApplying(mongos, ns) {
assert.soon(() => {
const coordinatorDoc = mongos.getCollection("config.reshardingOperations").findOne({ns: ns});
if (
coordinatorDoc === null ||
!Array.isArray(coordinatorDoc.recipientShards) ||
coordinatorDoc.recipientShards.length === 0
) {
return false;
}
for (const shardEntry of coordinatorDoc.recipientShards) {
if (shardEntry.mutableState.state !== "applying") {
return false;
}
}
return true;
});
}
// Rely on the resharding_test_fixture's built-in failpoint that hangs before switching to
// the blocking writes state.
runAbortWithFailpoint(null, nodeTypeEnum.NO_EXTRA_FAILPOINTS_SENTINEL, abortLocationEnum.BEFORE_STEADY_STATE, {
executeAtStartOfReshardingFn: (reshardingTest, topology, mongos, ns) => {
waitForAllRecipientsToReachApplying(mongos, ns);
},
});
// Test that the resharding operation can successfully be aborted even when the commit monitor won't
// ever signal to the coordinator the resharding operation is ready to commit.
runAbortWithFailpoint("hangBeforeQueryingRecipients", nodeTypeEnum.COORDINATOR, abortLocationEnum.BEFORE_STEADY_STATE, {
executeAtStartOfReshardingFn: (reshardingTest, topology, mongos, ns) => {
waitForAllRecipientsToReachApplying(mongos, ns);
},
});
runAbortWithFailpoint(null, nodeTypeEnum.NO_EXTRA_FAILPOINTS_SENTINEL, abortLocationEnum.AFTER_DECISION_PERSISTED);
// The resharding test fixture uses its own set of coordinator failpoints for resharding
// checkpoints. It may not be possible to insert documents once the second checkpoint is reached.
// Because of this, we cannot rely on the failpoint mechanism set up in this test file. Instead, we
// must manually activate and unactivate the failpoints across a checkpoint threshold.
//
// executeAtStartOfReshardingFn runs while the coordinator is in steady state (checkpoint 1), and
// executeAfterWaitingOnFailpointsFn will run while the coordinator is blocking writes (checkpoint
// 2).
let recipientFailpoints = [];
runAbortWithFailpoint(null, nodeTypeEnum.NO_EXTRA_FAILPOINTS_SENTINEL, abortLocationEnum.BEFORE_DECISION_PERSISTED, {
executeBeforeReshardingStartsFn: (reshardingTest, topology) => {
recipientFailpoints = generateFailpoints(
"reshardingPauseRecipientDuringOplogApplication",
nodeTypeEnum.RECIPIENT,
reshardingTest,
topology,
);
},
executeAtStartOfReshardingFn: (reshardingTest, topology, mongos, ns) => {
for (let failpoint of recipientFailpoints) {
failpoint.wait();
}
assert.commandWorked(
mongos.getCollection(ns).insert([
{_id: 4, oldKey: -10, newKey: -10},
{_id: 5, oldKey: 10, newKey: -10},
{_id: 6, oldKey: -10, newKey: 10},
{_id: 7, oldKey: 10, newKey: 10},
]),
);
},
executeAfterWaitingOnFailpointsFn: (reshardingTest, topology, mongos, ns) => {
assert.soon(() => {
for (let donor of reshardingTest.donorShardNames) {
const donorConn = new Mongo(topology.shards[donor].primary);
const donorDoc = donorConn.getCollection("config.localReshardingOperations.donor").findOne({
ns: ns,
});
return donorDoc != null && donorDoc.mutableState.state === "blocking-writes";
}
});
},
executeAfterAbortingFn: () => {
for (let failpoint of recipientFailpoints) {
failpoint.off();
}
},
});