mirror of https://github.com/mongodb/mongo
341 lines
13 KiB
JavaScript
341 lines
13 KiB
JavaScript
import {arrayEq} from "jstests/aggregation/extras/utils.js";
|
|
import {getAggPlanStages} from "jstests/libs/query/analyze_plan.js";
|
|
import {FeatureFlagUtil} from "jstests/libs/feature_flag_util.js";
|
|
|
|
const maxTotalMemoryUsageBytesProp = "maxTotalMemoryUsageBytes";
|
|
const peakTrackedMemBytesProp = "peakTrackedMemBytes";
|
|
|
|
/**
|
|
* Create a collection of tickers and prices.
|
|
*/
|
|
export function seedWithTickerData(coll, docsPerTicker) {
|
|
for (let i = 0; i < docsPerTicker; i++) {
|
|
assert.commandWorked(coll.insert({_id: i, partIndex: i, ticker: "T1", price: 500 - i * 10}));
|
|
|
|
assert.commandWorked(coll.insert({_id: i + docsPerTicker, partIndex: i, ticker: "T2", price: 400 + i * 10}));
|
|
}
|
|
}
|
|
|
|
export function forEachPartitionCase(callback) {
|
|
callback(null);
|
|
callback("$ticker");
|
|
callback({$toLower: "$ticker"});
|
|
}
|
|
|
|
export const documentBounds = [
|
|
["unbounded", 0],
|
|
["unbounded", -1],
|
|
["unbounded", 1],
|
|
["unbounded", 5],
|
|
["unbounded", "unbounded"],
|
|
[0, "unbounded"],
|
|
[0, 1],
|
|
["current", 2],
|
|
[0, 5],
|
|
["current", 4],
|
|
[-1, 1],
|
|
[-2, "unbounded"],
|
|
[1, "unbounded"],
|
|
[-3, -1],
|
|
[1, 3],
|
|
[-2, 3],
|
|
];
|
|
|
|
export function forEachDocumentBoundsCombo(callback) {
|
|
documentBounds.forEach(function (bounds, index) {
|
|
let boundsCombo = [bounds];
|
|
for (let j = index + 1; j < documentBounds.length; j++) {
|
|
boundsCombo.push(documentBounds[j]);
|
|
callback(boundsCombo);
|
|
boundsCombo.pop();
|
|
}
|
|
});
|
|
|
|
// Add a few combinations that test 3+ bounds.
|
|
callback([
|
|
["unbounded", "unbounded"],
|
|
["unbounded", 0],
|
|
["unbounded", -1],
|
|
]);
|
|
callback([
|
|
[-1, 1],
|
|
[-1, 0],
|
|
["unbounded", -1],
|
|
]);
|
|
callback([
|
|
[2, 3],
|
|
[2, 3],
|
|
["unbounded", -1],
|
|
]);
|
|
callback([
|
|
[-5, -5],
|
|
[-5, -5],
|
|
[-5, -5],
|
|
]);
|
|
}
|
|
|
|
/**
|
|
* Runs a pipeline containing a $group that computes the window-function equivalent using the
|
|
* given partition key, accumulator, index in the current partition, and bounds.
|
|
*
|
|
* The bounds are an inclusive range [lower, upper], specified as values relative to the current
|
|
* offset in the partition. They can be numeric, "current", or "unbounded".
|
|
*
|
|
* The skip/limit values are calculated from the given bounds and the current index.
|
|
*
|
|
* 'accumSpec' is the specification of accumulator being tested and is an object of the form
|
|
* {accumulatorName: <accumulator arguments>}.
|
|
*
|
|
* 'defaultValue' is used in cases when the skip/limit combination result in $group not getting any
|
|
* documents. The most likely scenario is that the window has gone off the side of the partition.
|
|
*
|
|
* Note that this function assumes that the data in 'coll' has been seeded with the documents from
|
|
* the seedWithTickerData() method above.
|
|
*/
|
|
export function computeAsGroup({coll, partitionKey, accumSpec, bounds, indexInPartition, defaultValue = null}) {
|
|
const skip = calculateSkip(bounds[0], indexInPartition);
|
|
const limit = calculateLimit(bounds[0], bounds[1], indexInPartition);
|
|
if (skip < 0 || limit <= 0) return defaultValue;
|
|
let prefixPipe = [{$match: partitionKey}, {$sort: {_id: 1}}, {$skip: skip}];
|
|
|
|
// Only attach a $limit if there's a numeric upper bound (or "current"), since "unbounded"
|
|
// implies an infinite limit.
|
|
if (limit != "unbounded") prefixPipe = prefixPipe.concat([{$limit: limit}]);
|
|
|
|
const result = coll.aggregate(prefixPipe.concat([{$group: {_id: null, res: accumSpec}}])).toArray();
|
|
// If the window is completely off the edge of the right side of the partition, return null.
|
|
if (result.length == 0) {
|
|
return defaultValue;
|
|
}
|
|
return result[0].res;
|
|
}
|
|
|
|
/**
|
|
* Helper to calculate the correct skip based on the lowerBound given.
|
|
*/
|
|
export function calculateSkip(lowerBound, indexInPartition) {
|
|
let skipValueToUse = 0;
|
|
if (lowerBound === "current") {
|
|
skipValueToUse = indexInPartition;
|
|
} else if (lowerBound === "unbounded") {
|
|
skipValueToUse = 0;
|
|
} else {
|
|
skipValueToUse = indexInPartition + lowerBound;
|
|
if (skipValueToUse < 0) {
|
|
skipValueToUse = 0;
|
|
}
|
|
}
|
|
return skipValueToUse;
|
|
}
|
|
|
|
/**
|
|
* Helper to calculate the correct limit based on the bounds given.
|
|
*/
|
|
export function calculateLimit(lowerBound, upperBound, indexInPartition) {
|
|
let limitValueToUse = "unbounded";
|
|
if (upperBound === "current") {
|
|
if (lowerBound === "unbounded") {
|
|
limitValueToUse = indexInPartition + 1;
|
|
} else if (lowerBound === "current") {
|
|
limitValueToUse = 1;
|
|
} else {
|
|
limitValueToUse = Math.abs(lowerBound) + 1;
|
|
}
|
|
} else if (upperBound !== "unbounded") {
|
|
// Keep unbounded as is to not add a limit stage at all later.
|
|
if (lowerBound < 0) {
|
|
// If we don't have a full window in this partition yet.
|
|
if (Math.abs(lowerBound) > indexInPartition) {
|
|
// Either take all documents we've seen if our right bound is also negative, or only
|
|
// do look ahead.
|
|
limitValueToUse = upperBound <= 0 ? indexInPartition : indexInPartition + upperBound + 1;
|
|
} else {
|
|
limitValueToUse = Math.abs(lowerBound) + upperBound + 1;
|
|
}
|
|
} else {
|
|
if (lowerBound === "unbounded") {
|
|
// Only base upper limit on look ahead.
|
|
limitValueToUse = indexInPartition + upperBound + 1;
|
|
} else if (lowerBound === "current") {
|
|
limitValueToUse = upperBound + 1;
|
|
} else {
|
|
// Sliding window uses both bounds for limit.
|
|
limitValueToUse = upperBound - lowerBound + 1;
|
|
}
|
|
}
|
|
}
|
|
return limitValueToUse;
|
|
}
|
|
|
|
export function assertResultsEqual(wfRes, index, groupRes, accum) {
|
|
// On DEBUG builds, the computed $group may be slightly different due to precision
|
|
// loss when spilling to disk.
|
|
// TODO SERVER-42616: Enable the exact check for $stdDevPop/Samp.
|
|
if (accum == "$stdDevSamp" || accum == "$stdDevPop") {
|
|
assert.close(
|
|
groupRes,
|
|
wfRes.res,
|
|
"Window function $stdDev result for index " + index + ": " + tojson(wfRes),
|
|
10 /* 10 decimal places */,
|
|
);
|
|
} else if (accum == "$addToSet") {
|
|
// Order doesn't matter for $addToSet.
|
|
assert(
|
|
arrayEq(groupRes, wfRes.res),
|
|
"Window function $addToSet results for index " +
|
|
index +
|
|
": " +
|
|
tojson(wfRes) +
|
|
"\nexpected:\n " +
|
|
tojson(groupRes),
|
|
);
|
|
} else assert.eq(groupRes, wfRes.res, "Window function result for index " + index + ": " + tojson(wfRes));
|
|
}
|
|
|
|
export function assertExplainResult(explainResult) {
|
|
const stages = getAggPlanStages(explainResult, "$_internalSetWindowFields");
|
|
for (let stage of stages) {
|
|
assert(stage.hasOwnProperty("$_internalSetWindowFields"), stage);
|
|
|
|
assert(stage.hasOwnProperty("maxFunctionMemoryUsageBytes"), stage);
|
|
const maxFunctionMemUsages = stage["maxFunctionMemoryUsageBytes"];
|
|
for (let field of Object.keys(maxFunctionMemUsages)) {
|
|
assert.gte(maxFunctionMemUsages[field], 0, "invalid memory usage for '" + field + "': " + tojson(stage));
|
|
}
|
|
|
|
let totalMemoryUsage = getTotalMemoryUsageFromStageExplainOutput(stage);
|
|
assert.gt(totalMemoryUsage, 0, "Incorrect total mem usage: " + tojson(stage));
|
|
|
|
// No test should be using more than 1GB of memory. This is mostly a sanity check that
|
|
// integer underflow doesn't occur.
|
|
assert.lt(totalMemoryUsage, 1 * 1024 * 1024 * 1024, "Incorrect total mem usage: " + tojson(stage));
|
|
}
|
|
}
|
|
|
|
export function getTotalMemoryUsageFromStageExplainOutput(stage) {
|
|
let totalMemoryUsage = null;
|
|
|
|
// Only maxTotalMemoryUsageBytes or peakTrackedMemBytes could be present, not both.
|
|
// If QueryMemoryTracking feature flag is enabled, we expect to see peakTrackedMemBytes.
|
|
// If feature flag is disabled, we expect to see maxTotalMemoryUsageBytes.
|
|
if (stage.hasOwnProperty(maxTotalMemoryUsageBytesProp)) {
|
|
assert(!stage.hasOwnProperty(peakTrackedMemBytesProp), stage);
|
|
|
|
totalMemoryUsage = stage[maxTotalMemoryUsageBytesProp];
|
|
} else {
|
|
assert(stage.hasOwnProperty(peakTrackedMemBytesProp), stage);
|
|
totalMemoryUsage = stage[peakTrackedMemBytesProp];
|
|
}
|
|
|
|
return totalMemoryUsage;
|
|
}
|
|
|
|
export function assertExplainMemoryTracking(stage) {
|
|
// Only maxTotalMemoryUsageBytes or peakTrackedMemBytes could be present, not both.
|
|
const queryMemoryTrackingEnabled = FeatureFlagUtil.isPresentAndEnabled(db, "QueryMemoryTracking");
|
|
if (stage.hasOwnProperty(maxTotalMemoryUsageBytesProp)) {
|
|
assert(
|
|
!queryMemoryTrackingEnabled,
|
|
maxTotalMemoryUsageBytesProp +
|
|
" should not appear in explain output when query memory tracking is enabled: " +
|
|
tojson(stage),
|
|
);
|
|
} else {
|
|
assert(
|
|
queryMemoryTrackingEnabled,
|
|
peakTrackedMemBytesProp +
|
|
" should appear in explain output when query memory tracking is enabled: " +
|
|
tojson(stage),
|
|
);
|
|
}
|
|
}
|
|
|
|
/**
|
|
* Runs the given 'accum' as both a window function and its equivalent accumulator in $group across
|
|
* various combinations of partitioning and window bounds. Asserts that the results are consistent.
|
|
*
|
|
* Note that this function assumes that the documents in 'coll' were initialized using the
|
|
* seedWithTickerData() method above.
|
|
*/
|
|
export function testAccumAgainstGroup(coll, accum, onNoResults = null, accumArgs = "$price") {
|
|
const accumSpec = {[accum]: accumArgs};
|
|
forEachPartitionCase(function (partition) {
|
|
documentBounds.forEach(function (bounds, index) {
|
|
jsTestLog(
|
|
"Testing accumulator " +
|
|
tojson(accumSpec) +
|
|
" against " +
|
|
tojson(partition) +
|
|
" partition and [" +
|
|
bounds +
|
|
"] bounds",
|
|
);
|
|
|
|
let outputSpec = {window: {documents: bounds}};
|
|
Object.assign(outputSpec, accumSpec);
|
|
const pipeline = [
|
|
{
|
|
$setWindowFields: {partitionBy: partition, sortBy: {_id: 1}, output: {res: outputSpec}},
|
|
},
|
|
];
|
|
const wfResults = coll.aggregate(pipeline, {allowDiskUse: true}).toArray();
|
|
for (let index = 0; index < wfResults.length; index++) {
|
|
const wfRes = wfResults[index];
|
|
|
|
let indexInPartition = partition === null ? index : wfRes.partIndex;
|
|
let groupRes;
|
|
if (partition == null) {
|
|
groupRes = computeAsGroup({
|
|
coll: coll,
|
|
partitionKey: {},
|
|
accumSpec: accumSpec,
|
|
bounds: bounds,
|
|
indexInPartition: indexInPartition,
|
|
defaultValue: onNoResults,
|
|
});
|
|
} else {
|
|
groupRes = computeAsGroup({
|
|
coll: coll,
|
|
partitionKey: {ticker: wfRes.ticker},
|
|
accumSpec: accumSpec,
|
|
bounds: bounds,
|
|
indexInPartition: indexInPartition,
|
|
defaultValue: onNoResults,
|
|
});
|
|
}
|
|
|
|
assertResultsEqual(wfRes, index, groupRes, accum);
|
|
}
|
|
|
|
// Run the same pipeline with explain verbosity "executionStats" and verify that the
|
|
// reported metrics are sensible.
|
|
assertExplainResult(coll.explain("executionStats").aggregate(pipeline, {allowDiskUse: true}));
|
|
|
|
jsTestLog("Done");
|
|
});
|
|
|
|
// To get additional coverage, specifically regarding the expiration policy, test
|
|
// combinations of various window types in the same $setWindowFields stage. This is more of
|
|
// a fuzz test so no need to check results.
|
|
forEachDocumentBoundsCombo(function (arrayOfBounds) {
|
|
jsTestLog(
|
|
"Testing accumulator " + tojson(accumSpec) + " against multiple bounds: " + tojson(arrayOfBounds),
|
|
);
|
|
let baseSpec = {
|
|
partitionBy: partition,
|
|
sortBy: {_id: 1},
|
|
};
|
|
let outputFields = {};
|
|
arrayOfBounds.forEach(function (bounds, index) {
|
|
let outputSpec = {window: {documents: bounds}};
|
|
Object.assign(outputSpec, accumSpec);
|
|
outputFields["res" + index] = outputSpec;
|
|
});
|
|
let specWithOutput = Object.merge(baseSpec, {output: outputFields});
|
|
const wfResults = coll.aggregate([{$setWindowFields: specWithOutput}], {allowDiskUse: true}).toArray();
|
|
assert.gt(wfResults.length, 0);
|
|
jsTestLog("Done");
|
|
});
|
|
});
|
|
}
|