SERVER-112118 Track and log how long it takes oplog sampling to stop after its been interrupted (#42899)

Co-authored-by: Gregory Wlodarek <gregory.wlodarek@mongodb.com>
GitOrigin-RevId: 8d7caaf81e9d4b4fb8a1047fdfbee6fb0ed9ebe1
This commit is contained in:
clarissecheah 2025-10-21 14:09:54 -04:00 committed by MongoDB Bot
parent 2e5d23ff56
commit b7daf9f217
2 changed files with 107 additions and 5 deletions

View File

@ -0,0 +1,81 @@
/**
* Checks that the oplog cap maintainer thread doesn't block stepdown.
*
* @tags: [requires_replication, requires_persistence]
*/
import {ReplSetTest} from "jstests/libs/replsettest.js";
import {waitForState} from "jstests/replsets/rslib.js";
function samplinginProgress(primary) {
const status = primary.getDB("local").serverStatus();
assert.commandWorked(status);
return (!status.oplogTruncation.hasOwnProperty("processingMethod") ||
status.oplogTruncation.processingMethod == "in progress");
}
const rst = new ReplSetTest({
nodes: 2,
nodeOptions: {
setParameter: {
"oplogSamplingAsyncEnabled": true,
useSlowCollectionTruncateMarkerScanning: true,
},
},
});
rst.startSet();
rst.initiate();
let coll = rst.getPrimary().getDB("test").getCollection("stepdown_test");
for (let i = 0; i < 250; i++) {
assert.commandWorked(coll.insert({a: i}));
}
// Stop and restart the replica set to re-trigger initial sampling.
rst.stopSet(null, true);
clearRawMongoProgramOutput();
rst.startSet(null, true);
jsTest.log.info("Replica set restarted.");
let primary = rst.getPrimary();
let secondary = rst.getSecondary();
// Verify we're still sampling.
assert(samplinginProgress(primary));
assert(samplinginProgress(secondary));
// Assert we have started the cap maintainer thread once.
assert(checkLog.checkContainsWithCountJson(primary, 5295000, {}, /*expectedCount=*/ 1));
// Wait for initial sampling to start
checkLog.containsJson(primary, 11212203);
jsTest.log.info("Attempting to step down primary");
assert.commandWorked(primary.adminCommand({replSetStepDown: 0, force: true}));
waitForState(primary, ReplSetTest.State.SECONDARY);
let interruptCount = assert.commandWorked(primary.getDB("admin").runCommand({serverStatus: 1}))
.oplogTruncation.interruptCount;
assert(interruptCount == 0);
// Assert at this point we haven't completed initial sampling yet.
assert(checkLog.checkContainsWithCountJson(primary, 22382, {}, /*expectedCount=*/ 0));
// Verify we're still sampling.
assert(samplinginProgress(primary));
assert(samplinginProgress(secondary));
interruptCount = assert.commandWorked(primary.getDB("admin").runCommand({serverStatus: 1}))
.oplogTruncation.interruptCount;
assert(interruptCount == 0);
jsTest.log.info("Test complete. Stopping replica set.");
rst.stopSet();
// Make sure shutdown interrupted the OplogCapMaintainerThread.
let subStr = "11212201.*OplogCapMaintainerThread.*InterruptedAtShutdown";
assert(rawMongoProgramOutput(subStr).search(subStr) !== -1);
// Make sure we have interruption duration metrics at shutdown
subStr = "11211800.*[sS]hutdown.*durationMs";
assert(rawMongoProgramOutput(subStr).search(subStr) !== -1);

View File

@ -286,9 +286,22 @@ void OplogCapMaintainerThread::run() {
} }
ON_BLOCK_EXIT([&] { ON_BLOCK_EXIT([&] {
stdx::lock_guard<stdx::mutex> lk(_opCtxMutex); auto ts = _uniqueCtx->get()->getServiceContext()->getTickSource();
admissionPriority.reset(); auto killTime = _uniqueCtx->get()->getKillTime();
_uniqueCtx.reset(); {
stdx::lock_guard<stdx::mutex> lk(_stateMutex);
LOGV2(11211800,
"Time spent between an operation interrupting the cap maintainer thread and the "
"thread successfully shutting down.",
"reason"_attr = _shutdownReason,
"durationMs"_attr = (ts->ticksTo<Milliseconds>(ts->getTicks() - killTime)));
}
{
stdx::lock_guard<stdx::mutex> lk(_opCtxMutex);
admissionPriority.reset();
_uniqueCtx.reset();
}
}); });
if (gOplogSamplingAsyncEnabled) { if (gOplogSamplingAsyncEnabled) {
@ -338,8 +351,12 @@ void OplogCapMaintainerThread::run() {
LOGV2_DEBUG(10621109, 1, "OplogCapMaintainerThread is active"); LOGV2_DEBUG(10621109, 1, "OplogCapMaintainerThread is active");
} while (true); } while (true);
} catch (ExceptionFor<ErrorCategory::Interruption>& ex) { } catch (ExceptionFor<ErrorCategory::Interruption>& ex) {
LOGV2(11212201, "OplogCapMaintainerThread interrupted", "reason"_attr = ex.reason()); LOGV2(11212201, "OplogCapMaintainerThread interrupted", "status"_attr = ex.toStatus());
interruptCount.fetchAndAdd(1); interruptCount.fetchAndAdd(1);
stdx::lock_guard<stdx::mutex> lk(_stateMutex);
_shutdownReason = ex.toStatus();
return; return;
} }
} }
@ -367,8 +384,12 @@ void OplogCapMaintainerThread::run() {
_uniqueCtx->get()->sleepFor( _uniqueCtx->get()->sleepFor(
Seconds(1)); // Back off in case there were problems deleting. Seconds(1)); // Back off in case there were problems deleting.
} catch (ExceptionFor<ErrorCategory::Interruption>& ex) { } catch (ExceptionFor<ErrorCategory::Interruption>& ex) {
LOGV2(11212204, "OplogCapMaintainerThread interrupted", "reason"_attr = ex.reason()); LOGV2(11212204, "OplogCapMaintainerThread interrupted", "status"_attr = ex.toStatus());
interruptCount.fetchAndAdd(1); interruptCount.fetchAndAdd(1);
stdx::lock_guard<stdx::mutex> lk(_stateMutex);
_shutdownReason = ex.toStatus();
return; return;
} catch (...) { } catch (...) {
const auto& err = mongo::exceptionToStatus(); const auto& err = mongo::exceptionToStatus();