SERVER-112599 Introduce server status metrics to track user commands targeting system buckets collections (#42814)

Co-authored-by: Meryama Nadim <meryama.nadim@mongodb.com>
Co-authored-by: Joan Bruguera Micó (at MongoDB) <joan.bruguera-mico@mongodb.com>
GitOrigin-RevId: ee48084854ac6958f1c6cd742e03c9f765bdf286
This commit is contained in:
Tommaso Tocci 2025-12-11 10:40:34 +01:00 committed by MongoDB Bot
parent b40b3fbc8d
commit f1126b9339
14 changed files with 406 additions and 8 deletions

2
.github/CODEOWNERS vendored
View File

@ -288,6 +288,7 @@ WORKSPACE.bazel @10gen/devprod-build @svc-auto-approve-bot
/buildscripts/resmokelib/testing/hooks/**/rotate_execution_control_params.py @10gen/server-workload-resilience @svc-auto-approve-bot
/buildscripts/resmokelib/testing/hooks/**/fuzz_runtime_parameters.py @10gen/server-programmability @svc-auto-approve-bot
/buildscripts/resmokelib/testing/hooks/**/validate.py @10gen/devprod-correctness @10gen/server-validate @svc-auto-approve-bot
/buildscripts/resmokelib/testing/hooks/**/check_system_buckets_metrics.py @10gen/server-catalog-and-routing @svc-auto-approve-bot
# The following patterns are parsed from ./buildscripts/resmokelib/testing/testcases/OWNERS.yml
/buildscripts/resmokelib/testing/testcases/**/query_tester_server_test.py @10gen/query-optimization @svc-auto-approve-bot
@ -2864,6 +2865,7 @@ WORKSPACE.bazel @10gen/devprod-build @svc-auto-approve-bot
/src/mongo/db/stats/**/server_write_concern* @10gen/server-rw-concerns @svc-auto-approve-bot
/src/mongo/db/stats/**/api_version_metrics* @10gen/query-optimization @svc-auto-approve-bot
/src/mongo/db/stats/**/change_stream* @10gen/query-execution-change-streams @svc-auto-approve-bot
/src/mongo/db/stats/**/system_buckets_metrics* @10gen/server-catalog-and-routing @svc-auto-approve-bot
# The following patterns are parsed from ./src/mongo/db/storage/OWNERS.yml
/src/mongo/db/storage/**/* @10gen/server-storage-engine-integration @svc-auto-approve-bot

View File

@ -44,6 +44,8 @@ executor:
TestData:
isTimeseriesTestSuite: true
hooks:
# ValidateCollections may issue direct system.buckets commands, so run CheckSystemBucketsMetrics first.
- class: CheckSystemBucketsMetrics
- class: ValidateCollections
- class: CleanEveryN
n: 20

View File

@ -70,3 +70,6 @@ filters:
approvers:
- 10gen/server-validate
- 10gen/devprod-correctness
- "check_system_buckets_metrics.py":
approvers:
- 10gen/server-catalog-and-routing

View File

@ -0,0 +1,97 @@
"""Test hook that verifies no commands directly targeted system.buckets collections."""
import pymongo
from buildscripts.resmokelib import errors
from buildscripts.resmokelib.testing.fixtures.external import ExternalFixture
from buildscripts.resmokelib.testing.fixtures.interface import build_client
from buildscripts.resmokelib.testing.fixtures.shardedcluster import _MongoSFixture
from buildscripts.resmokelib.testing.fixtures.standalone import MongoDFixture
from buildscripts.resmokelib.testing.hooks import interface
class CheckSystemBucketsMetrics(interface.Hook):
"""Check that no commands directly targeted system.buckets collections.
This hook reads the numCommandsTargetingSystemBuckets metric from serverStatus
after each test and fails if the counter has increased.
"""
IS_BACKGROUND = False
def __init__(self, hook_logger, fixture, shell_options=None):
"""Initialize CheckSystemBucketsMetrics."""
description = "Check system.buckets metrics"
interface.Hook.__init__(self, hook_logger, fixture, description)
self.shell_options = shell_options
self._baseline_metrics = {}
def before_test(self, test, test_report):
"""Capture baseline metrics before test execution."""
for cluster in self.fixture.get_independent_clusters():
for node in cluster._all_mongo_d_s_t():
if not isinstance(node, (MongoDFixture, ExternalFixture, _MongoSFixture)):
continue
metric_value = self._read_metric_or_fail(node, "baseline")
self._baseline_metrics[node] = metric_value
def _read_metric_or_fail(self, node, phase):
"""Read the numCommandsTargetingSystemBuckets metric for a node."""
node_url = node.get_driver_connection_url()
try:
auth_options = (
self.shell_options
if self.shell_options and "authenticationMechanism" in self.shell_options
else None
)
node_client = build_client(node, auth_options, pymongo.ReadPreference.PRIMARY_PREFERRED)
try:
serverStatus = node_client.admin.command("serverStatus")
return serverStatus["metrics"]["numCommandsTargetingSystemBuckets"]
finally:
node_client.close()
except Exception as err:
message = f"Failed to read {phase} numCommandsTargetingSystemBuckets metric for node {node_url}: {err}"
self.logger.error(message)
raise errors.TestFailure(message) from err
def after_test(self, test, test_report):
"""Check metrics after each test."""
hook_test_case = CheckSystemBucketsMetricsTestCase.create_after_test(
test.logger, test, self
)
hook_test_case.configure(self.fixture)
hook_test_case.run_dynamic_test(test_report)
class CheckSystemBucketsMetricsTestCase(interface.DynamicTestCase):
"""CheckSystemBucketsMetricsTestCase class."""
def run_test(self):
"""Execute test hook to verify no system.buckets commands were executed."""
violations = []
for cluster in self._hook.fixture.get_independent_clusters():
for node in cluster._all_mongo_d_s_t():
if not isinstance(node, (MongoDFixture, ExternalFixture, _MongoSFixture)):
continue
node_url = node.get_driver_connection_url()
current_count = self._hook._read_metric_or_fail(node, "post-test")
baseline = self._hook._baseline_metrics.get(node, 0)
if current_count > baseline:
violation_msg = (
f"Test '{self._base_test_name}' directly targeted system.buckets collection(s). "
f"Node: {node_url}, Baseline: {baseline}, Current: {current_count}, "
f"Difference: {current_count - baseline}. "
)
violations.append(violation_msg)
self.logger.error(violation_msg)
# Update baseline to ensure test isolation
self._hook._baseline_metrics[node] = current_count
if violations:
raise errors.TestFailure(
"Test directly targeted system.buckets collections:\n" + "\n".join(violations)
)

View File

@ -0,0 +1,128 @@
/**
* Validates metrics.numCommandsTargetingSystemBuckets when issuing commands that
* target timeseries collections vs directly targeting system.buckets collections.
*
* @tags: [
* requires_timeseries,
* ]
*/
import {getTimeseriesBucketsColl} from "jstests/core/timeseries/libs/viewless_timeseries_util.js";
import {ReplSetTest} from "jstests/libs/replsettest.js";
import {ShardingTest} from "jstests/libs/shardingtest.js";
const timeField = "my_time";
const metaField = "my_meta";
const timeseriesRawDoc = {
"control": {
"version": 2,
"min": {
"_id": ObjectId("68f24fb53dc19da48a654135"),
"my_time": ISODate("2025-03-08T16:40:00Z"),
},
"max": {
"_id": ObjectId("68f24fb53dc19da48a654135"),
"my_time": ISODate("2025-03-08T16:40:39.239Z"),
},
"count": 1,
},
"meta": {
"sensorId": 3,
},
"data": {
"_id": BinData(7, "BwBo8k+1PcGdpIplQTUA"),
"my_time": BinData(7, "CQBHVKF2lQEAAAA="),
},
};
function getNumSystemBucketsCommands(conn) {
const ss = assert.commandWorked(conn.getDB("admin").runCommand({serverStatus: 1}));
assert.hasFields(ss, ["metrics"], "serverStatus missing 'metrics'");
assert.hasFields(
ss.metrics,
["numCommandsTargetingSystemBuckets"],
"serverStatus.metrics missing 'numCommandsTargetingSystemBuckets'",
);
return ss.metrics.numCommandsTargetingSystemBuckets;
}
function assertSystemBucketsMetrics(primaryConn, otherConns, expectedPrimaryCount, ctxMsg) {
const actualPrimaryCount = getNumSystemBucketsCommands(primaryConn);
assert.eq(
expectedPrimaryCount,
actualPrimaryCount,
`Unexpected count on (${primaryConn.host}) ${ctxMsg}. ` +
`Expected ${expectedPrimaryCount}, got ${actualPrimaryCount}`,
);
for (const conn of otherConns) {
const count = getNumSystemBucketsCommands(conn);
assert.eq(0, count, `Expected 0 on connection (${conn.host}) ${ctxMsg}, got ${count}`);
}
}
function testDirectBucketTargeting({name, primaryConn, otherConns}) {
jsTest.log.info(`Testing direct bucket targeting metrics on topology: ${name}`);
const dbName = `${jsTestName()}_${name}`;
const testDB = primaryConn.getDB(dbName);
// 1) Regular timeseries usage should NOT count as direct system.buckets targeting.
let coll = testDB.getCollection("ts");
assert.commandWorked(testDB.createCollection(coll.getName(), {timeseries: {timeField, metaField}}));
assert.commandWorked(coll.insertOne({[timeField]: ISODate("2030-08-08T04:11:10Z")}));
assert(coll.drop());
assertSystemBucketsMetrics(primaryConn, otherConns, 0, `[after regular timeseries ops] (${name})`);
// 2) Directly targeting system.buckets should be counted.
coll = testDB.getCollection(getTimeseriesBucketsColl("coll"));
assert.commandWorked(testDB.createCollection(coll.getName(), {timeseries: {timeField, metaField}}));
assert.commandWorked(coll.insertOne(timeseriesRawDoc));
assert(coll.drop());
const EXPECTED_DIRECT_BUCKET_COMMANDS = 3; // createCollection + insert + drop
assertSystemBucketsMetrics(
primaryConn,
otherConns,
EXPECTED_DIRECT_BUCKET_COMMANDS,
`[after direct system.buckets ops] (${name})`,
);
}
// Standalone mongod.
{
const mongod = MongoRunner.runMongod();
try {
testDirectBucketTargeting({name: "standalone", primaryConn: mongod, otherConns: []});
} finally {
MongoRunner.stopMongod(mongod);
}
}
// Replica set.
{
const rst = new ReplSetTest({name: jsTestName() + "_rs", nodes: 1});
rst.startSet();
rst.initiate();
try {
testDirectBucketTargeting({name: "replset", primaryConn: rst.getPrimary(), otherConns: []});
} finally {
rst.stopSet();
}
}
// Sharded cluster with an additional config shard.
{
const st = new ShardingTest({shards: 2, rs: {nodes: 1}, mongos: 1, configShard: true});
try {
testDirectBucketTargeting({
name: "sharded",
primaryConn: st.s,
otherConns: [st.rs0.getPrimary(), st.rs1.getPrimary()],
});
} finally {
st.stop();
}
}

View File

@ -2610,6 +2610,7 @@ mongo_cc_library(
"//src/mongo/db/repl:topology_coordinator",
"//src/mongo/db/repl:wait_for_majority_service",
"//src/mongo/db/rss:replicated_storage_service",
"//src/mongo/db/stats:system_buckets_metrics",
"//src/mongo/db/s:query_analysis_writer",
"//src/mongo/db/global_catalog/ddl:sessions_collection_config_server",
"//src/mongo/db/s:sharding_commands_d",

View File

@ -186,6 +186,7 @@
#include "mongo/db/sharding_environment/sharding_ready.h"
#include "mongo/db/startup_recovery.h"
#include "mongo/db/startup_warnings_mongod.h"
#include "mongo/db/stats/system_buckets_metrics.h"
#include "mongo/db/storage/backup_cursor_hooks.h"
#include "mongo/db/storage/control/storage_control.h"
#include "mongo/db/storage/disk_space_monitor.h"
@ -404,24 +405,28 @@ void initializeCommandHooks(ServiceContext* serviceContext) {
class MongodCommandInvocationHooks final : public CommandInvocationHooks {
public:
void onBeforeRun(OperationContext* opCtx, CommandInvocation* invocation) override {
_nextHook.onBeforeRun(opCtx, invocation);
_transportHook.onBeforeRun(opCtx, invocation);
_systemBucketsHook.onBeforeRun(opCtx, invocation);
}
void onBeforeAsyncRun(std::shared_ptr<RequestExecutionContext> rec,
CommandInvocation* invocation) override {
_nextHook.onBeforeAsyncRun(rec, invocation);
_transportHook.onBeforeAsyncRun(rec, invocation);
_systemBucketsHook.onBeforeAsyncRun(rec, invocation);
}
void onAfterRun(OperationContext* opCtx,
CommandInvocation* invocation,
rpc::ReplyBuilderInterface* response) override {
_nextHook.onAfterRun(opCtx, invocation, response);
_transportHook.onAfterRun(opCtx, invocation, response);
_systemBucketsHook.onAfterRun(opCtx, invocation, response);
_onAfterRunImpl(opCtx);
}
void onAfterAsyncRun(std::shared_ptr<RequestExecutionContext> rec,
CommandInvocation* invocation) override {
_nextHook.onAfterAsyncRun(rec, invocation);
_transportHook.onAfterAsyncRun(rec, invocation);
_systemBucketsHook.onAfterAsyncRun(rec, invocation);
_onAfterRunImpl(rec->getOpCtx());
}
@ -431,7 +436,8 @@ void initializeCommandHooks(ServiceContext* serviceContext) {
MirrorMaestro::onReceiveMirroredRead(opCtx);
}
transport::IngressHandshakeMetricsCommandHooks _nextHook{};
transport::IngressHandshakeMetricsCommandHooks _transportHook{};
SystemBucketsMetricsCommandHooks _systemBucketsHook{};
};
CommandInvocationHooks::set(serviceContext, std::make_unique<MongodCommandInvocationHooks>());

View File

@ -87,6 +87,19 @@ mongo_cc_library(
],
)
mongo_cc_library(
name = "system_buckets_metrics",
srcs = [
"system_buckets_metrics.cpp",
],
hdrs = [
"system_buckets_metrics.h",
],
deps = [
"//src/mongo/db:commands",
],
)
mongo_cc_library(
name = "transaction_stats",
srcs = [

View File

@ -31,3 +31,6 @@ filters:
- "change_stream*":
approvers:
- 10gen/query-execution-change-streams
- "system_buckets_metrics*":
approvers:
- 10gen/server-catalog-and-routing

View File

@ -0,0 +1,67 @@
/**
* Copyright (C) 2025-present MongoDB, Inc.
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the Server Side Public License, version 1,
* as published by MongoDB, Inc.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* Server Side Public License for more details.
*
* You should have received a copy of the Server Side Public License
* along with this program. If not, see
* <http://www.mongodb.com/licensing/server-side-public-license>.
*
* As a special exception, the copyright holders give permission to link the
* code of portions of this program with the OpenSSL library under certain
* conditions as described in each individual source file and distribute
* linked combinations including the program with the OpenSSL library. You
* must comply with the Server Side Public License in all respects for
* all of the code used other than as permitted herein. If you modify file(s)
* with this exception, you may extend this exception to your version of the
* file(s), but you are not obligated to do so. If you do not wish to do so,
* delete this exception statement from your version. If you delete this
* exception statement from all source files in the program, then also delete
* it in the license file.
*/
#include "mongo/db/stats/system_buckets_metrics.h"
#include "mongo/db/commands/server_status/server_status_metric.h"
#define MONGO_LOGV2_DEFAULT_COMPONENT ::mongo::logv2::LogComponent::kCommand
namespace mongo {
SystemBucketsMetricsCommandHooks::SystemBucketsMetricsCommandHooks() {
_commandsExecuted = &*MetricBuilder<Counter64>("numCommandsTargetingSystemBuckets");
}
void SystemBucketsMetricsCommandHooks::onBeforeRun(OperationContext* opCtx,
CommandInvocation* invocation) {
if (
// This command have been initiated by another command (e.g. DBDirectClient)
isProcessInternalClient(*(opCtx->getClient())) ||
// This command comes from another node within the same cluster
opCtx->getClient()->isInternalClient() ||
// This command does not target a system buckets collection
!invocation->ns().isTimeseriesBucketsCollection()) {
return;
}
LOGV2_DEBUG(11259900,
_logSuppressor().toInt(),
"Received command targeting directly a system buckets namespace",
"command"_attr = invocation->definition()->getName(),
"namespace"_attr = invocation->ns().toStringForErrorMsg(),
"client"_attr = opCtx->getClient()->clientAddress(true),
"connId"_attr = opCtx->getClient()->getConnectionId());
_commandsExecuted->increment();
}
} // namespace mongo

View File

@ -0,0 +1,53 @@
/**
* Copyright (C) 2025-present MongoDB, Inc.
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the Server Side Public License, version 1,
* as published by MongoDB, Inc.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* Server Side Public License for more details.
*
* You should have received a copy of the Server Side Public License
* along with this program. If not, see
* <http://www.mongodb.com/licensing/server-side-public-license>.
*
* As a special exception, the copyright holders give permission to link the
* code of portions of this program with the OpenSSL library under certain
* conditions as described in each individual source file and distribute
* linked combinations including the program with the OpenSSL library. You
* must comply with the Server Side Public License in all respects for
* all of the code used other than as permitted herein. If you modify file(s)
* with this exception, you may extend this exception to your version of the
* file(s), but you are not obligated to do so. If you do not wish to do so,
* delete this exception statement from your version. If you delete this
* exception statement from all source files in the program, then also delete
* it in the license file.
*/
#pragma once
#include "mongo/db/commands.h"
#include "mongo/logv2/log_severity_suppressor.h"
namespace mongo {
/**
* A CommandInvocation hooks to track commands directly targeting system.buckets collections.
*/
class SystemBucketsMetricsCommandHooks final : public CommandInvocationHooks {
public:
SystemBucketsMetricsCommandHooks();
void onBeforeRun(OperationContext* opCtx, CommandInvocation* invocation) override;
void onAfterRun(OperationContext* opCtx,
CommandInvocation* invocation,
rpc::ReplyBuilderInterface* response) override {}
Counter64* _commandsExecuted;
logv2::SeveritySuppressor _logSuppressor{
Hours{1}, logv2::LogSeverity::Info(), logv2::LogSeverity::Debug(2)};
};
} // namespace mongo

View File

@ -626,6 +626,7 @@ mongo_cc_library(
"//src/mongo/db/session:logical_session_cache_impl",
"//src/mongo/db/session:session_catalog",
"//src/mongo/db:startup_warnings_common",
"//src/mongo/db/stats:system_buckets_metrics",
"//src/mongo/db/topology/cluster_parameters:cluster_server_parameter_refresher",
"//src/mongo/otel/traces:tracing_initialization",
"//src/mongo/transport:ingress_handshake_metrics",

View File

@ -93,6 +93,7 @@
#include "mongo/db/sharding_environment/sharding_initialization.h"
#include "mongo/db/sharding_environment/version_mongos.h"
#include "mongo/db/startup_warnings_common.h"
#include "mongo/db/stats/system_buckets_metrics.h"
#include "mongo/db/topology/cluster_parameters/cluster_server_parameter_refresher.h"
#include "mongo/db/topology/mongos_topology_coordinator.h"
#include "mongo/db/topology/shard_registry.h"
@ -541,6 +542,28 @@ void cleanupTask(const ShutdownTaskArgs& shutdownArgs) {
#endif
}
void initializeCommandHooks(ServiceContext* service) {
class MongosCommandInvocationHooks final : public CommandInvocationHooks {
public:
void onBeforeRun(OperationContext* opCtx, CommandInvocation* invocation) override {
_transportHook.onBeforeRun(opCtx, invocation);
_systemBucketsHook.onBeforeRun(opCtx, invocation);
}
void onAfterRun(OperationContext* opCtx,
CommandInvocation* invocation,
rpc::ReplyBuilderInterface* response) override {
_transportHook.onAfterRun(opCtx, invocation, response);
_systemBucketsHook.onAfterRun(opCtx, invocation, response);
}
transport::IngressHandshakeMetricsCommandHooks _transportHook{};
SystemBucketsMetricsCommandHooks _systemBucketsHook{};
};
CommandInvocationHooks::set(service, std::make_unique<MongosCommandInvocationHooks>());
}
Status initializeSharding(
OperationContext* opCtx,
std::shared_ptr<ReplicaSetChangeNotifier::Listener>* replicaSetChangeListener,
@ -834,8 +857,7 @@ ExitCode runMongosServer(ServiceContext* serviceContext) {
"error"_attr = redact(ex));
}
CommandInvocationHooks::set(serviceContext,
std::make_unique<transport::IngressHandshakeMetricsCommandHooks>());
initializeCommandHooks(serviceContext);
// Must happen before FTDC, because Periodic Metadata Collustion calls getClusterParameter
ClusterServerParameterRefresher::start(serviceContext, opCtx);

View File

@ -124,7 +124,7 @@ private:
* An implementation of CommandInvocationHooks that ensures IngressHandshakeMetrics::onCommand will
* be called for each command.
*/
class IngressHandshakeMetricsCommandHooks : public CommandInvocationHooks {
class IngressHandshakeMetricsCommandHooks final : public CommandInvocationHooks {
public:
void onBeforeRun(OperationContext* opCtx, CommandInvocation* invocation) override;