SERVER-109569: Introduce Source stages to the Extensions API (#42574)

GitOrigin-RevId: 3e91b3d2e3cc9a1f9be0ea0e7be3584643a8a9b7
This commit is contained in:
Adithi Raghavan 2025-10-23 10:54:02 -04:00 committed by MongoDB Bot
parent 15ee0a5f81
commit f720a0ab8e
10 changed files with 199 additions and 3 deletions

View File

@ -0,0 +1,31 @@
/**
* Tests that $testFooSource (source extension stage) work E2E after mongod is started with
* libfoo_source_mongo_extension.so successfully loaded.
*
* @tags: [featureFlagExtensionsAPI]
*/
import {assertArrayEq, assertErrorCode} from "jstests/aggregation/extras/utils.js";
const coll = db[jsTestName()];
coll.drop();
const testData = [
{_id: 0, x: 1},
{_id: 1, x: 2},
{_id: 2, x: 3},
];
assert.commandWorked(coll.insertMany(testData));
// Test one no-op stage passes documents through unchanged.
{
const pipeline = [{$testFooSource: {}}];
const result = coll.aggregate(pipeline).toArray();
assertArrayEq({actual: result, expected: testData});
}
// Test $testFoo stage fails to parse.
{
const pipeline = [{$testFooSource: {invalidField: "value"}}];
assertErrorCode(coll, pipeline, 11165101, "Using $testFooSource with invalid field should be rejected");
}

View File

@ -82,6 +82,7 @@ mongo_cc_unit_test(
"//src/mongo/db/extension/test_examples:explain_mongo_extension",
"//src/mongo/db/extension/test_examples:extension_errors_mongo_extension",
"//src/mongo/db/extension/test_examples:foo_mongo_extension",
"//src/mongo/db/extension/test_examples:foo_source_mongo_extension",
"//src/mongo/db/extension/test_examples:host_version_fails_bad_extension",
"//src/mongo/db/extension/test_examples:host_version_succeeds_mongo_extension",
"//src/mongo/db/extension/test_examples:initialize_version_fails_bad_extension",

View File

@ -171,6 +171,7 @@ void DocumentSourceExtension::registerStage(AggStageDescriptorHandle descriptor)
// TODO SERVER-112178: Add case for DocumentSourceExtensionExpandable.
switch (descriptor.getType()) {
case MongoExtensionAggStageType::kNoOp:
case MongoExtensionAggStageType::kSource:
registerStage(nameAsString, id, descriptor);
break;
default:

View File

@ -165,7 +165,7 @@ public:
}
bool isInitialSource() const override {
// TODO SERVER-109569 Change this to return true if the stage is a source stage.
// TODO SERVER-112779 Change this to return true if the stage is a source stage.
return false;
}

View File

@ -290,7 +290,16 @@ typedef enum MongoExtensionAggStageType : uint32_t {
* NoOp stage.
*/
kNoOp = 0,
/**
* Desugar stage.
*/
kDesugar = 1,
/**
* Source stage.
*/
kSource = 2,
} MongoExtensionAggStageType;
/**

View File

@ -709,6 +709,18 @@ TEST_F(AggStageTest, DesugarToEmptyDescriptorParseTest) {
ASSERT_TRUE(std::holds_alternative<extension::AggStageAstNodeHandle>(expanded[0]));
}
TEST_F(AggStageTest, SourceStageParseTest) {
auto descriptor = std::make_unique<ExtensionAggStageDescriptor>(
shared_test_stages::SourceAggStageDescriptor::make());
auto handle = extension::AggStageDescriptorHandle{descriptor.get()};
BSONObj stageBson =
BSON(shared_test_stages::SourceAggStageDescriptor::kStageName << BSON("foo" << true));
auto parseNodeHandle = handle.parse(stageBson);
ASSERT_EQ(shared_test_stages::SourceAggStageDescriptor::kStageName, handle.getName());
ASSERT_EQ(::MongoExtensionAggStageType::kSource, handle.getType());
}
class FieldPathQueryShapeParseNode : public sdk::AggStageParseNode {
public:
static constexpr StringData kStageName = "$fieldPathQueryShape";

View File

@ -42,6 +42,7 @@ namespace mongo::extension::sdk::shared_test_stages {
* Referenced by sdk/tests/aggregation_stage_test.cpp and host/document_source_extension_test.cpp.
*/
static constexpr std::string_view kNoOpName = "$noOp";
static constexpr std::string_view kSourceName = "$sourceStage";
class NoOpLogicalAggStage : public sdk::LogicalAggStage {
public:
@ -104,7 +105,6 @@ public:
: sdk::AggStageDescriptor(kStageName, MongoExtensionAggStageType::kNoOp) {}
std::unique_ptr<sdk::AggStageParseNode> parse(BSONObj stageBson) const override {
uassert(10596406,
"Failed to parse $noOpExtension, $noOpExtension expects an object.",
stageBson.hasField(kStageName) && stageBson.getField(kStageName).isABSONObj());
@ -120,4 +120,77 @@ public:
}
};
class SourceLogicalAggStage : public sdk::LogicalAggStage {
public:
SourceLogicalAggStage() {}
BSONObj serialize() const override {
return BSON(std::string(kSourceName) << "serializedForExecution");
}
BSONObj explain(::MongoExtensionExplainVerbosity verbosity) const override {
return BSONObj();
}
};
class SourceAggStageAstNode : public sdk::AggStageAstNode {
public:
SourceAggStageAstNode() : sdk::AggStageAstNode(kSourceName) {}
std::unique_ptr<sdk::LogicalAggStage> bind() const override {
return std::make_unique<SourceLogicalAggStage>();
}
static inline std::unique_ptr<sdk::AggStageAstNode> make() {
return std::make_unique<SourceAggStageAstNode>();
}
};
class SourceAggStageParseNode : public sdk::AggStageParseNode {
public:
SourceAggStageParseNode() : sdk::AggStageParseNode(kSourceName) {}
static constexpr size_t kExpansionSize = 1;
size_t getExpandedSize() const override {
return kExpansionSize;
}
std::vector<sdk::VariantNode> expand() const override {
std::vector<sdk::VariantNode> expanded;
expanded.reserve(kExpansionSize);
expanded.emplace_back(
new sdk::ExtensionAggStageAstNode(std::make_unique<SourceAggStageAstNode>()));
return expanded;
}
BSONObj getQueryShape(const ::MongoExtensionHostQueryShapeOpts* ctx) const override {
return BSONObj();
}
static inline std::unique_ptr<sdk::AggStageParseNode> make() {
return std::make_unique<SourceAggStageParseNode>();
}
};
class SourceAggStageDescriptor : public sdk::AggStageDescriptor {
public:
static inline const std::string kStageName = std::string(kSourceName);
SourceAggStageDescriptor()
: sdk::AggStageDescriptor(kStageName, MongoExtensionAggStageType::kSource) {}
std::unique_ptr<sdk::AggStageParseNode> parse(BSONObj stageBson) const override {
uassert(10956900,
"Failed to parse $sourceExtension, $sourceExtension expects an object.",
stageBson.hasField(kStageName) && stageBson.getField(kStageName).isABSONObj());
auto stageDefinition = stageBson.getField(kStageName).Obj();
return std::make_unique<SourceAggStageParseNode>();
}
static inline std::unique_ptr<sdk::AggStageDescriptor> make() {
return std::make_unique<SourceAggStageDescriptor>();
}
};
} // namespace mongo::extension::sdk::shared_test_stages

View File

@ -39,6 +39,7 @@ extensions_with_config(
":shapify_mongo_extension",
":sharded_execution_serialization_mongo_extension",
":explain_mongo_extension",
":foo_source_mongo_extension",
#################### EXTENSIONS FOR NO-PASSTHROUGH TESTS ####################
# Any extension that is just loaded in a no-passthrough test MUST NOT have the
@ -63,6 +64,7 @@ extensions_with_config(
)
for extension_name in [
"foo",
"foo_source",
"foo_v2",
"bar",
]

View File

@ -0,0 +1,67 @@
/**
* Copyright (C) 2025-present MongoDB, Inc.
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the Server Side Public License, version 1,
* as published by MongoDB, Inc.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* Server Side Public License for more details.
*
* You should have received a copy of the Server Side Public License
* along with this program. If not, see
* <http://www.mongodb.com/licensing/server-side-public-license>.
*
* As a special exception, the copyright holders give permission to link the
* code of portions of this program with the OpenSSL library under certain
* conditions as described in each individual source file and distribute
* linked combinations including the program with the OpenSSL library. You
* must comply with the Server Side Public License in all respects for
* all of the code used other than as permitted herein. If you modify file(s)
* with this exception, you may extend this exception to your version of the
* file(s), but you are not obligated to do so. If you do not wish to do so,
* delete this exception statement from your version. If you delete this
* exception statement from all source files in the program, then also delete
* it in the license file.
*/
#include "mongo/bson/bsonobj.h"
#include "mongo/db/extension/sdk/aggregation_stage.h"
#include "mongo/db/extension/sdk/extension_factory.h"
#include "mongo/db/extension/sdk/test_extension_factory.h"
#include "mongo/db/extension/sdk/test_extension_util.h"
namespace sdk = mongo::extension::sdk;
DEFAULT_LOGICAL_AST_PARSE(TestFooSource, "$testFooSource")
/**
* $testFoo is a source stage.
*
* This file is identical to foo.cpp except it is a source stage instead of a no-op stage.
*/
class TestFooSourceStageDescriptor : public sdk::AggStageDescriptor {
public:
static inline const std::string kStageName = std::string(TestFooSourceStageName);
TestFooSourceStageDescriptor()
: sdk::AggStageDescriptor(kStageName, MongoExtensionAggStageType::kSource) {}
std::unique_ptr<sdk::AggStageParseNode> parse(mongo::BSONObj stageBson) const override {
sdk::validateStageDefinition(stageBson, kStageName, true /* checkEmpty */);
return std::make_unique<TestFooSourceParseNode>(stageBson);
}
};
class FooExtension : public sdk::Extension {
public:
void initialize(const sdk::HostPortalHandle& portal) override {
_registerStage<TestFooSourceStageDescriptor>(portal);
}
};
REGISTER_EXTENSION(FooExtension)
DEFINE_GET_EXTENSION()

View File

@ -41,7 +41,7 @@ DEFAULT_LOGICAL_AST_PARSE(TestFoo, "$testFoo")
* $testFoo is a no-op stage.
*
* This file is identical to foo.cpp except this stage does _not_ fail parsing if the
* stage definition is empty. This is used for extenison upgrade/downgrade testing.
* stage definition is empty. This is used for extension upgrade/downgrade testing.
*/
class TestFooStageDescriptor : public sdk::AggStageDescriptor {
public: