SERVER-112570 Fix priority stats with the new ticketing system implementation (#43722)

GitOrigin-RevId: b3fc8aa433b01c40b1dea6fcf4458d9c5eabea43
This commit is contained in:
Marcos Grillo 2025-11-13 17:55:21 +01:00 committed by MongoDB Bot
parent fe154bba22
commit f327be6071
4 changed files with 136 additions and 18 deletions

View File

@ -0,0 +1,95 @@
/**
* Checks the aggregate of tickets statistic happens if the prioritization is enabled.
*/
import {ReplSetTest} from "jstests/libs/replsettest.js";
function verifyTicketAggregationStats(assertFunc, obj1, obj2) {
// Step 1: ensure the statistic schema is correct.
assert.eq(true, obj1.hasOwnProperty("available"));
assert.eq(true, obj1.hasOwnProperty("out"));
assert.eq(true, obj1.hasOwnProperty("totalTickets"));
assert.eq(true, obj2.hasOwnProperty("available"));
assert.eq(true, obj2.hasOwnProperty("out"));
assert.eq(true, obj2.hasOwnProperty("totalTickets"));
// Step 2: compare the statistics. The caller tries to assert compFunc(obj1, obj2).
// Not comparing out because it depends on whether there is an op running or not.
assertFunc(obj1.available, obj2.available);
assertFunc(obj1.totalTickets, obj2.totalTickets);
}
const kNumReadTickets = 5;
const kNumWriteTickets = 5;
const rst = new ReplSetTest({
nodes: 1,
nodeOptions: {
setParameter: {
executionControlConcurrentReadTransactions: kNumReadTickets,
executionControlConcurrentReadLowPriorityTransactions: kNumReadTickets,
executionControlConcurrentWriteTransactions: kNumWriteTickets,
executionControlConcurrentWriteLowPriorityTransactions: kNumWriteTickets,
},
},
});
rst.startSet();
rst.initiate();
const primary = rst.getPrimary();
const db = primary.getDB(jsTestName());
assert.commandWorked(
rst.getPrimary().adminCommand({
setParameter: 1,
executionControlConcurrencyAdjustmentAlgorithm: "throughputProbing",
}),
);
jsTestLog("Verify that there is no totalization of tickets in throughputProbing");
const executionStats = db.serverStatus().queues.execution;
assert.eq(executionStats.prioritizationEnabled, false);
verifyTicketAggregationStats(assert.eq, executionStats.read, executionStats.read.normalPriority);
verifyTicketAggregationStats(assert.eq, executionStats.write, executionStats.write.normalPriority);
jsTestLog("Verify that there is totalization of tickets in fixedConcurrentTransactionsWithPrioritization");
assert.commandWorked(
rst.getPrimary().adminCommand({
setParameter: 1,
executionControlConcurrencyAdjustmentAlgorithm: "fixedConcurrentTransactionsWithPrioritization",
}),
);
const executionStatsWithPrioritization = db.serverStatus().queues.execution;
assert.eq(executionStatsWithPrioritization.prioritizationEnabled, true);
verifyTicketAggregationStats(
assert.gt,
executionStatsWithPrioritization.read,
executionStatsWithPrioritization.read.normalPriority,
);
verifyTicketAggregationStats(
assert.gt,
executionStatsWithPrioritization.write,
executionStatsWithPrioritization.write.normalPriority,
);
jsTestLog("Verify that there is no totalization of tickets in fixedConcurrentTransactions");
assert.commandWorked(
rst.getPrimary().adminCommand({
setParameter: 1,
executionControlConcurrencyAdjustmentAlgorithm: "fixedConcurrentTransactions",
}),
);
const executionStatsFixedConcurrency = db.serverStatus().queues.execution;
assert.eq(executionStatsFixedConcurrency.prioritizationEnabled, false);
verifyTicketAggregationStats(
assert.eq,
executionStatsFixedConcurrency.read,
executionStatsFixedConcurrency.read.normalPriority,
);
verifyTicketAggregationStats(
assert.eq,
executionStatsFixedConcurrency.write,
executionStatsFixedConcurrency.write.normalPriority,
);
// TODO SERVER-113367: Once we can enable the heuristic with throughput probing, we need to add cases of the heuristic and algorithms combinations.
rst.stopSet();

View File

@ -588,8 +588,8 @@ describe("Execution control concurrency adjustment algorithm", function () {
const status = assert.commandWorked(mongod.adminCommand({serverStatus: 1}));
const stats = status.queues.execution;
assert.eq(stats.read.normalPriority.totalTickets, customReadTickets);
assert.eq(stats.write.normalPriority.totalTickets, customWriteTickets);
assert.eq(stats.read.totalTickets, customReadTickets);
assert.eq(stats.write.totalTickets, customWriteTickets);
});
it(`should align tickets when transitioning from '${kThroughputProbing}' to '${kFixedWithPrio}'`, function () {

View File

@ -19,14 +19,28 @@ const rst = new ReplSetTest({
// Make yielding more common.
internalQueryExecYieldPeriodMS: 1,
internalQueryExecYieldIterations: 1,
// Disable heuristic deprioritization to ensure readers are queued in the normal pool.
executionControlHeuristicDeprioritizationEnabled: false,
},
},
});
rst.startSet();
rst.initiate();
// Disable deprioritization to ensure readers are queued in the normal pool. Also, ensure the
// algorithm is fixed concurrency to have a predictable number of tickets. They could be overriden
// at startup by some variants, so, it is set here explicitly.
assert.commandWorked(
rst.getPrimary().adminCommand({
setParameter: 1,
executionControlHeuristicDeprioritizationEnabled: false,
}),
);
assert.commandWorked(
rst.getPrimary().adminCommand({
setParameter: 1,
executionControlConcurrencyAdjustmentAlgorithm: "fixedConcurrentTransactions",
}),
);
const dbName = jsTestName();
const collName = "testcoll";
const primary = rst.getPrimary();
@ -82,7 +96,7 @@ jsTestLog("Wait for no available read tickets");
assert.soon(() => {
let stats = db.runCommand({serverStatus: 1});
jsTestLog(stats.queues.execution);
return stats.queues.execution.read.normalPriority.available == 0;
return stats.queues.execution.read.available == 0;
}, "Expected to have no available read tickets.");
// Force thread to sleep for 1ms to guarantee readers accrue wait time in queue.

View File

@ -392,12 +392,15 @@ void TicketingSystem::appendStats(BSONObjBuilder& b) const {
int32_t writeOut = 0, writeAvailable = 0, writeTotalTickets = 0;
_state.loadRelaxed().appendStats(b);
b.append("totalDeprioritizations", _opsDeprioritized.loadRelaxed());
b.append("prioritizationEnabled", usesPrioritization());
for (size_t i = 0; i < _holders.size(); ++i) {
const auto priority = static_cast<AdmissionContext::Priority>(i);
if (priority == AdmissionContext::Priority::kExempt) {
// Do not report statistics for kExempt as they are included in the normal priority pool
// Do not report statistics for kExempt as they are included in the normal priority
// pool. Also, low priority statistics should only be reported when prioritization is
// enabled.
continue;
}
@ -407,37 +410,43 @@ void TicketingSystem::appendStats(BSONObjBuilder& b) const {
? kNormalPriorityName
: kLowPriorityName;
if (rw.read) {
readOut += rw.read->used();
readAvailable += rw.read->available();
readTotalTickets += rw.read->outof();
if (!readStats.is_initialized()) {
readStats.emplace();
}
BSONObjBuilder bb(readStats->subobjStart(fieldName));
rw.read->appendTicketStats(bb);
rw.read->appendHolderStats(bb);
bb.done();
if (priority == AdmissionContext::Priority::kNormal) {
BSONObjBuilder bb(readStats->subobjStart(kExemptPriorityName));
BSONObjBuilder exemptBuilder(readStats->subobjStart(kExemptPriorityName));
rw.read->appendExemptStats(readStats.value());
bb.done();
exemptBuilder.done();
}
auto obj = bb.done();
// Totalization of tickets for the aggregate only if prioritization is enabled.
if (priority == AdmissionContext::Priority::kNormal || usesPrioritization()) {
readOut += obj.getIntField("out");
readAvailable += obj.getIntField("available");
readTotalTickets += obj.getIntField("totalTickets");
}
}
if (rw.write) {
writeOut += rw.write->used();
writeAvailable += rw.write->available();
writeTotalTickets += rw.write->outof();
if (!writeStats.is_initialized()) {
writeStats.emplace();
}
BSONObjBuilder bb(writeStats->subobjStart(fieldName));
rw.write->appendTicketStats(bb);
rw.write->appendHolderStats(bb);
bb.done();
if (priority == AdmissionContext::Priority::kNormal) {
BSONObjBuilder bb(writeStats->subobjStart(kExemptPriorityName));
BSONObjBuilder exemptBuilder(writeStats->subobjStart(kExemptPriorityName));
rw.write->appendExemptStats(writeStats.value());
bb.done();
exemptBuilder.done();
}
auto obj = bb.done();
// Totalization of tickets for the aggregate only if prioritization is enabled.
if (priority == AdmissionContext::Priority::kNormal || usesPrioritization()) {
writeOut += obj.getIntField("out");
writeAvailable += obj.getIntField("available");
writeTotalTickets += obj.getIntField("totalTickets");
}
}
}