SERVER-112135 Enable extensions to collect diagnostic metrics exposed in OpDebug (#43482)

GitOrigin-RevId: d6ce464cad747c8aedb81daf9595d7628abe81a4
This commit is contained in:
Finley Lau 2025-11-06 16:26:06 -06:00 committed by MongoDB Bot
parent 0560e59a0f
commit b71ae2b570
26 changed files with 715 additions and 111 deletions

View File

@ -945,6 +945,7 @@ mongo_cc_library(
"//src/mongo/db/commands:test_commands_enabled", # TODO(SERVER-93876): Remove.
"//src/mongo/db/commands/server_status:server_status_core", # TODO(SERVER-93876): Remove.
"//src/mongo/db/exec/mutable_bson",
"//src/mongo/db/extension/host:extension_operation_metrics_registry",
"//src/mongo/db/local_catalog:collection_options", # TODO(SERVER-93876): Remove.
"//src/mongo/db/local_catalog:local_oplog_info",
"//src/mongo/db/query:common_query_enums_and_helpers",

View File

@ -292,6 +292,7 @@ mongo_cc_library(
}),
deps = [
"//src/mongo/db:profile_settings",
"//src/mongo/db/extension/host:extension_operation_metrics_registry",
"//src/mongo/db/query/algebra",
"//src/mongo/db/sorter:sorter_base",
"//src/mongo/db/storage:container",

View File

@ -14,6 +14,18 @@ exports_files(
]),
)
# Note that this is a standalone library such that it doesn't need to have any server dependencies like the rest of the extension_host target does.
mongo_cc_library(
name = "extension_operation_metrics_registry",
hdrs = [
"operation_metrics_registry.h",
],
deps = [
"//src/mongo:base",
"//src/mongo/db/extension/host_connector/handle:host_handles",
],
)
mongo_cc_library(
name = "extension_host",
srcs = [
@ -28,9 +40,12 @@ mongo_cc_library(
"document_source_extension_optimizable.h",
"extension_stage.h",
"host_portal.h",
"query_execution_context.h",
],
deps = [
":extension_operation_metrics_registry",
"//src/mongo:base",
"//src/mongo/db:commands",
"//src/mongo/db/extension/host/aggregation_stage",
"//src/mongo/db/extension/host_connector:extensions_host_connector",
"//src/mongo/db/extension/host_connector:host_services_adapter",

View File

@ -43,8 +43,8 @@ namespace mongo::extension::host {
*/
class ExecAggStage {
public:
ExecAggStage(std::unique_ptr<exec::agg::Stage> execAggStage)
: _execAggStage(std::move(execAggStage)) {}
ExecAggStage(std::unique_ptr<exec::agg::Stage> execAggStage, const std::string& stageName)
: _execAggStage(std::move(execAggStage)), _stageName(stageName) {}
~ExecAggStage() = default;
@ -55,8 +55,13 @@ public:
return _execAggStage->getNext();
}
std::string_view getName() const {
return _stageName;
}
private:
std::unique_ptr<exec::agg::Stage> _execAggStage;
const std::string _stageName;
};
}; // namespace mongo::extension::host

View File

@ -30,8 +30,9 @@
#include "mongo/db/exec/agg/document_source_to_stage_registry.h"
#include "mongo/db/exec/agg/stage.h"
#include "mongo/db/extension/host/aggregation_stage/executable_agg_stage.h"
#include "mongo/db/extension/host_connector/executable_agg_stage.h"
#include "mongo/db/extension/host_connector/executable_agg_stage_adapter.h"
#include "mongo/db/extension/host_connector/handle/executable_agg_stage.h"
#include "mongo/db/extension/host_connector/query_execution_context_adapter.h"
#include "mongo/db/extension/sdk/aggregation_stage.h"
#include "mongo/db/extension/shared/get_next_result.h"
#include "mongo/db/pipeline/document_source.h"
@ -54,24 +55,29 @@ protected:
class NoOpHostExecAggStage : public host::ExecAggStage {
public:
explicit NoOpHostExecAggStage(std::unique_ptr<mongo::exec::agg::Stage> execAggStage)
: host::ExecAggStage(std::move(execAggStage)) {}
static constexpr std::string kStageName = "$noOpHost";
explicit NoOpHostExecAggStage(std::unique_ptr<mongo::exec::agg::Stage> execAggStage,
const std::string& stageName)
: host::ExecAggStage(std::move(execAggStage), stageName) {}
static inline std::unique_ptr<host::ExecAggStage> make(
std::unique_ptr<mongo::exec::agg::Stage> execAggStage) {
return std::make_unique<NoOpHostExecAggStage>(std::move(execAggStage));
return std::make_unique<NoOpHostExecAggStage>(std::move(execAggStage), kStageName);
}
};
class NoOpExtensionExecAggStage : public sdk::ExecAggStage {
public:
NoOpExtensionExecAggStage() : sdk::ExecAggStage() {}
static constexpr std::string kStageName = "$noOpExt";
NoOpExtensionExecAggStage(const std::string& stageName) : sdk::ExecAggStage(stageName) {}
ExtensionGetNextResult getNext() override {
ExtensionGetNextResult getNext(const sdk::QueryExecutionContextHandle& expCtx,
const MongoExtensionExecAggStage* execStage) override {
MONGO_UNIMPLEMENTED;
}
static inline std::unique_ptr<sdk::ExecAggStage> make() {
return std::make_unique<NoOpExtensionExecAggStage>();
return std::make_unique<NoOpExtensionExecAggStage>(kStageName);
}
};
@ -100,19 +106,20 @@ TEST(HostExecAggStageTest, GetNextResult) {
ASSERT_EQ(ReturnStatus::kPauseExecution, getNextResult.getStatus());
// Test getNext() via ExecAggStageHandle.
auto nullExpCtx = host_connector::QueryExecutionContextAdapter(nullptr);
auto hostExecAggStage = new host_connector::HostExecAggStageAdapter(std::move(execAggStage));
auto handle = host_connector::ExecAggStageHandle{hostExecAggStage};
auto hostGetNextResult = handle.getNext();
auto hostGetNextResult = handle.getNext(&nullExpCtx);
ASSERT_EQ(extension::GetNextCode::kPauseExecution, hostGetNextResult.code);
ASSERT_EQ(boost::none, hostGetNextResult.res);
hostGetNextResult = handle.getNext();
hostGetNextResult = handle.getNext(&nullExpCtx);
ASSERT_EQ(extension::GetNextCode::kAdvanced, hostGetNextResult.code);
ASSERT_BSONOBJ_EQ(BSON("a" << 1), hostGetNextResult.res.get());
// Note that the match clause is "a": 1 so the documents where "a": 2 will be passed over.
hostGetNextResult = handle.getNext();
hostGetNextResult = handle.getNext(&nullExpCtx);
ASSERT_EQ(extension::GetNextCode::kEOF, hostGetNextResult.code);
ASSERT_EQ(boost::none, hostGetNextResult.res);
@ -149,8 +156,9 @@ TEST(HostExecAggStageTest, GetNextResultEdgeCaseEof) {
// Test getNext() via ExecAggStageHandle.
auto hostExecAggStage = new host_connector::HostExecAggStageAdapter(std::move(execAggStage));
auto handle = host_connector::ExecAggStageHandle{hostExecAggStage};
auto nullExpCtx = host_connector::QueryExecutionContextAdapter(nullptr);
auto hostGetNextResult = handle.getNext();
auto hostGetNextResult = handle.getNext(&nullExpCtx);
ASSERT_EQ(extension::GetNextCode::kEOF, hostGetNextResult.code);
ASSERT_EQ(boost::none, hostGetNextResult.res);
}
@ -179,10 +187,11 @@ DEATH_TEST_F(ExecAggStageTest, InvalidReturnStatusCode, "11019500") {
auto hostExecAggStage = new host_connector::HostExecAggStageAdapter(std::move(execAggStage));
auto handle = host_connector::ExecAggStageHandle{hostExecAggStage};
auto nullExpCtx = host_connector::QueryExecutionContextAdapter(nullptr);
// This getNext() call should hit the tassert because the C API doesn't have a
// kAdvancedControlDocument value for GetNextCode.
handle.getNext();
handle.getNext(&nullExpCtx);
}
TEST(HostExecAggStageTest, IsHostAllocated) {
@ -204,6 +213,30 @@ TEST(HostExecAggStageTest, IsNotHostAllocated) {
ASSERT_FALSE(host_connector::HostExecAggStageAdapter::isHostAllocated(*handle.get()));
}
TEST(HostExecAggStageTest, GetNameFromExtensionStage) {
auto noOpExtensionExecAggStage =
new sdk::ExtensionExecAggStage(NoOpExtensionExecAggStage::make());
auto handle = host_connector::ExecAggStageHandle{noOpExtensionExecAggStage};
ASSERT_EQ(handle.getName(), NoOpExtensionExecAggStage::kStageName);
}
TEST(HostExecAggStageTest, GetNameFromHostStage) {
boost::intrusive_ptr<DocumentSourceMatch> matchDocSourceStage =
DocumentSourceMatch::create(BSON("a" << 1), make_intrusive<ExpressionContextForTest>());
auto mock = DocumentSourceMock::createForTest({}, make_intrusive<ExpressionContextForTest>());
exec::agg::StagePtr matchExecAggStage = exec::agg::buildStage(matchDocSourceStage);
exec::agg::StagePtr mockStage = exec::agg::buildStage(mock);
matchExecAggStage->setSource(mockStage.get());
std::unique_ptr<host::ExecAggStage> execAggStage = NoOpHostExecAggStage::make(
std::unique_ptr<mongo::exec::agg::Stage>(matchExecAggStage.detach()));
auto hostExecAggStage = new host_connector::HostExecAggStageAdapter(std::move(execAggStage));
auto handle = host_connector::ExecAggStageHandle{hostExecAggStage};
ASSERT_EQ(handle.getName(), NoOpHostExecAggStage::kStageName);
}
} // namespace
} // namespace mongo::extension

View File

@ -0,0 +1,93 @@
/**
* Copyright (C) 2025-present MongoDB, Inc.
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the Server Side Public License, version 1,
* as published by MongoDB, Inc.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* Server Side Public License for more details.
*
* You should have received a copy of the Server Side Public License
* along with this program. If not, see
* <http://www.mongodb.com/licensing/server-side-public-license>.
*
* As a special exception, the copyright holders give permission to link the
* code of portions of this program with the OpenSSL library under certain
* conditions as described in each individual source file and distribute
* linked combinations including the program with the OpenSSL library. You
* must comply with the Server Side Public License in all respects for
* all of the code used other than as permitted herein. If you modify file(s)
* with this exception, you may extend this exception to your version of the
* file(s), but you are not obligated to do so. If you do not wish to do so,
* delete this exception statement from your version. If you delete this
* exception statement from all source files in the program, then also delete
* it in the license file.
*/
#pragma once
#include "mongo/bson/bsonobjbuilder.h"
#include "mongo/db/extension/host_connector/handle/executable_agg_stage.h"
#include "mongo/db/extension/host_connector/handle/host_operation_metrics_handle.h"
#include <map>
#include <string>
namespace mongo::extension::host {
/**
* OperationMetricsRegistry manages a collection of operation metrics for extension-executed
* aggregation stages during query execution.
*
* This registry maintains owned HostOperationMetricsHandle instances, one per unique aggregation
* stage name, allowing metrics to be collected incrementally throughout pipeline execution.
* It provides the following capabilities:
*
* - Lazy creation and caching of metrics per stage: metrics are created on first access via
* getOrCreateMetrics() and reused for subsequent accesses such that metrics in the same operation
* are aggregated
* - Serialization of all collected metrics into a BSON object for inclusion in operation logs
* and debug information
*
* The registry is typically owned by an OpDebug instance and populated during pipeline execution
* as extensions execute their stages. After execution completes, the serialized metrics can be
* included in operation diagnostics and slow query logs.
*/
class OperationMetricsRegistry {
public:
bool empty() const {
return _metrics.empty();
}
size_t size() const {
return _metrics.size();
}
BSONObj serialize() const {
BSONObjBuilder builder;
for (const auto& [stageName, extensionMetrics] : _metrics) {
builder.append(stageName, extensionMetrics.serialize());
}
return builder.obj();
}
extension::host_connector::HostOperationMetricsHandle* getOrCreateMetrics(
const std::string& stageName, host_connector::UnownedExecAggStageHandle execStage) {
auto it = _metrics.find(std::string(stageName));
if (it == _metrics.end()) {
auto metricsHandle = execStage.createMetrics();
auto [newIt, inserted] = _metrics.emplace(stageName, std::move(metricsHandle));
it = newIt;
}
return &it->second;
}
private:
std::map<std::string, extension::host_connector::HostOperationMetricsHandle> _metrics;
};
}; // namespace mongo::extension::host

View File

@ -0,0 +1,71 @@
/**
* Copyright (C) 2025-present MongoDB, Inc.
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the Server Side Public License, version 1,
* as published by MongoDB, Inc.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* Server Side Public License for more details.
*
* You should have received a copy of the Server Side Public License
* along with this program. If not, see
* <http://www.mongodb.com/licensing/server-side-public-license>.
*
* As a special exception, the copyright holders give permission to link the
* code of portions of this program with the OpenSSL library under certain
* conditions as described in each individual source file and distribute
* linked combinations including the program with the OpenSSL library. You
* must comply with the Server Side Public License in all respects for
* all of the code used other than as permitted herein. If you modify file(s)
* with this exception, you may extend this exception to your version of the
* file(s), but you are not obligated to do so. If you do not wish to do so,
* delete this exception statement from your version. If you delete this
* exception statement from all source files in the program, then also delete
* it in the license file.
*/
#pragma once
#include "mongo/db/curop.h"
#include "mongo/db/extension/host_connector/handle/executable_agg_stage.h"
#include "mongo/db/extension/host_connector/query_execution_context_adapter.h"
#include "mongo/db/pipeline/expression_context.h"
#include "mongo/util/modules.h"
namespace mongo::extension::host {
/**
* QueryExecutionContext provides concrete host implementation of the query execution context
* interface for extensions running within the host process.
*
* The context wraps an ExpressionContext, which holds references to the active OperationContext
* and other query state needed during pipeline execution. It delegates interrupt checks to the
* underlying OperationContext and exposes operation metrics through OpDebug's extension metrics
* registry.
*
* This class is intended for use by the extension host connector framework and should not be
* instantiated directly by extension code.
*/
class QueryExecutionContext : public host_connector::QueryExecutionContextBase {
public:
QueryExecutionContext(const ExpressionContext* ctx) : _ctx(ctx) {}
Status checkForInterrupt() const override {
return _ctx->getOperationContext()->checkForInterruptNoAssert();
}
host_connector::HostOperationMetricsHandle* getMetrics(
const std::string& stageName,
const host_connector::UnownedExecAggStageHandle& execStage) const override {
auto& opDebug = CurOp::get(_ctx->getOperationContext())->debug();
auto& opDebugMetrics = opDebug.extensionMetrics;
return opDebugMetrics.getOrCreateMetrics(stageName, execStage);
}
private:
const ExpressionContext* _ctx;
};
} // namespace mongo::extension::host

View File

@ -13,13 +13,11 @@ exports_files(
mongo_cc_library(
name = "extensions_host_connector",
srcs = [
"executable_agg_stage.cpp",
"host_portal_adapter.cpp",
"query_execution_context_adapter.cpp",
"query_shape_opts_adapter.cpp",
],
hdrs = [
"executable_agg_stage.h",
"extension_handle.h",
"host_portal_adapter.h",
"host_services_adapter.h",

View File

@ -36,7 +36,9 @@
namespace mongo::extension::host_connector {
::MongoExtensionStatus* HostExecAggStageAdapter::_hostGetNext(
::MongoExtensionExecAggStage* execAggStage, ::MongoExtensionGetNextResult* apiResult) noexcept {
::MongoExtensionExecAggStage* execAggStage,
::MongoExtensionQueryExecutionContext* execCtxPtr,
::MongoExtensionGetNextResult* apiResult) noexcept {
return wrapCXXAndConvertExceptionToStatus([&]() {
apiResult->code = ::MongoExtensionGetNextResultCode::kPauseExecution;
apiResult->result = nullptr;
@ -77,4 +79,11 @@ namespace mongo::extension::host_connector {
});
}
::MongoExtensionByteView HostExecAggStageAdapter::_hostGetName(
const ::MongoExtensionExecAggStage* execAggStage) noexcept {
auto sv = static_cast<const HostExecAggStageAdapter*>(execAggStage)->getImpl().getName();
StringData sd{sv.data(), sv.length()};
return stringDataAsByteView(sd);
}
}; // namespace mongo::extension::host_connector

View File

@ -30,6 +30,7 @@
#include "mongo/db/extension/host/aggregation_stage/executable_agg_stage.h"
#include "mongo/db/extension/public/api.h"
#include "mongo/db/extension/shared/extension_status.h"
#include "mongo/util/modules.h"
#include <memory>
@ -86,10 +87,27 @@ private:
* result in the ::MongoExtensionGetNextResult is transferred to the caller.
*/
static ::MongoExtensionStatus* _hostGetNext(::MongoExtensionExecAggStage* execAggStage,
::MongoExtensionQueryExecutionContext* execCtxPtr,
::MongoExtensionGetNextResult* apiResult) noexcept;
static MongoExtensionByteView _hostGetName(
const ::MongoExtensionExecAggStage* execAggStage) noexcept;
static MongoExtensionStatus* _hostCreateMetrics(
const MongoExtensionExecAggStage* execAggStage,
MongoExtensionOperationMetrics** metrics) noexcept {
return wrapCXXAndConvertExceptionToStatus([]() {
tasserted(11213501,
"_hostCreateMetrics should not be called. Ensure that execAggStage is "
"extension-allocated, not host-allocated.");
});
}
static constexpr ::MongoExtensionExecAggStageVTable VTABLE{.destroy = &_hostDestroy,
.get_next = &_hostGetNext};
.get_next = &_hostGetNext,
.get_name = &_hostGetName,
.create_metrics =
&_hostCreateMetrics};
std::unique_ptr<host::ExecAggStage> _execAggStage;
};

View File

@ -11,8 +11,10 @@ exports_files(
mongo_cc_library(
name = "host_handles",
srcs = [
"executable_agg_stage.cpp",
],
hdrs = [
"executable_agg_stage.h",
"host_operation_metrics_handle.h",
],
deps = [

View File

@ -26,17 +26,53 @@
* exception statement from all source files in the program, then also delete
* it in the license file.
*/
#include "mongo/db/extension/host_connector/executable_agg_stage.h"
#include "mongo/db/extension/host_connector/handle/executable_agg_stage.h"
#include "mongo/db/extension/shared/extension_status.h"
namespace mongo::extension::host_connector {
ExtensionGetNextResult ExecAggStageHandle::getNext() {
ExtensionGetNextResult ExecAggStageHandle::getNext(
MongoExtensionQueryExecutionContext* execCtxPtr) {
::MongoExtensionGetNextResult result{};
invokeCAndConvertStatusToException([&]() { return vtable().get_next(get(), &result); });
invokeCAndConvertStatusToException(
[&]() { return vtable().get_next(get(), execCtxPtr, &result); });
return convertCRepresentationToGetNextResult(&result);
}
namespace {
std::string_view _internalGetName(const MongoExtensionExecAggStageVTable& vtable,
const MongoExtensionExecAggStage* stage) {
return byteViewAsStringView(vtable.get_name(stage));
}
HostOperationMetricsHandle _internalCreateMetrics(const MongoExtensionExecAggStageVTable& vtable,
const MongoExtensionExecAggStage* stage) {
MongoExtensionOperationMetrics* metrics = nullptr;
invokeCAndConvertStatusToException([&]() { return vtable.create_metrics(stage, &metrics); });
tassert(11213505, "Result of `create_metrics` was nullptr", metrics != nullptr);
// Take ownership of the created metrics and return the result.
return HostOperationMetricsHandle(metrics);
}
} // namespace
std::string_view ExecAggStageHandle::getName() const {
return _internalGetName(vtable(), get());
}
HostOperationMetricsHandle ExecAggStageHandle::createMetrics() const {
return _internalCreateMetrics(vtable(), get());
}
std::string_view UnownedExecAggStageHandle::getName() const {
return _internalGetName(vtable(), get());
}
HostOperationMetricsHandle UnownedExecAggStageHandle::createMetrics() const {
return _internalCreateMetrics(vtable(), get());
}
} // namespace mongo::extension::host_connector

View File

@ -28,6 +28,7 @@
*/
#pragma once
#include "mongo/db/extension/host_connector/handle/host_operation_metrics_handle.h"
#include "mongo/db/extension/shared/get_next_result.h"
#include "mongo/db/extension/shared/handle/byte_buf_handle.h"
#include "mongo/db/extension/shared/handle/handle.h"
@ -73,11 +74,42 @@ public:
_assertValidVTable();
}
ExtensionGetNextResult getNext();
ExtensionGetNextResult getNext(MongoExtensionQueryExecutionContext* execCtxPtr);
std::string_view getName() const;
HostOperationMetricsHandle createMetrics() const;
protected:
void _assertVTableConstraints(const VTable_t& vtable) const override {
tassert(10956800, "ExecAggStage 'get_next' is null", vtable.get_next != nullptr);
tassert(11213503, "ExecAggStage 'get_name' is null", vtable.get_name != nullptr);
tassert(
11213504, "ExecAggStage 'create_metrics' is null", vtable.create_metrics != nullptr);
}
};
/**
* An unowned wrapper around a MongoExtensionExecAggStage. This is used when passing a
* MongoExtensionExecAggStage through the API boundary, but the callee callsite should not take
* ownership of the agg stage.
*/
class UnownedExecAggStageHandle : public UnownedHandle<const ::MongoExtensionExecAggStage> {
public:
UnownedExecAggStageHandle(const ::MongoExtensionExecAggStage* execAggStage)
: UnownedHandle<const ::MongoExtensionExecAggStage>(execAggStage) {
_assertValidVTable();
}
std::string_view getName() const;
HostOperationMetricsHandle createMetrics() const;
protected:
void _assertVTableConstraints(const VTable_t& vtable) const override {
tassert(11213502, "ExecAggStage 'get_name' is null", vtable.get_name != nullptr);
tassert(
11213506, "ExecAggStage 'create_metrics' is null", vtable.create_metrics != nullptr);
}
};

View File

@ -43,7 +43,7 @@ public:
}
// Call the underlying object's serialize function and return the resulting BSON object.
BSONObj serialize() {
BSONObj serialize() const {
assertValid();
::MongoExtensionByteBuf* buf;

View File

@ -35,8 +35,8 @@ namespace mongo::extension::host_connector {
MongoExtensionStatus* QueryExecutionContextAdapter::_extCheckForInterrupt(
const MongoExtensionQueryExecutionContext* ctx, MongoExtensionStatus* queryStatus) noexcept {
return wrapCXXAndConvertExceptionToStatus([&]() {
const auto& expCtx = static_cast<const QueryExecutionContextAdapter*>(ctx)->getCtxImpl();
Status interrupted = expCtx->getOperationContext()->checkForInterruptNoAssert();
const auto& execCtx = static_cast<const QueryExecutionContextAdapter*>(ctx)->getCtxImpl();
Status interrupted = execCtx.checkForInterrupt();
MongoExtensionByteView reasonByteView{stringViewAsByteView(interrupted.reason())};
// Note that we don't need invokeCAndConvertStatusToException here because
@ -46,4 +46,18 @@ MongoExtensionStatus* QueryExecutionContextAdapter::_extCheckForInterrupt(
});
}
MongoExtensionStatus* QueryExecutionContextAdapter::_extGetMetrics(
const MongoExtensionQueryExecutionContext* ctx,
const MongoExtensionExecAggStage* execAggStage,
MongoExtensionOperationMetrics** metrics) noexcept {
return wrapCXXAndConvertExceptionToStatus([&]() {
const auto& execCtx = static_cast<const QueryExecutionContextAdapter*>(ctx)->getCtxImpl();
auto execStageHandle = UnownedExecAggStageHandle(execAggStage);
const std::string stageName = std::string(execStageHandle.getName());
*metrics = execCtx.getMetrics(stageName, execStageHandle)->get();
});
}
} // namespace mongo::extension::host_connector

View File

@ -28,32 +28,62 @@
*/
#pragma once
#include "mongo/db/extension/host_connector/handle/executable_agg_stage.h"
#include "mongo/db/extension/host_connector/handle/host_operation_metrics_handle.h"
#include "mongo/db/extension/public/api.h"
#include "mongo/db/pipeline/expression_context.h"
#include "mongo/util/modules.h"
namespace mongo::extension::host_connector {
/**
* QueryExecutionContextBase defines the abstract interface for query execution context
* that extensions need to access during pipeline execution.
*
* This interface allows extensions to:
* - Check if the current operation has been interrupted or cancelled
* - Retrieve operation metrics and debugging information for specific aggregation stages
*
* Concrete implementations of this interface provide the bridge between the extension
* framework and the host process's query execution state, encapsulating access to
* OperationContext and operation debugging information.
*
* Implementations of this interface should be wrapped by QueryExecutionContextAdapter
* for exposure to the public extension API.
*/
class QueryExecutionContextBase {
public:
QueryExecutionContextBase() = default;
virtual ~QueryExecutionContextBase() = default;
virtual Status checkForInterrupt() const = 0;
virtual HostOperationMetricsHandle* getMetrics(
const std::string& stageName, const UnownedExecAggStageHandle& execStage) const = 0;
};
/**
* QueryExecutionContextAdapter is an adapter to ::MongoExtensionQueryExecutionContext,
* providing host expression context methods to extensions.
*/
class QueryExecutionContextAdapter final : public ::MongoExtensionQueryExecutionContext {
public:
QueryExecutionContextAdapter(const ExpressionContext* ctx)
: ::MongoExtensionQueryExecutionContext{&VTABLE}, _ctx(ctx) {}
QueryExecutionContextAdapter(std::unique_ptr<QueryExecutionContextBase> ctx)
: ::MongoExtensionQueryExecutionContext{&VTABLE}, _ctx(std::move(ctx)) {}
const ExpressionContext* getCtxImpl() const {
return _ctx;
const QueryExecutionContextBase& getCtxImpl() const {
return *_ctx;
}
private:
static MongoExtensionStatus* _extCheckForInterrupt(
const MongoExtensionQueryExecutionContext* ctx, MongoExtensionStatus* queryStatus) noexcept;
static constexpr ::MongoExtensionQueryExecutionContextVTable VTABLE{&_extCheckForInterrupt};
static MongoExtensionStatus* _extGetMetrics(const MongoExtensionQueryExecutionContext* ctx,
const MongoExtensionExecAggStage* execAggStage,
MongoExtensionOperationMetrics** metrics) noexcept;
const ExpressionContext* _ctx;
static constexpr ::MongoExtensionQueryExecutionContextVTable VTABLE{
.check_for_interrupt = &_extCheckForInterrupt, .get_metrics = &_extGetMetrics};
std::unique_ptr<QueryExecutionContextBase> _ctx;
};
} // namespace mongo::extension::host_connector

View File

@ -156,6 +156,53 @@ typedef struct MongoExtensionStatusVTable {
const int32_t MONGO_EXTENSION_STATUS_RUNTIME_ERROR = -1;
const int32_t MONGO_EXTENSION_STATUS_OK = 0;
/**
* Operation metrics exposed by extensions.
*
* This struct represents performance and execution statistics collected during extension
* operations. Extensions can implement this interface to track and report various arbitrary metrics
* about their execution, such as timing information, resource usage, or operation counts. The host
* will periodically query these metrics for monitoring, diagnostics, and performance analysis
* purposes.
*
* Extensions are responsible for implementing the collection and aggregation of metrics,
* while the host is responsible for periodically retrieving, persisting, and exposing these metrics
* through MongoDB's monitoring interfaces.
*
* Note that metrics are scoped to each operation - ie, query or getMore invocation. The lifetime of
* the metrics is managed by the host and the extension should not persist or aggregate the metrics
* itself across the query's lifetime.
*
* Metrics will be exposed via the serialize() function and prefaced by the extension's stage name.
* For example, if an extension returned {counter: 1} from the serialize() implementation, the
* metrics would be exposed via the host in the format {$stageName: {counter: 1}}.
*/
typedef struct MongoExtensionOperationMetrics {
const struct MongoExtensionOperationMetricsVTable* vtable;
} MongoExtensionOperationMetrics;
typedef struct MongoExtensionOperationMetricsVTable {
/**
* Destroy `metrics` and free any related resources.
*/
void (*destroy)(MongoExtensionOperationMetrics* metrics);
/**
* Serializes the collected metrics into an arbitrary BSON object. Ownership is allocated by the
* extension and transferred to the host.
*/
MongoExtensionStatus* (*serialize)(const MongoExtensionOperationMetrics* metrics,
MongoExtensionByteBuf** output);
/**
* Updates and aggregates existing metrics with current execution metrics. Note that the
* `arguments` byte view can be any format - for example, an opaque pointer, a serialized BSON
* message, a serialized struct, etc.
*/
MongoExtensionStatus* (*update)(MongoExtensionOperationMetrics* metrics,
MongoExtensionByteView arguments);
} MongoExtensionOperationMetricsVTable;
/**
* MongoExtensionQueryExecutionContext exposes helpers for an extension to call certain
* functionality on a wrapped ExpressionContext. It is owned by the host and used by an extension.
@ -164,11 +211,9 @@ typedef struct MongoExtensionQueryExecutionContext {
const struct MongoExtensionQueryExecutionContextVTable* vtable;
} MongoExtensionQueryExecutionContext;
// Forward declare
struct MongoExtensionExecAggStage;
typedef struct MongoExtensionQueryExecutionContextVTable {
/**
* Call checkForInterruptNoAssert() on the wrapped ExpressionContext and populate the
* `queryStatus` with the resulting code/reason.
*/
/**
* Call checkForInterruptNoAssert() on the wrapped ExpressionContext and populate the
* `queryStatus` with the resulting code/reason. Populates queryStatus with
@ -178,6 +223,20 @@ typedef struct MongoExtensionQueryExecutionContextVTable {
*/
MongoExtensionStatus* (*check_for_interrupt)(const MongoExtensionQueryExecutionContext* ctx,
MongoExtensionStatus* queryStatus);
/**
* Check if any existing metrics for this extension exist on the wrapped OperationContext and
* return an unowned pointer inside of `metrics`, to either a new set of metrics or the existing
* set of metrics.
*
* When this method is first called during an operation (e.g. query or getMore), the host will
* initialize a new set of metrics and return them. Otherwise, the existing metrics for the
* current operation will be returned. Note that multiple instances of the same aggregation
* stage in a single pipeline will share operation metrics.
*/
MongoExtensionStatus* (*get_metrics)(const MongoExtensionQueryExecutionContext* ctx,
const MongoExtensionExecAggStage* execAggStage,
MongoExtensionOperationMetrics** metrics);
} MongoExtensionQueryExecutionContextVTable;
/**
@ -224,45 +283,6 @@ typedef struct MongoExtensionHostQueryShapeOptsVTable {
MongoExtensionByteBuf** output);
} MongoExtensionHostQueryShapeOptsVTable;
/**
* Operation metrics exposed by extensions.
*
* This struct represents performance and execution statistics collected during extension
* operations. Extensions can implement this interface to track and report various arbitrary metrics
* about their execution, such as timing information, resource usage, or operation counts. The host
* will periodically query these metrics for monitoring, diagnostics, and performance analysis
* purposes.
*
* Extensions are responsible for implementing the collection and aggregation of metrics,
* while the host is responsible for periodically retrieving, persisting, and exposing these metrics
* through MongoDB's monitoring interfaces.
*/
typedef struct MongoExtensionOperationMetrics {
const struct MongoExtensionOperationMetricsVTable* vtable;
} MongoExtensionOperationMetrics;
typedef struct MongoExtensionOperationMetricsVTable {
/**
* Destroy `metrics` and free any related resources.
*/
void (*destroy)(MongoExtensionOperationMetrics* metrics);
/**
* Serializes the collected metrics into an arbitrary BSON object. Ownership is allocated by the
* extension and transferred to the host.
*/
MongoExtensionStatus* (*serialize)(const MongoExtensionOperationMetrics* metrics,
MongoExtensionByteBuf** output);
/**
* Updates and aggregates existing metrics with current execution metrics. Note that the
* `arguments` byte view can be any format - for example, an opaque pointer, a serialized BSON
* message, a serialized struct, etc.
*/
MongoExtensionStatus* (*update)(MongoExtensionOperationMetrics* metrics,
MongoExtensionByteView arguments);
} MongoExtensionOperationMetricsVTable;
/**
* Possible explain verbosity levels.
*/
@ -534,7 +554,22 @@ typedef struct MongoExtensionExecAggStageVTable {
* a byte buffer. Ownership of the buffer is transferred to the Host.
*/
MongoExtensionStatus* (*get_next)(MongoExtensionExecAggStage* execAggStage,
MongoExtensionQueryExecutionContext* execCtxPtr,
MongoExtensionGetNextResult* getNextResult);
/**
* Returns a MongoExtensionByteView containing the name of the associated aggregation stage.
*/
MongoExtensionByteView (*get_name)(const MongoExtensionExecAggStage* astNode);
/**
* Creates a MongoExtensionOperationMetrics object to collect metrics for this aggregation
* stage, then populates `metrics` with the location. Ownership of the metrics object is
* transferred to the caller.
*/
MongoExtensionStatus* (*create_metrics)(const MongoExtensionExecAggStage* execAggStage,
MongoExtensionOperationMetrics** metrics);
} MongoExtensionExecAggStageVTable;
/**

View File

@ -30,6 +30,8 @@
#include "mongo/bson/bsonobj.h"
#include "mongo/db/extension/public/api.h"
#include "mongo/db/extension/sdk/assert_util.h"
#include "mongo/db/extension/sdk/operation_metrics_adapter.h"
#include "mongo/db/extension/sdk/query_execution_context_handle.h"
#include "mongo/db/extension/shared/byte_buf.h"
#include "mongo/db/extension/shared/extension_status.h"
#include "mongo/db/extension/shared/get_next_result.h"
@ -475,10 +477,25 @@ private:
*/
class ExecAggStage {
public:
ExecAggStage() = default;
virtual ~ExecAggStage() = default;
virtual ExtensionGetNextResult getNext() = 0;
virtual ExtensionGetNextResult getNext(const QueryExecutionContextHandle& execCtx,
const MongoExtensionExecAggStage* execStage) = 0;
std::string_view getName() const {
return _name;
}
// Extensions are not required to provide metrics if they do not need to.
virtual std::unique_ptr<OperationMetricsBase> createMetrics() const {
return nullptr;
}
protected:
ExecAggStage(std::string_view name) : _name(name) {}
private:
const std::string _name;
};
/**
@ -564,21 +581,46 @@ private:
}
static ::MongoExtensionStatus* _extGetNext(::MongoExtensionExecAggStage* execAggStage,
::MongoExtensionQueryExecutionContext* execCtxPtr,
::MongoExtensionGetNextResult* apiResult) noexcept {
return wrapCXXAndConvertExceptionToStatus([&]() {
apiResult->code = ::MongoExtensionGetNextResultCode::kPauseExecution;
apiResult->result = nullptr;
QueryExecutionContextHandle execCtx(execCtxPtr);
auto& impl = static_cast<ExtensionExecAggStage*>(execAggStage)->getImpl();
// Allocate a buffer on the heap. Ownership is transferred to the caller.
ExtensionGetNextResult extensionResult = impl.getNext();
ExtensionGetNextResult extensionResult = impl.getNext(execCtx, execAggStage);
convertExtensionGetNextResultToCRepresentation(apiResult, extensionResult);
});
};
static constexpr ::MongoExtensionExecAggStageVTable VTABLE = {.destroy = &_extDestroy,
.get_next = &_extGetNext};
static ::MongoExtensionByteView _extGetName(
const ::MongoExtensionExecAggStage* execAggStage) noexcept {
const auto& impl = static_cast<const ExtensionExecAggStage*>(execAggStage)->getImpl();
return stringViewAsByteView(impl.getName());
}
static ::MongoExtensionStatus* _extCreateMetrics(
const ::MongoExtensionExecAggStage* execAggStage,
MongoExtensionOperationMetrics** metrics) noexcept {
return wrapCXXAndConvertExceptionToStatus([&]() {
const auto& impl = static_cast<const ExtensionExecAggStage*>(execAggStage)->getImpl();
auto result = impl.createMetrics();
auto adapter = new OperationMetricsAdapter(std::move(result));
*metrics = adapter;
});
}
static constexpr ::MongoExtensionExecAggStageVTable VTABLE = {
.destroy = &_extDestroy,
.get_next = &_extGetNext,
.get_name = &_extGetName,
.create_metrics = &_extCreateMetrics,
};
std::unique_ptr<ExecAggStage> _execAggStage;
};
} // namespace mongo::extension::sdk

View File

@ -40,4 +40,15 @@ ExtensionGenericStatus QueryExecutionContextHandle::checkForInterrupt() const {
return queryStatus;
}
ExtensionOperationMetricsHandle QueryExecutionContextHandle::getMetrics(
const MongoExtensionExecAggStage* execStage) const {
assertValid();
MongoExtensionOperationMetrics* metrics = nullptr;
invokeCAndConvertStatusToException(
[&]() { return vtable().get_metrics(get(), execStage, &metrics); });
return ExtensionOperationMetricsHandle(metrics);
}
} // namespace mongo::extension::sdk

View File

@ -29,6 +29,7 @@
#pragma once
#include "mongo/db/extension/public/api.h"
#include "mongo/db/extension/sdk/assert_util.h"
#include "mongo/db/extension/sdk/extension_operation_metrics_handle.h"
#include "mongo/db/extension/shared/extension_status.h"
#include "mongo/db/extension/shared/handle/handle.h"
#include "mongo/util/modules.h"
@ -52,11 +53,16 @@ public:
ExtensionGenericStatus checkForInterrupt() const;
ExtensionOperationMetricsHandle getMetrics(const MongoExtensionExecAggStage* execStage) const;
private:
void _assertVTableConstraints(const VTable_t& vtable) const override {
sdk_tassert(11098300,
"QueryExecutionContext' 'check_for_interrupt' is null",
vtable.check_for_interrupt != nullptr);
sdk_tassert(11213507,
"QueryExecutionContext' 'get_metrics' is null",
vtable.get_metrics != nullptr);
};
};

View File

@ -31,6 +31,7 @@ mongo_cc_unit_test(
deps = [
":shared_test_stages",
"//src/mongo:base",
"//src/mongo/db/extension/host:extension_host",
"//src/mongo/db/extension/host_connector:extensions_host_connector",
"//src/mongo/db/extension/host_connector:host_services_adapter",
"//src/mongo/db/extension/host_connector/handle:host_handles",

View File

@ -28,8 +28,9 @@
*/
#include "mongo/bson/bsonobj.h"
#include "mongo/db/extension/host_connector/executable_agg_stage.h"
#include "mongo/db/extension/host_connector/handle/executable_agg_stage.h"
#include "mongo/db/extension/host_connector/host_services_adapter.h"
#include "mongo/db/extension/host_connector/query_execution_context_adapter.h"
#include "mongo/db/extension/public/api.h"
#include "mongo/db/extension/sdk/aggregation_stage.h"
#include "mongo/db/extension/sdk/tests/shared_test_stages.h"
@ -57,7 +58,10 @@ public:
// functions, e.g. to run assertions.
extension::sdk::HostServicesHandle::setHostServices(
extension::host_connector::HostServicesAdapter::get());
_execCtx = std::make_unique<host_connector::QueryExecutionContextAdapter>(nullptr);
}
std::unique_ptr<host_connector::QueryExecutionContextAdapter> _execCtx;
};
class ParseNodeVTableDeathTest : public unittest::Test {
@ -90,58 +94,83 @@ public:
class NoOpExtensionExecAggStage : public extension::sdk::ExecAggStage {
public:
extension::ExtensionGetNextResult getNext() override {
NoOpExtensionExecAggStage(std::string stageName) : extension::sdk::ExecAggStage(stageName) {}
extension::ExtensionGetNextResult getNext(
const QueryExecutionContextHandle& expCtx,
const MongoExtensionExecAggStage* execAggStage) override {
MONGO_UNIMPLEMENTED;
}
static inline std::unique_ptr<extension::sdk::ExecAggStage> make() {
return std::make_unique<NoOpExtensionExecAggStage>();
return std::make_unique<NoOpExtensionExecAggStage>("$noOp");
}
};
class InvalidExtensionExecAggStageAdvancedState : public extension::sdk::ExecAggStage {
public:
extension::ExtensionGetNextResult getNext() override {
InvalidExtensionExecAggStageAdvancedState(std::string stageName)
: extension::sdk::ExecAggStage(stageName) {}
extension::ExtensionGetNextResult getNext(
const QueryExecutionContextHandle& expCtx,
const MongoExtensionExecAggStage* execAggStage) override {
return {.code = extension::GetNextCode::kAdvanced, .res = boost::none};
}
static inline std::unique_ptr<extension::sdk::ExecAggStage> make() {
return std::make_unique<InvalidExtensionExecAggStageAdvancedState>();
return std::make_unique<InvalidExtensionExecAggStageAdvancedState>("$noOp");
}
};
class InvalidExtensionExecAggStagePauseExecutionState : public extension::sdk::ExecAggStage {
public:
extension::ExtensionGetNextResult getNext() override {
InvalidExtensionExecAggStagePauseExecutionState(std::string stageName)
: extension::sdk::ExecAggStage(stageName) {}
extension::ExtensionGetNextResult getNext(
const QueryExecutionContextHandle& expCtx,
const MongoExtensionExecAggStage* execAggStage) override {
return {.code = extension::GetNextCode::kPauseExecution,
.res = boost::make_optional(BSON("$dog" << "I should not exist"))};
}
static inline std::unique_ptr<extension::sdk::ExecAggStage> make() {
return std::make_unique<InvalidExtensionExecAggStagePauseExecutionState>();
return std::make_unique<InvalidExtensionExecAggStagePauseExecutionState>("$noOp");
}
};
class InvalidExtensionExecAggStageEofState : public extension::sdk::ExecAggStage {
public:
extension::ExtensionGetNextResult getNext() override {
InvalidExtensionExecAggStageEofState(std::string stageName)
: extension::sdk::ExecAggStage(stageName) {}
extension::ExtensionGetNextResult getNext(
const QueryExecutionContextHandle& expCtx,
const MongoExtensionExecAggStage* execAggStage) override {
return {.code = extension::GetNextCode::kEOF,
.res = boost::make_optional(BSON("$dog" << "I should not exist"))};
}
static inline std::unique_ptr<extension::sdk::ExecAggStage> make() {
return std::make_unique<InvalidExtensionExecAggStageEofState>();
return std::make_unique<InvalidExtensionExecAggStageEofState>("$noOp");
}
};
class InvalidExtensionExecAggStageGetNextCode : public extension::sdk::ExecAggStage {
public:
extension::ExtensionGetNextResult getNext() override {
InvalidExtensionExecAggStageGetNextCode(std::string stageName)
: extension::sdk::ExecAggStage(stageName) {}
extension::ExtensionGetNextResult getNext(
const QueryExecutionContextHandle& expCtx,
const MongoExtensionExecAggStage* execAggStage) override {
return {.code = static_cast<const GetNextCode>(10), .res = boost::none};
}
static inline std::unique_ptr<extension::sdk::ExecAggStage> make() {
return std::make_unique<InvalidExtensionExecAggStageGetNextCode>();
return std::make_unique<InvalidExtensionExecAggStageGetNextCode>("$noOp");
}
};
@ -289,26 +318,29 @@ DEATH_TEST_F(ExecAggStageVTableDeathTest, InvalidExecAggStageVTableFailsGetNext,
DEATH_TEST_F(AggStageDeathTest, InvalidExtensionGetNextResultAdvanced, "10956801") {
auto invalidExtensionExecAggStageAdvancedState = new extension::sdk::ExtensionExecAggStage(
InvalidExtensionExecAggStageAdvancedState::make());
auto handle =
extension::host_connector::ExecAggStageHandle{invalidExtensionExecAggStageAdvancedState};
[[maybe_unused]] auto getNext = handle.getNext();
[[maybe_unused]] auto getNext = handle.getNext(_execCtx.get());
};
DEATH_TEST_F(AggStageDeathTest, InvalidExtensionGetNextResultPauseExecution, "10956802") {
auto invalidExtensionExecAggStagePauseExecutionState =
new extension::sdk::ExtensionExecAggStage(
InvalidExtensionExecAggStagePauseExecutionState::make());
auto handle = extension::host_connector::ExecAggStageHandle{
invalidExtensionExecAggStagePauseExecutionState};
[[maybe_unused]] auto getNext = handle.getNext();
[[maybe_unused]] auto getNext = handle.getNext(_execCtx.get());
};
DEATH_TEST_F(AggStageDeathTest, InvalidExtensionGetNextResultEOF, "10956805") {
auto invalidExtensionExecAggStageEofState =
new extension::sdk::ExtensionExecAggStage(InvalidExtensionExecAggStageEofState::make());
auto handle =
extension::host_connector::ExecAggStageHandle{invalidExtensionExecAggStageEofState};
[[maybe_unused]] auto getNext = handle.getNext();
[[maybe_unused]] auto getNext = handle.getNext(_execCtx.get());
};
DEATH_TEST_F(AggStageDeathTest, InvalidMongoExtensionGetNextResultCode, "10956803") {
@ -321,12 +353,12 @@ DEATH_TEST_F(AggStageDeathTest, InvalidMongoExtensionGetNextResultCode, "1095680
DEATH_TEST_F(AggStageDeathTest, InvalidGetNextCode, "10956804") {
auto invalidExtensionExecAggStageGetNextCode =
new extension::sdk::ExtensionExecAggStage(InvalidExtensionExecAggStageGetNextCode::make());
auto handle =
extension::host_connector::ExecAggStageHandle{invalidExtensionExecAggStageGetNextCode};
[[maybe_unused]] auto getNext = handle.getNext();
[[maybe_unused]] auto getNext = handle.getNext(_execCtx.get());
};
} // namespace
} // namespace mongo::extension::sdk

View File

@ -33,8 +33,10 @@
#include "mongo/bson/bsonelement.h"
#include "mongo/bson/bsonobj.h"
#include "mongo/bson/bsonobjbuilder.h"
#include "mongo/db/extension/host_connector/executable_agg_stage.h"
#include "mongo/db/extension/host/query_execution_context.h"
#include "mongo/db/extension/host_connector/handle/executable_agg_stage.h"
#include "mongo/db/extension/host_connector/host_services_adapter.h"
#include "mongo/db/extension/host_connector/query_execution_context_adapter.h"
#include "mongo/db/extension/host_connector/query_shape_opts_adapter.h"
#include "mongo/db/extension/public/api.h"
#include "mongo/db/extension/sdk/query_shape_opts_handle.h"
@ -43,6 +45,7 @@
#include "mongo/db/extension/shared/handle/aggregation_stage/ast_node.h"
#include "mongo/db/extension/shared/handle/aggregation_stage/parse_node.h"
#include "mongo/db/extension/shared/handle/aggregation_stage/stage_descriptor.h"
#include "mongo/db/pipeline/expression_context_for_test.h"
#include "mongo/db/pipeline/pipeline.h"
#include "mongo/unittest/unittest.h"
#include "mongo/util/assert_util.h"
@ -697,7 +700,11 @@ TEST_F(AggStageTest, SerializingLiteralQueryShapeSucceedsWithRepresentativeValue
class ValidExtensionExecAggStage : public extension::sdk::ExecAggStage {
public:
extension::ExtensionGetNextResult getNext() override {
ValidExtensionExecAggStage(std::string stageName) : extension::sdk::ExecAggStage(stageName) {}
extension::ExtensionGetNextResult getNext(
const QueryExecutionContextHandle& expCtx,
const MongoExtensionExecAggStage* execAggStage) override {
if (_results.empty()) {
return extension::ExtensionGetNextResult::eof();
}
@ -716,7 +723,7 @@ public:
}
static inline std::unique_ptr<extension::sdk::ExecAggStage> make() {
return std::make_unique<ValidExtensionExecAggStage>();
return std::make_unique<ValidExtensionExecAggStage>("$noOp");
}
private:
@ -729,19 +736,21 @@ TEST(AggregationStageTest, ValidExecAggStageVTableGetNextSucceeds) {
new extension::sdk::ExtensionExecAggStage(ValidExtensionExecAggStage::make());
auto handle = extension::host_connector::ExecAggStageHandle{validExecAggStage};
auto getNext = handle.getNext();
auto nullExecCtx = host_connector::QueryExecutionContextAdapter(nullptr);
auto getNext = handle.getNext(&nullExecCtx);
ASSERT_EQUALS(extension::GetNextCode::kAdvanced, getNext.code);
ASSERT_BSONOBJ_EQ(BSON("$meow" << "adithi"), getNext.res.get());
getNext = handle.getNext();
getNext = handle.getNext(&nullExecCtx);
ASSERT_EQUALS(extension::GetNextCode::kPauseExecution, getNext.code);
ASSERT_EQ(boost::none, getNext.res);
getNext = handle.getNext();
getNext = handle.getNext(&nullExecCtx);
ASSERT_EQUALS(extension::GetNextCode::kAdvanced, getNext.code);
ASSERT_BSONOBJ_EQ(BSON("$meow" << "cedric"), getNext.res.get());
getNext = handle.getNext();
getNext = handle.getNext(&nullExecCtx);
ASSERT_EQUALS(extension::GetNextCode::kEOF, getNext.code);
ASSERT_EQ(boost::none, getNext.res);
};
@ -763,14 +772,94 @@ TEST_F(AggStageTest, ValidateStructStateAfterConvertingStructToGetNextResult) {
});
ASSERT_EQ(static_cast<::MongoExtensionGetNextResultCode>(10), result.code);
ASSERT_EQ(nullptr, result.result);
}
result = {.code = ::MongoExtensionGetNextResultCode::kAdvanced,
.result = new VecByteBuf(BSON("$adithi" << "{cats: 0}"))};
auto converted = extension::host_connector::convertCRepresentationToGetNextResult(&result);
ASSERT_EQ(GetNextCode::kAdvanced, converted.code);
ASSERT_BSONOBJ_EQ(BSON("$adithi" << "{cats: 0}"), converted.res.get());
class GetMetricsExtensionOperationMetrics : public OperationMetricsBase {
public:
BSONObj serialize() const override {
return BSON("counter" << _counter);
}
void update(MongoExtensionByteView) override {
_counter++;
}
private:
int _counter = 0;
};
class GetMetricsExtensionExecAggStage
: public extension::sdk::ExecAggStage,
std::enable_shared_from_this<GetMetricsExtensionExecAggStage> {
public:
GetMetricsExtensionExecAggStage(std::string stageName)
: extension::sdk::ExecAggStage(stageName) {}
extension::ExtensionGetNextResult getNext(
const extension::sdk::QueryExecutionContextHandle& execCtx,
const MongoExtensionExecAggStage* execAggStage) override {
auto metrics = execCtx.getMetrics(execAggStage);
metrics.update(MongoExtensionByteView{nullptr, 0});
auto metricsBson = metrics.serialize();
auto counterVal = metricsBson["counter"].Int();
if (counterVal == 1) {
return extension::ExtensionGetNextResult::advanced(BSON("hi" << "finley"));
} else if (counterVal == 2) {
return extension::ExtensionGetNextResult::eof();
}
tasserted(11213508, "counterVal can only be 1 or 2 at this point");
}
std::unique_ptr<OperationMetricsBase> createMetrics() const override {
return std::make_unique<GetMetricsExtensionOperationMetrics>();
}
static inline std::unique_ptr<extension::sdk::ExecAggStage> make() {
return std::make_unique<GetMetricsExtensionExecAggStage>("$getMetrics");
}
};
TEST(AggregationStageTest, GetMetricsExtensionExecAggStageSucceeds) {
QueryTestServiceContext testCtx;
auto opCtx = testCtx.makeOperationContext();
auto getMetricsExecAggStage =
new extension::sdk::ExtensionExecAggStage(GetMetricsExtensionExecAggStage::make());
auto handle = extension::host_connector::ExecAggStageHandle{getMetricsExecAggStage};
// Create a test expression context that can be wrapped by QueryExecutionContextAdapter.
auto expCtx = make_intrusive<ExpressionContextForTest>(
opCtx.get(),
NamespaceString::createNamespaceString_forTest("test"_sd, "namespace"_sd),
SerializationContext());
std::unique_ptr<host::QueryExecutionContext> wrappedCtx =
std::make_unique<host::QueryExecutionContext>(expCtx.get());
host_connector::QueryExecutionContextAdapter adapter(std::move(wrappedCtx));
// Call getNext which triggers the getMetrics call logic.
auto getNext = handle.getNext(&adapter);
ASSERT_EQUALS(extension::GetNextCode::kAdvanced,
getNext.code); // should return Advanced, since the metrics counter should be 1.
// Call getNext again, which should build on the existing metrics from the last call.
getNext = handle.getNext(&adapter);
ASSERT_EQUALS(extension::GetNextCode::kEOF,
getNext.code); // should return EOF, since the metrics counter should be 2.
// Now switch out the OpCtx and make sure that the metrics also get reset.
QueryTestServiceContext newTestCtx;
auto newOpCtx = newTestCtx.makeOperationContext();
expCtx->setOperationContext(newOpCtx.get());
// Call getNext which triggers the getMetrics call logic.
getNext = handle.getNext(&adapter);
ASSERT_EQUALS(extension::GetNextCode::kAdvanced,
getNext.code); // should return Advanced, since the metrics counter should be 1.
}
} // namespace
} // namespace mongo::extension::sdk

View File

@ -27,6 +27,8 @@
* it in the license file.
*/
#include "mongo/db/extension/host/query_execution_context.h"
#include "mongo/db/extension/host_connector/query_execution_context_adapter.h"
#include "mongo/db/extension/sdk/query_execution_context_handle.h"
#include "mongo/db/pipeline/expression_context_for_test.h"
@ -46,6 +48,7 @@ protected:
_expCtx =
make_intrusive<ExpressionContextForTest>(_opCtx.get(), _nss, SerializationContext());
}
QueryTestServiceContext _testCtx;
ServiceContext::UniqueOperationContext _opCtx;
NamespaceString _nss;
@ -53,7 +56,9 @@ protected:
};
TEST_F(QueryExecutionContextTestFixture, CheckForInterruptOk) {
host_connector::QueryExecutionContextAdapter adapter(_expCtx.get());
std::unique_ptr<host::QueryExecutionContext> wrappedCtx =
std::make_unique<host::QueryExecutionContext>(_expCtx.get());
host_connector::QueryExecutionContextAdapter adapter(std::move(wrappedCtx));
sdk::QueryExecutionContextHandle handle(&adapter);
ASSERT_EQ(handle.checkForInterrupt(), ExtensionGenericStatus());
@ -62,7 +67,9 @@ TEST_F(QueryExecutionContextTestFixture, CheckForInterruptOk) {
TEST_F(QueryExecutionContextTestFixture, CheckForInterruptDefaultKillCode) {
_opCtx->markKilled();
host_connector::QueryExecutionContextAdapter adapter(_expCtx.get());
std::unique_ptr<host::QueryExecutionContext> wrappedCtx =
std::make_unique<host::QueryExecutionContext>(_expCtx.get());
host_connector::QueryExecutionContextAdapter adapter(std::move(wrappedCtx));
sdk::QueryExecutionContextHandle handle(&adapter);
auto status = handle.checkForInterrupt();
@ -74,7 +81,9 @@ TEST_F(QueryExecutionContextTestFixture, CheckForInterruptCustomKillCode) {
ErrorCodes::Error customKillCode = ErrorCodes::Error(11098301);
_opCtx->markKilled(customKillCode);
host_connector::QueryExecutionContextAdapter adapter(_expCtx.get());
std::unique_ptr<host::QueryExecutionContext> wrappedCtx =
std::make_unique<host::QueryExecutionContext>(_expCtx.get());
host_connector::QueryExecutionContextAdapter adapter(std::move(wrappedCtx));
sdk::QueryExecutionContextHandle handle(&adapter);
ASSERT_EQ(handle.checkForInterrupt(),

View File

@ -298,6 +298,11 @@ void OpDebug::report(OperationContext* opCtx,
if (mongotCursorId) {
pAttrs->add("mongot", makeMongotDebugStatsObject());
}
if (!extensionMetrics.empty()) {
pAttrs->add("extensionMetrics", extensionMetrics.serialize());
}
OPDEBUG_TOATTR_HELP_BOOL(exhaust);
OPDEBUG_TOATTR_HELP_OPTIONAL("keysExamined", additiveMetrics.keysExamined);
@ -572,6 +577,11 @@ void OpDebug::append(OperationContext* opCtx,
if (mongotCursorId) {
b.append("mongot", makeMongotDebugStatsObject());
}
if (!extensionMetrics.empty()) {
b.append("extensionMetrics", extensionMetrics.serialize());
}
OPDEBUG_APPEND_BOOL(b, exhaust);
OPDEBUG_APPEND_OPTIONAL(b, "keysExamined", additiveMetrics.keysExamined);
@ -889,6 +899,11 @@ std::function<BSONObj(ProfileFilter::Args)> OpDebug::appendStaged(OperationConte
b.append(field, args.op.makeMongotDebugStatsObject());
}
});
addIfNeeded("extensionMetrics", [](auto field, auto args, auto& b) {
if (!args.op.extensionMetrics.empty()) {
b.append(field, args.op.extensionMetrics.serialize());
}
});
addIfNeeded("exhaust", [](auto field, auto args, auto& b) {
OPDEBUG_APPEND_BOOL2(b, field, args.op.exhaust);
});

View File

@ -31,6 +31,7 @@
#include "mongo/base/status.h"
#include "mongo/bson/bsonobj.h"
#include "mongo/db/extension/host/operation_metrics_registry.h"
#include "mongo/db/flow_control_ticketholder.h"
#include "mongo/db/namespace_string.h"
#include "mongo/db/operation_context.h"
@ -560,6 +561,11 @@ public:
std::map<NamespaceString, std::pair<std::vector<NamespaceString>, std::vector<BSONObj>>>
resolvedViews;
// Stores metrics handles for extensions to properly manage their lifetimes. The contents of
// these stats are opaque to the MongoDB host - this object allows extensions to implement their
// own custom aggregation and serialization logic.
extension::host::OperationMetricsRegistry extensionMetrics;
private:
// The hash of query_shape::QueryShapeHash.
boost::optional<query_shape::QueryShapeHash> _queryShapeHash;