mongo/jstests/sharding/merge_with_drop_shard.js

155 lines
5.9 KiB
JavaScript

// Tests that the $merge aggregation stage is resilient to drop shard in both the source and
// output collection during execution.
import {withEachMergeMode} from "jstests/aggregation/extras/merge_helpers.js";
import {ShardingTest} from "jstests/libs/shardingtest.js";
import {removeShard} from "jstests/sharding/libs/remove_shard_util.js";
// TODO SERVER-50144 Remove this and allow orphan checking.
// This test calls removeShard which can leave docs in config.rangeDeletions in state "pending",
// therefore preventing orphans from being cleaned up.
TestData.skipCheckOrphans = true;
const st = new ShardingTest({shards: 2, rs: {nodes: 1}});
const mongosDB = st.s.getDB(jsTestName());
assert.commandWorked(
st.s.getDB("admin").runCommand({enableSharding: mongosDB.getName(), primaryShard: st.shard0.name}),
);
const sourceColl = mongosDB["source"];
const targetColl = mongosDB["target"];
function setAggHang(mode) {
// Match on the output namespace to avoid hanging the sharding metadata refresh aggregation when
// shard0 is a config shard.
assert.commandWorked(
st.shard0.adminCommand({
configureFailPoint: "hangWhileBuildingDocumentSourceMergeBatch",
mode: mode,
data: {nss: targetColl.getFullName()},
}),
);
assert.commandWorked(
st.shard1.adminCommand({
configureFailPoint: "hangWhileBuildingDocumentSourceMergeBatch",
mode: mode,
data: {nss: targetColl.getFullName()},
}),
);
}
function removeShardAndRefreshRouter(shard) {
// We need the balancer to drain all the chunks out of the shard that is being removed.
assert.commandWorked(st.startBalancer());
removeShard(st, shard.shardName);
// Drop the test database on the removed shard so it does not interfere with addShard later.
assert.commandWorked(shard.getDB(mongosDB.getName()).dropDatabase());
st.configRS.awaitLastOpCommitted();
assert.commandWorked(st.s.adminCommand({flushRouterConfig: 1}));
assert.commandWorked(st.stopBalancer());
}
function addShard(shard) {
assert.commandWorked(st.s.adminCommand({addShard: shard}));
assert.commandWorked(st.s.adminCommand({moveChunk: sourceColl.getFullName(), find: {shardKey: 0}, to: shard}));
}
function runMergeWithMode(whenMatchedMode, whenNotMatchedMode, shardedColl, dropShard, expectFailCode) {
// Set the failpoint to hang in the first call to DocumentSourceCursor's getNext().
setAggHang("alwaysOn");
let comment = whenMatchedMode + "_" + whenNotMatchedMode + "_" + shardedColl.getName() + "_1";
let outFn = `
const sourceDB = db.getSiblingDB(jsTestName());
const sourceColl = sourceDB["${sourceColl.getName()}"];
let cmdRes = sourceDB.runCommand({
aggregate: "${sourceColl.getName()}",
pipeline: [{$merge: {
into: "${targetColl.getName()}",
whenMatched: ${tojson(whenMatchedMode)},
whenNotMatched: "${whenNotMatchedMode}"
}}],
cursor: {},
comment: "${comment}"
});
if (${expectFailCode} !== undefined) {
assert.commandFailedWithCode(cmdRes, ${expectFailCode});
} else {
assert.commandWorked(cmdRes);
}
`;
// Start the $merge aggregation in a parallel shell.
let mergeShell = startParallelShell(outFn, st.s.port);
// Wait for the parallel shell to hit the failpoint.
// currentOp can fail with ShardNotFound since we remove
// the shard on some test runs.
assert.soon(
() => {
const response = assert.commandWorkedOrFailedWithCode(
mongosDB.currentOp({
$or: [
{op: "command", "command.comment": comment},
{op: "getmore", "cursor.originatingCommand.comment": comment},
],
}),
[ErrorCodes.ShardNotFound],
);
return response.ok ? response.inprog.length >= 1 : false;
},
() => {
const msg = "Timeout waiting for parallel shell to hit the failpoint";
const response = mongosDB.currentOp();
return response.ok ? msg + ":\n" + tojson(response.inprog) : msg;
},
);
if (dropShard) {
removeShardAndRefreshRouter(st.shard1);
} else {
addShard(st.rs1.getURL());
}
// Unset the failpoint to unblock the $merge and join with the parallel shell.
setAggHang("off");
mergeShell();
assert.eq(2, targetColl.find().itcount());
}
// Shard the source collection with shard key {shardKey: 1} and split into 2 chunks.
st.shardColl(sourceColl.getName(), {shardKey: 1}, {shardKey: 0}, false, mongosDB.getName());
// Shard the output collection with shard key {shardKey: 1} and split into 2 chunks.
st.shardColl(targetColl.getName(), {shardKey: 1}, {shardKey: 0}, false, mongosDB.getName());
// Write two documents in the source collection that should target the two chunks in the target
// collection.
assert.commandWorked(sourceColl.insert({shardKey: -1, _id: 0}));
assert.commandWorked(sourceColl.insert({shardKey: 1, _id: 1}));
withEachMergeMode(({whenMatchedMode, whenNotMatchedMode}) => {
assert.commandWorked(targetColl.remove({}));
// Match the data from source into target so that we don't fail the assertion for
// 'whenNotMatchedMode:fail/discard'.
if (whenNotMatchedMode == "fail" || whenNotMatchedMode == "discard") {
assert.commandWorked(targetColl.insert({shardKey: -1, _id: 0}));
assert.commandWorked(targetColl.insert({shardKey: 1, _id: 1}));
}
runMergeWithMode(whenMatchedMode, whenNotMatchedMode, targetColl, true, undefined);
runMergeWithMode(
whenMatchedMode,
whenNotMatchedMode,
targetColl,
false,
whenMatchedMode == "fail" ? ErrorCodes.DuplicateKey : undefined,
);
});
st.stop();