SERVER-94787 Do not abandon snapshot to wait for oplog visibility during oplog scan (#39321)

GitOrigin-RevId: 42bcb53e007c1f99791d1ba535e671dfb454d2a5
This commit is contained in:
Mihai Andrei 2025-09-02 13:30:06 -04:00 committed by MongoDB Bot
parent f77e78090f
commit 4d97cb3620
25 changed files with 399 additions and 64 deletions

2
.github/CODEOWNERS vendored
View File

@ -10,4 +10,4 @@ README.md
jstests/ jstests/
/src/mongo/dbtests/ /src/mongo/dbtests/
/src/mongo/**/*_test.cpp /src/mongo/**/*_test.cpp
/src/mongo/**/*_bm.cpp /src/mongo/**/*_bm.cpp

View File

@ -0,0 +1,13 @@
load("@aspect_rules_js//js:defs.bzl", "js_library")
js_library(
name = "all_javascript_files",
srcs = glob([
"*.js",
]),
target_compatible_with = select({
"//bazel/config:ppc_or_s390x": ["@platforms//:incompatible"],
"//conditions:default": [],
}),
visibility = ["//visibility:public"],
)

View File

@ -0,0 +1,59 @@
/**
* Concurrently runs 'replSetResizeOplog' with inserts and oplog scans and verifies that our oplog
* scans wait for oplog visibility correctly.
*
* @tags: [requires_replication]
*/
export const $config = (function() {
var states = (function() {
function resizeOplog(db, collName) {
const oplogSizeBytes = (20 + Math.floor(50 * Math.random())) * 1024 * 1024;
jsTestLog("Setting " + tojson(oplogSizeBytes));
assert.commandWorked(db.adminCommand({replSetResizeOplog: 1, size: oplogSizeBytes}));
}
function insertDocs(db, collName) {
const numDocs = Math.floor(10 * Math.random());
let docs = [];
for (let i = 0; i < numDocs; i++) {
docs.push({a: i});
}
assert.commandWorked(db[collName].insertMany(docs));
}
function scanOplog(db, collName) {
try {
assert.gte(db.getSiblingDB("local")["oplog.rs"].find().limit(20).itcount(), 0);
} catch (e) {
if (e.code == ErrorCodes.CappedPositionLost) {
return;
} else {
throw e;
}
}
}
return {
resizeOplog: resizeOplog,
insertDocs: insertDocs,
scanOplog: scanOplog,
};
})();
var transitions = {
resizeOplog: {resizeOplog: 0.1, insertDocs: 0.2, scanOplog: 0.7},
insertDocs: {resizeOplog: 0.1, insertDocs: 0.2, scanOplog: 0.7},
scanOplog: {resizeOplog: 0.1, insertDocs: 0.2, scanOplog: 0.7},
};
return {
threadCount: 4,
iterations: 100,
startState: 'insertDocs',
data: {},
states: states,
transitions: transitions,
};
})();

View File

@ -815,6 +815,7 @@ mongo_cc_library(
"//src/mongo/db/catalog:local_oplog_info", "//src/mongo/db/catalog:local_oplog_info",
"//src/mongo/db/stats:top", "//src/mongo/db/stats:top",
"//src/mongo/db/storage:capped_snapshots", "//src/mongo/db/storage:capped_snapshots",
"//src/mongo/db/storage:record_store_base",
"//src/mongo/db/storage:snapshot_helper", "//src/mongo/db/storage:snapshot_helper",
"//src/mongo/db/storage:storage_options", "//src/mongo/db/storage:storage_options",
"//src/mongo/util/concurrency:spin_lock", "//src/mongo/util/concurrency:spin_lock",

View File

@ -1078,6 +1078,7 @@ env.Library(
'shard_role_api', 'shard_role_api',
], ],
LIBDEPS_PRIVATE=[ LIBDEPS_PRIVATE=[
'$BUILD_DIR/mongo/db/storage/record_store_base',
'$BUILD_DIR/mongo/util/concurrency/spin_lock', '$BUILD_DIR/mongo/util/concurrency/spin_lock',
'catalog/collection_uuid_mismatch_info', 'catalog/collection_uuid_mismatch_info',
'catalog/local_oplog_info', 'catalog/local_oplog_info',

View File

@ -615,7 +615,8 @@ LockMode fixLockModeForSystemDotViewsChanges(const NamespaceString& nss, LockMod
ReadSourceScope::ReadSourceScope(OperationContext* opCtx, ReadSourceScope::ReadSourceScope(OperationContext* opCtx,
RecoveryUnit::ReadSource readSource, RecoveryUnit::ReadSource readSource,
boost::optional<Timestamp> provided) boost::optional<Timestamp> provided,
bool waitForOplog)
: _opCtx(opCtx), : _opCtx(opCtx),
_originalReadSource(shard_role_details::getRecoveryUnit(opCtx)->getTimestampReadSource()) { _originalReadSource(shard_role_details::getRecoveryUnit(opCtx)->getTimestampReadSource()) {
// Abandoning the snapshot is unsafe when the snapshot is managed by a lock free read // Abandoning the snapshot is unsafe when the snapshot is managed by a lock free read
@ -628,6 +629,16 @@ ReadSourceScope::ReadSourceScope(OperationContext* opCtx,
} }
shard_role_details::getRecoveryUnit(_opCtx)->abandonSnapshot(); shard_role_details::getRecoveryUnit(_opCtx)->abandonSnapshot();
// Wait for oplog visibility if the caller requested it.
if (waitForOplog) {
LocalOplogInfo* oplogInfo = LocalOplogInfo::get(opCtx);
tassert(9478700, "Should have oplog available at this point", oplogInfo);
tassert(9478705,
"Should have oplog record store available at this point",
oplogInfo->getRecordStore());
oplogInfo->getRecordStore()->waitForAllEarlierOplogWritesToBeVisible(opCtx);
}
shard_role_details::getRecoveryUnit(_opCtx)->setTimestampReadSource(readSource, provided); shard_role_details::getRecoveryUnit(_opCtx)->setTimestampReadSource(readSource, provided);
} }

View File

@ -476,7 +476,8 @@ class ReadSourceScope {
public: public:
ReadSourceScope(OperationContext* opCtx, ReadSourceScope(OperationContext* opCtx,
RecoveryUnit::ReadSource readSource, RecoveryUnit::ReadSource readSource,
boost::optional<Timestamp> provided = boost::none); boost::optional<Timestamp> provided = boost::none,
bool waitForOplog = false);
~ReadSourceScope(); ~ReadSourceScope();
private: private:

View File

@ -164,6 +164,11 @@ CollectionScan::CollectionScan(ExpressionContext* expCtx,
"Expected forward collection scan with 'resumeScanPoint'", "Expected forward collection scan with 'resumeScanPoint'",
params.direction == CollectionScanParams::FORWARD); params.direction == CollectionScanParams::FORWARD);
} }
// Set up 'OplogWaitConfig' if we are scanning the oplog.
if (collPtr && collPtr->ns().isOplog()) {
_oplogWaitConfig = OplogWaitConfig();
}
} }
namespace { namespace {
@ -269,23 +274,21 @@ PlanStage::StageState CollectionScan::doWork(WorkingSetID* out) {
[&] { [&] {
if (needToMakeCursor) { if (needToMakeCursor) {
const bool forward = _params.direction == CollectionScanParams::FORWARD; const bool forward = _params.direction == CollectionScanParams::FORWARD;
if (forward && _params.shouldWaitForOplogVisibility) { if (forward && _params.shouldWaitForOplogVisibility) {
// Forward, non-tailable scans from the oplog need to wait until all oplog tassert(9478714, "Must have oplog wait config configured", _oplogWaitConfig);
// entries before the read begins to be visible. This isn't needed for reverse if (_oplogWaitConfig->shouldWaitForOplogVisibility()) {
// scans because we only hide oplog entries from forward scans, and it isn't tassert(9478701,
// necessary for tailing cursors because they ignore EOF and will eventually see "We should only request yield for a tailable oplog scan",
// all writes. Forward, non-tailable scans are the only case where a meaningful !_params.tailable && collPtr->ns().isOplog());
// EOF will be seen that might not include writes that finished before the read
// started. This also must be done before we create the cursor as that is when
// we establish the endpoint for the cursor. Also call abandonSnapshot to make
// sure that we are using a fresh storage engine snapshot while waiting.
// Otherwise, we will end up reading from the snapshot where the oplog entries
// are not yet visible even after the wait.
invariant(!_params.tailable && collPtr->ns().isOplog());
shard_role_details::getRecoveryUnit(opCtx())->abandonSnapshot(); // Perform wait during yield. Note that we mark this as having waited before
collPtr->getRecordStore()->waitForAllEarlierOplogWritesToBeVisible(opCtx()); // actually waiting so that we can distinguish waiting for oplog visiblity
// from a WriteConflictException when handling this yield.
_oplogWaitConfig->setWaitedForOplogVisibility();
LOGV2_DEBUG(
9478711, 2, "Oplog scan triggering yield to wait for visibility");
return NEED_YIELD;
}
} }
try { try {

View File

@ -44,6 +44,7 @@
#include "mongo/db/matcher/expression.h" #include "mongo/db/matcher/expression.h"
#include "mongo/db/matcher/expression_leaf.h" #include "mongo/db/matcher/expression_leaf.h"
#include "mongo/db/pipeline/expression_context.h" #include "mongo/db/pipeline/expression_context.h"
#include "mongo/db/query/oplog_wait_config.h"
#include "mongo/db/query/plan_executor.h" #include "mongo/db/query/plan_executor.h"
#include "mongo/db/query/stage_types.h" #include "mongo/db/query/stage_types.h"
#include "mongo/db/record_id.h" #include "mongo/db/record_id.h"
@ -96,6 +97,18 @@ public:
return _params.direction; return _params.direction;
} }
const CollectionScanParams& params() const {
return _params;
}
bool initializedCursor() const {
return _cursor != nullptr;
}
OplogWaitConfig* getOplogWaitConfig() {
return _oplogWaitConfig ? &(*_oplogWaitConfig) : nullptr;
}
protected: protected:
void doSaveStateRequiresCollection() final; void doSaveStateRequiresCollection() final;
@ -149,6 +162,10 @@ private:
CollectionScanStats _specificStats; CollectionScanStats _specificStats;
bool _useSeek = false; bool _useSeek = false;
// Coordinates waiting for oplog visibility. Must be initialized if we are doing an oplog scan,
// boost::none otherwise.
boost::optional<OplogWaitConfig> _oplogWaitConfig;
}; };
} // namespace mongo } // namespace mongo

View File

@ -69,9 +69,12 @@ public:
return true; return true;
} }
Status yieldOrInterrupt(OperationContext*, Status yieldOrInterrupt(
std::function<void()> whileYieldingFn, OperationContext*,
RestoreContext::RestoreType restoreType) override { const std::function<void()>& whileYieldingFn,
RestoreContext::RestoreType restoreType,
const std::function<void()>& afterSnapshotAbandonFn = nullptr) override {
return {ErrorCodes::ExceededTimeLimit, "Using AlwaysTimeOutYieldPolicy"}; return {ErrorCodes::ExceededTimeLimit, "Using AlwaysTimeOutYieldPolicy"};
} }
}; };
@ -89,9 +92,11 @@ public:
return true; return true;
} }
Status yieldOrInterrupt(OperationContext*, Status yieldOrInterrupt(
std::function<void()> whileYieldingFn, OperationContext*,
RestoreContext::RestoreType restoreType) override { const std::function<void()>& whileYieldingFn,
RestoreContext::RestoreType restoreType,
const std::function<void()>& afterSnapshotAbandonFn = nullptr) override {
return {ErrorCodes::QueryPlanKilled, "Using AlwaysPlanKilledYieldPolicy"}; return {ErrorCodes::QueryPlanKilled, "Using AlwaysPlanKilledYieldPolicy"};
} }
}; };
@ -109,9 +114,11 @@ public:
return false; return false;
} }
Status yieldOrInterrupt(OperationContext*, Status yieldOrInterrupt(
std::function<void()> whileYieldingFn, OperationContext*,
RestoreContext::RestoreType restoreType) override { const std::function<void()>& whileYieldingFn,
RestoreContext::RestoreType restoreType,
const std::function<void()>& afterSnapshotAbandonFn = nullptr) override {
MONGO_UNREACHABLE; MONGO_UNREACHABLE;
} }
}; };

