SERVER-109576 Implement loadable test extension for Source stages (#44059)

GitOrigin-RevId: 5e484d1d058cb61f66b513e9f5b080b432333e7b
This commit is contained in:
Alyssa Clark 2025-12-01 17:57:19 -05:00 committed by MongoDB Bot
parent bc3a1bac78
commit fffd141a57
7 changed files with 186 additions and 128 deletions

View File

@ -1,31 +0,0 @@
/**
* Tests that $testFooSource (source extension stage) work E2E after mongod is started with
* libfoo_source_mongo_extension.so successfully loaded.
*
* @tags: [featureFlagExtensionsAPI]
*/
import {assertArrayEq, assertErrorCode} from "jstests/aggregation/extras/utils.js";
const coll = db[jsTestName()];
coll.drop();
const testData = [
{_id: 0, x: 1},
{_id: 1, x: 2},
{_id: 2, x: 3},
];
assert.commandWorked(coll.insertMany(testData));
// Test one no-op stage passes documents through unchanged.
{
const pipeline = [{$testFooSource: {}}];
const result = coll.aggregate(pipeline).toArray();
assertArrayEq({actual: result, expected: testData});
}
// Test $testFoo stage fails to parse.
{
const pipeline = [{$testFooSource: {invalidField: "value"}}];
assertErrorCode(coll, pipeline, 11165101, "Using $testFooSource with invalid field should be rejected");
}

View File

@ -0,0 +1,105 @@
/**
* Tests an extension source stage.
*
* @tags: [featureFlagExtensionsAPI]
*/
import {FixtureHelpers} from "jstests/libs/fixture_helpers.js";
const collName = jsTestName();
const coll = db[collName];
coll.drop();
coll.insert({breadType: "sourdough"});
// Source stage must still be run against a collection.
assert.commandFailedWithCode(
db.runCommand({
aggregate: 1,
pipeline: [{$toast: {temp: 350.0, numSlices: 2}}],
cursor: {},
}),
ErrorCodes.InvalidNamespace,
);
// Source stage must be first in the pipeline.
assert.commandFailedWithCode(
db.runCommand({
aggregate: collName,
pipeline: [{$match: {breadType: "brioche"}}, {$toast: {temp: 350.0, numSlices: 2}}],
cursor: {},
}),
40602,
);
// Can't have two source stages at the same level in a pipeline.
assert.commandFailedWithCode(
db.runCommand({
aggregate: collName,
pipeline: [{$toast: {temp: 350.0, numSlices: 2}}, {$toast: {temp: 350.0, numSlices: 2}}],
cursor: {},
}),
40602,
);
// EOF source stage.
let results = coll.aggregate([{$toast: {temp: 350.0, numSlices: 0}}]).toArray();
assert.eq(results.length, 0, results);
// Top-level source stage.
if (!FixtureHelpers.isMongos(db)) {
results = coll.aggregate([{$toast: {temp: 425.0, numSlices: 4}}]).toArray();
assert.eq(results, [
{slice: 0, isBurnt: true},
{slice: 1, isBurnt: true},
{slice: 2, isBurnt: true},
{slice: 3, isBurnt: true},
]);
} else {
// This source stage will run on every shard, producing variable numbers of
// documents depending on how many shards exist in the cluster. Relax the expected
// results assertion for that case.
// TODO SERVER-114234 Remove these relaxed assertions once we can run this properly as a collectionless aggregate.
results = coll.aggregate([{$toast: {temp: 425.0, numSlices: 4}}]).toArray();
assert.gt(results.length, 0, results);
}
// TODO SERVER-113930 Remove failure cases and enable success cases for $lookup and $unionWith.
// Source stage in $lookup.
assert.commandFailedWithCode(
db.runCommand({
aggregate: collName,
pipeline: [{$lookup: {from: collName, pipeline: [{$toast: {temp: 350.0, numSlices: 2}}], as: "slices"}}],
cursor: {},
}),
51047,
);
// results = coll.aggregate([{$lookup: {from: collName, pipeline: [{$toast: {temp: 350.0, numSlices: 2}}], as: "slices"}}]).toArray();
// assert.eq(results, [{breadType: "sourdough", slices: [{slice: 0, isBurnt: false}, {slice: 1, isBurnt: false}]}]);
// Source stage in $unionWith.
assert.commandFailedWithCode(
db.runCommand({
aggregate: collName,
pipeline: [{$unionWith: {coll: collName, pipeline: [{$toast: {temp: 350.0, numSlices: 2}}]}}],
cursor: {},
}),
31441,
);
// results = coll.aggregate([{$unionWith: {coll: collName, pipeline: [{$toast: {temp: 350.0, numSlices: 2}}]}}]).toArray();
// assert.eq(results, [{breadType: "sourdough"}, {slice: 0, isBurnt: false}, {slice: 1, isBurnt: false}]);
// Source stage is not allowed in $facet.
assert.commandFailedWithCode(
db.runCommand({
aggregate: collName,
pipeline: [{$facet: {slices: [{$toast: {temp: 250.0, numSlices: 2}}]}}],
cursor: {},
}),
40600,
);
// TODO SERVER-113930 Enable this test.
// Two source stages in the pipeline.
// results = coll.aggregate([{$toast: {temp: 100.0, numSlices: 1}}, {$unionWith: {coll: collName, pipeline: [{$toast: {temp: 350.0, numSlices: 2}}]}}]).toArray();
// assert.eq(results, [{slice: 0, notToasted: true}, {slice: 0, isBurnt: false}, {slice: 1, isBurnt: false}]);

View File

@ -81,7 +81,6 @@ mongo_cc_unit_test(
"//src/mongo/db/extension/test_examples:explain_mongo_extension", "//src/mongo/db/extension/test_examples:explain_mongo_extension",
"//src/mongo/db/extension/test_examples:extension_errors_mongo_extension", "//src/mongo/db/extension/test_examples:extension_errors_mongo_extension",
"//src/mongo/db/extension/test_examples:foo_mongo_extension", "//src/mongo/db/extension/test_examples:foo_mongo_extension",
"//src/mongo/db/extension/test_examples:foo_source_mongo_extension",
"//src/mongo/db/extension/test_examples:host_version_fails_bad_extension", "//src/mongo/db/extension/test_examples:host_version_fails_bad_extension",
"//src/mongo/db/extension/test_examples:host_version_succeeds_mongo_extension", "//src/mongo/db/extension/test_examples:host_version_succeeds_mongo_extension",
"//src/mongo/db/extension/test_examples:initialize_version_fails_bad_extension", "//src/mongo/db/extension/test_examples:initialize_version_fails_bad_extension",

View File

@ -42,7 +42,6 @@ extensions_with_config(
":explain_mongo_extension", ":explain_mongo_extension",
":idle_threads_mongo_extension", ":idle_threads_mongo_extension",
":interrupt_mongo_extension", ":interrupt_mongo_extension",
":foo_source_mongo_extension",
":match_topN_mongo_extension", ":match_topN_mongo_extension",
":native_vector_search_mongo_extension", ":native_vector_search_mongo_extension",
":metrics_mongo_extension", ":metrics_mongo_extension",
@ -73,7 +72,6 @@ pkg_name = "//" + package_name() + "/"
) )
for extension_name in [ for extension_name in [
"foo", "foo",
"foo_source",
"foo_v2", "foo_v2",
"bar", "bar",
] ]

View File

@ -13,5 +13,5 @@ extensions:
max: 10 max: 10
toaster: toaster:
extensionOptions: extensionOptions:
maxToasterHeat: 5 maxToasterHeat: 450
allowBagels: false allowBagels: false

View File

@ -28,63 +28,115 @@
*/ */
#include "mongo/bson/bsonobj.h" #include "mongo/bson/bsonobj.h"
#include "mongo/db/extension/public/extension_agg_stage_static_properties_gen.h"
#include "mongo/db/extension/sdk/aggregation_stage.h" #include "mongo/db/extension/sdk/aggregation_stage.h"
#include "mongo/db/extension/sdk/extension_factory.h" #include "mongo/db/extension/sdk/extension_factory.h"
#include "mongo/db/extension/sdk/test_extension_factory.h" #include "mongo/db/extension/sdk/test_extension_factory.h"
namespace sdk = mongo::extension::sdk; namespace sdk = mongo::extension::sdk;
DEFAULT_LOGICAL_AST_PARSE(Toast, "$toast")
DEFAULT_LOGICAL_AST_PARSE(ToastBagel, "$toastBagel")
struct ToasterOptions { struct ToasterOptions {
inline static double maxToasterHeat = 0; inline static double maxToasterHeat = 0;
inline static bool allowBagels = false; inline static bool allowBagels = false;
}; };
/** STAGE_NAME(Toast, "$toast");
* $toast is a no-op stage that requires a temperature, like {$toast: {temp: 3}}.
*/ class ToastExecStage : public sdk::ExecAggStageSource {
class ToastStageDescriptor : public sdk::AggStageDescriptor {
public: public:
static inline const std::string kStageName = std::string(ToastStageName); ToastExecStage(std::string_view stageName, const mongo::BSONObj& arguments)
: sdk::ExecAggStageSource(ToastStageName), _currentSlice(0) {
_temp = arguments["temp"].Number();
_numSlices = [&] {
if (auto numSlices = arguments["numSlices"]) {
return static_cast<int>(numSlices.Number());
}
return 1;
}();
}
ToastStageDescriptor() : sdk::AggStageDescriptor(kStageName) {} mongo::extension::ExtensionGetNextResult getNext(
const mongo::extension::sdk::QueryExecutionContextHandle& execCtx,
::MongoExtensionExecAggStage* execStage) override {
if (_currentSlice == _numSlices) {
return mongo::extension::ExtensionGetNextResult::eof();
}
mongo::BSONObjBuilder builder;
builder.append("slice", _currentSlice++);
if (_temp < 300.0) {
builder.append("notToasted", true);
} else {
builder.append("isBurnt", _temp > 400.0);
}
auto result = mongo::extension::ExtensionBSONObj::makeAsByteBuf(builder.obj());
return mongo::extension::ExtensionGetNextResult::advanced(std::move(result));
}
void open() override {}
void reopen() override {}
void close() override {}
private:
double _temp;
int _numSlices;
int _currentSlice;
};
DEFAULT_LOGICAL_STAGE(Toast);
class ToastAstNode : public sdk::TestAstNode<ToastLogicalStage> {
public:
ToastAstNode(std::string_view stageName, const mongo::BSONObj& arguments)
: sdk::TestAstNode<ToastLogicalStage>(stageName, arguments) {}
// TODO SERVER-114234 Set properties for this to be a collectionless stage.
mongo::BSONObj getProperties() const override {
mongo::extension::MongoExtensionStaticProperties properties;
mongo::BSONObjBuilder builder;
properties.setPosition(mongo::extension::MongoExtensionPositionRequirementEnum::kFirst);
properties.setHostType(
mongo::extension::MongoExtensionHostTypeRequirementEnum::kRunOnceAnyNode);
properties.setRequiresInputDocSource(false);
properties.serialize(&builder);
return builder.obj();
}
};
DEFAULT_PARSE_NODE(Toast);
/**
* $toast is a source stage that requires a temperature and number of slices, like {$toast: {temp:
* 3, numSlices: 5}}.
*/
class ToastStageDescriptor : public sdk::TestStageDescriptor<ToastStageName, ToastParseNode> {
public:
std::unique_ptr<sdk::AggStageParseNode> parse(mongo::BSONObj stageBson) const override { std::unique_ptr<sdk::AggStageParseNode> parse(mongo::BSONObj stageBson) const override {
auto arguments = sdk::validateStageDefinition(stageBson, kStageName); auto arguments = sdk::validateStageDefinition(stageBson, kStageName);
const auto obj = stageBson.getField(kStageName).Obj();
sdk_uassert(11285301, sdk_uassert(11285301,
"Failed to parse " + kStageName + ", expected {" + kStageName + "expected temp input to " + kStageName,
": {temp: <number>}}", arguments.hasField("temp") && arguments.getField("temp").isNumber());
obj.hasField("temp") && obj.getField("temp").isNumber());
sdk_uassert(11285302, sdk_uassert(11285302,
"Failed to parse " + kStageName + ", provided temperature is higher than max " + "Failed to parse " + kStageName + ", provided temperature is higher than max " +
std::to_string(ToasterOptions::maxToasterHeat), std::to_string(ToasterOptions::maxToasterHeat),
obj.getField("temp").numberDouble() <= ToasterOptions::maxToasterHeat); arguments["temp"].Number() <= ToasterOptions::maxToasterHeat);
if (auto numSlices = arguments["numSlices"]) {
sdk_uassert(10957601,
"numSlices must be >= 0",
numSlices.isNumber() && numSlices.Number() >= 0);
}
return std::make_unique<ToastParseNode>(kStageName, arguments); return std::make_unique<ToastParseNode>(kStageName, std::move(arguments));
} }
}; };
DEFAULT_LOGICAL_AST_PARSE(ToastBagel, "$toastBagel")
/** /**
* $toastBagel is a no-op stage whose stage definition must be empty, like {$toastBagel: {}}. * $toastBagel is a no-op stage whose stage definition must be empty, like {$toastBagel: {}}.
*/ */
class ToastBagelStageDescriptor : public sdk::AggStageDescriptor { using ToastBagelStageDescriptor = sdk::TestStageDescriptor<"$toastBagel", ToastBagelParseNode>;
public:
static inline const std::string kStageName = std::string(ToastBagelStageName);
ToastBagelStageDescriptor() : sdk::AggStageDescriptor(kStageName) {}
std::unique_ptr<sdk::AggStageParseNode> parse(mongo::BSONObj stageBson) const override {
auto arguments = sdk::validateStageDefinition(stageBson, kStageName, true /* checkEmpty */);
return std::make_unique<ToastBagelParseNode>(kStageName, arguments);
}
};
class ToasterExtension : public sdk::Extension { class ToasterExtension : public sdk::Extension {
public: public:

View File

@ -1,65 +0,0 @@
/**
* Copyright (C) 2025-present MongoDB, Inc.
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the Server Side Public License, version 1,
* as published by MongoDB, Inc.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* Server Side Public License for more details.
*
* You should have received a copy of the Server Side Public License
* along with this program. If not, see
* <http://www.mongodb.com/licensing/server-side-public-license>.
*
* As a special exception, the copyright holders give permission to link the
* code of portions of this program with the OpenSSL library under certain
* conditions as described in each individual source file and distribute
* linked combinations including the program with the OpenSSL library. You
* must comply with the Server Side Public License in all respects for
* all of the code used other than as permitted herein. If you modify file(s)
* with this exception, you may extend this exception to your version of the
* file(s), but you are not obligated to do so. If you do not wish to do so,
* delete this exception statement from your version. If you delete this
* exception statement from all source files in the program, then also delete
* it in the license file.
*/
#include "mongo/bson/bsonobj.h"
#include "mongo/db/extension/sdk/aggregation_stage.h"
#include "mongo/db/extension/sdk/extension_factory.h"
#include "mongo/db/extension/sdk/test_extension_factory.h"
namespace sdk = mongo::extension::sdk;
DEFAULT_LOGICAL_AST_PARSE(TestFooSource, "$testFooSource")
/**
* $testFoo is a source stage.
*
* This file is identical to foo.cpp except it is a source stage instead of a no-op stage.
*/
class TestFooSourceStageDescriptor : public sdk::AggStageDescriptor {
public:
static inline const std::string kStageName = std::string(TestFooSourceStageName);
TestFooSourceStageDescriptor() : sdk::AggStageDescriptor(kStageName) {}
std::unique_ptr<sdk::AggStageParseNode> parse(mongo::BSONObj stageBson) const override {
auto arguments = sdk::validateStageDefinition(stageBson, kStageName, true /* checkEmpty */);
return std::make_unique<TestFooSourceParseNode>(kStageName, arguments);
}
};
class FooExtension : public sdk::Extension {
public:
void initialize(const sdk::HostPortalHandle& portal) override {
_registerStage<TestFooSourceStageDescriptor>(portal);
}
};
REGISTER_EXTENSION(FooExtension)
DEFINE_GET_EXTENSION()