SERVER-104007 Track delinquent checkForInterrupt (#41510)

GitOrigin-RevId: e7d6a5925d0939a105aee7e5914023c6fe713287
This commit is contained in:
Zixuan 2025-10-07 15:14:39 -04:00 committed by MongoDB Bot
parent da0fc3c1bc
commit def54dbbaf
18 changed files with 104 additions and 1 deletions

View File

@ -151,6 +151,7 @@ function testDelinquencyOnShard(routerDb, shardDb) {
curOp.inprog.length === 1,
"Expected to find exactly one active find() command with the comment " + findComment);
assertDelinquentStats(curOp.inprog[0].delinquencyInfo, 2, curOp.inprog[0]);
assert.gte(curOp.inprog[0].numInterruptChecks, 2, curOp.inprog[0]);
}
// After the find() command, we check that the serverStatus has the delinquent stats
@ -176,6 +177,7 @@ function testDelinquencyOnShard(routerDb, shardDb) {
const parsedLine = JSON.parse(line);
const delinquencyInfo = parsedLine.attr.delinquencyInfo;
assertDelinquentStats(delinquencyInfo, count, line);
assert.gte(parsedLine.attr.numInterruptChecks, count, parsedLine.attr);
};
const globalLog = assert.commandWorked(shardDb.adminCommand({getLog: "global"}));
@ -199,6 +201,9 @@ function testDelinquencyOnShard(routerDb, shardDb) {
assert.gte(queryStats[0].metrics.maxAcquisitionDelinquencyMillis.max,
waitPerIterationMs,
tojson(queryStats));
// For first batch, numInterruptChecks >=4, time ~=600ms
// For second batch, numInterruptChecks >=3, time~=400ms
assert.gte(queryStats[0].metrics.numInterruptChecksPerSec.sum, 7, tojson(queryStats));
}
{

View File

@ -51,6 +51,7 @@ function assertMetricsEqual(cursor, {
assertMetricEqual(metrics, "usedDisk", usedDisk);
assertMetricEqual(metrics, "fromMultiPlanner", fromMultiPlanner);
assertMetricEqual(metrics, "fromPlanCache", fromPlanCache);
assertMetricEqual(metrics, "numInterruptChecks", undefined);
// The following metrics exist but are 0 by default.
assertMetricEqual(metrics, "delinquentAcquisitions", 0);
assertMetricEqual(metrics, "totalAcquisitionDelinquencyMillis", 0);

View File

@ -421,6 +421,10 @@ void CurOp::setEndOfOpMetrics(long long nreturned) {
admCtx.getMaxAcquisitionDelinquencyMillis())};
}
if (!parent()) {
metrics.numInterruptChecks = opCtx()->numInterruptChecks();
}
try {
// If we need them, try to fetch the storage stats. We use an unlimited timeout here,
// but the lock acquisition could still be interrupted, which we catch and log.
@ -982,6 +986,9 @@ void CurOp::reportState(BSONObjBuilder* builder,
OpDebug::appendDelinquentInfo(opCtx, sub);
}
if (!parent()) {
builder->append("numInterruptChecks", opCtx->numInterruptChecks());
}
populateCurrentOpQueueStats(opCtx, _tickSource, builder);
}

View File

@ -106,6 +106,8 @@ TEST(CurOpTest, AddingAdditiveMetricsObjectsTogetherShouldAddFieldsTogether) {
additiveMetricsToAdd.totalAcquisitionDelinquencyMillis = Milliseconds{200};
currentAdditiveMetrics.maxAcquisitionDelinquencyMillis = Milliseconds{300};
additiveMetricsToAdd.maxAcquisitionDelinquencyMillis = Milliseconds{100};
currentAdditiveMetrics.numInterruptChecks = 1;
additiveMetricsToAdd.numInterruptChecks = 2;
// Save the current AdditiveMetrics object before adding.
OpDebug::AdditiveMetrics additiveMetricsBeforeAdd;
@ -151,6 +153,9 @@ TEST(CurOpTest, AddingAdditiveMetricsObjectsTogetherShouldAddFieldsTogether) {
ASSERT_EQ(*currentAdditiveMetrics.maxAcquisitionDelinquencyMillis,
std::max(*additiveMetricsBeforeAdd.maxAcquisitionDelinquencyMillis,
*additiveMetricsToAdd.maxAcquisitionDelinquencyMillis));
ASSERT_EQ(*currentAdditiveMetrics.numInterruptChecks,
*additiveMetricsBeforeAdd.numInterruptChecks +
*additiveMetricsToAdd.numInterruptChecks);
}
TEST(CurOpTest, AddingUninitializedAdditiveMetricsFieldsShouldBeTreatedAsZero) {
@ -264,6 +269,7 @@ TEST(CurOpTest, AdditiveMetricsShouldAggregateCursorMetrics) {
additiveMetrics.delinquentAcquisitions = 2;
additiveMetrics.totalAcquisitionDelinquencyMillis = Milliseconds(400);
additiveMetrics.maxAcquisitionDelinquencyMillis = Milliseconds(300);
additiveMetrics.numInterruptChecks = 2;
CursorMetrics cursorMetrics(3 /* keysExamined */,
4 /* docsExamined */,
@ -278,6 +284,7 @@ TEST(CurOpTest, AdditiveMetricsShouldAggregateCursorMetrics) {
cursorMetrics.setDelinquentAcquisitions(3);
cursorMetrics.setTotalAcquisitionDelinquencyMillis(400);
cursorMetrics.setMaxAcquisitionDelinquencyMillis(200);
cursorMetrics.setNumInterruptChecks(3);
additiveMetrics.aggregateCursorMetrics(cursorMetrics);
@ -292,6 +299,7 @@ TEST(CurOpTest, AdditiveMetricsShouldAggregateCursorMetrics) {
ASSERT_EQ(*additiveMetrics.delinquentAcquisitions, 5);
ASSERT_EQ(*additiveMetrics.totalAcquisitionDelinquencyMillis, Milliseconds(800));
ASSERT_EQ(*additiveMetrics.maxAcquisitionDelinquencyMillis, Milliseconds(300));
ASSERT_EQ(*additiveMetrics.numInterruptChecks, 5);
}
TEST(CurOpTest, AdditiveMetricsShouldAggregateNegativeCpuNanos) {
@ -333,11 +341,13 @@ TEST(CurOpTest, AdditiveMetricsAggregateCursorMetricsTreatsNoneAsZero) {
false /* fromPlanCache */,
10 /* cpuNanos */);
cursorMetrics.setNumInterruptChecks(3);
additiveMetrics.aggregateCursorMetrics(cursorMetrics);
ASSERT_EQ(*additiveMetrics.keysExamined, 1);
ASSERT_EQ(*additiveMetrics.docsExamined, 2);
ASSERT_EQ(*additiveMetrics.bytesRead, 3);
ASSERT_EQ(*additiveMetrics.numInterruptChecks, 3);
}
TEST(CurOpTest, AdditiveMetricsShouldAggregateDataBearingNodeMetrics) {
@ -352,6 +362,7 @@ TEST(CurOpTest, AdditiveMetricsShouldAggregateDataBearingNodeMetrics) {
additiveMetrics.delinquentAcquisitions = 2;
additiveMetrics.totalAcquisitionDelinquencyMillis = Milliseconds(400);
additiveMetrics.maxAcquisitionDelinquencyMillis = Milliseconds(200);
additiveMetrics.numInterruptChecks = 2;
query_stats::DataBearingNodeMetrics remoteMetrics;
remoteMetrics.keysExamined = 3;
@ -363,6 +374,7 @@ TEST(CurOpTest, AdditiveMetricsShouldAggregateDataBearingNodeMetrics) {
remoteMetrics.delinquentAcquisitions = 1;
remoteMetrics.totalAcquisitionDelinquencyMillis = Milliseconds(300);
remoteMetrics.maxAcquisitionDelinquencyMillis = Milliseconds(300);
remoteMetrics.numInterruptChecks = 1;
additiveMetrics.aggregateDataBearingNodeMetrics(remoteMetrics);
@ -375,6 +387,7 @@ TEST(CurOpTest, AdditiveMetricsShouldAggregateDataBearingNodeMetrics) {
ASSERT_EQ(*additiveMetrics.delinquentAcquisitions, 3);
ASSERT_EQ(*additiveMetrics.totalAcquisitionDelinquencyMillis, Milliseconds(700));
ASSERT_EQ(*additiveMetrics.maxAcquisitionDelinquencyMillis, Milliseconds(300));
ASSERT_EQ(*additiveMetrics.numInterruptChecks, 3);
}
TEST(CurOpTest, AdditiveMetricsAggregateDataBearingNodeMetricsTreatsNoneAsZero) {
@ -583,6 +596,8 @@ TEST(CurOpTest, ReportStateIncludesDelinquentStatsIfNonZero) {
curOp->reportState(&bob, SerializationContext{});
BSONObj state = bob.obj();
ASSERT_FALSE(state.hasField("delinquencyInfo"));
// Field numInterruptChecks should be always shown.
ASSERT_TRUE(state.hasField("numInterruptChecks"));
}
// If the delinquent stats are not zero, they *are* included in the state.
@ -597,6 +612,17 @@ TEST(CurOpTest, ReportStateIncludesDelinquentStatsIfNonZero) {
ASSERT_EQ(state["delinquencyInfo"]["totalAcquisitionDelinquencyMillis"].Long(), 30);
ASSERT_EQ(state["delinquencyInfo"]["maxAcquisitionDelinquencyMillis"].Long(), 20);
}
// If the delinquent stats are not zero, they *are* included in the state.
{
opCtx->checkForInterrupt();
BSONObjBuilder bob;
curOp->reportState(&bob, SerializationContext{});
BSONObj state = bob.obj();
ASSERT_TRUE(state.hasField("numInterruptChecks")) << state.toString();
ASSERT_EQ(state["numInterruptChecks"].Number(), 1);
}
}
TEST(CurOpTest, ShouldNotReportFailpointMsgIfNotSet) {

View File

@ -457,6 +457,10 @@ void OpDebug::report(OperationContext* opCtx,
pAttrs->add("delinquencyInfo", sub.obj());
}
if (!curop.parent()) {
pAttrs->add("numInterruptChecks", opCtx->numInterruptChecks());
}
// Extract admission and execution control queueing stats from AdmissionContext stored on opCtx
TicketHolderQueueStats queueingStats(opCtx);
pAttrs->add("queues", queueingStats.toBson());
@ -1249,6 +1253,8 @@ CursorMetrics OpDebug::getCursorMetrics() const {
metrics.setMaxAcquisitionDelinquencyMillis(
additiveMetrics.maxAcquisitionDelinquencyMillis.value_or(Milliseconds(0)).count());
metrics.setNumInterruptChecks(additiveMetrics.numInterruptChecks.value_or(0));
metrics.setHasSortStage(additiveMetrics.hasSortStage);
metrics.setUsedDisk(additiveMetrics.usedDisk);
metrics.setFromMultiPlanner(additiveMetrics.fromMultiPlanner);
@ -1355,6 +1361,8 @@ void OpDebug::AdditiveMetrics::add(const AdditiveMetrics& otherMetrics) {
otherMetrics.maxAcquisitionDelinquencyMillis->count())};
}
numInterruptChecks = addOptionals(numInterruptChecks, otherMetrics.numInterruptChecks);
hasSortStage = hasSortStage || otherMetrics.hasSortStage;
usedDisk = usedDisk || otherMetrics.usedDisk;
fromMultiPlanner = fromMultiPlanner || otherMetrics.fromMultiPlanner;
@ -1384,6 +1392,8 @@ void OpDebug::AdditiveMetrics::aggregateDataBearingNodeMetrics(
Milliseconds{std::max(maxAcquisitionDelinquencyMillis.value_or(Milliseconds(0)).count(),
metrics.maxAcquisitionDelinquencyMillis.count())};
numInterruptChecks = numInterruptChecks.value_or(0) + metrics.numInterruptChecks;
hasSortStage = hasSortStage || metrics.hasSortStage;
usedDisk = usedDisk || metrics.usedDisk;
fromMultiPlanner = fromMultiPlanner || metrics.fromMultiPlanner;
@ -1414,6 +1424,7 @@ void OpDebug::AdditiveMetrics::aggregateCursorMetrics(const CursorMetrics& metri
static_cast<uint64_t>(metrics.getDelinquentAcquisitions()),
Milliseconds(metrics.getTotalAcquisitionDelinquencyMillis()),
Milliseconds(metrics.getMaxAcquisitionDelinquencyMillis()),
static_cast<uint64_t>(metrics.getNumInterruptChecks()),
metrics.getHasSortStage(),
metrics.getUsedDisk(),
metrics.getFromMultiPlanner(),

View File

@ -210,6 +210,9 @@ public:
// set) to make the "sticky towards false" logic work.
boost::optional<bool> fromPlanCache;
// Total number of checkForInterrupt() calls by an operation.
boost::optional<uint64_t> numInterruptChecks;
// If query stats are being collected for this operation, stores the estimated cpu time
// across the cluster. In a mongod, this is the local cpu time and in mongos this track the
// total cpu time across the cluster.

View File

@ -214,6 +214,8 @@ bool opShouldFail(Client* client, const BSONObj& failPointInfo) {
} // namespace
Status OperationContext::checkForInterruptNoAssert() noexcept {
_numInterruptChecks.fetchAndAddRelaxed(1);
if (getClient()->getKilled() && !_isExecutingShutdown) {
return Status(ErrorCodes::ClientMarkedKilled, "client has been killed");
}

View File

@ -742,6 +742,13 @@ public:
return _killOpsExempt;
}
/**
* Returns number of checkForInterrupts() done on this OperationContext.
*/
int64_t numInterruptChecks() const {
return _numInterruptChecks.loadRelaxed();
}
/**
* Set to prevent killOps from killing this opCtx even when an LSID is set.
* You may only call this method prior to setting an LSID on this opCtx.
@ -950,6 +957,9 @@ private:
// When the operation was marked as killed.
AtomicWord<TickSource::Tick> _killTime{0};
// Tracks total number of interrupt checks.
Atomic<int64_t> _numInterruptChecks{0};
// Used to cancel all tokens obtained via getCancellationToken() when this OperationContext is
// killed.
CancellationSource _cancelSource;

View File

@ -105,6 +105,11 @@ structs:
type: long
stability: unstable
default: 0
numInterruptChecks:
description: "Number of checkForInterrupt is call by a query operation."
type: long
stability: unstable
default: 0
ResponseCursorBase:
description: "Common fields of initial and subsequent cursor responses."

View File

@ -63,7 +63,8 @@ static const BSONObj basicMetricsObj = fromjson(R"({
cpuNanos: {"$numberLong": "18"},
delinquentAcquisitions: {"$numberLong": "0"},
totalAcquisitionDelinquencyMillis: {"$numberLong": "0"},
maxAcquisitionDelinquencyMillis: {"$numberLong": "0"}
maxAcquisitionDelinquencyMillis: {"$numberLong": "0"},
numInterruptChecks: {"$numberLong": "0"}
})");
static const std::string defaultNssStr = "db.coll";
@ -317,6 +318,7 @@ TEST(CursorResponseTest, parseFromBSONCursorMetrics) {
ASSERT_EQ(metrics.getDelinquentAcquisitions(), 0);
ASSERT_EQ(metrics.getTotalAcquisitionDelinquencyMillis(), 0);
ASSERT_EQ(metrics.getMaxAcquisitionDelinquencyMillis(), 0);
ASSERT_EQ(metrics.getNumInterruptChecks(), 0);
}
TEST(CursorResponseTest, parseFromBSONCursorMetricsWrongType) {
@ -966,6 +968,7 @@ TEST_F(CursorResponseBuilderTest, buildResponseWithAllKnownFields) {
ASSERT_EQ(parsedMetrics->getDelinquentAcquisitions(), 0);
ASSERT_EQ(parsedMetrics->getTotalAcquisitionDelinquencyMillis(), 0);
ASSERT_EQ(parsedMetrics->getMaxAcquisitionDelinquencyMillis(), 0);
ASSERT_EQ(parsedMetrics->getNumInterruptChecks(), 0);
ASSERT_TRUE(response.getPartialResultsReturned());
ASSERT_TRUE(response.getInvalidated());

View File

@ -245,6 +245,8 @@ following way:
acquisition overdue by a query operation, including getMores.
- `metrics.maxAcquisitionDelinquencyMillis`: Maximum time in milliseconds that an execution ticket
acquisition overdue by a query operation, including getMores.
- `metrics.numInterruptChecksPerSec`: Number of times checkForInterrupt is called per second by a
query operation, including getMores.
#### Permissions

View File

@ -53,6 +53,7 @@ struct DataBearingNodeMetrics {
uint64_t delinquentAcquisitions{0};
Milliseconds totalAcquisitionDelinquencyMillis{0};
Milliseconds maxAcquisitionDelinquencyMillis{0};
uint64_t numInterruptChecks = 0;
bool hasSortStage : 1 = false;
bool usedDisk : 1 = false;
@ -75,6 +76,7 @@ struct DataBearingNodeMetrics {
maxAcquisitionDelinquencyMillis =
Milliseconds{std::max(maxAcquisitionDelinquencyMillis.count(),
other.maxAcquisitionDelinquencyMillis.count())};
numInterruptChecks += other.numInterruptChecks;
hasSortStage = hasSortStage || other.hasSortStage;
usedDisk = usedDisk || other.usedDisk;
fromMultiPlanner = fromMultiPlanner || other.fromMultiPlanner;
@ -103,6 +105,7 @@ struct DataBearingNodeMetrics {
Milliseconds(metrics.getTotalAcquisitionDelinquencyMillis());
maxAcquisitionDelinquencyMillis = Milliseconds(std::max(
maxAcquisitionDelinquencyMillis.count(), metrics.getMaxAcquisitionDelinquencyMillis()));
numInterruptChecks += metrics.getNumInterruptChecks();
hasSortStage = hasSortStage || metrics.getHasSortStage();
usedDisk = usedDisk || metrics.getUsedDisk();
fromMultiPlanner = fromMultiPlanner || metrics.getFromMultiPlanner();

View File

@ -273,6 +273,14 @@ void updateStatistics(const QueryStatsStore::Partition& proofOfLock,
snapshot.totalAcquisitionDelinquencyMillis);
toUpdate.maxAcquisitionDelinquencyMillis.aggregate(snapshot.maxAcquisitionDelinquencyMillis);
// Store the number of interrupt checks per second as a rate, this can give us a better sense of
// how often interrupts are being checked relative to the total execution time across multiple
// query runs.
auto secondCount = durationCount<Seconds>(Milliseconds{snapshot.workingTimeMillis});
auto numInterruptChecksPerSec =
secondCount == 0 ? 0 : snapshot.numInterruptChecks / (static_cast<double>(secondCount));
toUpdate.numInterruptChecksPerSec.aggregate(numInterruptChecksPerSec);
toUpdate.hasSortStage.aggregate(snapshot.hasSortStage);
toUpdate.usedDisk.aggregate(snapshot.usedDisk);
toUpdate.fromMultiPlanner.aggregate(snapshot.fromMultiPlanner);
@ -447,6 +455,7 @@ QueryStatsSnapshot captureMetrics(const OperationContext* opCtx,
metrics.delinquentAcquisitions.value_or(0),
metrics.totalAcquisitionDelinquencyMillis.value_or(Milliseconds(0)).count(),
metrics.maxAcquisitionDelinquencyMillis.value_or(Milliseconds(0)).count(),
metrics.numInterruptChecks.value_or(0),
metrics.hasSortStage,
metrics.usedDisk,
metrics.fromMultiPlanner,

View File

@ -230,6 +230,8 @@ struct QueryStatsSnapshot {
int64_t totalAcquisitionDelinquencyMillis;
int64_t maxAcquisitionDelinquencyMillis;
uint64_t numInterruptChecks;
bool hasSortStage;
bool usedDisk;
bool fromMultiPlanner;

View File

@ -55,6 +55,8 @@ BSONObj QueryStatsEntry::toBSON() const {
totalAcquisitionDelinquencyMillis.appendTo(builder, "totalAcquisitionDelinquencyMillis");
maxAcquisitionDelinquencyMillis.appendTo(builder, "maxAcquisitionDelinquencyMillis");
numInterruptChecksPerSec.appendTo(builder, "numInterruptChecksPerSec");
hasSortStage.appendTo(builder, "hasSortStage");
usedDisk.appendTo(builder, "usedDisk");
fromMultiPlanner.appendTo(builder, "fromMultiPlanner");

View File

@ -125,6 +125,11 @@ struct QueryStatsEntry {
AggregatedMetric<int64_t> totalAcquisitionDelinquencyMillis;
AggregatedMetric<int64_t> maxAcquisitionDelinquencyMillis;
/**
* Aggregates the checkForInterrupt stats including getMore requests.
*/
AggregatedMetric<uint64_t> numInterruptChecksPerSec;
/**
* Counts the frequency of the boolean value hasSortStage.
*/

View File

@ -1577,6 +1577,7 @@ TEST_F(QueryStatsStoreTest, BasicDiskUsage) {
.append("delinquentAcquisitions", emptyIntMetric)
.append("totalAcquisitionDelinquencyMillis", emptyIntMetric)
.append("maxAcquisitionDelinquencyMillis", emptyIntMetric)
.append("numInterruptChecksPerSec", emptyIntMetric)
.append("hasSortStage", boolMetricBson(0, 0))
.append("usedDisk", boolMetricBson(0, 0))
.append("fromMultiPlanner", boolMetricBson(0, 0))
@ -1615,6 +1616,7 @@ TEST_F(QueryStatsStoreTest, BasicDiskUsage) {
.append("delinquentAcquisitions", emptyIntMetric)
.append("totalAcquisitionDelinquencyMillis", emptyIntMetric)
.append("maxAcquisitionDelinquencyMillis", emptyIntMetric)
.append("numInterruptChecksPerSec", emptyIntMetric)
.append("hasSortStage", boolMetricBson(0, 1))
.append("usedDisk", boolMetricBson(1, 0))
.append("fromMultiPlanner", boolMetricBson(0, 0))

View File

@ -3259,6 +3259,7 @@ TEST_F(AsyncResultsMergerTest, RemoteMetricsAggregatedLocally) {
metrics.setDelinquentAcquisitions(3);
metrics.setTotalAcquisitionDelinquencyMillis(100);
metrics.setMaxAcquisitionDelinquencyMillis(80);
metrics.setNumInterruptChecks(3);
scheduleResponse(id, {fromjson("{_id: 1}")}, std::move(metrics));
}
@ -3284,6 +3285,7 @@ TEST_F(AsyncResultsMergerTest, RemoteMetricsAggregatedLocally) {
ASSERT_EQ(remoteMetrics.delinquentAcquisitions, 3);
ASSERT_EQ(remoteMetrics.totalAcquisitionDelinquencyMillis, Milliseconds(100));
ASSERT_EQ(remoteMetrics.maxAcquisitionDelinquencyMillis, Milliseconds(80));
ASSERT_EQ(remoteMetrics.numInterruptChecks, 3);
}
// Schedule a second response.
@ -3302,6 +3304,7 @@ TEST_F(AsyncResultsMergerTest, RemoteMetricsAggregatedLocally) {
metrics.setDelinquentAcquisitions(2);
metrics.setTotalAcquisitionDelinquencyMillis(150);
metrics.setMaxAcquisitionDelinquencyMillis(120);
metrics.setNumInterruptChecks(2);
scheduleResponse(CursorId(0), {fromjson("{_id: 2}")}, std::move(metrics));
}
@ -3325,6 +3328,7 @@ TEST_F(AsyncResultsMergerTest, RemoteMetricsAggregatedLocally) {
ASSERT_EQ(remoteMetrics.delinquentAcquisitions, 5);
ASSERT_EQ(remoteMetrics.totalAcquisitionDelinquencyMillis, Milliseconds(250));
ASSERT_EQ(remoteMetrics.maxAcquisitionDelinquencyMillis, Milliseconds(120));
ASSERT_EQ(remoteMetrics.numInterruptChecks, 5);
}
{
@ -3341,6 +3345,7 @@ TEST_F(AsyncResultsMergerTest, RemoteMetricsAggregatedLocally) {
ASSERT_EQ(remoteMetrics.delinquentAcquisitions, 0);
ASSERT_EQ(remoteMetrics.totalAcquisitionDelinquencyMillis, Milliseconds(0));
ASSERT_EQ(remoteMetrics.maxAcquisitionDelinquencyMillis, Milliseconds(0));
ASSERT_EQ(remoteMetrics.numInterruptChecks, 0);
}
// Read the EOF