mongo/jstests/sharding/merge_with_chunk_migrations.js

165 lines
6.7 KiB
JavaScript

// Tests that the $merge aggregation stage is resilient to chunk migrations in both the source and
// output collection during execution.
import {withEachMergeMode} from "jstests/aggregation/extras/merge_helpers.js";
import {ShardingTest} from "jstests/libs/shardingtest.js";
const st = new ShardingTest({shards: 2, rs: {nodes: 1}});
const mongosDB = st.s.getDB(jsTestName());
const sourceColl = mongosDB["source"];
const targetColl = mongosDB["target"];
function setAggHang(mode) {
// Match on the output namespace to avoid hanging the sharding metadata refresh aggregation when
// shard0 is a config shard.
assert.commandWorked(
st.shard0.adminCommand({
configureFailPoint: "hangBeforeDocumentSourceCursorLoadBatch",
mode: mode,
data: {nss: "merge_with_chunk_migrations.source"},
}),
);
assert.commandWorked(
st.shard1.adminCommand({
configureFailPoint: "hangBeforeDocumentSourceCursorLoadBatch",
mode: mode,
data: {nss: "merge_with_chunk_migrations.source"},
}),
);
}
function runMergeWithMode(whenMatchedMode, whenNotMatchedMode, shardedColl) {
assert.commandWorked(targetColl.remove({}));
// For modes 'whenNotMatchedMode:fail/discard', the $merge will not insert the expected
// documents, causing the assertion below to fail. To avoid that, we match the documents in
// target collection with the documents in source.
if (whenNotMatchedMode == "fail" || whenNotMatchedMode == "discard") {
assert.commandWorked(targetColl.insert({_id: 0, shardKey: -1}));
assert.commandWorked(targetColl.insert({_id: 1, shardKey: 1}));
}
// Set the failpoint to hang in the first call to DocumentSourceCursor's getNext().
setAggHang("alwaysOn");
let comment = whenMatchedMode + "_" + whenNotMatchedMode + "_" + shardedColl.getName();
const mergeSpec = {
into: targetColl.getName(),
whenMatched: whenMatchedMode,
whenNotMatched: whenNotMatchedMode,
};
// The $_internalInhibitOptimization stage is added to the pipeline to prevent the pipeline
// from being optimized away after it's been split. Otherwise, we won't hit the failpoint.
let outFn = `
const sourceDB = db.getSiblingDB(jsTestName());
const sourceColl = sourceDB["${sourceColl.getName()}"];
sourceColl.aggregate([
{$_internalInhibitOptimization: {}},
{$merge: ${tojsononeline(mergeSpec)}}
],
{comment: "${comment}"});
`;
// Start the $merge aggregation in a parallel shell.
let mergeShell = startParallelShell(outFn, st.s.port);
// Wait for the parallel shell to hit the failpoint.
assert.soon(
() => mongosDB.currentOp({op: "command", "command.comment": comment}).inprog.length == 1,
() => tojson(mongosDB.currentOp().inprog),
);
// Migrate the chunk on shard1 to shard0.
assert.commandWorked(
st.s.adminCommand({moveChunk: shardedColl.getFullName(), find: {shardKey: 1}, to: st.shard0.shardName}),
);
// Unset the failpoint to unblock the $merge and join with the parallel shell.
setAggHang("off");
mergeShell();
// Verify that the $merge succeeded.
assert.eq(2, targetColl.find().itcount());
// Now both chunks are on shard0. Run a similar test except migrate the chunks back to
// shard1 in the middle of execution.
assert.commandWorked(targetColl.remove({}));
// For modes 'whenNotMatchedMode:fail/discard', the $merge will not insert the expected
// documents, causing the assertion below to fail. To avoid that, we match the documents in
// target collection with the documents in source.
if (whenNotMatchedMode == "fail" || whenNotMatchedMode == "discard") {
assert.commandWorked(targetColl.insert({_id: 0, shardKey: -1}));
assert.commandWorked(targetColl.insert({_id: 1, shardKey: 1}));
}
setAggHang("alwaysOn");
comment = comment + "_2";
// The $_internalInhibitOptimization stage is added to the pipeline to prevent the pipeline
// from being optimized away after it's been split. Otherwise, we won't hit the failpoint.
outFn = `
const sourceDB = db.getSiblingDB(jsTestName());
const sourceColl = sourceDB["${sourceColl.getName()}"];
sourceColl.aggregate([
{$_internalInhibitOptimization: {}},
{$merge: ${tojsononeline(mergeSpec)}}
],
{comment: "${comment}"});
`;
mergeShell = startParallelShell(outFn, st.s.port);
// Wait for the parallel shell to hit the failpoint.
assert.soon(
() => mongosDB.currentOp({op: "command", "command.comment": comment}).inprog.length == 1,
() => tojson(mongosDB.currentOp().inprog),
);
assert.commandWorked(
st.s.adminCommand({moveChunk: shardedColl.getFullName(), find: {shardKey: -1}, to: st.shard1.shardName}),
);
assert.commandWorked(
st.s.adminCommand({moveChunk: shardedColl.getFullName(), find: {shardKey: 1}, to: st.shard1.shardName}),
);
// Unset the failpoint to unblock the $merge and join with the parallel shell.
setAggHang("off");
mergeShell();
// Verify that the $merge succeeded.
assert.eq(2, targetColl.find().itcount());
// Reset the chunk distribution.
assert.commandWorked(
st.s.adminCommand({moveChunk: shardedColl.getFullName(), find: {shardKey: -1}, to: st.shard0.shardName}),
);
}
// Shard the source collection with shard key {shardKey: 1} and split into 2 chunks.
st.shardColl(sourceColl.getName(), {shardKey: 1}, {shardKey: 0}, false, mongosDB.getName());
// Write a document to each chunk of the source collection.
assert.commandWorked(sourceColl.insert({_id: 0, shardKey: -1}));
assert.commandWorked(sourceColl.insert({_id: 1, shardKey: 1}));
withEachMergeMode(({whenMatchedMode, whenNotMatchedMode}) => {
runMergeWithMode(whenMatchedMode, whenNotMatchedMode, sourceColl);
});
// Run a similar test with chunk migrations on the output collection instead.
sourceColl.drop();
assert.commandWorked(targetColl.remove({}));
// Shard the output collection with shard key {shardKey: 1} and split into 2 chunks.
st.shardColl(targetColl.getName(), {shardKey: 1}, {shardKey: 0}, false, mongosDB.getName());
// Write two documents in the source collection that should target the two chunks in the target
// collection.
assert.commandWorked(sourceColl.insert({_id: 0, shardKey: -1}));
assert.commandWorked(sourceColl.insert({_id: 1, shardKey: 1}));
withEachMergeMode(({whenMatchedMode, whenNotMatchedMode}) => {
runMergeWithMode(whenMatchedMode, whenNotMatchedMode, targetColl);
});
st.stop();