mongo/jstests/aggregation/exec/spill_to_disk.js

564 lines
19 KiB
JavaScript

// Tests the support for disk storage of intermediate results in aggregation.
//
// Run only when pipeline optimization is enabled, otherwise the type of sorter being used can be
// different (NoLimitSort vs TopKSort) causing an aggregation request to fail with different error
// codes.
//
// Some in memory variants will error because this test uses too much memory. As such, we do not
// run this test on in-memory variants.
//
// @tags: [
// requires_collstats,
// requires_pipeline_optimization,
// requires_persistence,
// not_allowed_with_signed_security_token,
// # During fcv upgrade/downgrade the engine might not be what we expect.
// cannot_run_during_upgrade_downgrade,
// ]
import "jstests/libs/query/sbe_assert_error_override.js";
import {arrayEq, assertArrayEq} from "jstests/aggregation/extras/utils.js";
import {FixtureHelpers} from "jstests/libs/fixture_helpers.js";
import {getSbePlanStages} from "jstests/libs/query/sbe_explain_helpers.js";
import {checkSbeRestrictedOrFullyEnabled} from "jstests/libs/query/sbe_util.js";
const coll = db.spill_to_disk;
coll.drop();
// Sets the set parameter named 'paramName' to the given 'memoryLimit' on each primary node in the
// cluster, and returns the old value.
function setMemoryParamHelper(paramName, memoryLimit) {
const commandResArr = FixtureHelpers.runCommandOnEachPrimary({
db: db.getSiblingDB("admin"),
cmdObj: {
setParameter: 1,
[paramName]: memoryLimit,
},
});
assert.gt(commandResArr.length, 0, "Setting memory limit on primaries failed");
const oldMemoryLimit = assert.commandWorked(commandResArr[0]).was;
return oldMemoryLimit;
}
// Verifies that the given 'groupStats' (an extract from SBE "executionStats" explain output) show
// evidence of spilling to disk.
function assertSpillingOccurredInSbeExplain(groupStats) {
assert(groupStats);
assert(groupStats.hasOwnProperty("usedDisk"), groupStats);
assert(groupStats.usedDisk, groupStats);
assert.gt(groupStats.spills, 0, groupStats);
assert.gt(groupStats.spilledRecords, 0, groupStats);
assert.gt(groupStats.spilledDataStorageSize, 0, groupStats);
}
const sharded = FixtureHelpers.isSharded(coll);
const memoryLimitMB = sharded ? 200 : 100;
const isSbeGroupLookupPushdownEnabled = checkSbeRestrictedOrFullyEnabled(db);
const bigStr = Array(1024 * 1024 + 1).toString(); // 1MB of ','
for (let i = 0; i < memoryLimitMB + 1; i++)
assert.commandWorked(coll.insert({_id: i, bigStr: i + bigStr, random: Math.random()}));
assert.gt(coll.stats().size, memoryLimitMB * 1024 * 1024);
function test({pipeline, expectedCodes, canSpillToDisk}) {
// Test that 'allowDiskUse: false' does indeed prevent spilling to disk.
assert.commandFailedWithCode(
db.runCommand({aggregate: coll.getName(), pipeline: pipeline, cursor: {}, allowDiskUse: false}),
expectedCodes,
);
// If this command supports spilling to disk, ensure that it will succeed when disk use is
// allowed.
const res = db.runCommand({aggregate: coll.getName(), pipeline: pipeline, cursor: {}, allowDiskUse: true});
if (canSpillToDisk) {
assert.eq(new DBCommandCursor(coll.getDB(), res).itcount(), coll.count()); // all tests output one doc per input doc
if (isSbeGroupLookupPushdownEnabled) {
const explain = db.runCommand({
explain: {aggregate: coll.getName(), pipeline: pipeline, cursor: {}, allowDiskUse: true},
});
const hashAggGroups = getSbePlanStages(explain, "group");
if (hashAggGroups.length > 0) {
assert.eq(hashAggGroups.length, 1, explain);
const hashAggGroup = hashAggGroups[0];
assertSpillingOccurredInSbeExplain(hashAggGroup);
}
}
} else {
assert.commandFailedWithCode(res, [ErrorCodes.ExceededMemoryLimit, expectedCodes]);
}
}
function setHashAggParameters(memoryLimit, atLeast) {
const memLimitCommandResArr = FixtureHelpers.runCommandOnEachPrimary({
db: db.getSiblingDB("admin"),
cmdObj: {
setParameter: 1,
internalQuerySlotBasedExecutionHashAggApproxMemoryUseInBytesBeforeSpill: memoryLimit,
},
});
assert.gt(memLimitCommandResArr.length, 0, "Setting memory limit on primaries failed.");
const oldMemoryLimit = assert.commandWorked(memLimitCommandResArr[0]).was;
const atLeastCommandResArr = FixtureHelpers.runCommandOnEachPrimary({
db: db.getSiblingDB("admin"),
cmdObj: {
setParameter: 1,
internalQuerySlotBasedExecutionHashAggMemoryCheckPerAdvanceAtLeast: atLeast,
},
});
assert.gt(atLeastCommandResArr.length, 0, "Setting atLeast limit on primaries failed.");
const oldAtLeast = assert.commandWorked(atLeastCommandResArr[0]).was;
return {memoryLimit: oldMemoryLimit, atLeast: oldAtLeast};
}
function testWithHashAggMemoryLimit({pipeline, expectedCodes, canSpillToDisk, memoryLimit}) {
// If a test sets a specific memory limit, we should do more frequent checks to respect it.
const oldSettings = setHashAggParameters(memoryLimit, 1 /*atLEast*/);
try {
test({pipeline, expectedCodes, canSpillToDisk});
} finally {
setHashAggParameters(oldSettings.memoryLimit, oldSettings.atLeast);
}
}
testWithHashAggMemoryLimit({
pipeline: [{$group: {_id: "$_id", bigStr: {$min: "$bigStr"}}}],
expectedCodes: ErrorCodes.QueryExceededMemoryLimitNoDiskUseAllowed,
canSpillToDisk: true,
memoryLimit: 1024,
});
// Sorting with _id would use index which doesn't require external sort, so sort by 'random'
// instead.
test({
pipeline: [{$sort: {random: 1}}],
expectedCodes: ErrorCodes.QueryExceededMemoryLimitNoDiskUseAllowed,
canSpillToDisk: true,
});
test({
pipeline: [{$sort: {bigStr: 1}}], // big key and value
expectedCodes: ErrorCodes.QueryExceededMemoryLimitNoDiskUseAllowed,
canSpillToDisk: true,
});
// Test that sort + large limit won't crash the server (SERVER-10136)
test({
pipeline: [{$sort: {bigStr: 1}}, {$limit: 1000 * 1000 * 1000}],
expectedCodes: ErrorCodes.QueryExceededMemoryLimitNoDiskUseAllowed,
canSpillToDisk: true,
});
// Test combining two external sorts in both same and different orders.
test({
pipeline: [{$group: {_id: "$_id", bigStr: {$min: "$bigStr"}}}, {$sort: {_id: 1}}],
expectedCodes: ErrorCodes.QueryExceededMemoryLimitNoDiskUseAllowed,
canSpillToDisk: true,
});
test({
pipeline: [{$group: {_id: "$_id", bigStr: {$min: "$bigStr"}}}, {$sort: {_id: -1}}],
expectedCodes: ErrorCodes.QueryExceededMemoryLimitNoDiskUseAllowed,
canSpillToDisk: true,
});
test({
pipeline: [{$group: {_id: "$_id", bigStr: {$min: "$bigStr"}}}, {$sort: {random: 1}}],
expectedCodes: ErrorCodes.QueryExceededMemoryLimitNoDiskUseAllowed,
canSpillToDisk: true,
});
test({
pipeline: [{$sort: {random: 1}}, {$group: {_id: "$_id", bigStr: {$first: "$bigStr"}}}],
expectedCodes: ErrorCodes.QueryExceededMemoryLimitNoDiskUseAllowed,
canSpillToDisk: true,
});
// Test accumulating all values into one array. On debug builds we will spill to disk for $group and
// so may hit the group error code before we hit ExceededMemoryLimit.
test({
pipeline: [{$group: {_id: null, bigArray: {$push: "$bigStr"}}}],
expectedCodes: [ErrorCodes.QueryExceededMemoryLimitNoDiskUseAllowed, ErrorCodes.ExceededMemoryLimit],
canSpillToDisk: false,
});
test({
pipeline: [{$group: {_id: null, bigArray: {$addToSet: {$concat: ["$bigStr", {$toString: "$_id"}]}}}}],
expectedCodes: [ErrorCodes.QueryExceededMemoryLimitNoDiskUseAllowed, ErrorCodes.ExceededMemoryLimit],
canSpillToDisk: false,
});
for (const op of ["$firstN", "$lastN", "$minN", "$maxN", "$topN", "$bottomN"]) {
jsTestLog("Testing op " + op);
let spec = {n: 100000000};
if (op === "$topN" || op === "$bottomN") {
spec["sortBy"] = {random: 1};
spec["output"] = "$bigStr";
} else {
// $firstN/$lastN/$minN/$maxN accept 'input'.
spec["input"] = "$bigStr";
}
// By grouping all of the entries in the same group, it is the case that we will either
// exceed the per group limit for the 'n' family of accumulators, or the total $group
// limit when disk use is disabled. Hence, we allow both possible error codes. Also note
// that we configure 'canSpillToDisk' to be false because spilling to disk will not
// reduce the memory consumption of our group in this case.
test({
pipeline: [{$group: {_id: null, bigArray: {[op]: spec}}}],
expectedCodes: [ErrorCodes.QueryExceededMemoryLimitNoDiskUseAllowed, ErrorCodes.ExceededMemoryLimit],
canSpillToDisk: false,
});
// Because each group uses less than the configured limit, but cumulatively they exceed
// the limit for $group, we only check for 'QueryExceededMemoryLimitNoDiskUseAllowed'
// when disk use is disabled.
test({
pipeline: [{$group: {_id: "$_id", bigArray: {[op]: spec}}}],
expectedCodes: [ErrorCodes.QueryExceededMemoryLimitNoDiskUseAllowed],
canSpillToDisk: true,
});
}
// don't leave large collection laying around
assert(coll.drop());
// Test spilling to disk for various accumulators in a $group stage . The data has 5 groups of 10
// documents each. We configure a low memory limit for SBE's hash aggregation stage in order to
// encourage spilling.
const numGroups = 5;
const docsPerGroup = 10;
let counter = 0;
for (let i = 0; i < numGroups; ++i) {
for (let j = 0; j < docsPerGroup; ++j) {
const doc = {
_id: counter++,
a: i,
b: 100 * i + j,
c: 100 * i + (j % 5),
obj: {a: i, b: j},
random: Math.random(),
};
assert.commandWorked(coll.insert(doc));
}
}
function setHashGroupMemoryParameters(memoryLimit) {
return setMemoryParamHelper("internalQuerySlotBasedExecutionHashAggApproxMemoryUseInBytesBeforeSpill", memoryLimit);
}
// Runs a group query containing the given 'accumulator' after sorting the data by the given
// 'sortInputBy' field. Then verifies that the query results are equal to 'expectedOutput'. If SBE
// is enabled, also runs explain and checks that the execution stats show that spilling occurred.
function testAccumulator({accumulator, sortInputBy, expectedOutput, ignoreArrayOrder = false}) {
const pipeline = [{$sort: {[sortInputBy]: 1}}, {$group: {_id: "$a", acc: accumulator}}, {$sort: {_id: 1}}];
const results = coll.aggregate(pipeline).toArray();
if (ignoreArrayOrder) {
assert(arrayEq(results, expectedOutput));
} else {
assert.eq(results, expectedOutput);
}
if (isSbeGroupLookupPushdownEnabled) {
const explain = coll.explain("executionStats").aggregate(pipeline);
const groupStages = getSbePlanStages(explain, "group");
assert.eq(groupStages.length, 1, groupStages);
assertSpillingOccurredInSbeExplain(groupStages[0]);
}
}
function testSpillingForVariousAccumulators() {
testAccumulator({
accumulator: {$first: "$b"},
sortInputBy: "_id",
expectedOutput: [
{_id: 0, acc: 0},
{_id: 1, acc: 100},
{_id: 2, acc: 200},
{_id: 3, acc: 300},
{_id: 4, acc: 400},
],
});
testAccumulator({
accumulator: {$last: "$b"},
sortInputBy: "_id",
expectedOutput: [
{_id: 0, acc: 9},
{_id: 1, acc: 109},
{_id: 2, acc: 209},
{_id: 3, acc: 309},
{_id: 4, acc: 409},
],
});
testAccumulator({
accumulator: {$min: "$b"},
sortInputBy: "random",
expectedOutput: [
{_id: 0, acc: 0},
{_id: 1, acc: 100},
{_id: 2, acc: 200},
{_id: 3, acc: 300},
{_id: 4, acc: 400},
],
});
testAccumulator({
accumulator: {$max: "$b"},
sortInputBy: "random",
expectedOutput: [
{_id: 0, acc: 9},
{_id: 1, acc: 109},
{_id: 2, acc: 209},
{_id: 3, acc: 309},
{_id: 4, acc: 409},
],
});
testAccumulator({
accumulator: {$sum: "$b"},
sortInputBy: "random",
expectedOutput: [
{_id: 0, acc: 45},
{_id: 1, acc: 1045},
{_id: 2, acc: 2045},
{_id: 3, acc: 3045},
{_id: 4, acc: 4045},
],
});
testAccumulator({
accumulator: {$avg: "$b"},
sortInputBy: "random",
expectedOutput: [
{_id: 0, acc: 4.5},
{_id: 1, acc: 104.5},
{_id: 2, acc: 204.5},
{_id: 3, acc: 304.5},
{_id: 4, acc: 404.5},
],
});
testAccumulator({
accumulator: {$addToSet: "$c"},
sortInputBy: "random",
expectedOutput: [
{_id: 0, acc: [0, 1, 2, 3, 4]},
{_id: 1, acc: [100, 101, 102, 103, 104]},
{_id: 2, acc: [200, 201, 202, 203, 204]},
{_id: 3, acc: [300, 301, 302, 303, 304]},
{_id: 4, acc: [400, 401, 402, 403, 404]},
],
// Since the accumulator produces sets, the resulting arrays may be in any order.
ignoreArrayOrder: true,
});
testAccumulator({
accumulator: {$push: "$c"},
sortInputBy: "_id",
expectedOutput: [
{_id: 0, acc: [0, 1, 2, 3, 4, 0, 1, 2, 3, 4]},
{_id: 1, acc: [100, 101, 102, 103, 104, 100, 101, 102, 103, 104]},
{_id: 2, acc: [200, 201, 202, 203, 204, 200, 201, 202, 203, 204]},
{_id: 3, acc: [300, 301, 302, 303, 304, 300, 301, 302, 303, 304]},
{_id: 4, acc: [400, 401, 402, 403, 404, 400, 401, 402, 403, 404]},
],
});
testAccumulator({
accumulator: {$mergeObjects: "$obj"},
sortInputBy: "_id",
expectedOutput: [
{_id: 0, acc: {a: 0, b: 9}},
{_id: 1, acc: {a: 1, b: 9}},
{_id: 2, acc: {a: 2, b: 9}},
{_id: 3, acc: {a: 3, b: 9}},
{_id: 4, acc: {a: 4, b: 9}},
],
});
}
(function () {
const kMemLimit = 100;
let oldMemSettings = setHashGroupMemoryParameters(kMemLimit);
try {
testSpillingForVariousAccumulators();
} finally {
setHashGroupMemoryParameters(oldMemSettings);
}
})();
assert(coll.drop());
// Test spill to disk for $lookup
const localColl = db.lookup_spill_local_hj;
const foreignColl = db.lookup_spill_foreign_hj;
function setupCollections(localRecords, foreignRecords, foreignField) {
localColl.drop();
assert.commandWorked(localColl.insert(localRecords));
foreignColl.drop();
assert.commandWorked(foreignColl.insert(foreignRecords));
}
function setHashLookupParameters(memoryLimit) {
return setMemoryParamHelper(
"internalQuerySlotBasedExecutionHashLookupApproxMemoryUseInBytesBeforeSpill",
memoryLimit,
);
}
/**
* Executes $lookup with multiple records in the local/foreign collections and checks that the "as"
* field for it contains documents with ids from `idsExpectedToMatch`. In addition, it checks
* whether it spills to disk as expected.
*/
function runTest_MultipleLocalForeignRecords({
testDescription,
localRecords,
localField,
foreignRecords,
foreignField,
idsExpectedToMatch,
spillsToDisk,
}) {
setupCollections(localRecords, foreignRecords, foreignField);
const pipeline = [
{
$lookup: {
from: foreignColl.getName(),
localField: localField,
foreignField: foreignField,
as: "matched",
},
},
];
const results = localColl.aggregate(pipeline, {allowDiskUse: true}).toArray();
const explain = localColl.explain("executionStats").aggregate(pipeline, {allowDiskUse: true});
const pipelineWasSplit = !!explain.splitPipeline;
// If sharding is enabled, or the pipeline was split, then '$lookup' is not pushed down to SBE.
if (isSbeGroupLookupPushdownEnabled && !sharded && !pipelineWasSplit) {
const hLookups = getSbePlanStages(explain, "hash_lookup");
assert.eq(hLookups.length, 1, explain);
const hLookup = hLookups[0];
assert(hLookup, explain);
assert(hLookup.hasOwnProperty("usedDisk"), hLookup);
assert.eq(hLookup.usedDisk, spillsToDisk, hLookup);
if (hLookup.usedDisk) {
assert.gt(hLookup.spilledRecords, 0, hLookup);
assert.gt(hLookup.spilledBytes, 0, hLookup);
}
}
assert.eq(localRecords.length, results.length);
// Extract matched foreign ids from the "matched" field.
for (let i = 0; i < results.length; i++) {
const matchedIds = results[i].matched.map((x) => x._id);
// Order of the elements within the arrays is not significant for 'assertArrayEq'.
assertArrayEq({
actual: matchedIds,
expected: idsExpectedToMatch[i],
extraErrorMsg: " **TEST** " + testDescription + " " + tojson(explain),
});
}
}
function runHashLookupSpill({memoryLimit, spillsToDisk}) {
const oldSettings = setHashLookupParameters(memoryLimit);
(function testMultipleMatchesOnSingleLocal() {
const docs = [
{_id: 0, no_a: 1},
{_id: 1, a: 1},
{_id: 2, a: [1]},
{_id: 3, a: [[1]]},
{_id: 4, a: 1},
];
runTest_MultipleLocalForeignRecords({
testDescription: "Single Local matches multiple foreign docs",
localRecords: [{_id: 0, b: 1}],
localField: "b",
foreignRecords: docs,
foreignField: "a",
idsExpectedToMatch: [[1, 2, 4]],
spillsToDisk: spillsToDisk,
});
})();
(function testMultipleMatchesOnManyLocal() {
const localDocs = [
{_id: 0, a: -1},
{_id: 1, a: 1},
{_id: 2, a: 2},
{_id: 3, a: 3},
{_id: 4, a: 3},
{_id: 5, a: 7},
];
const foreignDocs = [
{_id: 7, b: 0},
{_id: 8, b: 1},
{_id: 9, b: 2},
{_id: 10, b: 2},
{_id: 11, b: 3},
{_id: 12, b: 3},
{_id: 13, b: 3},
{_id: 14, b: 6},
];
runTest_MultipleLocalForeignRecords({
testDescription: "Multiple local matches on multiple foreign docs",
localRecords: localDocs,
localField: "a",
foreignRecords: foreignDocs,
foreignField: "b",
idsExpectedToMatch: [[], [8], [9, 10], [11, 12, 13], [11, 12, 13], []],
spillsToDisk: spillsToDisk,
});
})();
return oldSettings;
}
const oldMemSettings = assert.commandWorked(
db.adminCommand({
getParameter: 1,
internalQuerySlotBasedExecutionHashLookupApproxMemoryUseInBytesBeforeSpill: 1,
}),
).internalQuerySlotBasedExecutionHashLookupApproxMemoryUseInBytesBeforeSpill;
(function runAllDiskTest() {
try {
// Spill at one byte.
runHashLookupSpill({memoryLimit: 1, spillsToDisk: true});
} finally {
setHashLookupParameters(oldMemSettings);
}
})();
(function runMixedInMemoryAndDiskTest() {
try {
// Spill at 128 bytes.
runHashLookupSpill({memoryLimit: 128, spillsToDisk: true});
} finally {
setHashLookupParameters(oldMemSettings);
}
})();
(function runMixedAllInMemory() {
try {
// Spill at 100 mb.
runHashLookupSpill({memoryLimit: 100 * 1024 * 1024, spillsToDisk: false});
} finally {
setHashLookupParameters(oldMemSettings);
}
})();