SERVER-110277 Modify SplitPipeline code to accommodate idLookup special case (#44594)

GitOrigin-RevId: c9eeb3d1be5b65a3a76fe4afa9db51aa92e3c322
This commit is contained in:
Alyssa Clark 2025-12-11 15:01:51 -05:00 committed by MongoDB Bot
parent b6bf8e8bbe
commit b2acf37463
8 changed files with 486 additions and 8 deletions

View File

@ -0,0 +1,50 @@
/**
* Tests a source stage that reads documents from disk. $readNDocuments desugars to a source stage that produces _ids
* followed by an idLookup, so it will mimic reading from a collection with a limit.
*
* This test exists to verify the pipeline splitting behavior for a source stage followed by idLookup in sharded contexts.
*
* @tags: [featureFlagExtensionsAPI]
*/
const collName = jsTestName();
const coll = db[collName];
coll.drop();
const documents = [
{_id: 0, dog: "labradoodle"},
{_id: 1, dog: "golden retriever"},
{_id: 2, dog: "cavalier king charles spaniel"},
{_id: 3, dog: "bichon frise"},
];
coll.insertMany(documents);
// EOF case.
let results = coll.aggregate([{$readNDocuments: {numDocs: 0}}]).toArray();
assert.eq(results.length, 0, results);
// Return some documents in the collection.
results = coll.aggregate([{$readNDocuments: {numDocs: 2}}]).toArray();
assert.sameMembers(results, documents.slice(0, 2));
// Return all documents in the collection. Since it should be using idLookup, it can't return more than what is in the collection.
results = coll.aggregate([{$readNDocuments: {numDocs: 6}}]).toArray();
assert.sameMembers(results, documents);
// Return all documents in the collection, sorted by _id.
results = coll.aggregate([{$readNDocuments: {numDocs: 6, sortById: true}}]).toArray();
assert.eq(results, documents);
// TODO SERVER-113930 Test in lookup and unionWith.
// results = coll.aggregate([{$sort: {_id: 1}}, {$limit: 1}, {$lookup: {from: collName, pipeline: [{$readNDocuments: {numDocs: 2, sortById: true}}], as: "dogs"}}]).toArray();
// assert.eq(results, [{_id: 0, dog: "labradoodle", dogs: documents.slice(0, 2)}]);
// results = coll.aggregate([{$sort: {_id: 1}}, {$limit: 1}, {$lookup: {from: collName, pipeline: [{$readNDocuments: {numDocs: 2}}], as: "dogs"}}]).toArray();
// assert.eq(results.length, 1, results);
// assert(results[0].hasOwnProperty("dogs"));
// assert.sameMembers(documents.slice(0, 2), results[0].dogs);
// results = coll.aggregate([{$unionWith: {coll: collName, pipeline: [{$readNDocuments: {numDocs: 2}}]}}]).toArray();
// assert.sameMembers(results, documents.concat(documents.slice(0, 2)));
// results = coll.aggregate([{$unionWith: {coll: collName, pipeline: [{$readNDocuments: {numDocs: 2, sortById: true}}]}}]).toArray();
// assert.eq(results, documents.concat(documents.slice(0, 2)));

View File

@ -98,6 +98,7 @@ mongo_cc_unit_test(
"//src/mongo/db/extension/test_examples:null_mongo_extension_bad_extension",
"//src/mongo/db/extension/test_examples:null_stage_descriptor_bad_extension",
"//src/mongo/db/extension/test_examples:parse_options_mongo_extension",
"//src/mongo/db/extension/test_examples:read_n_documents_mongo_extension",
"//src/mongo/db/extension/test_examples:sharded_execution_serialization_mongo_extension",
"//src/mongo/db/extension/test_examples:test_options_mongo_extension",
"//src/mongo/db/extension/test_examples:toaster_mongo_extension",

View File

@ -39,6 +39,7 @@ extensions_with_config(
":extension_errors_mongo_extension",
":shapify_mongo_extension",
":sharded_execution_serialization_mongo_extension",
":read_n_documents_mongo_extension",
":explain_mongo_extension",
":idle_threads_mongo_extension",
":interrupt_mongo_extension",
@ -74,6 +75,7 @@ pkg_name = "//" + package_name() + "/"
"foo",
"foo_v2",
"bar",
"read_n_documents",
]
]

View File

@ -0,0 +1,182 @@
/**
* Copyright (C) 2025-present MongoDB, Inc.
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the Server Side Public License, version 1,
* as published by MongoDB, Inc.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* Server Side Public License for more details.
*
* You should have received a copy of the Server Side Public License
* along with this program. If not, see
* <http://www.mongodb.com/licensing/server-side-public-license>.
*
* As a special exception, the copyright holders give permission to link the
* code of portions of this program with the OpenSSL library under certain
* conditions as described in each individual source file and distribute
* linked combinations including the program with the OpenSSL library. You
* must comply with the Server Side Public License in all respects for
* all of the code used other than as permitted herein. If you modify file(s)
* with this exception, you may extend this exception to your version of the
* file(s), but you are not obligated to do so. If you do not wish to do so,
* delete this exception statement from your version. If you delete this
* exception statement from all source files in the program, then also delete
* it in the license file.
*/
#include "mongo/bson/bsonobj.h"
#include "mongo/bson/bsonobjbuilder.h"
#include "mongo/db/extension/sdk/aggregation_stage.h"
#include "mongo/db/extension/sdk/extension_factory.h"
#include "mongo/db/extension/sdk/test_extension_factory.h"
namespace sdk = mongo::extension::sdk;
using namespace mongo;
class ProduceIdsExecStage : public sdk::ExecAggStageSource {
public:
ProduceIdsExecStage(std::string_view stageName, const BSONObj& arguments)
: sdk::ExecAggStageSource(stageName),
_sortById(arguments["sortById"] && arguments["sortById"].booleanSafe()) {
_numDocs = arguments["numDocs"] && arguments["numDocs"].isNumber()
? arguments["numDocs"].safeNumberInt()
: 1;
}
extension::ExtensionGetNextResult getNext(const sdk::QueryExecutionContextHandle& execCtx,
::MongoExtensionExecAggStage* execStage) override {
if (_currentDoc == _numDocs) {
return extension::ExtensionGetNextResult::eof();
}
// Generate zero-indexed, ascending ids.
auto currentId = _currentDoc++;
auto document = extension::ExtensionBSONObj::makeAsByteBuf(BSON("_id" << currentId));
if (!_sortById) {
// We haven't been asked for sorted results, no need to generate sort key metadata.
return extension::ExtensionGetNextResult::advanced(std::move(document));
}
// Generate sort key metadata so that the sharded sort by id can be applied correctly.
auto metadata = extension::ExtensionBSONObj::makeAsByteBuf(
BSON("$sortKey" << BSON("_id" << currentId)));
return extension::ExtensionGetNextResult::advanced(std::move(document),
std::move(metadata));
}
void open() override {}
void reopen() override {}
void close() override {}
BSONObj explain(::MongoExtensionExplainVerbosity verbosity) const override {
return BSONObj();
}
private:
const bool _sortById;
int _numDocs;
int _currentDoc = 0;
};
class ProduceIdsLogicalStage : public sdk::TestLogicalStage<ProduceIdsExecStage> {
public:
ProduceIdsLogicalStage(std::string_view stageName, const BSONObj& arguments)
: sdk::TestLogicalStage<ProduceIdsExecStage>(stageName, arguments) {}
std::unique_ptr<sdk::LogicalAggStage> clone() const {
return std::make_unique<ProduceIdsLogicalStage>(_name, _arguments);
}
boost::optional<extension::sdk::DistributedPlanLogic> getDistributedPlanLogic() const override {
// We only need to provide distributed planning logic for correctness when we
// need to sort by _id, but we specify it unconditionally to force more interesting
// distributed planning.
extension::sdk::DistributedPlanLogic dpl;
{
std::vector<extension::VariantDPLHandle> pipeline;
pipeline.emplace_back(
extension::LogicalAggStageHandle{new sdk::ExtensionLogicalAggStage(clone())});
dpl.shardsPipeline = sdk::DPLArrayContainer(std::move(pipeline));
}
if (_arguments["sortById"] && _arguments["sortById"].booleanSafe()) {
dpl.sortPattern = BSON("_id" << 1);
}
return dpl;
}
};
class ProduceIdsAstNode : public sdk::TestAstNode<ProduceIdsLogicalStage> {
public:
ProduceIdsAstNode(std::string_view stageName, const BSONObj& arguments)
: sdk::TestAstNode<ProduceIdsLogicalStage>(stageName, arguments) {}
BSONObj getProperties() const override {
extension::MongoExtensionStaticProperties properties;
properties.setPosition(extension::MongoExtensionPositionRequirementEnum::kFirst);
properties.setRequiresInputDocSource(false);
BSONObjBuilder builder;
properties.serialize(&builder);
return builder.obj();
}
};
DEFAULT_PARSE_NODE(ProduceIds);
class ReadNDocumentsParseNode : public sdk::TestParseNode<ProduceIdsAstNode> {
public:
ReadNDocumentsParseNode(std::string_view stageName, const BSONObj& arguments)
: sdk::TestParseNode<ProduceIdsAstNode>(stageName, arguments) {}
size_t getExpandedSize() const override {
return 2;
}
std::vector<extension::VariantNodeHandle> expand() const override {
std::vector<extension::VariantNodeHandle> expanded;
expanded.reserve(getExpandedSize());
expanded.emplace_back(new sdk::ExtensionAggStageAstNode(
std::make_unique<ProduceIdsAstNode>("$produceIds", _arguments)));
expanded.emplace_back(
extension::sdk::HostServicesHandle::getHostServices()->createIdLookup(kIdLookupSpec));
return expanded;
}
private:
static const BSONObj kIdLookupSpec;
};
const BSONObj ReadNDocumentsParseNode::kIdLookupSpec =
BSON("$_internalSearchIdLookup" << BSONObj());
/**
* ReadNDocuments stage that will read documents with integer ids from a collection and will
* optionally return them sorted by _id value (ascending). Syntax:
*
* {$readNDocuments: {numDocs: <int>, sortById: <optional bool>}}
*
* $readNDocuments will desugar to two stages: $produceIds, which will produce integer ids up to
* 'numDocs', followed by $_internalSearchIdLookup, which will attempt to read documents with the
* given ids from the collection.
*/
using ReadNDocumentsStageDescriptor =
sdk::TestStageDescriptor<"$readNDocuments", ReadNDocumentsParseNode>;
// $produceIds needs to be parseable to be used in sharded execution.
using ProduceIdsStageDescriptor = sdk::TestStageDescriptor<"$produceIds", ProduceIdsParseNode>;
class ReadNDocumentsExtension : public sdk::Extension {
public:
void initialize(const sdk::HostPortalHandle& portal) override {
_registerStage<ReadNDocumentsStageDescriptor>(portal);
_registerStage<ProduceIdsStageDescriptor>(portal);
}
};
REGISTER_EXTENSION(ReadNDocumentsExtension)
DEFINE_GET_EXTENSION()

View File

@ -1212,6 +1212,7 @@ mongo_cc_unit_test(
"sharded_agg_helpers_test.cpp",
"sharded_union_test.cpp",
"skip_and_limit_test.cpp",
"split_pipeline_test.cpp",
"stage_params_to_document_source_registry_test.cpp",
"tee_buffer_test.cpp",
"//src/mongo/db/exec/agg:document_source_to_stage_registry_test.cpp",

View File

@ -294,7 +294,6 @@ public:
}
}
typedef std::function<bool(const DocumentSource&)> movePastFunctionType;
// A stage which executes on each shard in parallel, or nullptr if nothing can be done in
// parallel. For example, a partial $group before a subsequent global $group.
boost::intrusive_ptr<DocumentSource> shardsStage = nullptr;
@ -317,11 +316,9 @@ public:
// If needsSplit is false and this plan has anything that must run on the merging half of
// the pipeline, it will be deferred until the next stage that sets any non-default value on
// 'DistributedPlanLogic' or until a following stage causes the given validation
// function to return false. By default this will not allow swapping with any
// following stages.
movePastFunctionType canMovePast = [](const DocumentSource&) {
return false;
};
// function to return false. This function defaults to unset.
typedef std::function<bool(const DocumentSource&)> movePastFunctionType;
movePastFunctionType canMovePast = {};
};
/**

View File

@ -36,6 +36,7 @@
#include "mongo/db/pipeline/document_source_sort.h"
#include "mongo/db/pipeline/document_source_unwind.h"
#include "mongo/db/pipeline/pipeline.h"
#include "mongo/db/pipeline/search/document_source_internal_search_id_lookup.h"
#include "mongo/db/pipeline/semantic_analysis.h"
#include "mongo/db/query/compiler/dependency_analysis/dependencies.h"
@ -131,6 +132,7 @@ public:
->isFeatureFlagShardFilteringDistinctScanEnabled()) {
_moveGroupFollowingSortFromMergerToShards();
}
_moveIdLookupFromMergerToShards();
_moveFinalUnwindFromShardsToMerger();
_propagateDocLimitToShards();
_limitFieldsSentFromShardsToMerger();
@ -186,7 +188,7 @@ private:
void _finishFindSplitPointAfterDeferral(
boost::intrusive_ptr<DocumentSource> deferredStage,
boost::optional<BSONObj> mergeSort,
DocumentSource::DistributedPlanLogic::movePastFunctionType moveCheckFunc) {
const DocumentSource::DistributedPlanLogic::movePastFunctionType& moveCheckFunc) {
while (_mergePipeHasNext()) {
auto [current, distributedPlanLogic] = _popFirstMergeStage();
if (!moveCheckFunc(*current)) {
@ -281,7 +283,7 @@ private:
distributedPlanLogic->canMovePast);
auto mergingStageList = distributedPlanLogic->mergingStages;
tassert(6448007,
"Only support deferring at most one stage for now.",
"Only support deferring at most one stage.",
mergingStageList.size() <= 1);
_finishFindSplitPointAfterDeferral(
@ -377,6 +379,43 @@ private:
}
}
/**
* Move idLookup to the shards if it is in the merging pipeline. It can be pushed down if it is
* first in the merging pipeline or is preceded by $limit. If another stage is blocking
* pushdown, throw an error - $idLookup can't provide correct results on a merger.
*/
void _moveIdLookupFromMergerToShards() {
bool canPushDownIdLookup = true;
auto& mergeSources = _splitPipeline.mergePipeline->getSources();
for (auto it = mergeSources.begin(); it != mergeSources.end(); ++it) {
if (dynamic_cast<DocumentSourceLimit*>(it->get())) {
// Technically, swapping $idLookup and $limit can change query results. However,
// $vectorSearch will always put a $limit in the merging pipeline, and we are
// allowed to (and should, for correctness) swap with it. A non-$vectorSearch
// pipeline probably shouldn't try $limit followed by $idLookup, but if it does the
// results will still be more useful post-swap.
continue;
}
if (dynamic_cast<DocumentSourceInternalSearchIdLookUp*>(it->get())) {
uassert(
11027701,
"idLookup is not allowed to run in a merging pipeline but pushdown was blocked",
canPushDownIdLookup);
// We need to remove the stage from the merge pipeline before we can push it down.
auto idLookupStage = *it;
mergeSources.erase(it);
pushdownEntireStage(idLookupStage);
return;
}
// There's a non-$limit stage in the pipeline, so we can't perform an $idLookup
// pushdown. Continue the loop so that we can error if we find an $idLookup later.
canPushDownIdLookup = false;
}
}
/**
* If the final stage on shards is to unwind an array, move that stage to the merger. This
* cuts down on network traffic and allows us to take advantage of reduced copying in

View File

@ -0,0 +1,206 @@
/**
* Copyright (C) 2025-present MongoDB, Inc.
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the Server Side Public License, version 1,
* as published by MongoDB, Inc.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* Server Side Public License for more details.
*
* You should have received a copy of the Server Side Public License
* along with this program. If not, see
* <http://www.mongodb.com/licensing/server-side-public-license>.
*
* As a special exception, the copyright holders give permission to link the
* code of portions of this program with the OpenSSL library under certain
* conditions as described in each individual source file and distribute
* linked combinations including the program with the OpenSSL library. You
* must comply with the Server Side Public License in all respects for
* all of the code used other than as permitted herein. If you modify file(s)
* with this exception, you may extend this exception to your version of the
* file(s), but you are not obligated to do so. If you do not wish to do so,
* delete this exception statement from your version. If you delete this
* exception statement from all source files in the program, then also delete
* it in the license file.
*/
#include "mongo/db/pipeline/split_pipeline.h"
#include "mongo/db/pipeline/aggregation_context_fixture.h"
#include <deque>
namespace mongo::sharded_agg_helpers {
namespace {
class SplitPipelineTest : public AggregationContextFixture {
public:
void checkPipelineContents(const Pipeline* pipeline, std::deque<StringData> expectedStages) {
ASSERT_EQ(pipeline->size(), expectedStages.size());
for (auto stage : pipeline->getSources()) {
ASSERT_EQ(stage->getSourceName(), expectedStages.front());
expectedStages.pop_front();
}
}
void checkSortSpec(const SplitPipeline& splitPipeline,
boost::optional<BSONObj> expectedSortSpec) {
ASSERT_EQ(splitPipeline.shardCursorsSortSpec.has_value(), expectedSortSpec.has_value());
if (expectedSortSpec) {
ASSERT_BSONOBJ_EQ(*splitPipeline.shardCursorsSortSpec, *expectedSortSpec);
}
}
struct ExpectedSplitPipeline {
std::deque<StringData> shardsStages;
std::deque<StringData> mergeStages;
boost::optional<BSONObj> sortSpec;
};
SplitPipeline testPipelineSplit(const std::vector<BSONObj>& inputPipeline,
const ExpectedSplitPipeline& expectedSplitPipeline) {
auto pipeline = Pipeline::parse(inputPipeline, getExpCtx());
auto actualSplitPipeline = SplitPipeline::split(std::move(pipeline));
checkPipelineContents(actualSplitPipeline.shardsPipeline.get(),
expectedSplitPipeline.shardsStages);
checkPipelineContents(actualSplitPipeline.mergePipeline.get(),
expectedSplitPipeline.mergeStages);
checkSortSpec(actualSplitPipeline, expectedSplitPipeline.sortSpec);
return actualSplitPipeline;
}
};
TEST_F(SplitPipelineTest, SimpleIdLookupPushdown) {
// When IdLookup is the first stage in the merging pipeline it should be pushed down.
testPipelineSplit(
{
BSON("$vectorSearch" << BSONObj()),
BSON("$_internalSearchIdLookup" << BSONObj()),
},
ExpectedSplitPipeline{
.shardsStages =
{
"$vectorSearch",
"$_internalSearchIdLookup",
},
.mergeStages = {},
.sortSpec = BSON("$vectorSearchScore" << -1),
});
}
TEST_F(SplitPipelineTest, PushdownIdLookupOverLimit) {
// When IdLookup is only preceded by a $limit in the merging pipeline, it should be pushed down.
testPipelineSplit(
{
BSON("$vectorSearch" << BSON("limit" << 10)),
BSON("$_internalSearchIdLookup" << BSONObj()),
BSON("$match" << BSON("x" << 5)),
BSON("$sort" << BSON("y" << 1)),
},
ExpectedSplitPipeline{
.shardsStages =
{
"$vectorSearch",
"$_internalSearchIdLookup",
"$limit",
},
.mergeStages =
{
"$limit",
"$match",
"$sort",
},
.sortSpec = BSON("$vectorSearchScore" << -1),
});
}
TEST_F(SplitPipelineTest, PushdownIdLookupOverMultipleLimits) {
// IdLookup should be pushed down even over multiple $limits.
testPipelineSplit(
{
BSON("$limit" << 100),
BSON("$limit" << 10),
BSON("$_internalSearchIdLookup" << BSONObj()),
},
ExpectedSplitPipeline{
.shardsStages =
{
"$limit",
"$_internalSearchIdLookup",
"$limit",
},
.mergeStages =
{
"$limit",
"$limit",
},
.sortSpec = boost::none,
});
}
TEST_F(SplitPipelineTest, PushdownIdLookupDoesNotBlockLimitFieldsSentToMergerOptimization) {
// IdLookup should be pushed down even over multiple $limits.
testPipelineSplit(
{
BSON("$limit" << 10),
BSON("$_internalSearchIdLookup" << BSONObj()),
BSON("$project" << BSON("x" << 1)),
},
ExpectedSplitPipeline{
.shardsStages =
{
"$limit",
"$_internalSearchIdLookup",
"$limit",
"$project",
},
.mergeStages =
{
"$limit",
"$project",
},
.sortSpec = boost::none,
});
}
TEST_F(SplitPipelineTest, IdLookupInInvalidMergingPipelineLocation) {
// When IdLookup is preceded by a stage other than $limit in the merging pipeline, it should
// error.
ASSERT_THROWS_CODE(testPipelineSplit(
{
BSON("$sort" << BSON("x" << 1)),
BSON("$count" << "count"),
BSON("$_internalSearchIdLookup" << BSONObj()),
},
ExpectedSplitPipeline{}),
AssertionException,
11027701);
}
TEST_F(SplitPipelineTest, SplitAtIdLookup) {
// When IdLookup is the first stage in the pipeline that requires a split, it should end up on
// the shards.
testPipelineSplit(
{
BSON("$match" << BSON("x" << 1)),
BSON("$_internalSearchIdLookup" << BSONObj()),
BSON("$match" << BSON("y" << 1)),
},
ExpectedSplitPipeline{
.shardsStages =
{
"$match",
"$_internalSearchIdLookup",
},
.mergeStages = {"$match"},
.sortSpec = boost::none,
});
}
} // namespace
} // namespace mongo::sharded_agg_helpers