mongo/jstests/aggregation/shard_targeting.js

403 lines
17 KiB
JavaScript

/**
* Test that aggregations are sent directly to a single shard in the case where the data required by
* the pipeline's initial query all resides on that shard, and that we correctly back out and
* re-target in the event that a stale config exception is received.
*
* In particular:
*
* - If the data required by the aggregation all resides on a single shard (including multi-chunk
* range $matches), send the entire pipeline to that shard and do not perform a $mergeCursors.
* - In the case of a stage which requires a primary shard merge, do not split the pipeline or
* generate a $mergeCursors if the data required by the aggregation all resides on the primary
* shard.
*
* Because wrapping these aggregations in a $facet stage will affect how the pipeline is targeted,
* and will therefore invalidate the results of the test cases below, we tag this test to prevent it
* running under the 'aggregation_facet_unwind' passthrough.
*
* @tags: [
* do_not_wrap_aggregations_in_facets,
* requires_sharding,
* requires_spawning_own_processes,
* requires_profiling,
* ]
*/
(function() {
load("jstests/libs/profiler.js"); // For profilerHas*OrThrow helper functions.
const st = new ShardingTest({shards: 2, mongos: 2, config: 1});
// mongosForAgg will be used to perform all aggregations.
// mongosForMove does all chunk migrations, leaving mongosForAgg with stale config metadata.
const mongosForAgg = st.s0;
const mongosForMove = st.s1;
const mongosDB = mongosForAgg.getDB(jsTestName());
const mongosColl = mongosDB.test;
const shard0DB = primaryShardDB = st.shard0.getDB(jsTestName());
const shard1DB = st.shard1.getDB(jsTestName());
// Turn off best-effort recipient metadata refresh post-migration commit on both shards because
// it creates non-determinism for the profiler.
assert.commandWorked(st.shard0.getDB('admin').runCommand(
{configureFailPoint: 'doNotRefreshRecipientAfterCommit', mode: 'alwaysOn'}));
assert.commandWorked(st.shard1.getDB('admin').runCommand(
{configureFailPoint: 'doNotRefreshRecipientAfterCommit', mode: 'alwaysOn'}));
// Turn off automatic shard refresh in mongos when a stale config error is thrown.
assert.commandWorked(mongosForAgg.getDB('admin').runCommand(
{configureFailPoint: 'doNotRefreshShardsOnRetargettingError', mode: 'alwaysOn'}));
assert.commandWorked(mongosDB.dropDatabase());
// Enable sharding on the test DB and ensure its primary is st.shard0.shardName.
assert.commandWorked(mongosDB.adminCommand({enableSharding: mongosDB.getName()}));
st.ensurePrimaryShard(mongosDB.getName(), st.shard0.shardName);
// Shard the test collection on _id.
assert.commandWorked(
mongosDB.adminCommand({shardCollection: mongosColl.getFullName(), key: {_id: 1}}));
// Split the collection into 4 chunks: [MinKey, -100), [-100, 0), [0, 100), [100, MaxKey).
assert.commandWorked(mongosDB.adminCommand({split: mongosColl.getFullName(), middle: {_id: -100}}));
assert.commandWorked(mongosDB.adminCommand({split: mongosColl.getFullName(), middle: {_id: 0}}));
assert.commandWorked(mongosDB.adminCommand({split: mongosColl.getFullName(), middle: {_id: 100}}));
// Move the [0, 100) and [100, MaxKey) chunks to st.shard1.shardName.
assert.commandWorked(mongosDB.adminCommand(
{moveChunk: mongosColl.getFullName(), find: {_id: 50}, to: st.shard1.shardName}));
assert.commandWorked(mongosDB.adminCommand(
{moveChunk: mongosColl.getFullName(), find: {_id: 150}, to: st.shard1.shardName}));
// Write one document into each of the chunks.
assert.commandWorked(mongosColl.insert({_id: -150}));
assert.commandWorked(mongosColl.insert({_id: -50}));
assert.commandWorked(mongosColl.insert({_id: 50}));
assert.commandWorked(mongosColl.insert({_id: 150}));
const shardExceptions =
[ErrorCodes.StaleConfig, ErrorCodes.StaleShardVersion, ErrorCodes.StaleEpoch];
// Create an $_internalSplitPipeline stage that forces the merge to occur on the Primary shard.
const forcePrimaryMerge = [{$_internalSplitPipeline: {mergeType: "primaryShard"}}];
function runAggShardTargetTest({splitPoint}) {
// Ensure that both mongoS have up-to-date caches, and enable the profiler on both shards.
assert.commandWorked(mongosForAgg.getDB("admin").runCommand({flushRouterConfig: 1}));
assert.commandWorked(mongosForMove.getDB("admin").runCommand({flushRouterConfig: 1}));
assert.commandWorked(shard0DB.setProfilingLevel(2));
assert.commandWorked(shard1DB.setProfilingLevel(2));
//
// Test cases.
//
let testName, outColl;
// Test that a range query is passed through if the chunks encompassed by the query all lie
// on a single shard, in this case st.shard0.shardName.
testName = "agg_shard_targeting_range_single_shard_all_chunks_on_same_shard";
assert.eq(mongosColl
.aggregate([{$match: {_id: {$gte: -150, $lte: -50}}}].concat(splitPoint),
{comment: testName})
.itcount(),
2);
// We expect one aggregation on shard0, none on shard1, and no $mergeCursors on shard0 (the
// primary shard).
profilerHasSingleMatchingEntryOrThrow({
profileDB: shard0DB,
filter: {"command.aggregate": mongosColl.getName(), "command.comment": testName}
});
profilerHasZeroMatchingEntriesOrThrow({
profileDB: shard1DB,
filter: {"command.aggregate": mongosColl.getName(), "command.comment": testName}
});
profilerHasZeroMatchingEntriesOrThrow({
profileDB: primaryShardDB,
filter: {
"command.aggregate": mongosColl.getName(),
"command.comment": testName,
"command.pipeline.$mergeCursors": {$exists: 1}
}
});
// Test that a range query with a stage that requires a primary shard merge ($out in this
// case) is passed through if the chunk ranges encompassed by the query all lie on the
// primary shard.
testName = "agg_shard_targeting_range_all_chunks_on_primary_shard_out_no_merge";
outColl = mongosDB[testName];
assert.commandWorked(mongosDB.runCommand({
aggregate: mongosColl.getName(),
pipeline: [{$match: {_id: {$gte: -150, $lte: -50}}}].concat(splitPoint).concat([
{$out: testName}
]),
comment: testName,
cursor: {}
}));
// We expect one aggregation on shard0, none on shard1, and no $mergeCursors on shard0 (the
// primary shard). We expect some of these commands may fail with staleDBVersion and be
// retried, so we will ignore those entries in the profiler.
profilerHasSingleMatchingEntryOrThrow({
profileDB: shard0DB,
filter: {
"command.aggregate": mongosColl.getName(),
"command.comment": testName,
errMsg: {$exists: false}
}
});
profilerHasZeroMatchingEntriesOrThrow({
profileDB: shard1DB,
filter: {
"command.aggregate": mongosColl.getName(),
"command.comment": testName,
errMsg: {$exists: false}
}
});
profilerHasZeroMatchingEntriesOrThrow({
profileDB: primaryShardDB,
filter: {
"command.aggregate": mongosColl.getName(),
"command.comment": testName,
"command.pipeline.$mergeCursors": {$exists: 1},
errMsg: {$exists: false}
}
});
// Verify that the contents of the $out collection are as expected.
assert.eq(outColl.find().sort({_id: 1}).toArray(), [{_id: -150}, {_id: -50}]);
// Test that a passthrough will back out and split the pipeline if we try to target a single
// shard, get a stale config exception, and find that more than one shard is now involved.
// Move the _id: [-100, 0) chunk from st.shard0.shardName to st.shard1.shardName via
// mongosForMove.
assert.commandWorked(mongosForMove.getDB("admin").runCommand({
moveChunk: mongosColl.getFullName(),
find: {_id: -50},
to: st.shard1.shardName,
}));
// Run the same aggregation that targeted a single shard via the now-stale mongoS. It should
// attempt to send the aggregation to st.shard0.shardName, hit a stale config exception,
// split the pipeline and redispatch. We append an $_internalSplitPipeline stage in order to
// force a shard merge rather than a mongoS merge.
testName = "agg_shard_targeting_backout_passthrough_and_split_if_cache_is_stale";
assert.eq(mongosColl
.aggregate([{$match: {_id: {$gte: -150, $lte: -50}}}]
.concat(splitPoint)
.concat(forcePrimaryMerge),
{comment: testName})
.itcount(),
2);
// Before the first dispatch:
// - mongosForMove and st.shard0.shardName (the donor shard) are up to date.
// - mongosForAgg and st.shard1.shardName are stale. mongosForAgg incorrectly believes that
// the necessary data is all on st.shard0.shardName.
//
// We therefore expect that:
// - mongosForAgg will throw a stale config error when it attempts to establish a
// single-shard cursor on st.shard0.shardName (attempt 1).
// - mongosForAgg will back out, refresh itself, and redispatch to both shards.
// - st.shard1.shardName will throw a stale config and refresh itself when the split
// pipeline is sent to it (attempt 2).
// - mongosForAgg will back out and redispatch (attempt 3).
// - The aggregation will succeed on the third dispatch.
// We confirm this behaviour via the following profiler results:
// - One aggregation on st.shard0.shardName with a shard version exception (indicating that
// the mongoS was stale).
profilerHasSingleMatchingEntryOrThrow({
profileDB: shard0DB,
filter: {
"command.aggregate": mongosColl.getName(),
"command.comment": testName,
"command.pipeline.$mergeCursors": {$exists: false},
errCode: {$in: shardExceptions}
}
});
// - One aggregation on st.shard1.shardName with a shard version exception (indicating that
// the shard was stale).
profilerHasSingleMatchingEntryOrThrow({
profileDB: shard1DB,
filter: {
"command.aggregate": mongosColl.getName(),
"command.comment": testName,
"command.pipeline.$mergeCursors": {$exists: false},
errCode: {$in: shardExceptions}
}
});
// - At most two aggregations on st.shard0.shardName with no stale config exceptions. The
// first, if present, is an aborted cursor created if the command reaches
// st.shard0.shardName before st.shard1.shardName throws its stale config exception during
// attempt 2. The second profiler entry is from the aggregation which succeeded.
profilerHasAtLeastOneAtMostNumMatchingEntriesOrThrow({
profileDB: shard0DB,
filter: {
"command.aggregate": mongosColl.getName(),
"command.comment": testName,
"command.pipeline.$mergeCursors": {$exists: false},
errCode: {$exists: false}
},
maxExpectedMatches: 2
});
// - One aggregation on st.shard1.shardName with no stale config exception.
profilerHasSingleMatchingEntryOrThrow({
profileDB: shard1DB,
filter: {
"command.aggregate": mongosColl.getName(),
"command.comment": testName,
"command.pipeline.$mergeCursors": {$exists: false},
errCode: {$exists: false}
}
});
// - One $mergeCursors aggregation on primary st.shard0.shardName, since we eventually
// target both shards after backing out the passthrough and splitting the pipeline.
profilerHasSingleMatchingEntryOrThrow({
profileDB: primaryShardDB,
filter: {
"command.aggregate": mongosColl.getName(),
"command.comment": testName,
"command.pipeline.$mergeCursors": {$exists: true}
}
});
// Move the _id: [-100, 0) chunk back from st.shard1.shardName to st.shard0.shardName via
// mongosForMove. Shard0 and mongosForAgg are now stale.
assert.commandWorked(mongosForMove.getDB("admin").runCommand({
moveChunk: mongosColl.getFullName(),
find: {_id: -50},
to: st.shard0.shardName,
_waitForDelete: true
}));
// Run the same aggregation via the now-stale mongoS. It should split the pipeline, hit a
// stale config exception, and reset to the original single-shard pipeline upon refresh. We
// append an $_internalSplitPipeline stage in order to force a shard merge rather than a
// mongoS merge.
testName = "agg_shard_targeting_backout_split_pipeline_and_reassemble_if_cache_is_stale";
assert.eq(mongosColl
.aggregate([{$match: {_id: {$gte: -150, $lte: -50}}}]
.concat(splitPoint)
.concat(forcePrimaryMerge),
{comment: testName})
.itcount(),
2);
// Before the first dispatch:
// - mongosForMove and st.shard1.shardName (the donor shard) are up to date.
// - mongosForAgg and st.shard0.shardName are stale. mongosForAgg incorrectly believes that
// the necessary data is spread across both shards.
//
// We therefore expect that:
// - mongosForAgg will throw a stale config error when it attempts to establish a cursor on
// st.shard1.shardName (attempt 1).
// - mongosForAgg will back out, refresh itself, and redispatch to st.shard0.shardName.
// - st.shard0.shardName will throw a stale config and refresh itself when the pipeline is
// sent to it (attempt 2).
// - mongosForAgg will back out, and redispatch (attempt 3).
// - The aggregation will succeed on the third dispatch.
// We confirm this behaviour via the following profiler results:
// - One aggregation on st.shard1.shardName with a shard version exception (indicating that
// the mongoS was stale).
profilerHasSingleMatchingEntryOrThrow({
profileDB: shard1DB,
filter: {
"command.aggregate": mongosColl.getName(),
"command.comment": testName,
"command.pipeline.$mergeCursors": {$exists: false},
errCode: {$in: shardExceptions}
}
});
// - One aggregation on st.shard0.shardName with a shard version exception (indicating that
// the shard was stale).
profilerHasSingleMatchingEntryOrThrow({
profileDB: shard0DB,
filter: {
"command.aggregate": mongosColl.getName(),
"command.comment": testName,
"command.pipeline.$mergeCursors": {$exists: false},
errCode: {$in: shardExceptions}
}
});
// - At most two aggregations on st.shard0.shardName with no stale config exceptions. The
// first, if present, is an aborted cursor created if the command reaches
// st.shard0.shardName before st.shard1.shardName throws its stale config exception during
// attempt 1. The second profiler entry is the aggregation which succeeded.
profilerHasAtLeastOneAtMostNumMatchingEntriesOrThrow({
profileDB: shard0DB,
filter: {
"command.aggregate": mongosColl.getName(),
"command.comment": testName,
"command.pipeline.$mergeCursors": {$exists: false},
errCode: {$exists: false}
},
maxExpectedMatches: 2
});
// No $mergeCursors aggregation on primary st.shard0.shardName, since after backing out the
// split pipeline we eventually target only st.shard0.shardName.
profilerHasZeroMatchingEntriesOrThrow({
profileDB: primaryShardDB,
filter: {
"command.aggregate": mongosColl.getName(),
"command.comment": testName,
"command.pipeline.$mergeCursors": {$exists: true}
}
});
// Clean up the test run by dropping the $out collection and resetting the profiler.
assert(outColl.drop());
assert.commandWorked(shard0DB.setProfilingLevel(0));
assert.commandWorked(shard1DB.setProfilingLevel(0));
assert(shard0DB.system.profile.drop());
assert(shard1DB.system.profile.drop());
}
// Run tests with a variety of splitpoints, testing the pipeline split and re-assembly logic in
// cases where the merge pipeline is empty, where the split stage is moved from shard to merge
// pipe ($facet, $lookup), and where there are both shard and merge versions of the split source
// ($sort, $group, $limit). Each test case will ultimately produce the same output.
runAggShardTargetTest({splitPoint: []});
runAggShardTargetTest({splitPoint: [{$sort: {_id: 1}}]});
runAggShardTargetTest({splitPoint: [{$group: {_id: "$_id"}}]});
runAggShardTargetTest({splitPoint: [{$limit: 4}]});
runAggShardTargetTest({
splitPoint: [
{$facet: {facetPipe: [{$match: {_id: {$gt: MinKey}}}]}},
{$unwind: "$facetPipe"},
{$replaceRoot: {newRoot: "$facetPipe"}}
]
});
runAggShardTargetTest({
splitPoint: [
{
$lookup: {
from: "dummycoll",
localField: "dummyfield",
foreignField: "dummyfield",
as: "lookupRes"
}
},
{$project: {lookupRes: 0}}
]
});
st.stop();
})();