View File

@ -0,0 +1,71 @@
/**
* Copyright (C) 2025-present MongoDB, Inc.
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the Server Side Public License, version 1,
* as published by MongoDB, Inc.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* Server Side Public License for more details.
*
* You should have received a copy of the Server Side Public License
* along with this program. If not, see
* <http://www.mongodb.com/licensing/server-side-public-license>.
*
* As a special exception, the copyright holders give permission to link the
* code of portions of this program with the OpenSSL library under certain
* conditions as described in each individual source file and distribute
* linked combinations including the program with the OpenSSL library. You
* must comply with the Server Side Public License in all respects for
* all of the code used other than as permitted herein. If you modify file(s)
* with this exception, you may extend this exception to your version of the
* file(s), but you are not obligated to do so. If you do not wish to do so,
* delete this exception statement from your version. If you delete this
* exception statement from all source files in the program, then also delete
* it in the license file.
*/
#pragma once
namespace mongo {
/**
* Tracks whether we are allowed to wait for oplog visibility as well as whether we have waited for
* visiblity.
*/
class OplogWaitConfig {
public:
OplogWaitConfig() = default;
void enableWaitingForOplogVisibility() {
_shouldWaitForVisiblity = true;
}
void setWaitedForOplogVisibility() {
tassert(
9478712, "Cannot wait for oplog visibility if it is disabled", _shouldWaitForVisiblity);
_waitedForOplogVisibility = true;
}
bool shouldWaitForOplogVisibility() const {
return _shouldWaitForVisiblity && !_waitedForOplogVisibility;
}
bool waitedForOplogVisiblity() const {
if (_waitedForOplogVisibility) {
tassert(9478715,
"Cannot wait for oplog visibility if it is disabled",
_shouldWaitForVisiblity);
}
return _waitedForOplogVisibility;
}
private:
// Tracks whether we should wait for oplog visiblity at all.
bool _shouldWaitForVisiblity = false;
// Tracks whether we have waited for oplog visiblity.
bool _waitedForOplogVisibility = false;
};
} // namespace mongo

