diff --git a/.github/CODEOWNERS b/.github/CODEOWNERS index 1121403293f..97ba0c65c75 100644 --- a/.github/CODEOWNERS +++ b/.github/CODEOWNERS @@ -10,4 +10,4 @@ README.md jstests/ /src/mongo/dbtests/ /src/mongo/**/*_test.cpp -/src/mongo/**/*_bm.cpp \ No newline at end of file +/src/mongo/**/*_bm.cpp diff --git a/jstests/concurrency/fsm_workloads/query/yield/BUILD.bazel b/jstests/concurrency/fsm_workloads/query/yield/BUILD.bazel new file mode 100644 index 00000000000..e27c62dadcc --- /dev/null +++ b/jstests/concurrency/fsm_workloads/query/yield/BUILD.bazel @@ -0,0 +1,13 @@ +load("@aspect_rules_js//js:defs.bzl", "js_library") + +js_library( + name = "all_javascript_files", + srcs = glob([ + "*.js", + ]), + target_compatible_with = select({ + "//bazel/config:ppc_or_s390x": ["@platforms//:incompatible"], + "//conditions:default": [], + }), + visibility = ["//visibility:public"], +) diff --git a/jstests/concurrency/fsm_workloads/query/yield/repl_set_resize_oplog.js b/jstests/concurrency/fsm_workloads/query/yield/repl_set_resize_oplog.js new file mode 100644 index 00000000000..e5b52c2bfe8 --- /dev/null +++ b/jstests/concurrency/fsm_workloads/query/yield/repl_set_resize_oplog.js @@ -0,0 +1,59 @@ +/** + * Concurrently runs 'replSetResizeOplog' with inserts and oplog scans and verifies that our oplog + * scans wait for oplog visibility correctly. + * + * @tags: [requires_replication] + */ + +export const $config = (function() { + var states = (function() { + function resizeOplog(db, collName) { + const oplogSizeBytes = (20 + Math.floor(50 * Math.random())) * 1024 * 1024; + jsTestLog("Setting " + tojson(oplogSizeBytes)); + assert.commandWorked(db.adminCommand({replSetResizeOplog: 1, size: oplogSizeBytes})); + } + + function insertDocs(db, collName) { + const numDocs = Math.floor(10 * Math.random()); + let docs = []; + for (let i = 0; i < numDocs; i++) { + docs.push({a: i}); + } + + assert.commandWorked(db[collName].insertMany(docs)); + } + + function scanOplog(db, collName) { + try { + assert.gte(db.getSiblingDB("local")["oplog.rs"].find().limit(20).itcount(), 0); + } catch (e) { + if (e.code == ErrorCodes.CappedPositionLost) { + return; + } else { + throw e; + } + } + } + + return { + resizeOplog: resizeOplog, + insertDocs: insertDocs, + scanOplog: scanOplog, + }; + })(); + + var transitions = { + resizeOplog: {resizeOplog: 0.1, insertDocs: 0.2, scanOplog: 0.7}, + insertDocs: {resizeOplog: 0.1, insertDocs: 0.2, scanOplog: 0.7}, + scanOplog: {resizeOplog: 0.1, insertDocs: 0.2, scanOplog: 0.7}, + }; + + return { + threadCount: 4, + iterations: 100, + startState: 'insertDocs', + data: {}, + states: states, + transitions: transitions, + }; +})(); diff --git a/src/mongo/db/BUILD.bazel b/src/mongo/db/BUILD.bazel index 694adcc347d..f423da5f693 100644 --- a/src/mongo/db/BUILD.bazel +++ b/src/mongo/db/BUILD.bazel @@ -815,6 +815,7 @@ mongo_cc_library( "//src/mongo/db/catalog:local_oplog_info", "//src/mongo/db/stats:top", "//src/mongo/db/storage:capped_snapshots", + "//src/mongo/db/storage:record_store_base", "//src/mongo/db/storage:snapshot_helper", "//src/mongo/db/storage:storage_options", "//src/mongo/util/concurrency:spin_lock", diff --git a/src/mongo/db/SConscript b/src/mongo/db/SConscript index 1c152ac50c4..3e396dfe3f2 100644 --- a/src/mongo/db/SConscript +++ b/src/mongo/db/SConscript @@ -1078,6 +1078,7 @@ env.Library( 'shard_role_api', ], LIBDEPS_PRIVATE=[ + '$BUILD_DIR/mongo/db/storage/record_store_base', '$BUILD_DIR/mongo/util/concurrency/spin_lock', 'catalog/collection_uuid_mismatch_info', 'catalog/local_oplog_info', diff --git a/src/mongo/db/catalog_raii.cpp b/src/mongo/db/catalog_raii.cpp index 6ea45897ed9..19b7d76295b 100644 --- a/src/mongo/db/catalog_raii.cpp +++ b/src/mongo/db/catalog_raii.cpp @@ -615,7 +615,8 @@ LockMode fixLockModeForSystemDotViewsChanges(const NamespaceString& nss, LockMod ReadSourceScope::ReadSourceScope(OperationContext* opCtx, RecoveryUnit::ReadSource readSource, - boost::optional provided) + boost::optional provided, + bool waitForOplog) : _opCtx(opCtx), _originalReadSource(shard_role_details::getRecoveryUnit(opCtx)->getTimestampReadSource()) { // Abandoning the snapshot is unsafe when the snapshot is managed by a lock free read @@ -628,6 +629,16 @@ ReadSourceScope::ReadSourceScope(OperationContext* opCtx, } shard_role_details::getRecoveryUnit(_opCtx)->abandonSnapshot(); + + // Wait for oplog visibility if the caller requested it. + if (waitForOplog) { + LocalOplogInfo* oplogInfo = LocalOplogInfo::get(opCtx); + tassert(9478700, "Should have oplog available at this point", oplogInfo); + tassert(9478705, + "Should have oplog record store available at this point", + oplogInfo->getRecordStore()); + oplogInfo->getRecordStore()->waitForAllEarlierOplogWritesToBeVisible(opCtx); + } shard_role_details::getRecoveryUnit(_opCtx)->setTimestampReadSource(readSource, provided); } diff --git a/src/mongo/db/catalog_raii.h b/src/mongo/db/catalog_raii.h index c3511e2a26c..157c12da4f6 100644 --- a/src/mongo/db/catalog_raii.h +++ b/src/mongo/db/catalog_raii.h @@ -476,7 +476,8 @@ class ReadSourceScope { public: ReadSourceScope(OperationContext* opCtx, RecoveryUnit::ReadSource readSource, - boost::optional provided = boost::none); + boost::optional provided = boost::none, + bool waitForOplog = false); ~ReadSourceScope(); private: diff --git a/src/mongo/db/exec/collection_scan.cpp b/src/mongo/db/exec/collection_scan.cpp index 2ec8b5fb350..9feb4b9b3c6 100644 --- a/src/mongo/db/exec/collection_scan.cpp +++ b/src/mongo/db/exec/collection_scan.cpp @@ -164,6 +164,11 @@ CollectionScan::CollectionScan(ExpressionContext* expCtx, "Expected forward collection scan with 'resumeScanPoint'", params.direction == CollectionScanParams::FORWARD); } + + // Set up 'OplogWaitConfig' if we are scanning the oplog. + if (collPtr && collPtr->ns().isOplog()) { + _oplogWaitConfig = OplogWaitConfig(); + } } namespace { @@ -269,23 +274,21 @@ PlanStage::StageState CollectionScan::doWork(WorkingSetID* out) { [&] { if (needToMakeCursor) { const bool forward = _params.direction == CollectionScanParams::FORWARD; - if (forward && _params.shouldWaitForOplogVisibility) { - // Forward, non-tailable scans from the oplog need to wait until all oplog - // entries before the read begins to be visible. This isn't needed for reverse - // scans because we only hide oplog entries from forward scans, and it isn't - // necessary for tailing cursors because they ignore EOF and will eventually see - // all writes. Forward, non-tailable scans are the only case where a meaningful - // EOF will be seen that might not include writes that finished before the read - // started. This also must be done before we create the cursor as that is when - // we establish the endpoint for the cursor. Also call abandonSnapshot to make - // sure that we are using a fresh storage engine snapshot while waiting. - // Otherwise, we will end up reading from the snapshot where the oplog entries - // are not yet visible even after the wait. - invariant(!_params.tailable && collPtr->ns().isOplog()); + tassert(9478714, "Must have oplog wait config configured", _oplogWaitConfig); + if (_oplogWaitConfig->shouldWaitForOplogVisibility()) { + tassert(9478701, + "We should only request yield for a tailable oplog scan", + !_params.tailable && collPtr->ns().isOplog()); - shard_role_details::getRecoveryUnit(opCtx())->abandonSnapshot(); - collPtr->getRecordStore()->waitForAllEarlierOplogWritesToBeVisible(opCtx()); + // Perform wait during yield. Note that we mark this as having waited before + // actually waiting so that we can distinguish waiting for oplog visiblity + // from a WriteConflictException when handling this yield. + _oplogWaitConfig->setWaitedForOplogVisibility(); + LOGV2_DEBUG( + 9478711, 2, "Oplog scan triggering yield to wait for visibility"); + return NEED_YIELD; + } } try { diff --git a/src/mongo/db/exec/collection_scan.h b/src/mongo/db/exec/collection_scan.h index ab5d5db1f6f..6714c76f2f0 100644 --- a/src/mongo/db/exec/collection_scan.h +++ b/src/mongo/db/exec/collection_scan.h @@ -44,6 +44,7 @@ #include "mongo/db/matcher/expression.h" #include "mongo/db/matcher/expression_leaf.h" #include "mongo/db/pipeline/expression_context.h" +#include "mongo/db/query/oplog_wait_config.h" #include "mongo/db/query/plan_executor.h" #include "mongo/db/query/stage_types.h" #include "mongo/db/record_id.h" @@ -96,6 +97,18 @@ public: return _params.direction; } + const CollectionScanParams& params() const { + return _params; + } + + bool initializedCursor() const { + return _cursor != nullptr; + } + + OplogWaitConfig* getOplogWaitConfig() { + return _oplogWaitConfig ? &(*_oplogWaitConfig) : nullptr; + } + protected: void doSaveStateRequiresCollection() final; @@ -149,6 +162,10 @@ private: CollectionScanStats _specificStats; bool _useSeek = false; + + // Coordinates waiting for oplog visibility. Must be initialized if we are doing an oplog scan, + // boost::none otherwise. + boost::optional _oplogWaitConfig; }; } // namespace mongo diff --git a/src/mongo/db/query/mock_yield_policies.h b/src/mongo/db/query/mock_yield_policies.h index de85ed75233..ce409194063 100644 --- a/src/mongo/db/query/mock_yield_policies.h +++ b/src/mongo/db/query/mock_yield_policies.h @@ -69,9 +69,12 @@ public: return true; } - Status yieldOrInterrupt(OperationContext*, - std::function whileYieldingFn, - RestoreContext::RestoreType restoreType) override { + Status yieldOrInterrupt( + OperationContext*, + const std::function& whileYieldingFn, + RestoreContext::RestoreType restoreType, + const std::function& afterSnapshotAbandonFn = nullptr) override { + return {ErrorCodes::ExceededTimeLimit, "Using AlwaysTimeOutYieldPolicy"}; } }; @@ -89,9 +92,11 @@ public: return true; } - Status yieldOrInterrupt(OperationContext*, - std::function whileYieldingFn, - RestoreContext::RestoreType restoreType) override { + Status yieldOrInterrupt( + OperationContext*, + const std::function& whileYieldingFn, + RestoreContext::RestoreType restoreType, + const std::function& afterSnapshotAbandonFn = nullptr) override { return {ErrorCodes::QueryPlanKilled, "Using AlwaysPlanKilledYieldPolicy"}; } }; @@ -109,9 +114,11 @@ public: return false; } - Status yieldOrInterrupt(OperationContext*, - std::function whileYieldingFn, - RestoreContext::RestoreType restoreType) override { + Status yieldOrInterrupt( + OperationContext*, + const std::function& whileYieldingFn, + RestoreContext::RestoreType restoreType, + const std::function& afterSnapshotAbandonFn = nullptr) override { MONGO_UNREACHABLE; } }; diff --git a/src/mongo/db/query/oplog_wait_config.h b/src/mongo/db/query/oplog_wait_config.h new file mode 100644 index 00000000000..4924e9057aa --- /dev/null +++ b/src/mongo/db/query/oplog_wait_config.h @@ -0,0 +1,71 @@ +/** + * Copyright (C) 2025-present MongoDB, Inc. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the Server Side Public License, version 1, + * as published by MongoDB, Inc. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * Server Side Public License for more details. + * + * You should have received a copy of the Server Side Public License + * along with this program. If not, see + * . + * + * As a special exception, the copyright holders give permission to link the + * code of portions of this program with the OpenSSL library under certain + * conditions as described in each individual source file and distribute + * linked combinations including the program with the OpenSSL library. You + * must comply with the Server Side Public License in all respects for + * all of the code used other than as permitted herein. If you modify file(s) + * with this exception, you may extend this exception to your version of the + * file(s), but you are not obligated to do so. If you do not wish to do so, + * delete this exception statement from your version. If you delete this + * exception statement from all source files in the program, then also delete + * it in the license file. + */ + +#pragma once + +namespace mongo { + +/** + * Tracks whether we are allowed to wait for oplog visibility as well as whether we have waited for + * visiblity. + */ +class OplogWaitConfig { +public: + OplogWaitConfig() = default; + + void enableWaitingForOplogVisibility() { + _shouldWaitForVisiblity = true; + } + + void setWaitedForOplogVisibility() { + tassert( + 9478712, "Cannot wait for oplog visibility if it is disabled", _shouldWaitForVisiblity); + _waitedForOplogVisibility = true; + } + bool shouldWaitForOplogVisibility() const { + return _shouldWaitForVisiblity && !_waitedForOplogVisibility; + } + + bool waitedForOplogVisiblity() const { + if (_waitedForOplogVisibility) { + tassert(9478715, + "Cannot wait for oplog visibility if it is disabled", + _shouldWaitForVisiblity); + } + return _waitedForOplogVisibility; + } + +private: + // Tracks whether we should wait for oplog visiblity at all. + bool _shouldWaitForVisiblity = false; + + // Tracks whether we have waited for oplog visiblity. + bool _waitedForOplogVisibility = false; +}; +} // namespace mongo diff --git a/src/mongo/db/query/plan_executor_impl.cpp b/src/mongo/db/query/plan_executor_impl.cpp index 294ddada65e..eee09c7eeb9 100644 --- a/src/mongo/db/query/plan_executor_impl.cpp +++ b/src/mongo/db/query/plan_executor_impl.cpp @@ -157,9 +157,26 @@ PlanExecutorImpl::PlanExecutorImpl(OperationContext* opCtx, // If this PlanExecutor is executing a COLLSCAN, keep a pointer directly to the COLLSCAN // stage. This is used for change streams in order to keep the the latest oplog timestamp - // and post batch resume token up to date as the oplog scan progresses. + // and post batch resume token up to date as the oplog scan progresses. Similarly, this is + // used for oplog scans to coordinate waiting for oplog visiblity. if (auto collectionScan = getStageByType(_root.get(), STAGE_COLLSCAN)) { _collScanStage = static_cast(collectionScan); + + if (_nss.isOplog()) { + _oplogWaitConfig = _collScanStage->getOplogWaitConfig(); + tassert(9478713, + "Should have '_oplogWaitConfig' if we are scanning the oplog", + _oplogWaitConfig); + + // Allow waiting for oplog visiblity if our yield policy supports auto yielding. + if (_yieldPolicy->canAutoYield() && + _collScanStage->params().shouldWaitForOplogVisibility) { + _oplogWaitConfig->enableWaitingForOplogVisibility(); + _afterSnapshotAbandonFn = [&]() { + _waitForAllEarlierOplogWritesToBeVisible(); + }; + } + } } } @@ -292,6 +309,40 @@ void doYield(OperationContext* opCtx) { } } // namespace +/** + * This function waits for all oplog entries before the read to become visible. This must be done + * before initializing a cursor to perform an oplog scan as that is when we establish the endpoint + * for the cursor. Note that this function can only be called for forward, non-tailable scans. + */ +void PlanExecutorImpl::_waitForAllEarlierOplogWritesToBeVisible() { + tassert(9478702, "This function should not be called outside of oplog scans", nss().isOplog()); + tassert(9478703, "This function should not be called outside of oplog scans", _collScanStage); + const auto& params = _collScanStage->params(); + if (!(params.direction == CollectionScanParams::FORWARD && + params.shouldWaitForOplogVisibility)) { + return; + } + + if (_collScanStage->initializedCursor()) { + return; + } + + tassert(9478704, "This function should not be called on tailable cursors", !params.tailable); + + // If we do not have an oplog, we do not wait. + LocalOplogInfo* oplogInfo = LocalOplogInfo::get(_opCtx); + if (!oplogInfo) { + return; + } + + RecordStore* oplogRecordStore = oplogInfo->getRecordStore(); + if (!oplogRecordStore) { + return; + } + + oplogRecordStore->waitForAllEarlierOplogWritesToBeVisible(_opCtx); +} + PlanExecutor::ExecState PlanExecutorImpl::getNext(BSONObj* objOut, RecordId* dlOut) { const auto state = getNextDocument(&_docOutput, dlOut); if (objOut && state == ExecState::ADVANCED) { @@ -353,8 +404,10 @@ PlanExecutor::ExecState PlanExecutorImpl::_getNextImpl(Snapshotted* ob }; if (_yieldPolicy->shouldYieldOrInterrupt(_opCtx)) { - uassertStatusOK(_yieldPolicy->yieldOrInterrupt( - _opCtx, whileYieldingFn, RestoreContext::RestoreType::kYield)); + uassertStatusOK(_yieldPolicy->yieldOrInterrupt(_opCtx, + whileYieldingFn, + RestoreContext::RestoreType::kYield, + _afterSnapshotAbandonFn)); } WorkingSetID id = WorkingSet::INVALID_ID; @@ -466,9 +519,9 @@ void PlanExecutorImpl::_handleNeedYield(size_t& writeConflictsInARow, ExceptionFor( Status(ErrorCodes::TemporarilyUnavailable, "temporarily unavailable")), writeConflictsInARow); - - } else { - // We're yielding because of a WriteConflictException. + } else if (!_oplogWaitConfig || !_oplogWaitConfig->waitedForOplogVisiblity()) { + // If we didn't wait for oplog visiblity, then we must be yielding because of a + // WriteConflictException. if (!_yieldPolicy->canAutoYield() || MONGO_unlikely(skipWriteConflictRetries.shouldFail())) { throwWriteConflictException( diff --git a/src/mongo/db/query/plan_executor_impl.h b/src/mongo/db/query/plan_executor_impl.h index b2d79afefbd..336f173914b 100644 --- a/src/mongo/db/query/plan_executor_impl.h +++ b/src/mongo/db/query/plan_executor_impl.h @@ -52,6 +52,7 @@ #include "mongo/db/ops/update_result.h" #include "mongo/db/pipeline/expression_context.h" #include "mongo/db/query/canonical_query.h" +#include "mongo/db/query/oplog_wait_config.h" #include "mongo/db/query/plan_executor.h" #include "mongo/db/query/plan_explainer.h" #include "mongo/db/query/plan_yield_policy.h" @@ -227,15 +228,22 @@ private: bool _handleEOFAndExit(PlanStage::StageState code, std::unique_ptr& notifier); - MONGO_COMPILER_ALWAYS_INLINE void _checkIfMustYield(std::function whileYieldingFn) { + // Function which waits for oplog visiblity. It assumes that it is invoked following snapshot + // abandonment, but before yielding any resources. + void _waitForAllEarlierOplogWritesToBeVisible(); + + MONGO_COMPILER_ALWAYS_INLINE void _checkIfMustYield( + const std::function& whileYieldingFn) { // These are the conditions which can cause us to yield: // 1) The yield policy's timer elapsed, or // 2) some stage requested a yield, or // 3) we need to yield and retry due to a WriteConflictException. // In all cases, the actual yielding happens here. if (_yieldPolicy->shouldYieldOrInterrupt(_opCtx)) { - uassertStatusOK(_yieldPolicy->yieldOrInterrupt( - _opCtx, whileYieldingFn, RestoreContext::RestoreType::kYield)); + uassertStatusOK(_yieldPolicy->yieldOrInterrupt(_opCtx, + whileYieldingFn, + RestoreContext::RestoreType::kYield, + _afterSnapshotAbandonFn)); } } @@ -293,7 +301,15 @@ private: // otherwise. We cache it to avoid the need to traverse the execution tree in runtime when the // executor is requested to return the oplog tracking info. Since this info is provided by // either of these stages, the executor will simply delegate the request to the cached stage. - const CollectionScan* _collScanStage{nullptr}; + CollectionScan* _collScanStage{nullptr}; + + // Used to coordinate waiting for oplog visiblity. Note that this is owned by the collection + // scan (if one exists). Initialized only if this executor is doing a collection scan over the + // oplog, nullptr otherwise. + OplogWaitConfig* _oplogWaitConfig{nullptr}; + + // Function used to wait for oplog visibility in between snapshot abandonment and + std::function _afterSnapshotAbandonFn{nullptr}; }; } // namespace mongo diff --git a/src/mongo/db/query/plan_yield_policy.cpp b/src/mongo/db/query/plan_yield_policy.cpp index 71f43f67488..4002a2f0099 100644 --- a/src/mongo/db/query/plan_yield_policy.cpp +++ b/src/mongo/db/query/plan_yield_policy.cpp @@ -118,8 +118,9 @@ void PlanYieldPolicy::resetTimer() { } Status PlanYieldPolicy::yieldOrInterrupt(OperationContext* opCtx, - std::function whileYieldingFn, - RestoreContext::RestoreType restoreType) { + const std::function& whileYieldingFn, + RestoreContext::RestoreType restoreType, + const std::function& afterSnapshotAbandonFn) { invariant(opCtx); // After we finish yielding (or in any early return), call resetTimer() to prevent yielding @@ -168,9 +169,12 @@ Status PlanYieldPolicy::yieldOrInterrupt(OperationContext* opCtx, // snapshot. invariant(!opCtx->isLockFreeReadsOp()); shard_role_details::getRecoveryUnit(opCtx)->abandonSnapshot(); + if (afterSnapshotAbandonFn) { + afterSnapshotAbandonFn(); + } } else { if (usesCollectionAcquisitions()) { - performYieldWithAcquisitions(opCtx, whileYieldingFn); + performYieldWithAcquisitions(opCtx, whileYieldingFn, afterSnapshotAbandonFn); } else { const Yieldable* yieldablePtr = get(yieldable); tassert(9762900, @@ -178,7 +182,7 @@ Status PlanYieldPolicy::yieldOrInterrupt(OperationContext* opCtx, << "no yieldable object available for yield policy " << serializeYieldPolicy(getPolicy()) << " in attempt " << attempt, yieldablePtr); - performYield(opCtx, *yieldablePtr, whileYieldingFn); + performYield(opCtx, *yieldablePtr, whileYieldingFn, afterSnapshotAbandonFn); } } @@ -205,7 +209,8 @@ Status PlanYieldPolicy::yieldOrInterrupt(OperationContext* opCtx, void PlanYieldPolicy::performYield(OperationContext* opCtx, const Yieldable& yieldable, - std::function whileYieldingFn) { + std::function whileYieldingFn, + std::function afterSnapshotAbandonFn) { // Things have to happen here in a specific order: // * Release 'yieldable'. // * Abandon the current storage engine snapshot. @@ -231,6 +236,11 @@ void PlanYieldPolicy::performYield(OperationContext* opCtx, opCtx->checkForInterrupt(); // throws } + // After we've abandoned our snapshot, perform any work before releasing locks. + if (afterSnapshotAbandonFn) { + afterSnapshotAbandonFn(); + } + Locker* locker = shard_role_details::getLocker(opCtx); Locker::LockSnapshot snapshot; locker->saveLockStateAndUnlock(&snapshot); @@ -252,7 +262,8 @@ void PlanYieldPolicy::performYield(OperationContext* opCtx, } void PlanYieldPolicy::performYieldWithAcquisitions(OperationContext* opCtx, - std::function whileYieldingFn) { + std::function whileYieldingFn, + std::function afterSnapshotAbandonFn) { // Things have to happen here in a specific order: // * Abandon the current storage engine snapshot. // * Check for interrupt if the yield policy requires. @@ -271,6 +282,11 @@ void PlanYieldPolicy::performYieldWithAcquisitions(OperationContext* opCtx, opCtx->checkForInterrupt(); // throws } + // After we've abandoned our snapshot, perform any work before yielding transaction resources. + if (afterSnapshotAbandonFn) { + afterSnapshotAbandonFn(); + } + auto yieldedTransactionResources = yieldTransactionResourcesFromOperationContext(opCtx); ScopeGuard yieldFailedScopeGuard( [&] { yieldedTransactionResources.transitionTransactionResourcesToFailedState(opCtx); }); diff --git a/src/mongo/db/query/plan_yield_policy.h b/src/mongo/db/query/plan_yield_policy.h index 98e238b9375..39066704841 100644 --- a/src/mongo/db/query/plan_yield_policy.h +++ b/src/mongo/db/query/plan_yield_policy.h @@ -240,8 +240,9 @@ public: * been relinquished. */ virtual Status yieldOrInterrupt(OperationContext* opCtx, - std::function whileYieldingFn, - RestoreContext::RestoreType restoreType); + const std::function& whileYieldingFn, + RestoreContext::RestoreType restoreType, + const std::function& afterSnapshotAbandonFn = nullptr); /** * All calls to shouldYieldOrInterrupt() will return true until the next call to @@ -340,9 +341,11 @@ private: */ void performYield(OperationContext* opCtx, const Yieldable& yieldable, - std::function whileYieldingFn); + std::function whileYieldingFn, + std::function afterSnapshotAbandonFn); void performYieldWithAcquisitions(OperationContext* opCtx, - std::function whileYieldingFn); + std::function whileYieldingFn, + std::function afterSnapshotAbandonFn); const YieldPolicy _policy; std::variant _yieldable; diff --git a/src/mongo/db/query/query_planner_params.cpp b/src/mongo/db/query/query_planner_params.cpp index dec835d9d7e..25f7159e206 100644 --- a/src/mongo/db/query/query_planner_params.cpp +++ b/src/mongo/db/query/query_planner_params.cpp @@ -760,8 +760,9 @@ bool shouldWaitForOplogVisibility(OperationContext* opCtx, // visibility timestamp to be updated, it would wait for a replication batch that would never // complete because it couldn't reacquire its own lock, the global lock held by the waiting // reader. - return repl::ReplicationCoordinator::get(opCtx)->canAcceptWritesForDatabase( - opCtx, DatabaseName::kAdmin); + auto* replCoord = repl::ReplicationCoordinator::get(opCtx); + return replCoord->canAcceptWritesForDatabase(opCtx, DatabaseName::kAdmin) && + replCoord->getSettings().isReplSet(); } } // namespace mongo diff --git a/src/mongo/db/repl/storage_timestamp_test.cpp b/src/mongo/db/repl/storage_timestamp_test.cpp index a5f90ce09b2..2a2b2722ba0 100644 --- a/src/mongo/db/repl/storage_timestamp_test.cpp +++ b/src/mongo/db/repl/storage_timestamp_test.cpp @@ -268,8 +268,17 @@ Status createIndexFromSpec(OperationContext* opCtx, */ class OneOffRead { public: - OneOffRead(OperationContext* opCtx, const Timestamp& ts) : _opCtx(opCtx) { + OneOffRead(OperationContext* opCtx, const Timestamp& ts, bool waitForOplog = false) + : _opCtx(opCtx) { shard_role_details::getRecoveryUnit(_opCtx)->abandonSnapshot(); + if (waitForOplog) { + LocalOplogInfo* oplogInfo = LocalOplogInfo::get(opCtx); + + // Oplog should be available in this test. + invariant(oplogInfo); + invariant(oplogInfo->getRecordStore()); + oplogInfo->getRecordStore()->waitForAllEarlierOplogWritesToBeVisible(opCtx); + } if (ts.isNull()) { shard_role_details::getRecoveryUnit(_opCtx)->setTimestampReadSource( RecoveryUnit::ReadSource::kNoTimestamp); @@ -422,7 +431,7 @@ public: } void dumpOplog() { - OneOffRead oor(_opCtx, Timestamp::min()); + OneOffRead oor(_opCtx, Timestamp::min(), true /* waitForOplog */); shard_role_details::getRecoveryUnit(_opCtx)->beginUnitOfWork(_opCtx->readOnly()); LOGV2(8423335, "Dumping oplog collection"); AutoGetCollectionForRead oplogRaii(_opCtx, NamespaceString::kRsOplogNamespace); @@ -569,12 +578,12 @@ public: } BSONObj queryOplog(const BSONObj& query) { - OneOffRead oor(_opCtx, Timestamp::min()); + OneOffRead oor(_opCtx, Timestamp::min(), true /* waitForOplog */); return queryCollection(NamespaceString::kRsOplogNamespace, query); } Timestamp getTopOfOplog() { - OneOffRead oor(_opCtx, Timestamp::min()); + OneOffRead oor(_opCtx, Timestamp::min(), true /* waitForOplog */); BSONObj ret; ASSERT_TRUE(Helpers::getLast(_opCtx, NamespaceString::kRsOplogNamespace, ret)); return ret["ts"].timestamp(); @@ -641,7 +650,7 @@ public: void assertOplogDocumentExistsAtTimestamp(const BSONObj& query, const Timestamp& ts, bool exists) { - OneOffRead oor(_opCtx, ts); + OneOffRead oor(_opCtx, ts, true); BSONObj ret; bool found = Helpers::findOne( _opCtx, diff --git a/src/mongo/db/repl/tenant_oplog_applier_test.cpp b/src/mongo/db/repl/tenant_oplog_applier_test.cpp index 8af486e4765..1b4c743a8e9 100644 --- a/src/mongo/db/repl/tenant_oplog_applier_test.cpp +++ b/src/mongo/db/repl/tenant_oplog_applier_test.cpp @@ -511,6 +511,14 @@ TEST_F(TenantOplogApplierTest, NoOpsForLargeRetryableApplyOps) { _oplogBuffer.shutdown(_opCtx.get()); applier->join(); + // Before reading from the oplog, wait for oplog visibility. + LocalOplogInfo* oplogInfo = LocalOplogInfo::get(_opCtx.get()); + + // Oplog should be available in this test. + invariant(oplogInfo); + invariant(oplogInfo->getRecordStore()); + oplogInfo->getRecordStore()->waitForAllEarlierOplogWritesToBeVisible(_opCtx.get()); + // The session path in TenantOplogApplier bypasses the opObserver, so we can only read // the entries from the oplog. CollectionReader oplogReader(_opCtx.get(), NamespaceString::kRsOplogNamespace); diff --git a/src/mongo/db/s/resharding/resharding_oplog_fetcher_test.cpp b/src/mongo/db/s/resharding/resharding_oplog_fetcher_test.cpp index e39137130e9..f5a67a29609 100644 --- a/src/mongo/db/s/resharding/resharding_oplog_fetcher_test.cpp +++ b/src/mongo/db/s/resharding/resharding_oplog_fetcher_test.cpp @@ -141,8 +141,17 @@ repl::MutableOplogEntry makeOplog(const NamespaceString& nss, */ class OneOffRead { public: - OneOffRead(OperationContext* opCtx, const Timestamp& ts) : _opCtx(opCtx) { + OneOffRead(OperationContext* opCtx, const Timestamp& ts, bool waitForOplog = false) + : _opCtx(opCtx) { shard_role_details::getRecoveryUnit(_opCtx)->abandonSnapshot(); + if (waitForOplog) { + LocalOplogInfo* oplogInfo = LocalOplogInfo::get(opCtx); + + // Oplog should be available in this test. + invariant(oplogInfo); + invariant(oplogInfo->getRecordStore()); + oplogInfo->getRecordStore()->waitForAllEarlierOplogWritesToBeVisible(opCtx); + } if (ts.isNull()) { shard_role_details::getRecoveryUnit(_opCtx)->setTimestampReadSource( RecoveryUnit::ReadSource::kNoTimestamp); @@ -269,7 +278,7 @@ public: } BSONObj queryOplog(const BSONObj& query) { - OneOffRead oor(_opCtx, Timestamp::min()); + OneOffRead oor(_opCtx, Timestamp::min(), true); return queryCollection(NamespaceString::kRsOplogNamespace, query); } @@ -286,7 +295,7 @@ public: } int itcount(NamespaceString nss) { - OneOffRead oof(_opCtx, Timestamp::min()); + OneOffRead oof(_opCtx, Timestamp::min(), nss.isOplog()); AutoGetCollectionForRead autoColl(_opCtx, nss); auto cursor = autoColl.getCollection()->getCursor(_opCtx); diff --git a/src/mongo/db/s/resharding/resharding_txn_cloner_test.cpp b/src/mongo/db/s/resharding/resharding_txn_cloner_test.cpp index 3559a4cd23a..5db10ccb265 100644 --- a/src/mongo/db/s/resharding/resharding_txn_cloner_test.cpp +++ b/src/mongo/db/s/resharding/resharding_txn_cloner_test.cpp @@ -539,6 +539,12 @@ protected: } Timestamp getLatestOplogTimestamp(OperationContext* opCtx) { + LocalOplogInfo* oplogInfo = LocalOplogInfo::get(opCtx); + + // Oplog should be available in this test. + invariant(oplogInfo); + invariant(oplogInfo->getRecordStore()); + oplogInfo->getRecordStore()->waitForAllEarlierOplogWritesToBeVisible(opCtx); DBDirectClient client(opCtx); FindCommandRequest findRequest{NamespaceString::kRsOplogNamespace}; @@ -555,6 +561,12 @@ protected: Timestamp ts) { std::vector result; + LocalOplogInfo* oplogInfo = LocalOplogInfo::get(opCtx); + + // Oplog should be available in this test. + invariant(oplogInfo); + invariant(oplogInfo->getRecordStore()); + oplogInfo->getRecordStore()->waitForAllEarlierOplogWritesToBeVisible(opCtx); PersistentTaskStore store(NamespaceString::kRsOplogNamespace); store.forEach(opCtx, BSON("ts" << BSON("$gt" << ts)), [&](const auto& oplogEntry) { result.emplace_back( diff --git a/src/mongo/db/s/session_catalog_migration_destination_test.cpp b/src/mongo/db/s/session_catalog_migration_destination_test.cpp index 56a17152674..919cfd35e59 100644 --- a/src/mongo/db/s/session_catalog_migration_destination_test.cpp +++ b/src/mongo/db/s/session_catalog_migration_destination_test.cpp @@ -196,6 +196,14 @@ public: repl::OplogEntry getOplog(OperationContext* opCtx, const repl::OpTime& opTime) { DBDirectClient client(opCtx); + + LocalOplogInfo* oplogInfo = LocalOplogInfo::get(opCtx); + + // Oplog should be available in this test. + invariant(oplogInfo); + invariant(oplogInfo->getRecordStore()); + oplogInfo->getRecordStore()->waitForAllEarlierOplogWritesToBeVisible(opCtx); + auto oplogBSON = client.findOne(NamespaceString::kRsOplogNamespace, opTime.asQuery()); ASSERT_FALSE(oplogBSON.isEmpty()); diff --git a/src/mongo/db/transaction/transaction_history_iterator.h b/src/mongo/db/transaction/transaction_history_iterator.h index e4a05ba5fb4..833a7b9c925 100644 --- a/src/mongo/db/transaction/transaction_history_iterator.h +++ b/src/mongo/db/transaction/transaction_history_iterator.h @@ -67,8 +67,9 @@ class TransactionHistoryIterator : public TransactionHistoryIteratorBase { public: /** * Creates a new iterator starting with an oplog entry with the given start opTime. + * TODO SERVER-104970: If permitYield can't be deleted, change the default to 'false'. */ - TransactionHistoryIterator(repl::OpTime startingOpTime, bool permitYield = false); + TransactionHistoryIterator(repl::OpTime startingOpTime, bool permitYield = true); ~TransactionHistoryIterator() override = default; bool hasNext() const override; @@ -84,6 +85,7 @@ private: // Clients can set this to allow PlanExecutors created by this TransactionHistoryIterator to // have a YIELD_AUTO yield policy. It is only safe to set this if next() will never be called // while holding a lock that should not be yielded. + // TODO SERVER-104970: Determine whether this can be removed. bool _permitYield; repl::OpTime _nextOpTime; diff --git a/src/mongo/db/transaction/transaction_history_iterator_test.cpp b/src/mongo/db/transaction/transaction_history_iterator_test.cpp index 15e85bf8f7b..1d432636096 100644 --- a/src/mongo/db/transaction/transaction_history_iterator_test.cpp +++ b/src/mongo/db/transaction/transaction_history_iterator_test.cpp @@ -118,7 +118,7 @@ TEST_F(SessionHistoryIteratorTest, NormalHistory) { repl::OpTime(Timestamp(67, 54801), 2)); // optime of previous write in transaction insertOplogEntry(entry4); - TransactionHistoryIterator iter(repl::OpTime(Timestamp(97, 2472), 2)); + TransactionHistoryIterator iter(repl::OpTime(Timestamp(97, 2472), 2), true); { ASSERT_TRUE(iter.hasNext()); @@ -163,7 +163,7 @@ TEST_F(SessionHistoryIteratorTest, NextShouldAssertIfHistoryIsTruncated) { insertOplogEntry(entry); repl::OpTime opTime(Timestamp(67, 54801), 2); - TransactionHistoryIterator iter(opTime); + TransactionHistoryIterator iter(opTime, true); ASSERT_TRUE(iter.hasNext()); auto nextEntry = iter.next(opCtx()); @@ -181,7 +181,7 @@ TEST_F(SessionHistoryIteratorTest, OplogInWriteHistoryChainWithMissingPrevTSShou boost::none); // optime of previous write in transaction insertOplogEntry(entry); - TransactionHistoryIterator iter(repl::OpTime(Timestamp(67, 54801), 2)); + TransactionHistoryIterator iter(repl::OpTime(Timestamp(67, 54801), 2), true); ASSERT_TRUE(iter.hasNext()); ASSERT_THROWS_CODE(iter.next(opCtx()), AssertionException, ErrorCodes::FailedToParse); } diff --git a/src/mongo/db/transaction/transaction_participant.cpp b/src/mongo/db/transaction/transaction_participant.cpp index cda51a6b0e6..a76b2ed5348 100644 --- a/src/mongo/db/transaction/transaction_participant.cpp +++ b/src/mongo/db/transaction/transaction_participant.cpp @@ -352,7 +352,8 @@ ActiveTransactionHistory fetchActiveTransactionHistory(OperationContext* opCtx, // Restore the current timestamp read source after fetching transaction history, which may // change our ReadSource. - ReadSourceScope readSourceScope(opCtx, RecoveryUnit::ReadSource::kNoTimestamp); + ReadSourceScope readSourceScope( + opCtx, RecoveryUnit::ReadSource::kNoTimestamp, boost::none, true /* waitForOplog */); auto originalReadConcern = std::exchange(repl::ReadConcernArgs::get(opCtx), repl::ReadConcernArgs()); ON_BLOCK_EXIT([&] { repl::ReadConcernArgs::get(opCtx) = std::move(originalReadConcern); }); @@ -3621,6 +3622,13 @@ boost::optional TransactionParticipant::Participant::checkStat // Use a SideTransactionBlock since it is illegal to scan the oplog while in a write unit of // work. TransactionParticipant::SideTransactionBlock sideTxn(opCtx); + + // Before opening the storage snapshot (and before scanning the oplog), wait for all + // earlier oplog writes to be visible. This is necessary because the transaction history + // iterator will not be able to abandon the storage snapshot and wait. + auto storageInterface = repl::StorageInterface::get(opCtx); + storageInterface->waitForAllEarlierOplogWritesToBeVisible(opCtx); + TransactionHistoryIterator txnIter(*stmtOpTime); while (txnIter.hasNext()) { const auto entry = txnIter.next(opCtx); diff --git a/src/mongo/dbtests/repltests.cpp b/src/mongo/dbtests/repltests.cpp index 3acb09a0d98..afc979b3c14 100644 --- a/src/mongo/dbtests/repltests.cpp +++ b/src/mongo/dbtests/repltests.cpp @@ -285,8 +285,14 @@ protected: std::vector ops; { DBDirectClient db(&_opCtx); - auto cursor = db.find( - FindCommandRequest{NamespaceString::createNamespaceString_forTest(cllNS())}); + LocalOplogInfo* oplogInfo = LocalOplogInfo::get(&_opCtx); + + // Oplog should be available in this test. + invariant(oplogInfo); + invariant(oplogInfo->getRecordStore()); + oplogInfo->getRecordStore()->waitForAllEarlierOplogWritesToBeVisible(&_opCtx); + auto cursor = db.find(FindCommandRequest{NamespaceString::createNamespaceString_forTest( + cllNS())}); // Read all ops from the oplog. while (cursor->more()) { ops.push_back(cursor->nextSafe()); }