mirror of https://github.com/mongodb/mongo
42 lines
1.7 KiB
JavaScript
42 lines
1.7 KiB
JavaScript
/**
|
|
* Overrides the $changeStream aggregation pipeline to run in version "v2".
|
|
*/
|
|
|
|
import {OverrideHelpers} from "jstests/libs/override_methods/override_helpers.js";
|
|
import {FeatureFlagUtil} from "jstests/libs/feature_flag_util.js";
|
|
|
|
let featureFlagEnabled = null;
|
|
function isFeatureFlagEnabled(conn) {
|
|
if (featureFlagEnabled === null) {
|
|
featureFlagEnabled = FeatureFlagUtil.isPresentAndEnabled(conn, "ChangeStreamPreciseShardTargeting");
|
|
}
|
|
return featureFlagEnabled;
|
|
}
|
|
|
|
function isChangeStreamCommandWithoutVersion(cmdObj) {
|
|
return (
|
|
cmdObj &&
|
|
cmdObj.aggregate &&
|
|
// TODO: SERVER-111325 Implement DatabaseChangeStreamShardTargeterImpl module.
|
|
cmdObj.aggregate !== 1 &&
|
|
// TODO: SERVER-111381 Implement AllDatabasesChangeStreamShardTargeterImpl module.
|
|
cmdObj.aggregate.$db !== "admin" &&
|
|
Array.isArray(cmdObj.pipeline) &&
|
|
cmdObj.pipeline.length > 0 &&
|
|
typeof cmdObj.pipeline[0].$changeStream == "object" &&
|
|
cmdObj.pipeline[0].$changeStream.constructor === Object &&
|
|
!cmdObj.pipeline[0].$changeStream.hasOwnProperty("version")
|
|
);
|
|
}
|
|
|
|
function runChangeStreamWithV2Version(conn, _dbName, _commandName, commandObj, func, makeFuncArgs) {
|
|
// TODO: SERVER-52253 Enable feature flag for Improved change stream handling of cluster topology changes.
|
|
if (isChangeStreamCommandWithoutVersion(commandObj) && isFeatureFlagEnabled(conn)) {
|
|
commandObj.pipeline[0].$changeStream["version"] = "v2";
|
|
}
|
|
return func.apply(conn, makeFuncArgs(commandObj));
|
|
}
|
|
|
|
OverrideHelpers.prependOverrideInParallelShell("jstests/libs/override_methods/implicit_change_stream_v2.js");
|
|
OverrideHelpers.overrideRunCommand(runChangeStreamWithV2Version);
|