mongo/jstests/libs/shard_targeting_util.js

272 lines
11 KiB
JavaScript

// Contains utilities and helper functions for testing shard targeting of aggregate commands.
import {arrayEq} from "jstests/aggregation/extras/utils.js";
import {FeatureFlagUtil} from "jstests/libs/feature_flag_util.js";
import {getAggPlanStage} from "jstests/libs/query/analyze_plan.js";
import {CreateShardedCollectionUtil} from "jstests/sharding/libs/create_sharded_collection_util.js";
/**
* Class which allows for setting up a test fixture to test the behavior of shard targeting for
* aggregate commands.
*/
export class ShardTargetingTest {
constructor(db, shardDBMap) {
this.db = db;
this.shardProfileDBMap = shardDBMap;
this.shardDBList = [];
for (const [_, shardDB] of Object.entries(this.shardProfileDBMap)) {
this.shardDBList.push(shardDB);
}
}
// Helper functions.
/**
* Utility to clear the profiling collection.
*/
_resetProfiling() {
assert(this.shardDBList, "shardDBList must be defined");
for (const shardDB of this.shardDBList) {
assert.commandWorked(shardDB.setProfilingLevel(0));
shardDB.system.profile.drop();
assert.commandWorked(shardDB.setProfilingLevel(2));
}
}
/**
* Utility which asserts that the aggregation stages in 'actualStages' match those in
* 'expectedStages'.
*/
_assertExpectedStages(expectedStages, actualStages, explain) {
assert.eq(expectedStages.length, actualStages.length, explain);
let stageIdx = 0;
for (const stage of expectedStages) {
const spec = actualStages[stageIdx];
assert(spec.hasOwnProperty(stage), "Expected stage " + tojson(stage) + " in explain " + tojson(explain));
stageIdx++;
}
}
/**
* Utility to create a filter for querying the database profiler with the provided parameters.
*/
_createProfileFilter({ns, comment, expectedStages}) {
let profileFilter = {"op": "command", "command.aggregate": ns};
if (comment) {
profileFilter["command.comment"] = comment;
}
let idx = 0;
for (const stage of expectedStages) {
const fieldName = "command.pipeline." + idx + "." + stage;
profileFilter[fieldName] = {"$exists": true};
idx++;
}
// If we have an empty list of stages, this may indicate a cursor established by a
// $mergeCursors stage internally. As such, we wish to verify a cursor was established,
// regardless of how many stages were specified.
if (expectedStages) {
profileFilter["command.pipeline"] = {"$size": expectedStages.length};
}
return profileFilter;
}
_examineSplitPipeline({splitPipeline, expectedMergingStages, expectedShardStages}) {
if (splitPipeline.mergerPart) {
const mergerPart = splitPipeline.mergerPart;
if (mergerPart.length > 0) {
assert(
expectedMergingStages,
"Should have specified merging stages for test case if split pipeline has 'mergerPart'" +
tojson(splitPipeline),
);
this._assertExpectedStages(expectedMergingStages, mergerPart, splitPipeline);
}
}
if (splitPipeline.shardsPart) {
const shardsPart = splitPipeline.shardsPart;
if (shardsPart.length > 0) {
assert(
expectedShardStages,
"Should have specified shard stages for test case if split pipeline has 'shardsPart'" +
tojson(splitPipeline),
);
this._assertExpectedStages(expectedShardStages, shardsPart, splitPipeline);
}
}
}
/**
* Utility which makes certain assertions about 'explain' (obtained by running 'explain'),
* namely:
* - 'expectedMergingShard' and 'expectedMergingStages' allow for assertions around the shard
* which was chosen as the merger and what pipeline is used to merge.
* - 'expectedShard' and 'expectedShardStages' allow for assertions around targeting a single
* shard for execution.
* - 'assertSBELookupPushdown' asserts that $lookup was pushed down into SBE when present.
*/
_assertExplainTargeting(
explain,
{
expectedMergingShard,
expectedMergingStages,
expectedShard,
expectedShardStages,
assertSBELookupPushdown,
expectRouter,
},
) {
if (expectedMergingShard) {
assert.eq(explain.mergeType, "specificShard", explain);
assert.eq(explain.mergeShardId, expectedMergingShard, explain);
assert(explain.hasOwnProperty("splitPipeline"), explain);
this._examineSplitPipeline({
splitPipeline: explain.splitPipeline,
expectedMergingStages: expectedMergingStages,
expectedShardStages: expectedShardStages,
});
} else {
assert.neq(
explain.mergeType,
"specificShard",
"Expected not to merge on a specific shard; explain " + tojson(explain),
);
assert.neq(explain.mergeType, "anyShard", "Expected not to merge on any shard", explain);
}
if (expectedShard) {
assert(explain.hasOwnProperty("shards"), explain);
const shards = explain.shards;
const keys = Object.keys(shards);
assert.eq(keys.length, 1, explain);
assert.eq(expectedShard, keys[0], explain);
const shard = shards[expectedShard];
if (expectedShardStages) {
const stages = shard.stages;
assert(stages, explain);
this._assertExpectedStages(expectedShardStages, stages, explain);
}
const stage = getAggPlanStage(shard, "EQ_LOOKUP", true /* useQueryPlannerSection */);
if (assertSBELookupPushdown) {
assert.neq(stage, null, shard);
} else {
assert.eq(stage, null, shard);
}
}
if (expectRouter) {
// TODO SERVER-95358 remove once 9.0 becomes last LTS.
let mergeType;
if (FeatureFlagUtil.isPresentAndEnabled(this.db, "AggMongosToRouter")) {
mergeType = "router";
} else {
mergeType = "mongos";
}
assert.eq(explain.mergeType, mergeType, explain);
assert(explain.hasOwnProperty("splitPipeline"), explain);
this._examineSplitPipeline({
splitPipeline: explain.splitPipeline,
expectedMergingStages: expectedMergingStages,
expectedShardStages: expectedShardStages,
});
}
}
// Testing functions.
/**
* Function to set up a collection in 'db':
* - 'collName' specifies the name of the collection.
* - 'indexList' specifies a list of indexes to build on the collection.
* - 'docs' specifies a set of documents to insert.
* - 'collType' specifies the type of collection (i.e. "sharded" or "unsplittable").
* - 'shardKey' and 'chunkList' are used to configure a sharded collection.
* - 'owningShard' designates the shard that an unsplittable collection should live on.
*/
setupColl({collName, indexList, docs, collType, shardKey, chunkList, owningShard}) {
const coll = this.db[collName];
if (collType === "sharded") {
assert(shardKey && chunkList, "Must specify shard key and chunk list when setting up a sharded collection");
CreateShardedCollectionUtil.shardCollectionWithChunks(coll, shardKey, chunkList);
} else if (collType == "unsplittable") {
assert(owningShard, "Must specify an owning shard when setting up an unsplittable collection");
assert.commandWorked(this.db.runCommand({createUnsplittableCollection: collName, dataShard: owningShard}));
} else {
assert(false, "Unknown collection type " + tojson(collType));
}
if (indexList && indexList.length > 0) {
assert.commandWorked(coll.createIndexes(indexList));
}
if (docs && docs.length > 0) {
assert.commandWorked(coll.insertMany(docs));
}
}
/**
* Function which runs 'pipeline' using the explain and aggregate commands to verify correct
* results and expected shard targeting behavior.
* - 'targetCollName' names the collection to target 'pipeline' with.
* - 'explainAssertionObj' describes assertions to be made against the explain output (see
* 'assertExplainTargeting' for more detail).
* - 'expectedResults' contains the output that running 'pipeline' should produce.
* - 'comment' is a string that will allow 'pipeline' (and, in some cases, sub-queries) to be
* uniquely identified in profiler output.
* - 'profileFilters' is a map from shard name to objects containing arguments to create a
* filter to query the profiler output.
*/
assertShardTargeting({pipeline, targetCollName, explainAssertionObj, expectedResults, comment, profileFilters}) {
const coll = this.db[targetCollName];
const options = comment ? {"comment": comment} : {};
// Test explain if 'explainAssertionObj' is specified.
if (explainAssertionObj) {
const explain = coll.explain().aggregate(pipeline, options);
this._assertExplainTargeting(explain, explainAssertionObj);
}
// Always reset the profiling collections before running an aggregate.
this._resetProfiling();
// Verify that 'pipeline' returns the expected results.
const res = coll.aggregate(pipeline, options).toArray();
assert(
arrayEq(res, expectedResults),
"sharded aggregation results did not match: " +
tojson(res) +
" does not have the same members as " +
tojson(expectedResults),
);
// Verify that execution targeted the expected nodes if 'profileFilters' was specified.
if (profileFilters) {
for (const [shard, filterList] of Object.entries(profileFilters)) {
const profileDB = this.shardProfileDBMap[shard];
assert(profileDB);
for (let filter of filterList) {
filter.comment = comment;
const profileFilter = this._createProfileFilter(filter);
const profileColl = profileDB.system.profile;
const debugFilter = comment ? {"command.comment": comment} : {};
assert.gt(
profileColl.find(profileFilter).itcount(),
0,
"Expected to find an entry matching " +
tojson(profileFilter) +
" on shard " +
shard +
". Dumping profiler contents limited to filter " +
tojson(debugFilter) +
": " +
tojson(profileColl.find(debugFilter).toArray()),
);
}
}
}
}
}