mongo/jstests/noPassthrough/query/use_disk.js

484 lines
18 KiB
JavaScript

// @tags: [
// does_not_support_stepdowns,
// requires_persistence,
// requires_profiling,
// requires_sharding,
// ]
// Confirms that profiled aggregation execution contains expected values for usedDisk.
import {
getLatestProfilerEntry,
profilerHasSingleMatchingEntryOrThrow,
profilerHasZeroMatchingEntriesOrThrow,
} from "jstests/libs/profiler.js";
import {getAggPlanStages, getPlanStage, getWinningPlanFromExplain} from "jstests/libs/query/analyze_plan.js";
import {ShardingTest} from "jstests/libs/shardingtest.js";
const conn = MongoRunner.runMongod({setParameter: {featureFlagExtendedAutoSpilling: true}});
const testDB = conn.getDB("profile_agg");
const collName = jsTestName();
const coll = testDB.getCollection(collName);
testDB.setProfilingLevel(2);
function resetCollection() {
coll.drop();
for (let i = 0; i < 10; ++i) {
assert.commandWorked(coll.insert({a: i}));
}
}
function resetForeignCollection() {
testDB.foreign.drop();
const forColl = testDB.getCollection("foreign");
for (let i = 4; i < 18; i += 2) assert.commandWorked(forColl.insert({b: i}));
}
//
// Confirm hasSortStage with in-memory sort.
//
resetCollection();
//
// Confirm 'usedDisk' is not set if 'allowDiskUse' is set but no stages need to use disk.
//
coll.aggregate([{$match: {a: {$gte: 2}}}], {allowDiskUse: true});
let profileObj = getLatestProfilerEntry(testDB);
assert(!profileObj.hasOwnProperty("usedDisk"), tojson(profileObj));
resetCollection();
coll.aggregate([{$match: {a: {$gte: 2}}}, {$sort: {a: 1}}], {allowDiskUse: true});
profileObj = getLatestProfilerEntry(testDB);
assert(!profileObj.hasOwnProperty("usedDisk"), tojson(profileObj));
assert.eq(profileObj.hasSortStage, true, tojson(profileObj));
assert.commandWorked(testDB.adminCommand({setParameter: 1, internalQueryMaxBlockingSortMemoryUsageBytes: 10}));
assert.eq(8, coll.aggregate([{$match: {a: {$gte: 2}}}, {$sort: {a: 1}}], {allowDiskUse: true}).itcount());
profileObj = getLatestProfilerEntry(testDB);
assert.eq(profileObj.usedDisk, true, tojson(profileObj));
assert.eq(profileObj.hasSortStage, true, tojson(profileObj));
//
// Confirm that disk use is correctly detected for the $sort stage and the sorter spilling metrics
// are outputted.
//
resetCollection();
coll.aggregate([{$sort: {a: 1}}], {comment: "sort_spill"});
profileObj = getLatestProfilerEntry(testDB);
assert.eq(profileObj.command.comment, "sort_spill", tojson(profileObj));
assert.eq(profileObj.usedDisk, true, tojson(profileObj));
assert.gt(profileObj.sortSpills, 0, tojson(profileObj));
assert.gt(profileObj.sortSpilledBytes, 0, tojson(profileObj));
assert.gt(profileObj.sortSpilledRecords, 0, tojson(profileObj));
assert.gt(profileObj.sortSpilledDataStorageSize, 0, tojson(profileObj));
assert.gt(profileObj.sortTotalDataSizeBytes, 0, tojson(profileObj));
//
// Confirm that disk use is correctly detected for the $facet stage.
//
resetCollection();
coll.aggregate([{$facet: {"aSort": [{$sortByCount: "$a"}]}}], {allowDiskUse: true});
profileObj = getLatestProfilerEntry(testDB);
assert.eq(profileObj.usedDisk, true, tojson(profileObj));
//
// Confirm that usedDisk is correctly detected for the $group stage.
//
resetCollection();
coll.aggregate([{$group: {"_id": {$avg: "$a"}}}], {allowDiskUse: true});
profileObj = getLatestProfilerEntry(testDB);
assert(!profileObj.hasOwnProperty("usedDisk"), tojson(profileObj));
assert.commandWorked(testDB.adminCommand({setParameter: 1, internalDocumentSourceGroupMaxMemoryBytes: 10}));
resetCollection();
coll.aggregate([{$group: {"_id": {$avg: "$a"}}}], {allowDiskUse: true});
profileObj = getLatestProfilerEntry(testDB);
assert.eq(profileObj.usedDisk, true, tojson(profileObj));
assert.gt(profileObj.groupSpills, 0, tojson(profileObj));
assert.gt(profileObj.groupSpilledBytes, 0, tojson(profileObj));
assert.gt(profileObj.groupSpilledRecords, 0, tojson(profileObj));
assert.gt(profileObj.groupSpilledDataStorageSize, 0, tojson(profileObj));
//
// Confirm that usedDisk is correctly detected for the TEXT_OR stage.
//
coll.drop();
assert.commandWorked(
coll.insertMany([
{a: "green tea", b: 5},
{a: "black tea", b: 6},
{a: "black coffee", b: 7},
]),
);
assert.commandWorked(coll.createIndex({a: "text"}));
assert.commandWorked(testDB.adminCommand({setParameter: 1, internalTextOrStageMaxMemoryBytes: 1}));
coll.aggregate([{$match: {$text: {$search: "black tea"}}}, {$addFields: {score: {$meta: "textScore"}}}], {
allowDiskUse: true,
});
profileObj = getLatestProfilerEntry(testDB);
assert.eq(profileObj.usedDisk, true, tojson(profileObj));
assert.gt(profileObj.textOrSpills, 0, tojson(profileObj));
assert.gt(profileObj.textOrSpilledBytes, 0, tojson(profileObj));
assert.gt(profileObj.textOrSpilledRecords, 0, tojson(profileObj));
assert.gt(profileObj.textOrSpilledDataStorageSize, 0, tojson(profileObj));
coll.aggregate([{$match: {$text: {$search: "black tea"}}}, {$sort: {_: {$meta: "textScore"}}}], {
allowDiskUse: true,
});
profileObj = getLatestProfilerEntry(testDB);
assert.eq(profileObj.usedDisk, true, tojson(profileObj));
assert.gt(profileObj.textOrSpills, 0, tojson(profileObj));
assert.gt(profileObj.textOrSpilledBytes, 0, tojson(profileObj));
assert.gt(profileObj.textOrSpilledRecords, 0, tojson(profileObj));
assert.gt(profileObj.textOrSpilledDataStorageSize, 0, tojson(profileObj));
assert.gt(profileObj.sortSpills, 0, tojson(profileObj));
assert.gt(profileObj.sortSpilledBytes, 0, tojson(profileObj));
assert.gt(profileObj.sortSpilledRecords, 0, tojson(profileObj));
assert.gt(profileObj.sortSpilledDataStorageSize, 0, tojson(profileObj));
//
// Confirm that usedDisk is correctly detected for the $graphLookup stage.
//
coll.drop();
assert.commandWorked(
coll.insertMany([
{_id: 1, to: [2, 3]},
{_id: 2, to: [4, 5]},
{_id: 3, to: [6, 7]},
{_id: 4},
{_id: 5},
{_id: 6},
{_id: 7},
]),
);
assert.commandWorked(testDB.adminCommand({setParameter: 1, internalDocumentSourceGraphLookupMaxMemoryBytes: 1}));
const graphLookupStage = {
$graphLookup: {
from: "coll",
startWith: 1,
connectFromField: "to",
connectToField: "_id",
as: "path",
depthField: "depth",
},
};
coll.aggregate([{$limit: 1}, graphLookupStage], {allowDiskUse: true});
profileObj = getLatestProfilerEntry(testDB);
assert.eq(profileObj.usedDisk, true, tojson(profileObj));
assert.gt(profileObj.graphLookupSpills, 0, tojson(profileObj));
assert.gt(profileObj.graphLookupSpilledBytes, 0, tojson(profileObj));
assert.gt(profileObj.graphLookupSpilledRecords, 0, tojson(profileObj));
assert.gt(profileObj.graphLookupSpilledDataStorageSize, 0, tojson(profileObj));
coll.aggregate([{$limit: 1}, graphLookupStage, {$unwind: "$path"}], {allowDiskUse: true});
profileObj = getLatestProfilerEntry(testDB);
assert.eq(profileObj.usedDisk, true, tojson(profileObj));
assert.gt(profileObj.graphLookupSpills, 0, tojson(profileObj));
assert.gt(profileObj.graphLookupSpilledBytes, 0, tojson(profileObj));
assert.gt(profileObj.graphLookupSpilledRecords, 0, tojson(profileObj));
assert.gt(profileObj.graphLookupSpilledDataStorageSize, 0, tojson(profileObj));
//
// Confirm that usedDisk is correctly detected for the $setWindowFields stage.
//
const setWindowFieldsPipeline = [{$setWindowFields: {sortBy: {a: 1}, output: {as: {$addToSet: "$a"}}}}];
function getSetWindowFieldsMemoryLimit() {
const explain = coll.explain().aggregate(setWindowFieldsPipeline);
// If $setWindowFields was pushed down to SBE, set a lower limit. We can't set it to 1 byte
// for Classic because DocumentSourceSetWindowFields will fail if it still doesn't fit into
// memory limit after spilling.
if (getPlanStage(getWinningPlanFromExplain(explain), "WINDOW")) {
return 1;
} else {
return 500;
}
}
assert.commandWorked(
testDB.adminCommand({
setParameter: 1,
internalDocumentSourceSetWindowFieldsMaxMemoryBytes: getSetWindowFieldsMemoryLimit(),
}),
);
resetCollection();
coll.aggregate(setWindowFieldsPipeline, {allowDiskUse: true});
profileObj = getLatestProfilerEntry(testDB);
assert.eq(profileObj.usedDisk, true, tojson(profileObj));
assert.gt(profileObj.setWindowFieldsSpills, 0, tojson(profileObj));
assert.gt(profileObj.setWindowFieldsSpilledBytes, 0, tojson(profileObj));
assert.gt(profileObj.setWindowFieldsSpilledRecords, 0, tojson(profileObj));
assert.gt(profileObj.setWindowFieldsSpilledDataStorageSize, 0, tojson(profileObj));
assert.gt(profileObj.sortSpills, 0, tojson(profileObj));
assert.gt(profileObj.sortSpilledBytes, 0, tojson(profileObj));
assert.gt(profileObj.sortSpilledRecords, 0, tojson(profileObj));
assert.gt(profileObj.sortSpilledDataStorageSize, 0, tojson(profileObj));
//
// Confirm that usedDisk is correctly detected for the $lookup stage with a subsequent $unwind.
//
resetCollection();
resetForeignCollection();
coll.aggregate(
[{$lookup: {let: {var1: "$a"}, pipeline: [{$sort: {a: 1}}], from: "foreign", as: "same"}}, {$unwind: "$same"}],
{allowDiskUse: true},
);
profileObj = getLatestProfilerEntry(testDB);
assert.eq(profileObj.usedDisk, true, tojson(profileObj));
//
// Confirm that usedDisk is correctly detected for the $lookup stage without a subsequent
// $unwind.
//
resetCollection();
resetForeignCollection();
coll.aggregate([{$lookup: {let: {var1: "$a"}, pipeline: [{$sort: {a: 1}}], from: "foreign", as: "same"}}], {
allowDiskUse: true,
});
profileObj = getLatestProfilerEntry(testDB);
assert.eq(profileObj.usedDisk, true, tojson(profileObj));
//
// Confirm that usedDisk is correctly detected when $limit is set after the $lookup stage.
//
resetCollection();
resetForeignCollection();
coll.aggregate(
[
{$lookup: {let: {var1: "$a"}, pipeline: [{$sort: {a: 1}}], from: "foreign", as: "same"}},
{$unwind: "$same"},
{$limit: 3},
],
{allowDiskUse: true},
);
profileObj = getLatestProfilerEntry(testDB);
assert.eq(profileObj.usedDisk, true, tojson(profileObj));
//
// Confirm that usedDisk is correctly detected when $limit is set before the $lookup stage.
//
resetCollection();
resetForeignCollection();
coll.aggregate(
[
{$limit: 1},
{$lookup: {let: {var1: "$a"}, pipeline: [{$sort: {a: 1}}], from: "foreign", as: "same"}},
{$unwind: "$same"},
],
{allowDiskUse: true},
);
profileObj = getLatestProfilerEntry(testDB);
assert.eq(profileObj.usedDisk, true, tojson(profileObj));
//
// Test that usedDisk is not set for a $lookup with a pipeline that does not use disk.
//
assert.commandWorked(
testDB.adminCommand({setParameter: 1, internalQueryMaxBlockingSortMemoryUsageBytes: 100 * 1024 * 1024}),
);
resetCollection();
resetForeignCollection();
coll.aggregate([{$lookup: {let: {var1: "$a"}, pipeline: [{$sort: {a: 1}}], from: "otherTest", as: "same"}}], {
allowDiskUse: true,
});
profileObj = getLatestProfilerEntry(testDB);
assert(!profileObj.hasOwnProperty("usedDisk"), tojson(profileObj));
assert.commandWorked(
testDB.adminCommand({
setParameter: 1,
internalQuerySlotBasedExecutionHashLookupApproxMemoryUseInBytesBeforeSpill: 1,
}),
);
assert.commandWorked(
testDB.adminCommand({setParameter: 1, internalQuerySlotBasedExecutionHashAggIncreasedSpilling: "never"}),
);
function checkHashLookup(pipeline) {
// HashLookup spills only in SBE
const explain = coll.explain().aggregate(pipeline);
if (getAggPlanStages(explain, "EQ_LOOKUP_UNWIND").length > 0 || getAggPlanStages(explain, "EQ_LOOKUP").length > 0) {
coll.aggregate(pipeline, {allowDiskUse: true});
const profileObj = getLatestProfilerEntry(testDB);
assert.eq(profileObj.usedDisk, true, tojson(profileObj));
assert.gt(profileObj.hashLookupSpills, 0, tojson(profileObj));
assert.gt(profileObj.hashLookupSpilledBytes, 0, tojson(profileObj));
assert.gt(profileObj.hashLookupSpilledRecords, 0, tojson(profileObj));
assert.gt(profileObj.hashLookupSpilledDataStorageSize, 0, tojson(profileObj));
}
}
const lookupPipeline = [{$lookup: {from: "foreign", localField: "a", foreignField: "b", as: "same"}}];
checkHashLookup(lookupPipeline);
const lookupUnwindPipeline = [
{$lookup: {from: "foreign", localField: "a", foreignField: "b", as: "same"}},
{$unwind: "$same"},
{$project: {same: 1}},
];
checkHashLookup(lookupUnwindPipeline);
//
// Test that aggregate command fails when 'allowDiskUse:false' because of insufficient available
// memory to perform group.
//
assert.throws(() =>
coll.aggregate([{$unionWith: {coll: "foreign", pipeline: [{$group: {"_id": {$avg: "$b"}}}]}}], {
allowDiskUse: false,
}),
);
//
// Test that the above command succeeds with 'allowDiskUse:true'. 'usedDisk' is correctly detected
// when a sub-pipeline of $unionWith stage uses disk.
//
resetCollection();
resetForeignCollection();
coll.aggregate([{$unionWith: {coll: "foreign", pipeline: [{$group: {"_id": {$avg: "$b"}}}]}}], {allowDiskUse: true});
profileObj = getLatestProfilerEntry(testDB);
assert.eq(profileObj.usedDisk, true, tojson(profileObj));
//
// Test that usedDisk is not set for a $unionWith with a sub-pipeline that does not use disk.
//
coll.aggregate([{$unionWith: {coll: "foreign", pipeline: [{$sort: {b: 1}}]}}], {allowDiskUse: true});
profileObj = getLatestProfilerEntry(testDB);
assert(!profileObj.usedDisk, tojson(profileObj));
coll.aggregate([{$unionWith: {coll: "foreign", pipeline: [{$match: {a: 1}}]}}], {allowDiskUse: true});
profileObj = getLatestProfilerEntry(testDB);
assert(!profileObj.usedDisk, tojson(profileObj));
MongoRunner.stopMongod(conn);
//
// Tests on a sharded cluster.
//
const st = new ShardingTest({shards: 2});
const shardedDB = st.s.getDB(jsTestName());
assert.commandWorked(st.s0.adminCommand({enableSharding: shardedDB.getName(), primaryShard: st.shard0.shardName}));
const shardedSourceColl = shardedDB.coll1;
const shardedForeignColl = shardedDB.coll2;
const shard0DB = st.shard0.getDB(jsTestName());
const shard1DB = st.shard1.getDB(jsTestName());
// Shard 'shardedSourceColl' and 'shardedForeignColl' on {x:1}, split it at {x:0}, and move
// chunk {x:0} to shard1.
st.shardColl(shardedSourceColl, {x: 1}, {x: 0}, {x: 0});
st.shardColl(shardedForeignColl, {x: 1}, {x: 0}, {x: 0});
// Insert few documents on each shard.
for (let i = 0; i < 10; ++i) {
assert.commandWorked(shardedSourceColl.insert({x: i}));
assert.commandWorked(shardedSourceColl.insert({x: -i}));
assert.commandWorked(shardedForeignColl.insert({x: i}));
assert.commandWorked(shardedForeignColl.insert({x: -i}));
assert.commandWorked(shardedDB.unshardedColl.insert({x: i}));
}
// Restart profiler.
function restartProfiler() {
for (let shardDB of [shard0DB, shard1DB]) {
shardDB.setProfilingLevel(0);
shardDB.system.profile.drop();
// Enable profiling and changes the 'slowms' threshold to -1ms. This will log all the
// commands.
shardDB.setProfilingLevel(2, -1);
}
}
assert.commandWorked(shard0DB.adminCommand({setParameter: 1, internalDocumentSourceGroupMaxMemoryBytes: 10}));
restartProfiler();
// Test that 'usedDisk' doesn't get populated on the profiler entry of the base pipeline, when the
// $unionWith'd pipeline needs to use disk on a sharded collection.
assert.commandWorked(
shardedDB.runCommand({
aggregate: shardedSourceColl.getName(),
pipeline: [
{
$unionWith: {coll: shardedForeignColl.getName(), pipeline: [{$group: {"_id": {$avg: "$x"}}}]},
},
],
cursor: {},
allowDiskUse: true,
}),
);
// Verify that the $unionWith'd pipeline always has the profiler entry.
profilerHasSingleMatchingEntryOrThrow({
profileDB: shard0DB,
filter: {"command.getMore": {$exists: true}, usedDisk: true, ns: shardedForeignColl.getFullName()},
});
// If the $mergeCursor is ran on the shard0DB, then the profiler entry should have the 'usedDisk'
// set.
if (
shard0DB.system.profile
.find({
ns: shardedSourceColl.getFullName(),
"command.pipeline.$mergeCursors": {$exists: true},
})
.itcount() > 0
) {
profilerHasSingleMatchingEntryOrThrow({
profileDB: shard0DB,
filter: {
"command.pipeline.$mergeCursors": {$exists: true},
usedDisk: true,
ns: shardedSourceColl.getFullName(),
},
});
profilerHasZeroMatchingEntriesOrThrow({
profileDB: shard1DB,
filter: {usedDisk: true, ns: shardedSourceColl.getFullName()},
});
} else {
// If the $mergeCursors is ran on mongos or shard1DB, then the profiler shouldn't have the
// 'usedDisk' set.
profilerHasZeroMatchingEntriesOrThrow({
profileDB: shard0DB,
filter: {usedDisk: true, ns: shardedSourceColl.getFullName()},
});
profilerHasZeroMatchingEntriesOrThrow({
profileDB: shard1DB,
filter: {usedDisk: true, ns: shardedSourceColl.getFullName()},
});
}
// Verify that the 'usedDisk' is always set correctly on base pipeline.
restartProfiler();
assert.commandWorked(
shardedDB.runCommand({
aggregate: shardedSourceColl.getName(),
pipeline: [{$group: {"_id": {$avg: "$x"}}}, {$unionWith: {coll: shardedForeignColl.getName(), pipeline: []}}],
cursor: {},
allowDiskUse: true,
}),
);
profilerHasSingleMatchingEntryOrThrow({
profileDB: shard0DB,
filter: {"command.getMore": {$exists: true}, usedDisk: true, ns: shardedSourceColl.getFullName()},
});
profilerHasZeroMatchingEntriesOrThrow({
profileDB: shard0DB,
filter: {usedDisk: true, ns: shardedForeignColl.getFullName()},
});
// Set the 'internalDocumentSourceGroupMaxMemoryBytes' to a higher value so that st.stop()
// doesn't fail.
assert.commandWorked(
shard0DB.adminCommand({setParameter: 1, internalDocumentSourceGroupMaxMemoryBytes: 100 * 1024 * 1024}),
);
st.stop();