View File

@ -157,9 +157,26 @@ PlanExecutorImpl::PlanExecutorImpl(OperationContext* opCtx,
// If this PlanExecutor is executing a COLLSCAN, keep a pointer directly to the COLLSCAN // If this PlanExecutor is executing a COLLSCAN, keep a pointer directly to the COLLSCAN
// stage. This is used for change streams in order to keep the the latest oplog timestamp // stage. This is used for change streams in order to keep the the latest oplog timestamp
// and post batch resume token up to date as the oplog scan progresses. // and post batch resume token up to date as the oplog scan progresses. Similarly, this is
// used for oplog scans to coordinate waiting for oplog visiblity.
if (auto collectionScan = getStageByType(_root.get(), STAGE_COLLSCAN)) { if (auto collectionScan = getStageByType(_root.get(), STAGE_COLLSCAN)) {
_collScanStage = static_cast<CollectionScan*>(collectionScan); _collScanStage = static_cast<CollectionScan*>(collectionScan);
if (_nss.isOplog()) {
_oplogWaitConfig = _collScanStage->getOplogWaitConfig();
tassert(9478713,
"Should have '_oplogWaitConfig' if we are scanning the oplog",
_oplogWaitConfig);
// Allow waiting for oplog visiblity if our yield policy supports auto yielding.
if (_yieldPolicy->canAutoYield() &&
_collScanStage->params().shouldWaitForOplogVisibility) {
_oplogWaitConfig->enableWaitingForOplogVisibility();
_afterSnapshotAbandonFn = [&]() {
_waitForAllEarlierOplogWritesToBeVisible();
};
}
}
} }
} }
@ -292,6 +309,40 @@ void doYield(OperationContext* opCtx) {
} }
} // namespace } // namespace
/**
* This function waits for all oplog entries before the read to become visible. This must be done
* before initializing a cursor to perform an oplog scan as that is when we establish the endpoint
* for the cursor. Note that this function can only be called for forward, non-tailable scans.
*/
void PlanExecutorImpl::_waitForAllEarlierOplogWritesToBeVisible() {
tassert(9478702, "This function should not be called outside of oplog scans", nss().isOplog());
tassert(9478703, "This function should not be called outside of oplog scans", _collScanStage);
const auto& params = _collScanStage->params();
if (!(params.direction == CollectionScanParams::FORWARD &&
params.shouldWaitForOplogVisibility)) {
return;
}
if (_collScanStage->initializedCursor()) {
return;
}
tassert(9478704, "This function should not be called on tailable cursors", !params.tailable);
// If we do not have an oplog, we do not wait.
LocalOplogInfo* oplogInfo = LocalOplogInfo::get(_opCtx);
if (!oplogInfo) {
return;
}
RecordStore* oplogRecordStore = oplogInfo->getRecordStore();
if (!oplogRecordStore) {
return;
}
oplogRecordStore->waitForAllEarlierOplogWritesToBeVisible(_opCtx);
}
PlanExecutor::ExecState PlanExecutorImpl::getNext(BSONObj* objOut, RecordId* dlOut) { PlanExecutor::ExecState PlanExecutorImpl::getNext(BSONObj* objOut, RecordId* dlOut) {
const auto state = getNextDocument(&_docOutput, dlOut); const auto state = getNextDocument(&_docOutput, dlOut);
if (objOut && state == ExecState::ADVANCED) { if (objOut && state == ExecState::ADVANCED) {
@ -353,8 +404,10 @@ PlanExecutor::ExecState PlanExecutorImpl::_getNextImpl(Snapshotted<Document>* ob
}; };
if (_yieldPolicy->shouldYieldOrInterrupt(_opCtx)) { if (_yieldPolicy->shouldYieldOrInterrupt(_opCtx)) {
uassertStatusOK(_yieldPolicy->yieldOrInterrupt( uassertStatusOK(_yieldPolicy->yieldOrInterrupt(_opCtx,
_opCtx, whileYieldingFn, RestoreContext::RestoreType::kYield)); whileYieldingFn,
RestoreContext::RestoreType::kYield,
_afterSnapshotAbandonFn));
} }
WorkingSetID id = WorkingSet::INVALID_ID; WorkingSetID id = WorkingSet::INVALID_ID;
@ -466,9 +519,9 @@ void PlanExecutorImpl::_handleNeedYield(size_t& writeConflictsInARow,
ExceptionFor<ErrorCodes::TemporarilyUnavailable>( ExceptionFor<ErrorCodes::TemporarilyUnavailable>(
Status(ErrorCodes::TemporarilyUnavailable, "temporarily unavailable")), Status(ErrorCodes::TemporarilyUnavailable, "temporarily unavailable")),
writeConflictsInARow); writeConflictsInARow);
} else if (!_oplogWaitConfig || !_oplogWaitConfig->waitedForOplogVisiblity()) {
} else { // If we didn't wait for oplog visiblity, then we must be yielding because of a
// We're yielding because of a WriteConflictException. // WriteConflictException.
if (!_yieldPolicy->canAutoYield() || if (!_yieldPolicy->canAutoYield() ||
MONGO_unlikely(skipWriteConflictRetries.shouldFail())) { MONGO_unlikely(skipWriteConflictRetries.shouldFail())) {
throwWriteConflictException( throwWriteConflictException(

View File

@ -52,6 +52,7 @@
#include "mongo/db/ops/update_result.h" #include "mongo/db/ops/update_result.h"
#include "mongo/db/pipeline/expression_context.h" #include "mongo/db/pipeline/expression_context.h"
#include "mongo/db/query/canonical_query.h" #include "mongo/db/query/canonical_query.h"
#include "mongo/db/query/oplog_wait_config.h"
#include "mongo/db/query/plan_executor.h" #include "mongo/db/query/plan_executor.h"
#include "mongo/db/query/plan_explainer.h" #include "mongo/db/query/plan_explainer.h"
#include "mongo/db/query/plan_yield_policy.h" #include "mongo/db/query/plan_yield_policy.h"
@ -227,15 +228,22 @@ private:
bool _handleEOFAndExit(PlanStage::StageState code, bool _handleEOFAndExit(PlanStage::StageState code,
std::unique_ptr<insert_listener::Notifier>& notifier); std::unique_ptr<insert_listener::Notifier>& notifier);
MONGO_COMPILER_ALWAYS_INLINE void _checkIfMustYield(std::function<void()> whileYieldingFn) { // Function which waits for oplog visiblity. It assumes that it is invoked following snapshot
// abandonment, but before yielding any resources.
void _waitForAllEarlierOplogWritesToBeVisible();
MONGO_COMPILER_ALWAYS_INLINE void _checkIfMustYield(
const std::function<void()>& whileYieldingFn) {
// These are the conditions which can cause us to yield: // These are the conditions which can cause us to yield:
// 1) The yield policy's timer elapsed, or // 1) The yield policy's timer elapsed, or
// 2) some stage requested a yield, or // 2) some stage requested a yield, or
// 3) we need to yield and retry due to a WriteConflictException. // 3) we need to yield and retry due to a WriteConflictException.
// In all cases, the actual yielding happens here. // In all cases, the actual yielding happens here.
if (_yieldPolicy->shouldYieldOrInterrupt(_opCtx)) { if (_yieldPolicy->shouldYieldOrInterrupt(_opCtx)) {
uassertStatusOK(_yieldPolicy->yieldOrInterrupt( uassertStatusOK(_yieldPolicy->yieldOrInterrupt(_opCtx,
_opCtx, whileYieldingFn, RestoreContext::RestoreType::kYield)); whileYieldingFn,
RestoreContext::RestoreType::kYield,
_afterSnapshotAbandonFn));
} }
} }
@ -293,7 +301,15 @@ private:
// otherwise. We cache it to avoid the need to traverse the execution tree in runtime when the // otherwise. We cache it to avoid the need to traverse the execution tree in runtime when the
// executor is requested to return the oplog tracking info. Since this info is provided by // executor is requested to return the oplog tracking info. Since this info is provided by
// either of these stages, the executor will simply delegate the request to the cached stage. // either of these stages, the executor will simply delegate the request to the cached stage.
const CollectionScan* _collScanStage{nullptr}; CollectionScan* _collScanStage{nullptr};
// Used to coordinate waiting for oplog visiblity. Note that this is owned by the collection
// scan (if one exists). Initialized only if this executor is doing a collection scan over the
// oplog, nullptr otherwise.
OplogWaitConfig* _oplogWaitConfig{nullptr};
// Function used to wait for oplog visibility in between snapshot abandonment and
std::function<void()> _afterSnapshotAbandonFn{nullptr};
}; };
} // namespace mongo } // namespace mongo

View File

@ -118,8 +118,9 @@ void PlanYieldPolicy::resetTimer() {
} }
Status PlanYieldPolicy::yieldOrInterrupt(OperationContext* opCtx, Status PlanYieldPolicy::yieldOrInterrupt(OperationContext* opCtx,
std::function<void()> whileYieldingFn, const std::function<void()>& whileYieldingFn,
RestoreContext::RestoreType restoreType) { RestoreContext::RestoreType restoreType,
const std::function<void()>& afterSnapshotAbandonFn) {
invariant(opCtx); invariant(opCtx);
// After we finish yielding (or in any early return), call resetTimer() to prevent yielding // After we finish yielding (or in any early return), call resetTimer() to prevent yielding
@ -168,9 +169,12 @@ Status PlanYieldPolicy::yieldOrInterrupt(OperationContext* opCtx,
// snapshot. // snapshot.
invariant(!opCtx->isLockFreeReadsOp()); invariant(!opCtx->isLockFreeReadsOp());
shard_role_details::getRecoveryUnit(opCtx)->abandonSnapshot(); shard_role_details::getRecoveryUnit(opCtx)->abandonSnapshot();
if (afterSnapshotAbandonFn) {
afterSnapshotAbandonFn();
}
} else { } else {
if (usesCollectionAcquisitions()) { if (usesCollectionAcquisitions()) {
performYieldWithAcquisitions(opCtx, whileYieldingFn); performYieldWithAcquisitions(opCtx, whileYieldingFn, afterSnapshotAbandonFn);
} else { } else {
const Yieldable* yieldablePtr = get<const Yieldable*>(yieldable); const Yieldable* yieldablePtr = get<const Yieldable*>(yieldable);
tassert(9762900, tassert(9762900,
@ -178,7 +182,7 @@ Status PlanYieldPolicy::yieldOrInterrupt(OperationContext* opCtx,
<< "no yieldable object available for yield policy " << "no yieldable object available for yield policy "
<< serializeYieldPolicy(getPolicy()) << " in attempt " << attempt, << serializeYieldPolicy(getPolicy()) << " in attempt " << attempt,
yieldablePtr); yieldablePtr);
performYield(opCtx, *yieldablePtr, whileYieldingFn); performYield(opCtx, *yieldablePtr, whileYieldingFn, afterSnapshotAbandonFn);
} }
} }
@ -205,7 +209,8 @@ Status PlanYieldPolicy::yieldOrInterrupt(OperationContext* opCtx,
void PlanYieldPolicy::performYield(OperationContext* opCtx, void PlanYieldPolicy::performYield(OperationContext* opCtx,
const Yieldable& yieldable, const Yieldable& yieldable,
std::function<void()> whileYieldingFn) { std::function<void()> whileYieldingFn,
std::function<void()> afterSnapshotAbandonFn) {
// Things have to happen here in a specific order: // Things have to happen here in a specific order:
// * Release 'yieldable'. // * Release 'yieldable'.
// * Abandon the current storage engine snapshot. // * Abandon the current storage engine snapshot.
@ -231,6 +236,11 @@ void PlanYieldPolicy::performYield(OperationContext* opCtx,
opCtx->checkForInterrupt(); // throws opCtx->checkForInterrupt(); // throws
} }
// After we've abandoned our snapshot, perform any work before releasing locks.
if (afterSnapshotAbandonFn) {
afterSnapshotAbandonFn();
}
Locker* locker = shard_role_details::getLocker(opCtx); Locker* locker = shard_role_details::getLocker(opCtx);
Locker::LockSnapshot snapshot; Locker::LockSnapshot snapshot;
locker->saveLockStateAndUnlock(&snapshot); locker->saveLockStateAndUnlock(&snapshot);
@ -252,7 +262,8 @@ void PlanYieldPolicy::performYield(OperationContext* opCtx,
} }
void PlanYieldPolicy::performYieldWithAcquisitions(OperationContext* opCtx, void PlanYieldPolicy::performYieldWithAcquisitions(OperationContext* opCtx,
std::function<void()> whileYieldingFn) { std::function<void()> whileYieldingFn,
std::function<void()> afterSnapshotAbandonFn) {
// Things have to happen here in a specific order: // Things have to happen here in a specific order:
// * Abandon the current storage engine snapshot. // * Abandon the current storage engine snapshot.
// * Check for interrupt if the yield policy requires. // * Check for interrupt if the yield policy requires.
@ -271,6 +282,11 @@ void PlanYieldPolicy::performYieldWithAcquisitions(OperationContext* opCtx,
opCtx->checkForInterrupt(); // throws opCtx->checkForInterrupt(); // throws
} }
// After we've abandoned our snapshot, perform any work before yielding transaction resources.
if (afterSnapshotAbandonFn) {
afterSnapshotAbandonFn();
}
auto yieldedTransactionResources = yieldTransactionResourcesFromOperationContext(opCtx); auto yieldedTransactionResources = yieldTransactionResourcesFromOperationContext(opCtx);
ScopeGuard yieldFailedScopeGuard( ScopeGuard yieldFailedScopeGuard(
[&] { yieldedTransactionResources.transitionTransactionResourcesToFailedState(opCtx); }); [&] { yieldedTransactionResources.transitionTransactionResourcesToFailedState(opCtx); });

View File

@ -240,8 +240,9 @@ public:
* been relinquished. * been relinquished.
*/ */
virtual Status yieldOrInterrupt(OperationContext* opCtx, virtual Status yieldOrInterrupt(OperationContext* opCtx,
std::function<void()> whileYieldingFn, const std::function<void()>& whileYieldingFn,
RestoreContext::RestoreType restoreType); RestoreContext::RestoreType restoreType,
const std::function<void()>& afterSnapshotAbandonFn = nullptr);
/** /**
* All calls to shouldYieldOrInterrupt() will return true until the next call to * All calls to shouldYieldOrInterrupt() will return true until the next call to
@ -340,9 +341,11 @@ private:
*/ */
void performYield(OperationContext* opCtx, void performYield(OperationContext* opCtx,
const Yieldable& yieldable, const Yieldable& yieldable,
std::function<void()> whileYieldingFn); std::function<void()> whileYieldingFn,
std::function<void()> afterSnapshotAbandonFn);
void performYieldWithAcquisitions(OperationContext* opCtx, void performYieldWithAcquisitions(OperationContext* opCtx,
std::function<void()> whileYieldingFn); std::function<void()> whileYieldingFn,
std::function<void()> afterSnapshotAbandonFn);
const YieldPolicy _policy; const YieldPolicy _policy;
std::variant<const Yieldable*, YieldThroughAcquisitions> _yieldable; std::variant<const Yieldable*, YieldThroughAcquisitions> _yieldable;

View File

@ -760,8 +760,9 @@ bool shouldWaitForOplogVisibility(OperationContext* opCtx,
// visibility timestamp to be updated, it would wait for a replication batch that would never // visibility timestamp to be updated, it would wait for a replication batch that would never
// complete because it couldn't reacquire its own lock, the global lock held by the waiting // complete because it couldn't reacquire its own lock, the global lock held by the waiting
// reader. // reader.
return repl::ReplicationCoordinator::get(opCtx)->canAcceptWritesForDatabase( auto* replCoord = repl::ReplicationCoordinator::get(opCtx);
opCtx, DatabaseName::kAdmin); return replCoord->canAcceptWritesForDatabase(opCtx, DatabaseName::kAdmin) &&
replCoord->getSettings().isReplSet();
} }
} // namespace mongo } // namespace mongo

View File

@ -268,8 +268,17 @@ Status createIndexFromSpec(OperationContext* opCtx,
*/ */
class OneOffRead { class OneOffRead {
public: public:
OneOffRead(OperationContext* opCtx, const Timestamp& ts) : _opCtx(opCtx) { OneOffRead(OperationContext* opCtx, const Timestamp& ts, bool waitForOplog = false)
: _opCtx(opCtx) {
shard_role_details::getRecoveryUnit(_opCtx)->abandonSnapshot(); shard_role_details::getRecoveryUnit(_opCtx)->abandonSnapshot();
if (waitForOplog) {
LocalOplogInfo* oplogInfo = LocalOplogInfo::get(opCtx);
// Oplog should be available in this test.
invariant(oplogInfo);
invariant(oplogInfo->getRecordStore());
oplogInfo->getRecordStore()->waitForAllEarlierOplogWritesToBeVisible(opCtx);
}
if (ts.isNull()) { if (ts.isNull()) {
shard_role_details::getRecoveryUnit(_opCtx)->setTimestampReadSource( shard_role_details::getRecoveryUnit(_opCtx)->setTimestampReadSource(
RecoveryUnit::ReadSource::kNoTimestamp); RecoveryUnit::ReadSource::kNoTimestamp);
@ -422,7 +431,7 @@ public:
} }
void dumpOplog() { void dumpOplog() {
OneOffRead oor(_opCtx, Timestamp::min()); OneOffRead oor(_opCtx, Timestamp::min(), true /* waitForOplog */);
shard_role_details::getRecoveryUnit(_opCtx)->beginUnitOfWork(_opCtx->readOnly()); shard_role_details::getRecoveryUnit(_opCtx)->beginUnitOfWork(_opCtx->readOnly());
LOGV2(8423335, "Dumping oplog collection"); LOGV2(8423335, "Dumping oplog collection");
AutoGetCollectionForRead oplogRaii(_opCtx, NamespaceString::kRsOplogNamespace); AutoGetCollectionForRead oplogRaii(_opCtx, NamespaceString::kRsOplogNamespace);
@ -569,12 +578,12 @@ public:
} }
BSONObj queryOplog(const BSONObj& query) { BSONObj queryOplog(const BSONObj& query) {
OneOffRead oor(_opCtx, Timestamp::min()); OneOffRead oor(_opCtx, Timestamp::min(), true /* waitForOplog */);
return queryCollection(NamespaceString::kRsOplogNamespace, query); return queryCollection(NamespaceString::kRsOplogNamespace, query);
} }
Timestamp getTopOfOplog() { Timestamp getTopOfOplog() {
OneOffRead oor(_opCtx, Timestamp::min()); OneOffRead oor(_opCtx, Timestamp::min(), true /* waitForOplog */);
BSONObj ret; BSONObj ret;
ASSERT_TRUE(Helpers::getLast(_opCtx, NamespaceString::kRsOplogNamespace, ret)); ASSERT_TRUE(Helpers::getLast(_opCtx, NamespaceString::kRsOplogNamespace, ret));
return ret["ts"].timestamp(); return ret["ts"].timestamp();
@ -641,7 +650,7 @@ public:
void assertOplogDocumentExistsAtTimestamp(const BSONObj& query, void assertOplogDocumentExistsAtTimestamp(const BSONObj& query,
const Timestamp& ts, const Timestamp& ts,
bool exists) { bool exists) {
OneOffRead oor(_opCtx, ts); OneOffRead oor(_opCtx, ts, true);
BSONObj ret; BSONObj ret;
bool found = Helpers::findOne( bool found = Helpers::findOne(
_opCtx, _opCtx,

View File

@ -511,6 +511,14 @@ TEST_F(TenantOplogApplierTest, NoOpsForLargeRetryableApplyOps) {
_oplogBuffer.shutdown(_opCtx.get()); _oplogBuffer.shutdown(_opCtx.get());
applier->join(); applier->join();
// Before reading from the oplog, wait for oplog visibility.
LocalOplogInfo* oplogInfo = LocalOplogInfo::get(_opCtx.get());
// Oplog should be available in this test.
invariant(oplogInfo);
invariant(oplogInfo->getRecordStore());
oplogInfo->getRecordStore()->waitForAllEarlierOplogWritesToBeVisible(_opCtx.get());
// The session path in TenantOplogApplier bypasses the opObserver, so we can only read // The session path in TenantOplogApplier bypasses the opObserver, so we can only read
// the entries from the oplog. // the entries from the oplog.
CollectionReader oplogReader(_opCtx.get(), NamespaceString::kRsOplogNamespace); CollectionReader oplogReader(_opCtx.get(), NamespaceString::kRsOplogNamespace);

View File

@ -141,8 +141,17 @@ repl::MutableOplogEntry makeOplog(const NamespaceString& nss,
*/ */
class OneOffRead { class OneOffRead {
public: public:
OneOffRead(OperationContext* opCtx, const Timestamp& ts) : _opCtx(opCtx) { OneOffRead(OperationContext* opCtx, const Timestamp& ts, bool waitForOplog = false)
: _opCtx(opCtx) {
shard_role_details::getRecoveryUnit(_opCtx)->abandonSnapshot(); shard_role_details::getRecoveryUnit(_opCtx)->abandonSnapshot();
if (waitForOplog) {
LocalOplogInfo* oplogInfo = LocalOplogInfo::get(opCtx);
// Oplog should be available in this test.
invariant(oplogInfo);
invariant(oplogInfo->getRecordStore());
oplogInfo->getRecordStore()->waitForAllEarlierOplogWritesToBeVisible(opCtx);
}
if (ts.isNull()) { if (ts.isNull()) {
shard_role_details::getRecoveryUnit(_opCtx)->setTimestampReadSource( shard_role_details::getRecoveryUnit(_opCtx)->setTimestampReadSource(
RecoveryUnit::ReadSource::kNoTimestamp); RecoveryUnit::ReadSource::kNoTimestamp);
@ -269,7 +278,7 @@ public:
} }
BSONObj queryOplog(const BSONObj& query) { BSONObj queryOplog(const BSONObj& query) {
OneOffRead oor(_opCtx, Timestamp::min()); OneOffRead oor(_opCtx, Timestamp::min(), true);
return queryCollection(NamespaceString::kRsOplogNamespace, query); return queryCollection(NamespaceString::kRsOplogNamespace, query);
} }
@ -286,7 +295,7 @@ public:
} }
int itcount(NamespaceString nss) { int itcount(NamespaceString nss) {
OneOffRead oof(_opCtx, Timestamp::min()); OneOffRead oof(_opCtx, Timestamp::min(), nss.isOplog());
AutoGetCollectionForRead autoColl(_opCtx, nss); AutoGetCollectionForRead autoColl(_opCtx, nss);
auto cursor = autoColl.getCollection()->getCursor(_opCtx); auto cursor = autoColl.getCollection()->getCursor(_opCtx);

View File

@ -539,6 +539,12 @@ protected:
} }
Timestamp getLatestOplogTimestamp(OperationContext* opCtx) { Timestamp getLatestOplogTimestamp(OperationContext* opCtx) {
LocalOplogInfo* oplogInfo = LocalOplogInfo::get(opCtx);
// Oplog should be available in this test.
invariant(oplogInfo);
invariant(oplogInfo->getRecordStore());
oplogInfo->getRecordStore()->waitForAllEarlierOplogWritesToBeVisible(opCtx);
DBDirectClient client(opCtx); DBDirectClient client(opCtx);
FindCommandRequest findRequest{NamespaceString::kRsOplogNamespace}; FindCommandRequest findRequest{NamespaceString::kRsOplogNamespace};
@ -555,6 +561,12 @@ protected:
Timestamp ts) { Timestamp ts) {
std::vector<repl::DurableOplogEntry> result; std::vector<repl::DurableOplogEntry> result;
LocalOplogInfo* oplogInfo = LocalOplogInfo::get(opCtx);
// Oplog should be available in this test.
invariant(oplogInfo);
invariant(oplogInfo->getRecordStore());
oplogInfo->getRecordStore()->waitForAllEarlierOplogWritesToBeVisible(opCtx);
PersistentTaskStore<repl::OplogEntryBase> store(NamespaceString::kRsOplogNamespace); PersistentTaskStore<repl::OplogEntryBase> store(NamespaceString::kRsOplogNamespace);
store.forEach(opCtx, BSON("ts" << BSON("$gt" << ts)), [&](const auto& oplogEntry) { store.forEach(opCtx, BSON("ts" << BSON("$gt" << ts)), [&](const auto& oplogEntry) {
result.emplace_back( result.emplace_back(

View File

@ -196,6 +196,14 @@ public:
repl::OplogEntry getOplog(OperationContext* opCtx, const repl::OpTime& opTime) { repl::OplogEntry getOplog(OperationContext* opCtx, const repl::OpTime& opTime) {
DBDirectClient client(opCtx); DBDirectClient client(opCtx);
LocalOplogInfo* oplogInfo = LocalOplogInfo::get(opCtx);
// Oplog should be available in this test.
invariant(oplogInfo);
invariant(oplogInfo->getRecordStore());
oplogInfo->getRecordStore()->waitForAllEarlierOplogWritesToBeVisible(opCtx);
auto oplogBSON = client.findOne(NamespaceString::kRsOplogNamespace, opTime.asQuery()); auto oplogBSON = client.findOne(NamespaceString::kRsOplogNamespace, opTime.asQuery());
ASSERT_FALSE(oplogBSON.isEmpty()); ASSERT_FALSE(oplogBSON.isEmpty());

View File

@ -67,8 +67,9 @@ class TransactionHistoryIterator : public TransactionHistoryIteratorBase {
public: public:
/** /**
* Creates a new iterator starting with an oplog entry with the given start opTime. * Creates a new iterator starting with an oplog entry with the given start opTime.
* TODO SERVER-104970: If permitYield can't be deleted, change the default to 'false'.
*/ */
TransactionHistoryIterator(repl::OpTime startingOpTime, bool permitYield = false); TransactionHistoryIterator(repl::OpTime startingOpTime, bool permitYield = true);
~TransactionHistoryIterator() override = default; ~TransactionHistoryIterator() override = default;
bool hasNext() const override; bool hasNext() const override;
@ -84,6 +85,7 @@ private:
// Clients can set this to allow PlanExecutors created by this TransactionHistoryIterator to // Clients can set this to allow PlanExecutors created by this TransactionHistoryIterator to
// have a YIELD_AUTO yield policy. It is only safe to set this if next() will never be called // have a YIELD_AUTO yield policy. It is only safe to set this if next() will never be called
// while holding a lock that should not be yielded. // while holding a lock that should not be yielded.
// TODO SERVER-104970: Determine whether this can be removed.
bool _permitYield; bool _permitYield;
repl::OpTime _nextOpTime; repl::OpTime _nextOpTime;

View File

@ -118,7 +118,7 @@ TEST_F(SessionHistoryIteratorTest, NormalHistory) {
repl::OpTime(Timestamp(67, 54801), 2)); // optime of previous write in transaction repl::OpTime(Timestamp(67, 54801), 2)); // optime of previous write in transaction
insertOplogEntry(entry4); insertOplogEntry(entry4);
TransactionHistoryIterator iter(repl::OpTime(Timestamp(97, 2472), 2)); TransactionHistoryIterator iter(repl::OpTime(Timestamp(97, 2472), 2), true);
{ {
ASSERT_TRUE(iter.hasNext()); ASSERT_TRUE(iter.hasNext());
@ -163,7 +163,7 @@ TEST_F(SessionHistoryIteratorTest, NextShouldAssertIfHistoryIsTruncated) {
insertOplogEntry(entry); insertOplogEntry(entry);
repl::OpTime opTime(Timestamp(67, 54801), 2); repl::OpTime opTime(Timestamp(67, 54801), 2);
TransactionHistoryIterator iter(opTime); TransactionHistoryIterator iter(opTime, true);
ASSERT_TRUE(iter.hasNext()); ASSERT_TRUE(iter.hasNext());
auto nextEntry = iter.next(opCtx()); auto nextEntry = iter.next(opCtx());
@ -181,7 +181,7 @@ TEST_F(SessionHistoryIteratorTest, OplogInWriteHistoryChainWithMissingPrevTSShou
boost::none); // optime of previous write in transaction boost::none); // optime of previous write in transaction
insertOplogEntry(entry); insertOplogEntry(entry);
TransactionHistoryIterator iter(repl::OpTime(Timestamp(67, 54801), 2)); TransactionHistoryIterator iter(repl::OpTime(Timestamp(67, 54801), 2), true);
ASSERT_TRUE(iter.hasNext()); ASSERT_TRUE(iter.hasNext());
ASSERT_THROWS_CODE(iter.next(opCtx()), AssertionException, ErrorCodes::FailedToParse); ASSERT_THROWS_CODE(iter.next(opCtx()), AssertionException, ErrorCodes::FailedToParse);
} }

View File

@ -352,7 +352,8 @@ ActiveTransactionHistory fetchActiveTransactionHistory(OperationContext* opCtx,
// Restore the current timestamp read source after fetching transaction history, which may // Restore the current timestamp read source after fetching transaction history, which may
// change our ReadSource. // change our ReadSource.
ReadSourceScope readSourceScope(opCtx, RecoveryUnit::ReadSource::kNoTimestamp); ReadSourceScope readSourceScope(
opCtx, RecoveryUnit::ReadSource::kNoTimestamp, boost::none, true /* waitForOplog */);
auto originalReadConcern = auto originalReadConcern =
std::exchange(repl::ReadConcernArgs::get(opCtx), repl::ReadConcernArgs()); std::exchange(repl::ReadConcernArgs::get(opCtx), repl::ReadConcernArgs());
ON_BLOCK_EXIT([&] { repl::ReadConcernArgs::get(opCtx) = std::move(originalReadConcern); }); ON_BLOCK_EXIT([&] { repl::ReadConcernArgs::get(opCtx) = std::move(originalReadConcern); });
@ -3621,6 +3622,13 @@ boost::optional<repl::OplogEntry> TransactionParticipant::Participant::checkStat
// Use a SideTransactionBlock since it is illegal to scan the oplog while in a write unit of // Use a SideTransactionBlock since it is illegal to scan the oplog while in a write unit of
// work. // work.
TransactionParticipant::SideTransactionBlock sideTxn(opCtx); TransactionParticipant::SideTransactionBlock sideTxn(opCtx);
// Before opening the storage snapshot (and before scanning the oplog), wait for all
// earlier oplog writes to be visible. This is necessary because the transaction history
// iterator will not be able to abandon the storage snapshot and wait.
auto storageInterface = repl::StorageInterface::get(opCtx);
storageInterface->waitForAllEarlierOplogWritesToBeVisible(opCtx);
TransactionHistoryIterator txnIter(*stmtOpTime); TransactionHistoryIterator txnIter(*stmtOpTime);
while (txnIter.hasNext()) { while (txnIter.hasNext()) {
const auto entry = txnIter.next(opCtx); const auto entry = txnIter.next(opCtx);

View File

@ -285,8 +285,14 @@ protected:
std::vector<BSONObj> ops; std::vector<BSONObj> ops;
{ {
DBDirectClient db(&_opCtx); DBDirectClient db(&_opCtx);
auto cursor = db.find( LocalOplogInfo* oplogInfo = LocalOplogInfo::get(&_opCtx);
FindCommandRequest{NamespaceString::createNamespaceString_forTest(cllNS())});
// Oplog should be available in this test.
invariant(oplogInfo);
invariant(oplogInfo->getRecordStore());
oplogInfo->getRecordStore()->waitForAllEarlierOplogWritesToBeVisible(&_opCtx);
auto cursor = db.find(FindCommandRequest{NamespaceString::createNamespaceString_forTest(
cllNS())}); // Read all ops from the oplog.
while (cursor->more()) { while (cursor->more()) {
ops.push_back(cursor->nextSafe()); ops.push_back(cursor->nextSafe());
} }