mongo/jstests/libs/override_methods/implicit_change_stream_v2.js

42 lines
1.7 KiB
JavaScript

/**
* Overrides the $changeStream aggregation pipeline to run in version "v2".
*/
import {OverrideHelpers} from "jstests/libs/override_methods/override_helpers.js";
import {FeatureFlagUtil} from "jstests/libs/feature_flag_util.js";
let featureFlagEnabled = null;
function isFeatureFlagEnabled(conn) {
if (featureFlagEnabled === null) {
featureFlagEnabled = FeatureFlagUtil.isPresentAndEnabled(conn, "ChangeStreamPreciseShardTargeting");
}
return featureFlagEnabled;
}
function isChangeStreamCommandWithoutVersion(cmdObj) {
return (
cmdObj &&
cmdObj.aggregate &&
// TODO: SERVER-111325 Implement DatabaseChangeStreamShardTargeterImpl module.
cmdObj.aggregate !== 1 &&
// TODO: SERVER-111381 Implement AllDatabasesChangeStreamShardTargeterImpl module.
cmdObj.aggregate.$db !== "admin" &&
Array.isArray(cmdObj.pipeline) &&
cmdObj.pipeline.length > 0 &&
typeof cmdObj.pipeline[0].$changeStream == "object" &&
cmdObj.pipeline[0].$changeStream.constructor === Object &&
!cmdObj.pipeline[0].$changeStream.hasOwnProperty("version")
);
}
function runChangeStreamWithV2Version(conn, _dbName, _commandName, commandObj, func, makeFuncArgs) {
// TODO: SERVER-52253 Enable feature flag for Improved change stream handling of cluster topology changes.
if (isChangeStreamCommandWithoutVersion(commandObj) && isFeatureFlagEnabled(conn)) {
commandObj.pipeline[0].$changeStream["version"] = "v2";
}
return func.apply(conn, makeFuncArgs(commandObj));
}
OverrideHelpers.prependOverrideInParallelShell("jstests/libs/override_methods/implicit_change_stream_v2.js");
OverrideHelpers.overrideRunCommand(runChangeStreamWithV2Version);