SERVER-17942 Log long-running operations during execution (#42619)

Co-authored-by: David Goffredo <david.goffredo@mongodb.com>
GitOrigin-RevId: a2b4c8032cba1955a0bd4fcfe2125bcb0e11f7de
This commit is contained in:
Ivan Fefer 2025-10-21 20:27:07 +02:00 committed by MongoDB Bot
parent 9c8afc8810
commit 8e8e56ab52
9 changed files with 290 additions and 86 deletions

View File

@ -16,7 +16,9 @@ function verifyLoggedNamespace({pipeline, comment}) {
checkLog.containsWithCount(
conn,
RegExp(
`"appName"\:"MongoDB Shell",` +
`"Slow query"` +
".*" +
`"appName"\:"MongoDB Shell",` +
".*" + // leave some space for other keys
`"command"\:{"aggregate"\:"${regexLiteral(source.getName())}",` +
`"comment"\:"${regexLiteral(comment)}"`,

View File

@ -0,0 +1,105 @@
/**
* Confirms that long-running operations are logged once during their progress.
*/
import {findMatchingLogLine} from "jstests/libs/log.js";
function findSlowInProgressQueryLogLine(db, comment) {
const globalLog = assert.commandWorked(db.adminCommand({getLog: "global"}));
return findMatchingLogLine(globalLog.log, {id: 1794200, comment: comment});
}
function assertSlowInProgressQueryLogged(db, comment, expectedPlanSummary) {
const logLine = findSlowInProgressQueryLogLine(db, comment);
assert.neq(null, logLine, "Did not find slow in-progress query log line for " + comment);
const log = JSON.parse(logLine);
assert.eq(log.attr.planSummary, expectedPlanSummary, "Unexpected plan summary in log line: " + logLine);
}
const kDocCount = 200;
// Ensure that we yield often enough to log the "slow" in-progress query.
const conn = MongoRunner.runMongod({setParameter: {internalQueryExecYieldIterations: kDocCount / 10}});
assert.neq(null, conn, "mongod was unable to start up");
const db = conn.getDB("log_slow_in_progress_queries");
const coll = db.test;
assert.commandWorked(db.setLogLevel(1, "command.slowInProgress"));
assert.commandWorked(db.setProfilingLevel(2, {slowms: -1}));
assert.commandWorked(db.dropDatabase());
const docs = [];
for (let i = 0; i < kDocCount; ++i) {
docs.push({a: i});
}
assert.commandWorked(coll.insertMany(docs));
assert.commandWorked(coll.createIndex({a: 1}));
assert.eq(kDocCount, coll.find({}).comment("Collection Scan").itcount());
assertSlowInProgressQueryLogged(db, "Collection Scan", "COLLSCAN");
assert.eq(
kDocCount,
coll
.find({a: {$gte: 0}})
.comment("Index Scan")
.itcount(),
);
assertSlowInProgressQueryLogged(db, "Index Scan", "IXSCAN { a: 1 }");
assert.eq(kDocCount, coll.aggregate([{$match: {a: {$gte: 0}}}], {comment: "Agg Index Scan"}).itcount());
assertSlowInProgressQueryLogged(db, "Agg Index Scan", "IXSCAN { a: 1 }");
assert.eq(
kDocCount,
db.aggregate([{$documents: docs}, {$match: {a: {$gte: 0}}}], {comment: "Agg Documents"}).itcount(),
);
assertSlowInProgressQueryLogged(
db,
"Agg Documents",
undefined /* planSummary is undefined for $documents aggregation */,
);
assert.commandWorked(
db.runCommand({
update: "test",
updates: [{q: {a: {$gte: 0}}, u: {$inc: {u: 1}}, multi: true}],
comment: "Update Index Scan",
}),
);
assertSlowInProgressQueryLogged(db, "Update Index Scan", "IXSCAN { a: 1 }");
assert.commandWorked(
db.runCommand({
delete: "test",
deletes: [{q: {a: {$gte: 0}}, limit: 0}],
comment: "Delete Index Scan",
}),
);
assertSlowInProgressQueryLogged(db, "Delete Index Scan", "IXSCAN { a: 1 }");
assert.commandWorked(coll.insertMany(docs));
assert.commandWorked(db.setProfilingLevel(2, {slowms: -1, filter: {"command.find": {$exists: true}}}));
assert.eq(
kDocCount,
coll
.find({a: {$gte: 0}})
.comment("Find Index Scan With Profile Filter")
.itcount(),
);
assertSlowInProgressQueryLogged(db, "Find Index Scan With Profile Filter", "IXSCAN { a: 1 }");
assert.eq(
kDocCount,
coll.aggregate([{$match: {a: {$gte: 0}}}], {comment: "Agg Index Scan With Profiler Filter"}).itcount(),
);
assert.eq(
null,
findSlowInProgressQueryLogLine(db, "Agg Index Scan With Profiler Filter"),
findSlowInProgressQueryLogLine(db, "Agg Index Scan With Profiler Filter"),
);
MongoRunner.stopMongod(conn);

View File

@ -33,7 +33,6 @@
#include "mongo/base/error_codes.h"
#include "mongo/bson/bsonelement.h"
#include "mongo/bson/bsonmisc.h"
#include "mongo/bson/bsonobjbuilder.h"
#include "mongo/bson/util/builder_fwd.h"
#include "mongo/config.h" // IWYU pragma: keep
@ -434,17 +433,15 @@ void CurOp::setGenericOpRequestDetails(
_nss = std::move(nss);
}
void CurOp::_fetchStorageStatsIfNecessary(Date_t deadline) {
void CurOp::_fetchStorageStatsIfNecessary(Date_t deadline, bool isFinal) {
auto opCtx = this->opCtx();
// Do not fetch operation statistics again if we have already got them (for
// instance, as a part of stashing the transaction). Take a lock before calling into
// the storage engine to prevent racing against a shutdown. Any operation that used
// a storage engine would have at-least held a global lock at one point, hence we
// limit our lock acquisition to such operations. We can get here and our lock
// acquisition be timed out or interrupted, in which case we'll throw. Callers should
// After storage stats are fetched with isFinal flag set to true, they won't be updated for this
// operation. Take a lock before calling into the storage engine to prevent racing against a
// shutdown. Any operation that used a storage engine would have at-least held a global lock at
// one point, hence we limit our lock acquisition to such operations. We can get here and our
// lock acquisition be timed out or interrupted, in which case we'll throw. Callers should
// handle that case, e.g., by logging a message.
if (_debug.storageStats == nullptr &&
shard_role_details::getLocker(opCtx)->wasGlobalLockTaken() &&
if (_allowStorageStatsUpdate && shard_role_details::getLocker(opCtx)->wasGlobalLockTaken() &&
opCtx->getServiceContext()->getStorageEngine()) {
ScopedAdmissionPriority<ExecutionAdmissionContext> admissionControl(
opCtx, AdmissionContext::Priority::kExempt);
@ -453,8 +450,14 @@ void CurOp::_fetchStorageStatsIfNecessary(Date_t deadline) {
deadline,
Lock::InterruptBehavior::kThrow,
Lock::GlobalLockOptions{.skipRSTLLock = true});
_debug.storageStats =
auto storageStats =
shard_role_details::getRecoveryUnit(opCtx)->computeOperationStatisticsSinceLastCall();
if (_debug.storageStats == nullptr) {
_debug.storageStats = std::move(storageStats);
} else if (storageStats != nullptr) {
*_debug.storageStats += *storageStats;
}
_allowStorageStatsUpdate = !isFinal;
}
}
@ -517,7 +520,7 @@ void CurOp::setEndOfOpMetrics(long long nreturned) {
// If we choose a fixed priority other than kExempt (e.g., kNormal), it may
// be lower than the operation's current priority, which would cause an exception to be
// thrown.
_fetchStorageStatsIfNecessary(Date_t::max());
_fetchStorageStatsIfNecessary(Date_t::max(), true);
} catch (DBException& ex) {
LOGV2(8457400,
"Failed to gather storage statistics for query stats",
@ -713,13 +716,81 @@ bool CurOp::shouldCurOpStackOmitDiagnosticInformation(CurOp* curop) {
return false;
}
void CurOp::_updateExecutionTimers() {
_debug.additiveMetrics.executionTime = elapsedTimeExcludingPauses();
auto workingMillis = duration_cast<Milliseconds>(*_debug.additiveMetrics.executionTime) -
(_sumBlockedTimeTotal() - _blockedTimeAtStart);
// Round up to zero if necessary to allow precision errors from FastClockSource used by flow
// control ticketholder.
_debug.workingTimeMillis = std::max(Milliseconds(0), workingMillis);
}
CurOp::ShouldProfileQuery CurOp::_shouldProfileAtLevel1AndLogSlowQuery(
const logv2::LogOptions& logOptions, std::shared_ptr<const ProfileFilter> filter) {
if (filter) {
// Calculate this operation's CPU time before deciding whether logging/profiling is
// necessary only if it is needed for filtering.
if (filter->dependsOn("cpuNanos")) {
calculateCpuTime();
}
const bool passesFilter = filter->matches(opCtx(), _debug, *this);
return ShouldProfileQuery{passesFilter, passesFilter};
} else {
// Log the operation if it is eligible according to the current slowMS and sampleRate
// settings.
const auto [shouldLogSlowOp, shouldSample] =
shouldLogSlowOpWithSampling(opCtx(),
logOptions.component(),
_debug.workingTimeMillis,
Milliseconds(serverGlobalParams.slowMS.load()));
return ShouldProfileQuery{.shouldProfileAtLevel1 = shouldLogSlowOp && shouldSample,
.shouldLogSlowQuery = shouldLogSlowOp};
}
}
logv2::DynamicAttributes CurOp::_reportDebugAndStats(const logv2::LogOptions& logOptions,
bool isFinalStorageStatsUpdate) {
auto* opCtx = this->opCtx();
auto locker = shard_role_details::getLocker(opCtx);
SingleThreadedLockStats lockStats(locker->stats());
try {
// Slow query logs are critical for observability and should not wait for ticket
// acquisition. Slow queries can happen for various reasons; however, if queries
// are slower due to ticket exhaustion, queueing in order to log can compound
// the issue. Hence we pass the kExempt priority to _fetchStorageStatsIfNecessary.
_fetchStorageStatsIfNecessary(Date_t::now() + Milliseconds(500), isFinalStorageStatsUpdate);
} catch (const DBException& ex) {
LOGV2_OPTIONS(20526,
logOptions,
"Failed to gather storage statistics for slow operation",
"opId"_attr = opCtx->getOpID(),
"error"_attr = redact(ex));
}
// Gets the time spent blocked on prepare conflicts.
auto prepareConflictDurationMicros = StorageExecutionContext::get(opCtx)
->getPrepareConflictTracker()
.getThisOpPrepareConflictDuration();
_debug.prepareConflictDurationMillis =
duration_cast<Milliseconds>(prepareConflictDurationMicros);
const auto& storageMetrics = getOperationStorageMetrics();
logv2::DynamicAttributes attr;
_debug.report(opCtx, &lockStats, storageMetrics, getPrepareReadConflicts(), &attr);
return attr;
}
bool CurOp::completeAndLogOperation(const logv2::LogOptions& logOptions,
std::shared_ptr<const ProfileFilter> filter,
boost::optional<size_t> responseLength,
boost::optional<long long> slowMsOverride,
bool forceLog) {
auto opCtx = this->opCtx();
const long long slowMs = slowMsOverride.value_or(serverGlobalParams.slowMS.load());
// Record the size of the response returned to the client, if applicable.
if (responseLength) {
@ -728,9 +799,8 @@ bool CurOp::completeAndLogOperation(const logv2::LogOptions& logOptions,
// Obtain the total execution time of this operation.
done();
_debug.additiveMetrics.executionTime = elapsedTimeExcludingPauses();
const auto executionTimeMillis =
durationCount<Milliseconds>(*_debug.additiveMetrics.executionTime);
_updateExecutionTimers();
if (!opCtx->inMultiDocumentTransaction()) {
// If we're not in a txn, we record information about delinquent ticket acquisitions to the
@ -753,38 +823,11 @@ bool CurOp::completeAndLogOperation(const logv2::LogOptions& logOptions,
}
if (_debug.isReplOplogGetMore) {
oplogGetMoreStats.recordMillis(executionTimeMillis);
}
auto workingMillis =
Milliseconds(executionTimeMillis) - (_sumBlockedTimeTotal() - _blockedTimeAtStart);
// Round up to zero if necessary to allow precision errors from FastClockSource used by flow
// control ticketholder.
_debug.workingTimeMillis = (workingMillis < Milliseconds(0) ? Milliseconds(0) : workingMillis);
bool shouldLogSlowOp, shouldProfileAtLevel1;
if (filter) {
// Calculate this operation's CPU time before deciding whether logging/profiling is
// necessary only if it is needed for filtering.
if (filter->dependsOn("cpuNanos")) {
calculateCpuTime();
}
bool passesFilter = filter->matches(opCtx, _debug, *this);
shouldLogSlowOp = passesFilter;
shouldProfileAtLevel1 = passesFilter;
} else {
// Log the operation if it is eligible according to the current slowMS and sampleRate
// settings.
bool shouldSample;
std::tie(shouldLogSlowOp, shouldSample) = shouldLogSlowOpWithSampling(
opCtx, logOptions.component(), _debug.workingTimeMillis, Milliseconds(slowMs));
shouldProfileAtLevel1 = shouldLogSlowOp && shouldSample;
oplogGetMoreStats.recordMillis(
durationCount<Milliseconds>(*_debug.additiveMetrics.executionTime));
}
const auto [shouldProfileAtLevel1, shouldLogSlowOp] =
_shouldProfileAtLevel1AndLogSlowQuery(logOptions, std::move(filter));
// Defer calculating the CPU time until we know that we actually are going to write it to
// the logs or profiler. The CPU time may have been determined earlier if it was a
@ -794,35 +837,7 @@ bool CurOp::completeAndLogOperation(const logv2::LogOptions& logOptions,
}
if (forceLog || shouldLogSlowOp) {
auto locker = shard_role_details::getLocker(opCtx);
SingleThreadedLockStats lockStats(locker->stats());
try {
// Slow query logs are critical for observability and should not wait for ticket
// acquisition. Slow queries can happen for various reasons; however, if queries
// are slower due to ticket exhaustion, queueing in order to log can compound
// the issue. Hence we pass the kExempt priority to _fetchStorageStatsIfNecessary.
_fetchStorageStatsIfNecessary(Date_t::now() + Milliseconds(500));
} catch (const DBException& ex) {
LOGV2_OPTIONS(20526,
logOptions,
"Failed to gather storage statistics for slow operation",
"opId"_attr = opCtx->getOpID(),
"error"_attr = redact(ex));
}
// Gets the time spent blocked on prepare conflicts.
auto prepareConflictDurationMicros = StorageExecutionContext::get(opCtx)
->getPrepareConflictTracker()
.getThisOpPrepareConflictDuration();
_debug.prepareConflictDurationMillis =
duration_cast<Milliseconds>(prepareConflictDurationMicros);
const auto& storageMetrics = getOperationStorageMetrics();
logv2::DynamicAttributes attr;
_debug.report(opCtx, &lockStats, storageMetrics, getPrepareReadConflicts(), &attr);
logv2::DynamicAttributes attr = _reportDebugAndStats(logOptions, true);
LOGV2_OPTIONS(51803, logOptions, "Slow query", attr);
_checkForFailpointsAfterCommandLogged();
@ -836,6 +851,40 @@ bool CurOp::completeAndLogOperation(const logv2::LogOptions& logOptions,
return shouldProfileAtLevel1;
}
void CurOp::logLongRunningOperationIfNeeded() {
static constexpr int kSlowInProgressLogDebugLevel = 1;
static const logv2::LogOptions kLogOptions{logv2::LogComponent::kCommandSlowInProg};
if (!_eligibleForLongRunningQueryLogging ||
!logv2::shouldLog(kLogOptions.component(),
logv2::LogSeverity::Debug(kSlowInProgressLogDebugLevel))) {
return;
}
if (shouldCurOpStackOmitDiagnosticInformation(this)) {
// Set the flag to true to not repeat checks.
_eligibleForLongRunningQueryLogging = false;
return;
}
_updateExecutionTimers();
std::shared_ptr<const ProfileFilter> filter =
DatabaseProfileSettings::get(opCtx()->getServiceContext())
.getDatabaseProfileSettings(getNSS().dbName())
.filter;
const bool shouldLogSlowOp =
_shouldProfileAtLevel1AndLogSlowQuery(kLogOptions, std::move(filter)).shouldLogSlowQuery;
if (!shouldLogSlowOp) {
return;
}
calculateCpuTime();
logv2::DynamicAttributes attr = _reportDebugAndStats(kLogOptions, false);
LOGV2_DEBUG_OPTIONS(
1794200, kSlowInProgressLogDebugLevel, kLogOptions, "Slow in-progress query", attr);
_eligibleForLongRunningQueryLogging = false;
}
std::string CurOp::getNS() const {
return NamespaceStringUtil::serialize(_nss, SerializationContext::stateDefault());
}

View File

@ -191,6 +191,8 @@ public:
boost::optional<long long> slowMsOverride = boost::none,
bool forceLog = false);
void logLongRunningOperationIfNeeded();
bool haveOpDescription() const {
return !_opDescription.isEmpty();
}
@ -774,6 +776,25 @@ private:
*/
Milliseconds _sumBlockedTimeTotal();
/**
* Updates the values of _debug.additiveMetrics.executionTime and _debug.workingTimeMillis.
*/
void _updateExecutionTimers();
/**
* Returns two boolean values: should the query be profiled at profile level 1 and should the
* query be logged as a "slow query".
*/
struct ShouldProfileQuery {
bool shouldProfileAtLevel1;
bool shouldLogSlowQuery;
};
ShouldProfileQuery _shouldProfileAtLevel1AndLogSlowQuery(
const logv2::LogOptions& logOptions, std::shared_ptr<const ProfileFilter> filter);
logv2::DynamicAttributes _reportDebugAndStats(const logv2::LogOptions& logOptions,
bool isFinalStorageStatsUpdate);
/**
* Handles failpoints that check whether a command has completed or not.
* Used for testing purposes instead of the getLog command.
@ -781,10 +802,11 @@ private:
void _checkForFailpointsAfterCommandLogged();
/**
* Fetches storage stats and stores them in the OpDebug if they're not already present.
* Can throw if interrupted while waiting for the global lock.
* Fetches storage stats and stores them in the OpDebug if they're not already present. If
* parameter isFinal set to true, storage stats won't be updated again. Can throw if interrupted
* while waiting for the global lock.
*/
void _fetchStorageStatsIfNecessary(Date_t deadline);
void _fetchStorageStatsIfNecessary(Date_t deadline, bool isFinal);
static const OperationContext::Decoration<CurOpStack> _curopStack;
@ -888,5 +910,14 @@ private:
// memory from shards.
AtomicWord<int64_t> _inUseTrackedMemoryBytes{0};
AtomicWord<int64_t> _peakTrackedMemoryBytes{0};
// Long running queries are logged only once to avoid excessive logging.
bool _eligibleForLongRunningQueryLogging{true};
// Updating storage stats requires a lock, so to avoid doing it multiple times for a single
// query, it is guarded by this flag. In cases where we know that we are fetching stats for an
// in-progress operation, we can keep it true and allow storage stats to be fetched multiple
// times.
bool _allowStorageStatsUpdate{true};
};
} // namespace mongo

View File

@ -31,6 +31,7 @@
#include "mongo/base/error_codes.h"
#include "mongo/bson/timestamp.h"
#include "mongo/db/commands/feature_compatibility_version.h"
#include "mongo/db/curop.h"
#include "mongo/db/feature_compatibility_version_documentation.h"
#include "mongo/db/feature_flag.h"
#include "mongo/db/operation_context.h"
@ -123,6 +124,16 @@ ExpressionContext::CollatorStash::~CollatorStash() {
_expCtx->setCollator(std::move(_originalCollator));
}
void ExpressionContext::InterruptChecker::checkForInterruptSlow() {
_tick = kInterruptCheckPeriod;
OperationContext* opCtx = _expressionContext->getOperationContext();
invariant(opCtx);
opCtx->checkForInterrupt();
CurOp::get(opCtx)->logLongRunningOperationIfNeeded();
}
std::unique_ptr<ExpressionContext::CollatorStash> ExpressionContext::temporarilyChangeCollator(
std::unique_ptr<CollatorInterface> newCollator) {
// This constructor of CollatorStash is private, so we can't use make_unique().

View File

@ -1194,12 +1194,7 @@ protected:
private:
// Performs the heavy work of checking whether an interrupt has occurred. For performance
// reasons, this should only be called every now and then.
MONGO_COMPILER_NOINLINE void checkForInterruptSlow() {
_tick = kInterruptCheckPeriod;
invariant(_expressionContext->getOperationContext());
_expressionContext->getOperationContext()->checkForInterrupt();
}
void checkForInterruptSlow();
static constexpr int kInterruptCheckPeriod = 128;

View File

@ -57,6 +57,8 @@ YieldPolicyCallbacksImpl::YieldPolicyCallbacksImpl(NamespaceString nssForFailpoi
void YieldPolicyCallbacksImpl::duringYield(OperationContext* opCtx) const {
CurOp::get(opCtx)->yielded();
_tryLogLongRunningQueries(opCtx);
// If we yielded because we encountered the need to refresh the sharding CatalogCache, refresh
// it here while the locks are yielded.
auto& catalogCacheRefreshRequired =
@ -99,6 +101,8 @@ void YieldPolicyCallbacksImpl::duringYield(OperationContext* opCtx) const {
}
void YieldPolicyCallbacksImpl::preCheckInterruptOnly(OperationContext* opCtx) const {
_tryLogLongRunningQueries(opCtx);
// If the 'setInterruptOnlyPlansCheckForInterruptHang' fail point is enabled, set the
// 'failPointMsg' field of this operation's CurOp to signal that we've hit this point.
if (MONGO_unlikely(setInterruptOnlyPlansCheckForInterruptHang.shouldFail())) {
@ -109,4 +113,8 @@ void YieldPolicyCallbacksImpl::preCheckInterruptOnly(OperationContext* opCtx) co
}
}
void YieldPolicyCallbacksImpl::_tryLogLongRunningQueries(OperationContext* opCtx) const {
CurOp::get(opCtx)->logLongRunningOperationIfNeeded();
}
} // namespace mongo

View File

@ -51,6 +51,8 @@ public:
void preCheckInterruptOnly(OperationContext*) const override;
private:
void _tryLogLongRunningQueries(OperationContext*) const;
NamespaceString _nss;
};

View File

@ -54,6 +54,7 @@ namespace mongo::logv2 {
X(kAccessControl, , "accessControl" , "ACCESS" , kDefault) \
X(kAssert, , "assert" , "ASSERT" , kDefault) \
X(kCommand, , "command" , "COMMAND" , kDefault) \
X(kCommandSlowInProg, , "slowInProgress" , "SLOWPROG" , kCommand) \
X(kControl, , "control" , "CONTROL" , kDefault) \
X(kExecutor, , "executor" , "EXECUTOR" , kDefault) \
X(kGeo, , "geo" , "GEO" , kDefault) \