SERVER-113951 Introduce LiteParseRegistration struct with ability to register primary and fallback parsers (#44070)

GitOrigin-RevId: 90c7d3fef67b034fe33bda10b9880b81cc93d3b9
This commit is contained in:
Finley Lau 2025-11-25 10:20:33 -06:00 committed by MongoDB Bot
parent b4beaca4b2
commit 6f0f9a8652
4 changed files with 369 additions and 6 deletions

View File

@ -1177,6 +1177,7 @@ mongo_cc_unit_test(
"field_path_test.cpp",
"granularity_rounder_powers_of_two_test.cpp",
"granularity_rounder_preferred_numbers_test.cpp",
"lite_parsed_document_source_test.cpp",
"lookup_set_cache_test.cpp",
"monotonic_expression_test.cpp",
"partition_key_comparator_test.cpp",

View File

@ -46,10 +46,41 @@ namespace {
// Empty vector used by LiteParsedDocumentSources which do not have a sub pipeline.
inline static std::vector<LiteParsedPipeline> kNoSubPipeline = {};
StringMap<LiteParsedDocumentSource::LiteParserInfo> parserMap;
StringMap<LiteParsedDocumentSource::LiteParserRegistration> parserMap;
} // namespace
const LiteParsedDocumentSource::LiteParserInfo&
LiteParsedDocumentSource::LiteParserRegistration::getParser() const {
// If there's no feature flag toggle, or the feature flag toggle exists and the feature is
// enabled in this context, use the primary parser. Otherwise, use the fallback parser.
if (_primaryParserFeatureFlag == nullptr || _primaryParserFeatureFlag->checkEnabled()) {
return _primaryParser;
} else {
return _fallbackParser;
}
}
void LiteParsedDocumentSource::LiteParserRegistration::setPrimaryParser(LiteParserInfo&& lpi) {
_primaryParser = std::move(lpi);
_primaryIsSet = true;
}
void LiteParsedDocumentSource::LiteParserRegistration::setFallbackParser(
LiteParserInfo&& lpi, IncrementalRolloutFeatureFlag* ff) {
_fallbackParser = std::move(lpi);
_primaryParserFeatureFlag = ff;
_fallbackIsSet = true;
}
bool LiteParsedDocumentSource::LiteParserRegistration::isPrimarySet() const {
return _primaryIsSet;
}
bool LiteParsedDocumentSource::LiteParserRegistration::isFallbackSet() const {
return _fallbackIsSet;
}
void LiteParsedDocumentSource::registerParser(const std::string& name,
Parser parser,
AllowedWithApiStrict allowedWithApiStrict,
@ -62,7 +93,46 @@ void LiteParsedDocumentSource::registerParser(const std::string& name,
aggStageCounters.addMetric(name);
}
parserMap[name] = {parser, allowedWithApiStrict, allowedWithClientType};
// Retrieve an existing or create a new registration.
auto& registration = parserMap[name];
registration.setPrimaryParser({parser, allowedWithApiStrict, allowedWithClientType});
}
void LiteParsedDocumentSource::registerFallbackParser(const std::string& name,
Parser parser,
FeatureFlag* parserFeatureFlag,
AllowedWithApiStrict allowedWithApiStrict,
AllowedWithClientType allowedWithClientType) {
if (parserMap.contains(name)) {
const auto& registration = parserMap.at(name);
// We require that the fallback parser is always registered prior to the primary parser.
// At extension load time, its then explicit which stages are permitted to be overridden
// and which cannot.
tassert(11395100,
"A stage's fallback parser must be registered before the primary parser",
registration.isFallbackSet() || !registration.isPrimarySet());
// Silently skip registration if a fallback parser has already been registered. The first
// fallback parser registration gets priority.
return;
}
// Initialize a counter for this document source to track how many times it is used.
aggStageCounters.addMetric(name);
// Create a new registration and save the parser as the fallback parser.
auto& registration = parserMap[name];
// TODO SERVER-114028 Remove the following dynamic cast and tassert when fallback parsing
// supports all feature flags.
auto* ifrFeatureFlag = dynamic_cast<IncrementalRolloutFeatureFlag*>(parserFeatureFlag);
tassert(11395101,
"Fallback parsing only supports IncrementalRolloutFeatureFlags.",
ifrFeatureFlag != nullptr);
registration.setFallbackParser({parser, allowedWithApiStrict, allowedWithClientType},
ifrFeatureFlag);
}
void LiteParsedDocumentSource::unregisterParser_forTest(const std::string& name) {
@ -77,23 +147,23 @@ std::unique_ptr<LiteParsedDocumentSource> LiteParsedDocumentSource::parse(
BSONElement specElem = spec.firstElement();
auto stageName = specElem.fieldNameStringData();
auto it = parserMap.find(stageName);
const auto it = parserMap.find(stageName);
uassert(40324,
str::stream() << "Unrecognized pipeline stage name: '" << stageName << "'",
it != parserMap.end());
return it->second.parser(nss, specElem, options);
return it->second.getParser().parser(nss, specElem, options);
}
const LiteParsedDocumentSource::LiteParserInfo& LiteParsedDocumentSource::getInfo(
const std::string& stageName) {
auto it = parserMap.find(stageName);
const auto it = parserMap.find(stageName);
uassert(5407200,
str::stream() << "Unrecognized pipeline stage name: '" << stageName << "'",
it != parserMap.end());
return it->second;
return it->second.getParser();
}
const std::vector<LiteParsedPipeline>& LiteParsedDocumentSource::getSubPipelines() const {

View File

@ -95,6 +95,44 @@ public:
AllowedWithClientType allowedWithClientType;
};
/*
* A LiteParserRegistration encapsulates the set of all parsers that can be used to parse a
* stage into a LiteParsedDocumentSource, controlled by the value of a feature flag.
*/
class LiteParserRegistration {
public:
const LiteParserInfo& getParser() const;
void setPrimaryParser(LiteParserInfo&& lpi);
// TODO SERVER-114028 Update when fallback parsing supports all feature flags.
void setFallbackParser(LiteParserInfo&& lpi, IncrementalRolloutFeatureFlag* ff);
bool isPrimarySet() const;
bool isFallbackSet() const;
private:
// The preferred method of parsing this LiteParsedDocumentSource. If the feature flag is
// enabled, the primary parser will be used to parse the stage.
LiteParserInfo _primaryParser;
// The fallback method of parsing this LiteParsedDocumentSource. If the feature flag is
// disabled, the fallback parser will be used to parse the stage.
LiteParserInfo _fallbackParser;
// When enabled, signals to use the primary parser; when disabled, signals to use the
// fallback parser.
// TODO SERVER-114028 Generalize this to be FeatureFlag*.
IncrementalRolloutFeatureFlag* _primaryParserFeatureFlag = nullptr;
// Whether or not the primary parser has been registered or not.
bool _primaryIsSet = false;
// Whether or not the fallback parser has been registered or not.
bool _fallbackIsSet = false;
};
/**
* Constructs a LiteParsedDocumentSource from the user-supplied BSON.
*
@ -120,6 +158,12 @@ public:
AllowedWithApiStrict allowedWithApiStrict,
AllowedWithClientType allowedWithClientType);
static void registerFallbackParser(const std::string& name,
Parser parser,
FeatureFlag* parserFeatureFlag,
AllowedWithApiStrict allowedWithApiStrict,
AllowedWithClientType allowedWithClientType);
/**
* Function that will be used as an alternate parser for a document source that has been
* disabled.
@ -347,6 +391,8 @@ private:
* only meant to be used in the context of unit tests. This is because the parserMap is not
* thread safe, so modifying it at runtime is unsafe.
*/
friend class LiteParserRegistrationTest;
friend class LiteParsedDocumentSourceParseTest;
static void unregisterParser_forTest(const std::string& name);
std::string _parseTimeName;

View File

@ -0,0 +1,246 @@
/**
* Copyright (C) 2018-present MongoDB, Inc.
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the Server Side Public License, version 1,
* as published by MongoDB, Inc.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* Server Side Public License for more details.
*
* You should have received a copy of the Server Side Public License
* along with this program. If not, see
* <http://www.mongodb.com/licensing/server-side-public-license>.
*
* As a special exception, the copyright holders give permission to link the
* code of portions of this program with the OpenSSL library under certain
* conditions as described in each individual source file and distribute
* linked combinations including the program with the OpenSSL library. You
* must comply with the Server Side Public License in all respects for
* all of the code used other than as permitted herein. If you modify file(s)
* with this exception, you may extend this exception to your version of the
* file(s), but you are not obligated to do so. If you do not wish to do so,
* delete this exception statement from your version. If you delete this
* exception statement from all source files in the program, then also delete
* it in the license file.
*/
#include "mongo/db/pipeline/lite_parsed_document_source.h"
#include "mongo/base/error_codes.h"
#include "mongo/bson/bsonmisc.h"
#include "mongo/bson/bsonobj.h"
#include "mongo/db/feature_flag.h"
#include "mongo/db/namespace_string.h"
#include "mongo/db/query/allowed_contexts.h"
#include "mongo/unittest/assert.h"
#include "mongo/unittest/death_test.h"
#include "mongo/unittest/framework.h"
#include "mongo/util/assert_util.h"
namespace mongo {
namespace {
const NamespaceString kTestNss =
NamespaceString::createNamespaceString_forTest("test.liteParsedDocSource"_sd);
// Mock parser functions for testing
std::unique_ptr<LiteParsedDocumentSource> createMockParser(const NamespaceString& nss,
const BSONElement& spec,
const LiteParserOptions& options) {
return std::make_unique<LiteParsedDocumentSourceDefault>(spec);
}
} // namespace
class LiteParserRegistrationTest : public unittest::Test {
protected:
void setUp() override {
primaryParser = {
createMockParser, AllowedWithApiStrict::kAlways, AllowedWithClientType::kAny};
fallbackParser = {createMockParser,
AllowedWithApiStrict::kNeverInVersion1,
AllowedWithClientType::kInternal};
}
void assertParserIsPrimary(const LiteParsedDocumentSource::LiteParserInfo& parserInfo) {
ASSERT_EQ(parserInfo.allowedWithApiStrict, AllowedWithApiStrict::kAlways);
ASSERT_EQ(parserInfo.allowedWithClientType, AllowedWithClientType::kAny);
}
void assertParserIsFallback(const LiteParsedDocumentSource::LiteParserInfo& parserInfo) {
ASSERT_EQ(parserInfo.allowedWithApiStrict, AllowedWithApiStrict::kNeverInVersion1);
ASSERT_EQ(parserInfo.allowedWithClientType, AllowedWithClientType::kInternal);
}
LiteParsedDocumentSource::LiteParserInfo primaryParser;
LiteParsedDocumentSource::LiteParserInfo fallbackParser;
};
TEST_F(LiteParserRegistrationTest, SetPrimaryParser) {
LiteParsedDocumentSource::LiteParserRegistration registration;
ASSERT_FALSE(registration.isPrimarySet());
registration.setPrimaryParser(std::move(primaryParser));
ASSERT_TRUE(registration.isPrimarySet());
}
TEST_F(LiteParserRegistrationTest, SetFallbackParser) {
LiteParsedDocumentSource::LiteParserRegistration registration;
// Create a mock IncrementalRolloutFeatureFlag.
IncrementalRolloutFeatureFlag mockFlag("testFlag"_sd, RolloutPhase::inDevelopment, false);
registration.setFallbackParser(std::move(fallbackParser), &mockFlag);
// Verify primary is still not set.
ASSERT_FALSE(registration.isPrimarySet());
ASSERT_TRUE(registration.isFallbackSet());
}
TEST_F(LiteParserRegistrationTest, GetParserWithoutFeatureFlag) {
LiteParsedDocumentSource::LiteParserRegistration registration;
registration.setPrimaryParser(std::move(primaryParser));
// When there's no feature flag, should return primary parser.
const auto& parserInfo = registration.getParser();
assertParserIsPrimary(parserInfo);
}
TEST_F(LiteParserRegistrationTest, GetParserWithFeatureFlagEnabled) {
LiteParsedDocumentSource::LiteParserRegistration registration;
IncrementalRolloutFeatureFlag mockFlag("testFlag"_sd, RolloutPhase::inDevelopment, true);
registration.setFallbackParser(std::move(fallbackParser), &mockFlag);
registration.setPrimaryParser(std::move(primaryParser));
// When feature flag is enabled, should return primary parser.
const auto& parserInfo = registration.getParser();
assertParserIsPrimary(parserInfo);
}
TEST_F(LiteParserRegistrationTest, GetParserWithFeatureFlagDisabled) {
LiteParsedDocumentSource::LiteParserRegistration registration;
IncrementalRolloutFeatureFlag mockFlag("testFlag"_sd, RolloutPhase::inDevelopment, false);
registration.setFallbackParser(std::move(fallbackParser), &mockFlag);
registration.setPrimaryParser(std::move(primaryParser));
// When feature flag is disabled, should return fallback parser.
const auto& parserInfo = registration.getParser();
assertParserIsFallback(parserInfo);
}
TEST_F(LiteParserRegistrationTest, GetParserWithChangingFeatureFlag) {
LiteParsedDocumentSource::LiteParserRegistration registration;
IncrementalRolloutFeatureFlag mockFlag("testFlag"_sd, RolloutPhase::inDevelopment, false);
registration.setFallbackParser(std::move(fallbackParser), &mockFlag);
registration.setPrimaryParser(std::move(primaryParser));
// When feature flag is disabled, should return fallback parser.
assertParserIsFallback(registration.getParser());
// Enable the feature flag and check that the primary parser is chosen.
mockFlag.setForServerParameter(true);
assertParserIsPrimary(registration.getParser());
// Disable it and check that the fallback parser is chosen.
mockFlag.setForServerParameter(false);
assertParserIsFallback(registration.getParser());
}
class LiteParsedDocumentSourceParseTest : public unittest::Test {
protected:
void registerPrimaryParser() {
LiteParsedDocumentSource::registerParser(_stageName,
createMockParser,
AllowedWithApiStrict::kAlways,
AllowedWithClientType::kAny);
}
void registerFallbackParser(FeatureFlag* ff) {
LiteParsedDocumentSource::registerFallbackParser(_stageName,
createMockParser,
ff,
AllowedWithApiStrict::kNeverInVersion1,
AllowedWithClientType::kInternal);
}
void tearDown() override {
LiteParsedDocumentSource::unregisterParser_forTest(_stageName);
}
// Note that _stageName should be unique between every test and set at the beginning. This is
// because we have no way of clearing out the global variable aggStageCounters.
std::string _stageName;
};
TEST_F(LiteParsedDocumentSourceParseTest, CanRegisterBothPrimaryAndFallback) {
_stageName = "$canRegisterBothPrimaryAndFallback";
IncrementalRolloutFeatureFlag mockFlag("testFlag"_sd, RolloutPhase::inDevelopment, false);
registerFallbackParser(&mockFlag);
registerPrimaryParser();
}
DEATH_TEST_F(LiteParsedDocumentSourceParseTest, MustRegisterPrimaryAfterFallback, "11395100") {
_stageName = "$mustRegisterPrimaryAfterFallback";
IncrementalRolloutFeatureFlag mockFlag("testFlag"_sd, RolloutPhase::inDevelopment, false);
registerPrimaryParser();
registerFallbackParser(&mockFlag);
}
TEST_F(LiteParsedDocumentSourceParseTest, FirstFallbackParserTakesPrecedence) {
_stageName = "$firstFallbackParserTakesPrecedence";
// Disable the feature flag such that the parser returns the fallback parser.
IncrementalRolloutFeatureFlag mockFlag("testFlag"_sd, RolloutPhase::inDevelopment, false);
registerFallbackParser(&mockFlag);
registerPrimaryParser();
// Try creating another fallback parser.
LiteParsedDocumentSource::registerFallbackParser(_stageName,
createMockParser,
&mockFlag,
AllowedWithApiStrict::kNeverInVersion1,
AllowedWithClientType::kAny);
// Ensure that the parser info is the original fallback parser.
auto parserInfo = LiteParsedDocumentSource::getInfo(_stageName);
ASSERT_EQ(parserInfo.allowedWithApiStrict, AllowedWithApiStrict::kNeverInVersion1);
ASSERT_EQ(parserInfo.allowedWithClientType, AllowedWithClientType::kInternal);
}
TEST_F(LiteParsedDocumentSourceParseTest, FirstFallbackParserTakesPrecedenceWithNoPrimary) {
_stageName = "$firstFallbackParserTakesPrecedenceWithNoPrimary";
// Disable the feature flag such that the parser returns the fallback parser.
IncrementalRolloutFeatureFlag mockFlag("testFlag"_sd, RolloutPhase::inDevelopment, false);
registerFallbackParser(&mockFlag);
// Try creating another fallback parser.
LiteParsedDocumentSource::registerFallbackParser(_stageName,
createMockParser,
&mockFlag,
AllowedWithApiStrict::kNeverInVersion1,
AllowedWithClientType::kAny);
// Ensure that the parser info is the original fallback parser.
auto parserInfo = LiteParsedDocumentSource::getInfo(_stageName);
ASSERT_EQ(parserInfo.allowedWithApiStrict, AllowedWithApiStrict::kNeverInVersion1);
ASSERT_EQ(parserInfo.allowedWithClientType, AllowedWithClientType::kInternal);
}
// TODO SERVER-114028 Remove the following test when fallback parsing supports all feature flags.
DEATH_TEST_F(LiteParsedDocumentSourceParseTest, IFRFlagIsRequired, "11395101") {
_stageName = "$IFRFlagIsRequired";
BinaryCompatibleFeatureFlag mockFlag(false);
registerFallbackParser(&mockFlag);
}
} // namespace mongo