mirror of https://github.com/mongodb/mongo
SERVER-111644 Implement `DocumentSourceExtensionOptimizable` (#42336)
GitOrigin-RevId: 97a18ffb35ee632ed4f867b0f12a3e9c6e0c167a
This commit is contained in:
parent
cb4cd787cd
commit
e3f47ef82d
|
|
@ -4,6 +4,7 @@ package(default_visibility = ["//visibility:public"])
|
|||
|
||||
exports_files([
|
||||
"document_source_extension_test.cpp",
|
||||
"document_source_extension_optimizable_test.cpp",
|
||||
])
|
||||
|
||||
exports_files(
|
||||
|
|
@ -21,6 +22,7 @@ mongo_cc_library(
|
|||
],
|
||||
hdrs = [
|
||||
"document_source_extension.h",
|
||||
"document_source_extension_optimizable.h",
|
||||
"extension_stage.h",
|
||||
"host_portal.h",
|
||||
],
|
||||
|
|
|
|||
|
|
@ -31,6 +31,7 @@
|
|||
|
||||
#include "mongo/base/init.h" // IWYU pragma: keep
|
||||
#include "mongo/db/extension/host/aggregation_stage/parse_node.h"
|
||||
#include "mongo/db/extension/host/document_source_extension_optimizable.h"
|
||||
#include "mongo/db/extension/host_connector/handle/aggregation_stage/stage_descriptor.h"
|
||||
|
||||
namespace mongo::extension::host {
|
||||
|
|
@ -101,6 +102,7 @@ void DocumentSourceExtension::registerStage(host_connector::AggStageDescriptorHa
|
|||
};
|
||||
}();
|
||||
|
||||
// TODO SERVER-112178: Add case for DocumentSourceExtensionExpandable.
|
||||
switch (descriptor.getType()) {
|
||||
case MongoExtensionAggStageType::kNoOp:
|
||||
registerStage(nameAsString, id, descriptor);
|
||||
|
|
@ -127,7 +129,7 @@ void DocumentSourceExtension::registerStage(const std::string& name,
|
|||
[id, descriptor](BSONElement specElem,
|
||||
const boost::intrusive_ptr<ExpressionContext>& expCtx)
|
||||
-> boost::intrusive_ptr<DocumentSource> {
|
||||
return boost::intrusive_ptr(new DocumentSourceExtension(
|
||||
return boost::intrusive_ptr(new DocumentSourceExtensionOptimizable(
|
||||
specElem.fieldNameStringData(), expCtx, id, specElem.wrap(), descriptor));
|
||||
});
|
||||
}
|
||||
|
|
@ -138,7 +140,7 @@ void DocumentSourceExtension::unregisterParser_forTest(const std::string& name)
|
|||
|
||||
DocumentSourceExtension::DocumentSourceExtension(
|
||||
StringData name,
|
||||
boost::intrusive_ptr<ExpressionContext> exprCtx,
|
||||
const boost::intrusive_ptr<ExpressionContext>& exprCtx,
|
||||
Id id,
|
||||
BSONObj rawStage,
|
||||
host_connector::AggStageDescriptorHandle staticDescriptor)
|
||||
|
|
@ -157,12 +159,6 @@ DocumentSource::Id DocumentSourceExtension::getId() const {
|
|||
return id;
|
||||
}
|
||||
|
||||
Value DocumentSourceExtension::serialize(const SerializationOptions& opts) const {
|
||||
// TODO We need to call into the plugin here when we want to serialize for query shape, or
|
||||
// if optimizations change the shape of the stage definition.
|
||||
return Value(_raw_stage);
|
||||
}
|
||||
|
||||
boost::optional<DocumentSource::DistributedPlanLogic>
|
||||
DocumentSourceExtension::distributedPlanLogic() {
|
||||
return boost::none;
|
||||
|
|
@ -192,4 +188,6 @@ DocumentSourceExtension::ExtensionBase DocumentSourceExtension::extensionBase()
|
|||
return ExtensionBase{_stageName, getExpCtx(), _id, _raw_stage, _staticDescriptor};
|
||||
}
|
||||
|
||||
DocumentSourceExtension::~DocumentSourceExtension() = default;
|
||||
|
||||
} // namespace mongo::extension::host
|
||||
|
|
|
|||
|
|
@ -167,11 +167,14 @@ public:
|
|||
|
||||
StageConstraints constraints(PipelineSplitState pipeState) const override;
|
||||
|
||||
Value serialize(const SerializationOptions& opts) const override;
|
||||
Value serialize(const SerializationOptions& opts) const override = 0;
|
||||
|
||||
// This method is invoked by extensions to register descriptor.
|
||||
static void registerStage(host_connector::AggStageDescriptorHandle descriptor);
|
||||
|
||||
// Declare DocumentSourceExtension to be pure virtual.
|
||||
~DocumentSourceExtension() override = 0;
|
||||
|
||||
private:
|
||||
static void registerStage(const std::string& name,
|
||||
DocumentSource::Id id,
|
||||
|
|
@ -186,8 +189,9 @@ private:
|
|||
friend class mongo::extension::DocumentSourceExtensionTest;
|
||||
static void unregisterParser_forTest(const std::string& name);
|
||||
|
||||
protected:
|
||||
DocumentSourceExtension(StringData name,
|
||||
boost::intrusive_ptr<ExpressionContext> exprCtx,
|
||||
const boost::intrusive_ptr<ExpressionContext>& exprCtx,
|
||||
Id id,
|
||||
BSONObj rawStage,
|
||||
mongo::extension::host_connector::AggStageDescriptorHandle descriptor);
|
||||
|
|
@ -205,12 +209,6 @@ private:
|
|||
|
||||
ExtensionBase extensionBase() const;
|
||||
|
||||
// Do not support copy or move.
|
||||
DocumentSourceExtension(const DocumentSourceExtension&) = delete;
|
||||
DocumentSourceExtension(DocumentSourceExtension&&) = delete;
|
||||
DocumentSourceExtension& operator=(const DocumentSourceExtension&) = delete;
|
||||
DocumentSourceExtension& operator=(DocumentSourceExtension&&) = delete;
|
||||
|
||||
/**
|
||||
* NB : Here we keep a copy of the stage name to service getSourceName().
|
||||
* It is tempting to rely on the name which is provided by the _staticDescriptor, however, that
|
||||
|
|
@ -222,7 +220,16 @@ private:
|
|||
const Id _id;
|
||||
BSONObj _raw_stage;
|
||||
const mongo::extension::host_connector::AggStageDescriptorHandle _staticDescriptor;
|
||||
mongo::extension::host_connector::AggStageParseNodeHandle _parseNode;
|
||||
const mongo::extension::host_connector::AggStageParseNodeHandle _parseNode;
|
||||
|
||||
private:
|
||||
// Do not support copy or move.
|
||||
DocumentSourceExtension(const DocumentSourceExtension&) = delete;
|
||||
DocumentSourceExtension(DocumentSourceExtension&&) = delete;
|
||||
DocumentSourceExtension& operator=(const DocumentSourceExtension&) = delete;
|
||||
DocumentSourceExtension& operator=(DocumentSourceExtension&&) = delete;
|
||||
};
|
||||
|
||||
} // namespace host
|
||||
|
||||
} // namespace mongo::extension
|
||||
|
|
|
|||
|
|
@ -0,0 +1,78 @@
|
|||
/**
|
||||
* Copyright (C) 2025-present MongoDB, Inc.
|
||||
*
|
||||
* This program is free software: you can redistribute it and/or modify
|
||||
* it under the terms of the Server Side Public License, version 1,
|
||||
* as published by MongoDB, Inc.
|
||||
*
|
||||
* This program is distributed in the hope that it will be useful,
|
||||
* but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||||
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
||||
* Server Side Public License for more details.
|
||||
*
|
||||
* You should have received a copy of the Server Side Public License
|
||||
* along with this program. If not, see
|
||||
* <http://www.mongodb.com/licensing/server-side-public-license>.
|
||||
*
|
||||
* As a special exception, the copyright holders give permission to link the
|
||||
* code of portions of this program with the OpenSSL library under certain
|
||||
* conditions as described in each individual source file and distribute
|
||||
* linked combinations including the program with the OpenSSL library. You
|
||||
* must comply with the Server Side Public License in all respects for
|
||||
* all of the code used other than as permitted herein. If you modify file(s)
|
||||
* with this exception, you may extend this exception to your version of the
|
||||
* file(s), but you are not obligated to do so. If you do not wish to do so,
|
||||
* delete this exception statement from your version. If you delete this
|
||||
* exception statement from all source files in the program, then also delete
|
||||
* it in the license file.
|
||||
*/
|
||||
#pragma once
|
||||
|
||||
#include "mongo/db/extension/host/document_source_extension.h"
|
||||
#include "mongo/db/extension/host_connector/handle/aggregation_stage/ast_node.h"
|
||||
#include "mongo/db/extension/host_connector/handle/aggregation_stage/logical.h"
|
||||
#include "mongo/db/extension/host_connector/handle/aggregation_stage/parse_node.h"
|
||||
#include "mongo/db/extension/host_connector/handle/aggregation_stage/stage_descriptor.h"
|
||||
|
||||
namespace mongo::extension::host {
|
||||
|
||||
class DocumentSourceExtensionOptimizable : public DocumentSourceExtension {
|
||||
public:
|
||||
// Direct construction of a source or transform extension.
|
||||
DocumentSourceExtensionOptimizable(StringData name,
|
||||
const boost::intrusive_ptr<ExpressionContext>& expCtx,
|
||||
Id id,
|
||||
BSONObj rawStage,
|
||||
host_connector::AggStageDescriptorHandle staticDescriptor)
|
||||
: DocumentSourceExtension(name, expCtx, id, rawStage, staticDescriptor),
|
||||
_logicalStage(validateAndCreateLogicalStage()) {}
|
||||
|
||||
Value serialize(const SerializationOptions& opts) const override {
|
||||
// TODO We need to call into the plugin here when we want to serialize for query shape, or
|
||||
// if optimizations change the shape of the stage definition.
|
||||
return Value(_raw_stage);
|
||||
}
|
||||
|
||||
private:
|
||||
const host_connector::LogicalAggStageHandle _logicalStage;
|
||||
|
||||
host_connector::LogicalAggStageHandle validateAndCreateLogicalStage() {
|
||||
std::vector<host_connector::VariantNodeHandle> expandedNodes = _parseNode.expand();
|
||||
|
||||
tassert(11164400,
|
||||
str::stream() << "Source or transform stage " << _stageName
|
||||
<< " must expand into exactly one node.",
|
||||
expandedNodes.size() == 1);
|
||||
|
||||
if (const auto* astNodeHandlePtr =
|
||||
std::get_if<host_connector::AggStageAstNodeHandle>(&expandedNodes[0])) {
|
||||
return astNodeHandlePtr->bind();
|
||||
} else {
|
||||
tasserted(11164401,
|
||||
str::stream() << "Source or transform extension" << _stageName
|
||||
<< " must expand into an AST node");
|
||||
}
|
||||
}
|
||||
};
|
||||
|
||||
} // namespace mongo::extension::host
|
||||
|
|
@ -0,0 +1,158 @@
|
|||
/**
|
||||
* Copyright (C) 2025-present MongoDB, Inc.
|
||||
*
|
||||
* This program is free software: you can redistribute it and/or modify
|
||||
* it under the terms of the Server Side Public License, version 1,
|
||||
* as published by MongoDB, Inc.
|
||||
*
|
||||
* This program is distributed in the hope that it will be useful,
|
||||
* but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||||
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
||||
* Server Side Public License for more details.
|
||||
*
|
||||
* You should have received a copy of the Server Side Public License
|
||||
* along with this program. If not, see
|
||||
* <http://www.mongodb.com/licensing/server-side-public-license>.
|
||||
*
|
||||
* As a special exception, the copyright holders give permission to link the
|
||||
* code of portions of this program with the OpenSSL library under certain
|
||||
* conditions as described in each individual source file and distribute
|
||||
* linked combinations including the program with the OpenSSL library. You
|
||||
* must comply with the Server Side Public License in all respects for
|
||||
* all of the code used other than as permitted herein. If you modify file(s)
|
||||
* with this exception, you may extend this exception to your version of the
|
||||
* file(s), but you are not obligated to do so. If you do not wish to do so,
|
||||
* delete this exception statement from your version. If you delete this
|
||||
* exception statement from all source files in the program, then also delete
|
||||
* it in the license file.
|
||||
*/
|
||||
|
||||
#include "mongo/db/extension/host/document_source_extension_optimizable.h"
|
||||
|
||||
#include "mongo/db/extension/sdk/aggregation_stage.h"
|
||||
#include "mongo/db/extension/sdk/tests/shared_test_stages.h"
|
||||
#include "mongo/db/pipeline/aggregation_context_fixture.h"
|
||||
#include "mongo/unittest/death_test.h"
|
||||
#include "mongo/unittest/unittest.h"
|
||||
|
||||
namespace mongo::extension {
|
||||
|
||||
static constexpr std::string_view kFirstStageNotAstName = "$firstStageNotAst";
|
||||
static constexpr std::string_view kExpandSizeTooBigName = "$expandSizeTooBig";
|
||||
class FirstStageNotAstParseNode : public sdk::AggStageParseNode {
|
||||
public:
|
||||
static inline const std::string kStageName = std::string(kFirstStageNotAstName);
|
||||
FirstStageNotAstParseNode() : sdk::AggStageParseNode(kStageName) {}
|
||||
|
||||
static constexpr size_t kExpansionSize = 1;
|
||||
size_t getExpandedSize() const override {
|
||||
return kExpansionSize;
|
||||
}
|
||||
std::vector<sdk::VariantNode> expand() const override {
|
||||
std::vector<sdk::VariantNode> expanded;
|
||||
expanded.reserve(kExpansionSize);
|
||||
expanded.emplace_back(new sdk::ExtensionAggStageParseNode(
|
||||
std::make_unique<sdk::shared_test_stages::NoOpAggStageParseNode>()));
|
||||
return expanded;
|
||||
}
|
||||
BSONObj getQueryShape(const ::MongoExtensionHostQueryShapeOpts* ctx) const override {
|
||||
return BSONObj();
|
||||
}
|
||||
};
|
||||
class FirstStageNotAstStageDescriptor : public sdk::AggStageDescriptor {
|
||||
public:
|
||||
static inline const std::string kStageName = std::string(kFirstStageNotAstName);
|
||||
FirstStageNotAstStageDescriptor()
|
||||
: sdk::AggStageDescriptor(kStageName, MongoExtensionAggStageType::kNoOp) {}
|
||||
|
||||
std::unique_ptr<sdk::AggStageParseNode> parse(BSONObj stageBson) const override {
|
||||
return std::make_unique<FirstStageNotAstParseNode>();
|
||||
}
|
||||
static inline std::unique_ptr<sdk::AggStageDescriptor> make() {
|
||||
return std::make_unique<FirstStageNotAstStageDescriptor>();
|
||||
}
|
||||
};
|
||||
class ExpandSizeTooBigParseNode : public sdk::AggStageParseNode {
|
||||
public:
|
||||
static inline const std::string kStageName = std::string(kExpandSizeTooBigName);
|
||||
ExpandSizeTooBigParseNode() : extension::sdk::AggStageParseNode(kStageName) {}
|
||||
|
||||
static constexpr size_t kExpansionSize = 2;
|
||||
size_t getExpandedSize() const override {
|
||||
return kExpansionSize;
|
||||
}
|
||||
std::vector<sdk::VariantNode> expand() const override {
|
||||
std::vector<sdk::VariantNode> expanded;
|
||||
expanded.reserve(kExpansionSize);
|
||||
expanded.emplace_back(new extension::sdk::ExtensionAggStageAstNode(
|
||||
sdk::shared_test_stages::NoOpAggStageAstNode::make()));
|
||||
expanded.emplace_back(new extension::sdk::ExtensionAggStageAstNode(
|
||||
sdk::shared_test_stages::NoOpAggStageAstNode::make()));
|
||||
return expanded;
|
||||
}
|
||||
BSONObj getQueryShape(const ::MongoExtensionHostQueryShapeOpts* ctx) const override {
|
||||
return BSONObj();
|
||||
}
|
||||
};
|
||||
class ExpandSizeTooBigStageDescriptor : public sdk::AggStageDescriptor {
|
||||
public:
|
||||
static inline const std::string kStageName = std::string(kExpandSizeTooBigName);
|
||||
ExpandSizeTooBigStageDescriptor()
|
||||
: sdk::AggStageDescriptor(kStageName, MongoExtensionAggStageType::kNoOp) {}
|
||||
|
||||
std::unique_ptr<sdk::AggStageParseNode> parse(BSONObj stageBson) const override {
|
||||
return std::make_unique<ExpandSizeTooBigParseNode>();
|
||||
}
|
||||
static inline std::unique_ptr<sdk::AggStageDescriptor> make() {
|
||||
return std::make_unique<ExpandSizeTooBigStageDescriptor>();
|
||||
}
|
||||
};
|
||||
|
||||
class DocumentSourceExtensionOptimizableTest : public AggregationContextFixture {
|
||||
public:
|
||||
DocumentSourceExtensionOptimizableTest() : DocumentSourceExtensionOptimizableTest(_nss) {}
|
||||
explicit DocumentSourceExtensionOptimizableTest(NamespaceString nsString)
|
||||
: AggregationContextFixture(std::move(nsString)) {};
|
||||
|
||||
protected:
|
||||
static inline NamespaceString _nss = NamespaceString::createNamespaceString_forTest(
|
||||
boost::none, "document_source_extension_optimizable_test");
|
||||
|
||||
sdk::ExtensionAggStageDescriptor _expandSizeTooBigStageDescriptor{
|
||||
ExpandSizeTooBigStageDescriptor::make()};
|
||||
sdk::ExtensionAggStageDescriptor _firstStageNotAstStageDescriptor{
|
||||
FirstStageNotAstStageDescriptor::make()};
|
||||
sdk::ExtensionAggStageDescriptor _noOpAggregationStageDescriptor{
|
||||
sdk::shared_test_stages::NoOpAggStageDescriptor::make()};
|
||||
};
|
||||
|
||||
DEATH_TEST_F(DocumentSourceExtensionOptimizableTest, expandedSizeNotOneFails, "11164400") {
|
||||
[[maybe_unused]] auto extensionOptimizable = host::DocumentSourceExtensionOptimizable(
|
||||
ExpandSizeTooBigStageDescriptor::kStageName,
|
||||
getExpCtx(),
|
||||
DocumentSource::allocateId(ExpandSizeTooBigStageDescriptor::kStageName),
|
||||
BSON(ExpandSizeTooBigStageDescriptor::kStageName << BSON("foo" << true)),
|
||||
host_connector::AggStageDescriptorHandle(&_expandSizeTooBigStageDescriptor));
|
||||
}
|
||||
|
||||
DEATH_TEST_F(DocumentSourceExtensionOptimizableTest, expandToParseNodeFails, "11164401") {
|
||||
[[maybe_unused]] auto extensionOptimizable = host::DocumentSourceExtensionOptimizable(
|
||||
FirstStageNotAstStageDescriptor::kStageName,
|
||||
getExpCtx(),
|
||||
DocumentSource::allocateId(FirstStageNotAstStageDescriptor::kStageName),
|
||||
BSON(FirstStageNotAstStageDescriptor::kStageName << BSON("foo" << true)),
|
||||
host_connector::AggStageDescriptorHandle(&_firstStageNotAstStageDescriptor));
|
||||
}
|
||||
|
||||
TEST_F(DocumentSourceExtensionOptimizableTest, noOpConstructionSucceeds) {
|
||||
auto optimizable = host::DocumentSourceExtensionOptimizable(
|
||||
sdk::shared_test_stages::NoOpAggStageDescriptor::kStageName,
|
||||
getExpCtx(),
|
||||
DocumentSource::allocateId(sdk::shared_test_stages::NoOpAggStageDescriptor::kStageName),
|
||||
BSON(sdk::shared_test_stages::NoOpAggStageDescriptor::kStageName << BSON("foo" << true)),
|
||||
host_connector::AggStageDescriptorHandle(&_noOpAggregationStageDescriptor));
|
||||
|
||||
ASSERT_EQ(std::string(optimizable.getSourceName()),
|
||||
sdk::shared_test_stages::NoOpAggStageDescriptor::kStageName);
|
||||
}
|
||||
} // namespace mongo::extension
|
||||
|
|
@ -1597,6 +1597,7 @@ mongo_cc_unit_test(
|
|||
],
|
||||
}) + [
|
||||
"//src/mongo/db/exec/agg:document_source_to_stage_registry_test.cpp",
|
||||
"//src/mongo/db/extension/host:document_source_extension_optimizable_test.cpp",
|
||||
"//src/mongo/db/extension/host:document_source_extension_test.cpp",
|
||||
"//src/mongo/db/pipeline:document_source_coll_stats_test.cpp",
|
||||
"//src/mongo/db/pipeline:document_source_internal_all_collection_stats_test.cpp",
|
||||
|
|
|
|||
Loading…
Reference in New Issue