mongo/jstests/aggregation/extras/window_function_helpers.js

341 lines
13 KiB
JavaScript

import {arrayEq} from "jstests/aggregation/extras/utils.js";
import {getAggPlanStages} from "jstests/libs/query/analyze_plan.js";
import {FeatureFlagUtil} from "jstests/libs/feature_flag_util.js";
const maxTotalMemoryUsageBytesProp = "maxTotalMemoryUsageBytes";
const peakTrackedMemBytesProp = "peakTrackedMemBytes";
/**
* Create a collection of tickers and prices.
*/
export function seedWithTickerData(coll, docsPerTicker) {
for (let i = 0; i < docsPerTicker; i++) {
assert.commandWorked(coll.insert({_id: i, partIndex: i, ticker: "T1", price: 500 - i * 10}));
assert.commandWorked(coll.insert({_id: i + docsPerTicker, partIndex: i, ticker: "T2", price: 400 + i * 10}));
}
}
export function forEachPartitionCase(callback) {
callback(null);
callback("$ticker");
callback({$toLower: "$ticker"});
}
export const documentBounds = [
["unbounded", 0],
["unbounded", -1],
["unbounded", 1],
["unbounded", 5],
["unbounded", "unbounded"],
[0, "unbounded"],
[0, 1],
["current", 2],
[0, 5],
["current", 4],
[-1, 1],
[-2, "unbounded"],
[1, "unbounded"],
[-3, -1],
[1, 3],
[-2, 3],
];
export function forEachDocumentBoundsCombo(callback) {
documentBounds.forEach(function (bounds, index) {
let boundsCombo = [bounds];
for (let j = index + 1; j < documentBounds.length; j++) {
boundsCombo.push(documentBounds[j]);
callback(boundsCombo);
boundsCombo.pop();
}
});
// Add a few combinations that test 3+ bounds.
callback([
["unbounded", "unbounded"],
["unbounded", 0],
["unbounded", -1],
]);
callback([
[-1, 1],
[-1, 0],
["unbounded", -1],
]);
callback([
[2, 3],
[2, 3],
["unbounded", -1],
]);
callback([
[-5, -5],
[-5, -5],
[-5, -5],
]);
}
/**
* Runs a pipeline containing a $group that computes the window-function equivalent using the
* given partition key, accumulator, index in the current partition, and bounds.
*
* The bounds are an inclusive range [lower, upper], specified as values relative to the current
* offset in the partition. They can be numeric, "current", or "unbounded".
*
* The skip/limit values are calculated from the given bounds and the current index.
*
* 'accumSpec' is the specification of accumulator being tested and is an object of the form
* {accumulatorName: <accumulator arguments>}.
*
* 'defaultValue' is used in cases when the skip/limit combination result in $group not getting any
* documents. The most likely scenario is that the window has gone off the side of the partition.
*
* Note that this function assumes that the data in 'coll' has been seeded with the documents from
* the seedWithTickerData() method above.
*/
export function computeAsGroup({coll, partitionKey, accumSpec, bounds, indexInPartition, defaultValue = null}) {
const skip = calculateSkip(bounds[0], indexInPartition);
const limit = calculateLimit(bounds[0], bounds[1], indexInPartition);
if (skip < 0 || limit <= 0) return defaultValue;
let prefixPipe = [{$match: partitionKey}, {$sort: {_id: 1}}, {$skip: skip}];
// Only attach a $limit if there's a numeric upper bound (or "current"), since "unbounded"
// implies an infinite limit.
if (limit != "unbounded") prefixPipe = prefixPipe.concat([{$limit: limit}]);
const result = coll.aggregate(prefixPipe.concat([{$group: {_id: null, res: accumSpec}}])).toArray();
// If the window is completely off the edge of the right side of the partition, return null.
if (result.length == 0) {
return defaultValue;
}
return result[0].res;
}
/**
* Helper to calculate the correct skip based on the lowerBound given.
*/
export function calculateSkip(lowerBound, indexInPartition) {
let skipValueToUse = 0;
if (lowerBound === "current") {
skipValueToUse = indexInPartition;
} else if (lowerBound === "unbounded") {
skipValueToUse = 0;
} else {
skipValueToUse = indexInPartition + lowerBound;
if (skipValueToUse < 0) {
skipValueToUse = 0;
}
}
return skipValueToUse;
}
/**
* Helper to calculate the correct limit based on the bounds given.
*/
export function calculateLimit(lowerBound, upperBound, indexInPartition) {
let limitValueToUse = "unbounded";
if (upperBound === "current") {
if (lowerBound === "unbounded") {
limitValueToUse = indexInPartition + 1;
} else if (lowerBound === "current") {
limitValueToUse = 1;
} else {
limitValueToUse = Math.abs(lowerBound) + 1;
}
} else if (upperBound !== "unbounded") {
// Keep unbounded as is to not add a limit stage at all later.
if (lowerBound < 0) {
// If we don't have a full window in this partition yet.
if (Math.abs(lowerBound) > indexInPartition) {
// Either take all documents we've seen if our right bound is also negative, or only
// do look ahead.
limitValueToUse = upperBound <= 0 ? indexInPartition : indexInPartition + upperBound + 1;
} else {
limitValueToUse = Math.abs(lowerBound) + upperBound + 1;
}
} else {
if (lowerBound === "unbounded") {
// Only base upper limit on look ahead.
limitValueToUse = indexInPartition + upperBound + 1;
} else if (lowerBound === "current") {
limitValueToUse = upperBound + 1;
} else {
// Sliding window uses both bounds for limit.
limitValueToUse = upperBound - lowerBound + 1;
}
}
}
return limitValueToUse;
}
export function assertResultsEqual(wfRes, index, groupRes, accum) {
// On DEBUG builds, the computed $group may be slightly different due to precision
// loss when spilling to disk.
// TODO SERVER-42616: Enable the exact check for $stdDevPop/Samp.
if (accum == "$stdDevSamp" || accum == "$stdDevPop") {
assert.close(
groupRes,
wfRes.res,
"Window function $stdDev result for index " + index + ": " + tojson(wfRes),
10 /* 10 decimal places */,
);
} else if (accum == "$addToSet") {
// Order doesn't matter for $addToSet.
assert(
arrayEq(groupRes, wfRes.res),
"Window function $addToSet results for index " +
index +
": " +
tojson(wfRes) +
"\nexpected:\n " +
tojson(groupRes),
);
} else assert.eq(groupRes, wfRes.res, "Window function result for index " + index + ": " + tojson(wfRes));
}
export function assertExplainResult(explainResult) {
const stages = getAggPlanStages(explainResult, "$_internalSetWindowFields");
for (let stage of stages) {
assert(stage.hasOwnProperty("$_internalSetWindowFields"), stage);
assert(stage.hasOwnProperty("maxFunctionMemoryUsageBytes"), stage);
const maxFunctionMemUsages = stage["maxFunctionMemoryUsageBytes"];
for (let field of Object.keys(maxFunctionMemUsages)) {
assert.gte(maxFunctionMemUsages[field], 0, "invalid memory usage for '" + field + "': " + tojson(stage));
}
let totalMemoryUsage = getTotalMemoryUsageFromStageExplainOutput(stage);
assert.gt(totalMemoryUsage, 0, "Incorrect total mem usage: " + tojson(stage));
// No test should be using more than 1GB of memory. This is mostly a sanity check that
// integer underflow doesn't occur.
assert.lt(totalMemoryUsage, 1 * 1024 * 1024 * 1024, "Incorrect total mem usage: " + tojson(stage));
}
}
export function getTotalMemoryUsageFromStageExplainOutput(stage) {
let totalMemoryUsage = null;
// Only maxTotalMemoryUsageBytes or peakTrackedMemBytes could be present, not both.
// If QueryMemoryTracking feature flag is enabled, we expect to see peakTrackedMemBytes.
// If feature flag is disabled, we expect to see maxTotalMemoryUsageBytes.
if (stage.hasOwnProperty(maxTotalMemoryUsageBytesProp)) {
assert(!stage.hasOwnProperty(peakTrackedMemBytesProp), stage);
totalMemoryUsage = stage[maxTotalMemoryUsageBytesProp];
} else {
assert(stage.hasOwnProperty(peakTrackedMemBytesProp), stage);
totalMemoryUsage = stage[peakTrackedMemBytesProp];
}
return totalMemoryUsage;
}
export function assertExplainMemoryTracking(stage) {
// Only maxTotalMemoryUsageBytes or peakTrackedMemBytes could be present, not both.
const queryMemoryTrackingEnabled = FeatureFlagUtil.isPresentAndEnabled(db, "QueryMemoryTracking");
if (stage.hasOwnProperty(maxTotalMemoryUsageBytesProp)) {
assert(
!queryMemoryTrackingEnabled,
maxTotalMemoryUsageBytesProp +
" should not appear in explain output when query memory tracking is enabled: " +
tojson(stage),
);
} else {
assert(
queryMemoryTrackingEnabled,
peakTrackedMemBytesProp +
" should appear in explain output when query memory tracking is enabled: " +
tojson(stage),
);
}
}
/**
* Runs the given 'accum' as both a window function and its equivalent accumulator in $group across
* various combinations of partitioning and window bounds. Asserts that the results are consistent.
*
* Note that this function assumes that the documents in 'coll' were initialized using the
* seedWithTickerData() method above.
*/
export function testAccumAgainstGroup(coll, accum, onNoResults = null, accumArgs = "$price") {
const accumSpec = {[accum]: accumArgs};
forEachPartitionCase(function (partition) {
documentBounds.forEach(function (bounds, index) {
jsTestLog(
"Testing accumulator " +
tojson(accumSpec) +
" against " +
tojson(partition) +
" partition and [" +
bounds +
"] bounds",
);
let outputSpec = {window: {documents: bounds}};
Object.assign(outputSpec, accumSpec);
const pipeline = [
{
$setWindowFields: {partitionBy: partition, sortBy: {_id: 1}, output: {res: outputSpec}},
},
];
const wfResults = coll.aggregate(pipeline, {allowDiskUse: true}).toArray();
for (let index = 0; index < wfResults.length; index++) {
const wfRes = wfResults[index];
let indexInPartition = partition === null ? index : wfRes.partIndex;
let groupRes;
if (partition == null) {
groupRes = computeAsGroup({
coll: coll,
partitionKey: {},
accumSpec: accumSpec,
bounds: bounds,
indexInPartition: indexInPartition,
defaultValue: onNoResults,
});
} else {
groupRes = computeAsGroup({
coll: coll,
partitionKey: {ticker: wfRes.ticker},
accumSpec: accumSpec,
bounds: bounds,
indexInPartition: indexInPartition,
defaultValue: onNoResults,
});
}
assertResultsEqual(wfRes, index, groupRes, accum);
}
// Run the same pipeline with explain verbosity "executionStats" and verify that the
// reported metrics are sensible.
assertExplainResult(coll.explain("executionStats").aggregate(pipeline, {allowDiskUse: true}));
jsTestLog("Done");
});
// To get additional coverage, specifically regarding the expiration policy, test
// combinations of various window types in the same $setWindowFields stage. This is more of
// a fuzz test so no need to check results.
forEachDocumentBoundsCombo(function (arrayOfBounds) {
jsTestLog(
"Testing accumulator " + tojson(accumSpec) + " against multiple bounds: " + tojson(arrayOfBounds),
);
let baseSpec = {
partitionBy: partition,
sortBy: {_id: 1},
};
let outputFields = {};
arrayOfBounds.forEach(function (bounds, index) {
let outputSpec = {window: {documents: bounds}};
Object.assign(outputSpec, accumSpec);
outputFields["res" + index] = outputSpec;
});
let specWithOutput = Object.merge(baseSpec, {output: outputFields});
const wfResults = coll.aggregate([{$setWindowFields: specWithOutput}], {allowDiskUse: true}).toArray();
assert.gt(wfResults.length, 0);
jsTestLog("Done");
});
});
}