From c08ed5087cf0db8ea549e8c5def4d99fab294be9 Mon Sep 17 00:00:00 2001 From: Wenqin Date: Wed, 10 Dec 2025 10:54:14 -0500 Subject: [PATCH] SERVER-114477 Add support for reclaiming a prepared transaction (#44741) GitOrigin-RevId: cddce3dde634914a5263383adf5f22dc6096fed3 --- src/mongo/db/storage/recovery_unit.h | 10 ++ .../wiredtiger_begin_transaction_block.cpp | 21 +++- .../wiredtiger_begin_transaction_block.h | 3 +- .../wiredtiger_compiled_configuration.cpp | 4 + .../wiredtiger_compiled_configuration.h | 2 + .../storage/wiredtiger/wiredtiger_cursor.cpp | 17 +--- .../db/storage/wiredtiger/wiredtiger_cursor.h | 1 + .../wiredtiger/wiredtiger_kv_engine_test.cpp | 96 +++++++++++++++++-- .../wiredtiger/wiredtiger_recovery_unit.cpp | 24 ++++- .../wiredtiger/wiredtiger_recovery_unit.h | 2 + 10 files changed, 155 insertions(+), 25 deletions(-) diff --git a/src/mongo/db/storage/recovery_unit.h b/src/mongo/db/storage/recovery_unit.h index 0dce17cf638..a843c011b7f 100644 --- a/src/mongo/db/storage/recovery_unit.h +++ b/src/mongo/db/storage/recovery_unit.h @@ -253,6 +253,16 @@ public: return PrepareConflictBehavior::kEnforce; } + /** + * Starts a storage engine transaction that takes over and continues a prepared transaction + * identified by setPreparedId(). Must be called after setPrepareTimestamp() and + * setPreparedId(). + */ + virtual void reclaimPreparedTransactionForRecovery() { + uasserted(ErrorCodes::CommandNotSupported, + "This storage engine does not support reclaiming prepared transactions"); + } + /** * If there is an open transaction, it is closed. If the current AbandonSnapshotMode is * 'kAbort', the transaction is aborted. If the mode is 'kCommit' the transaction is committed, diff --git a/src/mongo/db/storage/wiredtiger/wiredtiger_begin_transaction_block.cpp b/src/mongo/db/storage/wiredtiger/wiredtiger_begin_transaction_block.cpp index 013f2af5ab3..d89111d103d 100644 --- a/src/mongo/db/storage/wiredtiger/wiredtiger_begin_transaction_block.cpp +++ b/src/mongo/db/storage/wiredtiger/wiredtiger_begin_transaction_block.cpp @@ -103,7 +103,8 @@ WiredTigerBeginTxnBlock::WiredTigerBeginTxnBlock( PrepareConflictBehavior prepareConflictBehavior, bool roundUpPreparedTimestamps, RoundUpReadTimestamp roundUpReadTimestamp, - RecoveryUnit::UntimestampedWriteAssertionLevel allowUntimestampedWrite) + RecoveryUnit::UntimestampedWriteAssertionLevel allowUntimestampedWrite, + boost::optional claimPreparedId) : _session(session) { invariant(!_rollback); @@ -122,7 +123,23 @@ WiredTigerBeginTxnBlock::WiredTigerBeginTxnBlock( if (config > 0) { compiled_config = compiledBeginTransactions[config - 1].getConfig(_session); } - invariantWTOK(_session->begin_transaction(compiled_config), *_session); + + if (claimPreparedId) { + // Slow path used on startup recovery to take ownership of a prepared transaction as part of + // the transaction. + std::stringstream rawConfig; + if (config > 0) { + // We need to get the raw config because the compiled config is not a human readable + // string that allows us to concatenate the claim_prepared_id. + rawConfig << compiledBeginTransactions[config - 1].getRawConfig() << ","; + } + rawConfig << fmt::format("claim_prepared_id={}", unsignedHex(*claimPreparedId)); + std::string rawConfigStr = rawConfig.str(); + invariantWTOK(_session->begin_transaction(rawConfigStr.c_str()), *_session); + } else { + invariantWTOK(_session->begin_transaction(compiled_config), *_session); + } + _rollback = true; } diff --git a/src/mongo/db/storage/wiredtiger/wiredtiger_begin_transaction_block.h b/src/mongo/db/storage/wiredtiger/wiredtiger_begin_transaction_block.h index 60bd1aea0c0..90f0543d52c 100644 --- a/src/mongo/db/storage/wiredtiger/wiredtiger_begin_transaction_block.h +++ b/src/mongo/db/storage/wiredtiger/wiredtiger_begin_transaction_block.h @@ -66,7 +66,8 @@ public: PrepareConflictBehavior prepareConflictBehavior, bool roundUpPreparedTimestamps, RoundUpReadTimestamp roundUpReadTimestamp, - RecoveryUnit::UntimestampedWriteAssertionLevel allowUntimestampedWrite); + RecoveryUnit::UntimestampedWriteAssertionLevel allowUntimestampedWrite, + boost::optional claimPreparedId = boost::none); WiredTigerBeginTxnBlock(WiredTigerSession* session, const char* config); ~WiredTigerBeginTxnBlock(); diff --git a/src/mongo/db/storage/wiredtiger/wiredtiger_compiled_configuration.cpp b/src/mongo/db/storage/wiredtiger/wiredtiger_compiled_configuration.cpp index 898e32510a3..d8e8f03fef2 100644 --- a/src/mongo/db/storage/wiredtiger/wiredtiger_compiled_configuration.cpp +++ b/src/mongo/db/storage/wiredtiger/wiredtiger_compiled_configuration.cpp @@ -45,6 +45,10 @@ const char* CompiledConfiguration::getConfig(WiredTigerSession* session) const { return (compiled->get(_compileToken)); } +const std::string& CompiledConfiguration::getRawConfig() const { + return _config; +} + // ----------------------- diff --git a/src/mongo/db/storage/wiredtiger/wiredtiger_compiled_configuration.h b/src/mongo/db/storage/wiredtiger/wiredtiger_compiled_configuration.h index 2146c4be12f..018d0c7fb9a 100644 --- a/src/mongo/db/storage/wiredtiger/wiredtiger_compiled_configuration.h +++ b/src/mongo/db/storage/wiredtiger/wiredtiger_compiled_configuration.h @@ -48,6 +48,8 @@ public: const char* getConfig(WiredTigerSession* session) const; + const std::string& getRawConfig() const; + private: std::string _apiName; std::string _config; diff --git a/src/mongo/db/storage/wiredtiger/wiredtiger_cursor.cpp b/src/mongo/db/storage/wiredtiger/wiredtiger_cursor.cpp index 1ff1827a3d0..c6f05a1919d 100644 --- a/src/mongo/db/storage/wiredtiger/wiredtiger_cursor.cpp +++ b/src/mongo/db/storage/wiredtiger/wiredtiger_cursor.cpp @@ -124,22 +124,11 @@ WiredTigerBulkLoadCursor::WiredTigerBulkLoadCursor(OperationContext* opCtx, invariantWTOK(_session->open_cursor(indexUri.c_str(), nullptr, nullptr, &_cursor), *_session); } -WiredTigerPrepareCursor::WiredTigerPrepareCursor(WiredTigerSession& session) { - int ret = session.open_cursor("prepared_discover:", nullptr, nullptr, &_cursor); - - if (ret == 0) { - return; // Success - } - - auto status = wtRCToStatus(ret, session); - uassertStatusOK(status); +WiredTigerPrepareCursor::WiredTigerPrepareCursor(WiredTigerSession& session) : _session(session) { + _cursor = session.getNewCursor("prepared_discover:", nullptr); } WiredTigerPrepareCursor::~WiredTigerPrepareCursor() { - // TODO: SERVER-114477 uassertStatusOK on the result of _cursor->close() once the code to - // reclaim unclaimed prepared transactions is added. Without the ability to reclaim an unclaimed - // prepared transaction, _cursor->close() fails with an error about unclaimed prepared - // transactions, which prevents tests using this cursor from running successfully. - _cursor->close(_cursor); + _session.closeCursor(_cursor); } } // namespace mongo diff --git a/src/mongo/db/storage/wiredtiger/wiredtiger_cursor.h b/src/mongo/db/storage/wiredtiger/wiredtiger_cursor.h index 0a8f45376b9..5fb16ba66b4 100644 --- a/src/mongo/db/storage/wiredtiger/wiredtiger_cursor.h +++ b/src/mongo/db/storage/wiredtiger/wiredtiger_cursor.h @@ -144,5 +144,6 @@ public: private: WT_CURSOR* _cursor = nullptr; // Owned + WiredTigerSession& _session; }; } // namespace mongo diff --git a/src/mongo/db/storage/wiredtiger/wiredtiger_kv_engine_test.cpp b/src/mongo/db/storage/wiredtiger/wiredtiger_kv_engine_test.cpp index cd3bf9ba92a..fd177c9653b 100644 --- a/src/mongo/db/storage/wiredtiger/wiredtiger_kv_engine_test.cpp +++ b/src/mongo/db/storage/wiredtiger/wiredtiger_kv_engine_test.cpp @@ -127,9 +127,12 @@ private: WiredTigerKVEngineBase::WiredTigerConfig wtConfig = getWiredTigerConfigFromStartupOptions(provider); wtConfig.cacheSizeMB = 1; - wtConfig.extraOpenOptions = "log=(file_max=1m,prealloc=false)"; if (_preciseCheckpoints) { - wtConfig.extraOpenOptions += ",precise_checkpoint=true,preserve_prepared=true,"; + // Precise checkpoints don't work with journaling in tests. + wtConfig.extraOpenOptions = "precise_checkpoint=true,preserve_prepared=true,"; + wtConfig.logEnabled = false; + } else { + wtConfig.extraOpenOptions = "log=(file_max=1m,prealloc=false)"; } // Faithfully simulate being in replica set mode for timestamping tests which requires // parity for journaling settings. @@ -1131,13 +1134,23 @@ TEST_F(WiredTigerKVEngineTestWithPreciseCheckpoints, engine = _helper.getWiredTigerKVEngine(); auto opCtxPtr2 = _makeOperationContext(); - // Verify that we see the prepared transaction on startup recovery. + // Verify that we see the prepared transaction on startup recovery and reclaim it. int count = 0; auto iterator = engine->getUnclaimedPreparedTransactionsForStartupRecovery(opCtxPtr2.get()); + auto& ru2 = *checked_cast( + shard_role_details::getRecoveryUnit(opCtxPtr2.get())); + while (auto recoveredPreparedId = iterator->next()) { ASSERT_EQ(*recoveredPreparedId, preparedId); + ru2.beginUnitOfWork(false); + ru2.setPrepareTimestamp(prepareTimestamp); + ru2.setPreparedId(*recoveredPreparedId); + ru2.reclaimPreparedTransactionForRecovery(); + + ru2.setDurableTimestamp(Timestamp(3, 0)); + ru2.setCommitTimestamp(prepareTimestamp); + ru2.commitUnitOfWork(); count++; - // TODO: SERVER-114477 Add code to reclaim the prepared transaction and commit it. } ASSERT_EQ(count, 1); } @@ -1198,7 +1211,23 @@ TEST_F(WiredTigerKVEngineTestWithPreciseCheckpoints, ASSERT_TRUE(secondId == preparedId1 || secondId == preparedId2); ASSERT_NE(firstId, secondId); ASSERT_TRUE(!iterator->next()); - // TODO: SERVER-114477 Add code to reclaim the prepared transactions and commit them. + + // Reclaim the prepared transactions and abort/commit them. + auto& ru3 = + *checked_cast(shard_role_details::getRecoveryUnit(opCtxPtr.get())); + ru3.beginUnitOfWork(false); + ru3.setPrepareTimestamp(prepareTimestamp1); + ru3.setPreparedId(firstId); + ru3.reclaimPreparedTransactionForRecovery(); + ru3.abortUnitOfWork(); + + ru3.beginUnitOfWork(false); + ru3.setPrepareTimestamp(prepareTimestamp2); + ru3.setPreparedId(secondId); + ru3.reclaimPreparedTransactionForRecovery(); + ru3.setDurableTimestamp(Timestamp(8, 0)); + ru3.setCommitTimestamp(prepareTimestamp2); + ru3.commitUnitOfWork(); } TEST_F(WiredTigerKVEngineTestWithPreciseCheckpoints, @@ -1267,17 +1296,70 @@ TEST_F(WiredTigerKVEngineTestWithPreciseCheckpoints, _helper.restartEngine(); engine = _helper.getWiredTigerKVEngine(); auto opCtxPtr2 = _makeOperationContext(); + auto& ru2 = *checked_cast( + shard_role_details::getRecoveryUnit(opCtxPtr2.get())); - // Verify that we see the prepared transaction on startup recovery. + // Verify that we see the prepared transaction on startup recovery and reclaim it. int count = 0; auto iterator = engine->getUnclaimedPreparedTransactionsForStartupRecovery(opCtxPtr2.get()); while (auto recoveredPreparedId = iterator->next()) { ASSERT_EQ(*recoveredPreparedId, preparedId); + ru2.beginUnitOfWork(false); + // Not strictly necessary to begin a transaction, but used to verify that starting a + // transaction can handle extra configuration options when claim_prepared_id is set. + ru2.setPrepareConflictBehavior(PrepareConflictBehavior::kIgnoreConflicts); + ru2.setPrepareTimestamp(prepareTimestamp); + ru2.setPreparedId(*recoveredPreparedId); + ru2.reclaimPreparedTransactionForRecovery(); + + ru2.abortUnitOfWork(); count++; - // TODO: SERVER-114477 Add code to reclaim the prepared transaction and commit it. } ASSERT_EQ(count, 1); } +using WiredTigerKVEngineTestWithPreciseCheckpointsDeathTest = + WiredTigerKVEngineTestWithPreciseCheckpoints; +DEATH_TEST_F(WiredTigerKVEngineTestWithPreciseCheckpointsDeathTest, + UnresolvedPreparedTransactionsMustBeClaimed, + "Found 1 unclaimed prepared transactions") { + // Create an unresolved prepared transaction. + auto opCtxPtr = _makeOperationContext(); + auto& ru = *shard_role_details::getRecoveryUnit(opCtxPtr.get()); + const auto prepareTimestamp = Timestamp(2, 0); + const auto preparedId = prepareTimestamp.asULL(); + createPreparedTransaction(opCtxPtr.get(), ru, prepareTimestamp, preparedId); + ASSERT_EQ(ru.getPrepareTimestamp(), prepareTimestamp); + ASSERT_EQ(ru.getPreparedId().value(), preparedId); + + // Create a checkpoint that includes the prepared transaction. + auto* engine = _helper.getWiredTigerKVEngine(); + engine->setInitialDataTimestamp(Timestamp(1, 0)); + engine->setStableTimestamp(prepareTimestamp, /*force=*/false); + engine->checkpoint(); + + // This is necessary to satisfy the destructor of the recovery unit which expects to not be + // in a unit of work when the storage engine is restarted. This does not affect the results of + // the prepared transaction iterator since the transaction rollback is not in the + // checkpoint. + ru.abortUnitOfWork(); + + // Release the opCtx to prevent memory issues when the storage engine is restarted. + opCtxPtr.reset(); + + // Simulate startup recovery by restarting the storage engine. + _helper.restartEngine(); + engine = _helper.getWiredTigerKVEngine(); + auto opCtxPtr2 = _makeOperationContext(); + + // Verify that we see the prepared transaction on startup recovery. + auto iterator = engine->getUnclaimedPreparedTransactionsForStartupRecovery(opCtxPtr2.get()); + auto recoveredPreparedId = iterator->next(); + ASSERT_EQ(*recoveredPreparedId, preparedId); + ASSERT_TRUE(!iterator->next()); + + // Purposely don't reclaim the transaction to verify that destroying the iterator without + // claiming the prepared transaction results in a crash. +} } // namespace } // namespace mongo diff --git a/src/mongo/db/storage/wiredtiger/wiredtiger_recovery_unit.cpp b/src/mongo/db/storage/wiredtiger/wiredtiger_recovery_unit.cpp index e6151992d03..ebdfdac7c0f 100644 --- a/src/mongo/db/storage/wiredtiger/wiredtiger_recovery_unit.cpp +++ b/src/mongo/db/storage/wiredtiger/wiredtiger_recovery_unit.cpp @@ -552,6 +552,19 @@ void WiredTigerRecoveryUnit::_txnOpen() { _timer.reset(new Timer()); } + // A non-empty _preparedId at transaction start means the caller intends to reclaim the + // corresponding prepared transaction during startup recovery. In this case, a read source + // must not be set, since it is not applicable. + if (_preparedId) { + tassert(11447700, + "Read source should not be specified when recovering a prepared transaction.", + _timestampReadSource == ReadSource::kNoTimestamp); + tassert( + 11447701, + "Rounding up prepared timestamps is incompatible with claiming a prepared transaction", + !_optionsUsedToOpenSnapshot.roundUpPreparedTimestamps); + } + switch (_timestampReadSource) { case ReadSource::kNoTimestamp: { if (_oplogManager) { @@ -561,7 +574,8 @@ void WiredTigerRecoveryUnit::_txnOpen() { _prepareConflictBehavior, _optionsUsedToOpenSnapshot.roundUpPreparedTimestamps, RoundUpReadTimestamp::kNoRoundError, - _untimestampedWriteAssertionLevel) + _untimestampedWriteAssertionLevel, + _preparedId) .done(); break; } @@ -950,6 +964,14 @@ PrepareConflictBehavior WiredTigerRecoveryUnit::getPrepareConflictBehavior() con return _prepareConflictBehavior; } +void WiredTigerRecoveryUnit::reclaimPreparedTransactionForRecovery() { + invariant(_preparedId.has_value()); + + // Start a WiredTiger transaction that takes over and continues the prepared transaction + // identified by _preparedId. + preallocateSnapshot(); +} + void WiredTigerRecoveryUnit::setTimestampReadSource(ReadSource readSource, boost::optional provided) { tassert(5863604, "Cannot change ReadSource as it is pinned.", !isReadSourcePinned()); diff --git a/src/mongo/db/storage/wiredtiger/wiredtiger_recovery_unit.h b/src/mongo/db/storage/wiredtiger/wiredtiger_recovery_unit.h index f72a2ce6d20..80610383a6b 100644 --- a/src/mongo/db/storage/wiredtiger/wiredtiger_recovery_unit.h +++ b/src/mongo/db/storage/wiredtiger/wiredtiger_recovery_unit.h @@ -129,6 +129,8 @@ public: PrepareConflictBehavior getPrepareConflictBehavior() const override; + void reclaimPreparedTransactionForRecovery() override; + /** * Set pre-fetching capabilities for this session. This allows pre-loading of a set of pages * into the cache and is an optional optimization.