SERVER-114477 Add support for reclaiming a prepared transaction (#44741)

GitOrigin-RevId: cddce3dde634914a5263383adf5f22dc6096fed3
This commit is contained in:
Wenqin 2025-12-10 10:54:14 -05:00 committed by MongoDB Bot
parent 9423860eb3
commit c08ed5087c
10 changed files with 155 additions and 25 deletions

View File

@ -253,6 +253,16 @@ public:
return PrepareConflictBehavior::kEnforce;
}
/**
* Starts a storage engine transaction that takes over and continues a prepared transaction
* identified by setPreparedId(). Must be called after setPrepareTimestamp() and
* setPreparedId().
*/
virtual void reclaimPreparedTransactionForRecovery() {
uasserted(ErrorCodes::CommandNotSupported,
"This storage engine does not support reclaiming prepared transactions");
}
/**
* If there is an open transaction, it is closed. If the current AbandonSnapshotMode is
* 'kAbort', the transaction is aborted. If the mode is 'kCommit' the transaction is committed,

View File

@ -103,7 +103,8 @@ WiredTigerBeginTxnBlock::WiredTigerBeginTxnBlock(
PrepareConflictBehavior prepareConflictBehavior,
bool roundUpPreparedTimestamps,
RoundUpReadTimestamp roundUpReadTimestamp,
RecoveryUnit::UntimestampedWriteAssertionLevel allowUntimestampedWrite)
RecoveryUnit::UntimestampedWriteAssertionLevel allowUntimestampedWrite,
boost::optional<uint64_t> claimPreparedId)
: _session(session) {
invariant(!_rollback);
@ -122,7 +123,23 @@ WiredTigerBeginTxnBlock::WiredTigerBeginTxnBlock(
if (config > 0) {
compiled_config = compiledBeginTransactions[config - 1].getConfig(_session);
}
if (claimPreparedId) {
// Slow path used on startup recovery to take ownership of a prepared transaction as part of
// the transaction.
std::stringstream rawConfig;
if (config > 0) {
// We need to get the raw config because the compiled config is not a human readable
// string that allows us to concatenate the claim_prepared_id.
rawConfig << compiledBeginTransactions[config - 1].getRawConfig() << ",";
}
rawConfig << fmt::format("claim_prepared_id={}", unsignedHex(*claimPreparedId));
std::string rawConfigStr = rawConfig.str();
invariantWTOK(_session->begin_transaction(rawConfigStr.c_str()), *_session);
} else {
invariantWTOK(_session->begin_transaction(compiled_config), *_session);
}
_rollback = true;
}

View File

@ -66,7 +66,8 @@ public:
PrepareConflictBehavior prepareConflictBehavior,
bool roundUpPreparedTimestamps,
RoundUpReadTimestamp roundUpReadTimestamp,
RecoveryUnit::UntimestampedWriteAssertionLevel allowUntimestampedWrite);
RecoveryUnit::UntimestampedWriteAssertionLevel allowUntimestampedWrite,
boost::optional<uint64_t> claimPreparedId = boost::none);
WiredTigerBeginTxnBlock(WiredTigerSession* session, const char* config);
~WiredTigerBeginTxnBlock();

View File

@ -45,6 +45,10 @@ const char* CompiledConfiguration::getConfig(WiredTigerSession* session) const {
return (compiled->get(_compileToken));
}
const std::string& CompiledConfiguration::getRawConfig() const {
return _config;
}
// -----------------------

View File

@ -48,6 +48,8 @@ public:
const char* getConfig(WiredTigerSession* session) const;
const std::string& getRawConfig() const;
private:
std::string _apiName;
std::string _config;

View File

@ -124,22 +124,11 @@ WiredTigerBulkLoadCursor::WiredTigerBulkLoadCursor(OperationContext* opCtx,
invariantWTOK(_session->open_cursor(indexUri.c_str(), nullptr, nullptr, &_cursor), *_session);
}
WiredTigerPrepareCursor::WiredTigerPrepareCursor(WiredTigerSession& session) {
int ret = session.open_cursor("prepared_discover:", nullptr, nullptr, &_cursor);
if (ret == 0) {
return; // Success
}
auto status = wtRCToStatus(ret, session);
uassertStatusOK(status);
WiredTigerPrepareCursor::WiredTigerPrepareCursor(WiredTigerSession& session) : _session(session) {
_cursor = session.getNewCursor("prepared_discover:", nullptr);
}
WiredTigerPrepareCursor::~WiredTigerPrepareCursor() {
// TODO: SERVER-114477 uassertStatusOK on the result of _cursor->close() once the code to
// reclaim unclaimed prepared transactions is added. Without the ability to reclaim an unclaimed
// prepared transaction, _cursor->close() fails with an error about unclaimed prepared
// transactions, which prevents tests using this cursor from running successfully.
_cursor->close(_cursor);
_session.closeCursor(_cursor);
}
} // namespace mongo

View File

@ -144,5 +144,6 @@ public:
private:
WT_CURSOR* _cursor = nullptr; // Owned
WiredTigerSession& _session;
};
} // namespace mongo

View File

@ -127,9 +127,12 @@ private:
WiredTigerKVEngineBase::WiredTigerConfig wtConfig =
getWiredTigerConfigFromStartupOptions(provider);
wtConfig.cacheSizeMB = 1;
wtConfig.extraOpenOptions = "log=(file_max=1m,prealloc=false)";
if (_preciseCheckpoints) {
wtConfig.extraOpenOptions += ",precise_checkpoint=true,preserve_prepared=true,";
// Precise checkpoints don't work with journaling in tests.
wtConfig.extraOpenOptions = "precise_checkpoint=true,preserve_prepared=true,";
wtConfig.logEnabled = false;
} else {
wtConfig.extraOpenOptions = "log=(file_max=1m,prealloc=false)";
}
// Faithfully simulate being in replica set mode for timestamping tests which requires
// parity for journaling settings.
@ -1131,13 +1134,23 @@ TEST_F(WiredTigerKVEngineTestWithPreciseCheckpoints,
engine = _helper.getWiredTigerKVEngine();
auto opCtxPtr2 = _makeOperationContext();
// Verify that we see the prepared transaction on startup recovery.
// Verify that we see the prepared transaction on startup recovery and reclaim it.
int count = 0;
auto iterator = engine->getUnclaimedPreparedTransactionsForStartupRecovery(opCtxPtr2.get());
auto& ru2 = *checked_cast<WiredTigerRecoveryUnit*>(
shard_role_details::getRecoveryUnit(opCtxPtr2.get()));
while (auto recoveredPreparedId = iterator->next()) {
ASSERT_EQ(*recoveredPreparedId, preparedId);
ru2.beginUnitOfWork(false);
ru2.setPrepareTimestamp(prepareTimestamp);
ru2.setPreparedId(*recoveredPreparedId);
ru2.reclaimPreparedTransactionForRecovery();
ru2.setDurableTimestamp(Timestamp(3, 0));
ru2.setCommitTimestamp(prepareTimestamp);
ru2.commitUnitOfWork();
count++;
// TODO: SERVER-114477 Add code to reclaim the prepared transaction and commit it.
}
ASSERT_EQ(count, 1);
}
@ -1198,7 +1211,23 @@ TEST_F(WiredTigerKVEngineTestWithPreciseCheckpoints,
ASSERT_TRUE(secondId == preparedId1 || secondId == preparedId2);
ASSERT_NE(firstId, secondId);
ASSERT_TRUE(!iterator->next());
// TODO: SERVER-114477 Add code to reclaim the prepared transactions and commit them.
// Reclaim the prepared transactions and abort/commit them.
auto& ru3 =
*checked_cast<WiredTigerRecoveryUnit*>(shard_role_details::getRecoveryUnit(opCtxPtr.get()));
ru3.beginUnitOfWork(false);
ru3.setPrepareTimestamp(prepareTimestamp1);
ru3.setPreparedId(firstId);
ru3.reclaimPreparedTransactionForRecovery();
ru3.abortUnitOfWork();
ru3.beginUnitOfWork(false);
ru3.setPrepareTimestamp(prepareTimestamp2);
ru3.setPreparedId(secondId);
ru3.reclaimPreparedTransactionForRecovery();
ru3.setDurableTimestamp(Timestamp(8, 0));
ru3.setCommitTimestamp(prepareTimestamp2);
ru3.commitUnitOfWork();
}
TEST_F(WiredTigerKVEngineTestWithPreciseCheckpoints,
@ -1267,17 +1296,70 @@ TEST_F(WiredTigerKVEngineTestWithPreciseCheckpoints,
_helper.restartEngine();
engine = _helper.getWiredTigerKVEngine();
auto opCtxPtr2 = _makeOperationContext();
auto& ru2 = *checked_cast<WiredTigerRecoveryUnit*>(
shard_role_details::getRecoveryUnit(opCtxPtr2.get()));
// Verify that we see the prepared transaction on startup recovery.
// Verify that we see the prepared transaction on startup recovery and reclaim it.
int count = 0;
auto iterator = engine->getUnclaimedPreparedTransactionsForStartupRecovery(opCtxPtr2.get());
while (auto recoveredPreparedId = iterator->next()) {
ASSERT_EQ(*recoveredPreparedId, preparedId);
ru2.beginUnitOfWork(false);
// Not strictly necessary to begin a transaction, but used to verify that starting a
// transaction can handle extra configuration options when claim_prepared_id is set.
ru2.setPrepareConflictBehavior(PrepareConflictBehavior::kIgnoreConflicts);
ru2.setPrepareTimestamp(prepareTimestamp);
ru2.setPreparedId(*recoveredPreparedId);
ru2.reclaimPreparedTransactionForRecovery();
ru2.abortUnitOfWork();
count++;
// TODO: SERVER-114477 Add code to reclaim the prepared transaction and commit it.
}
ASSERT_EQ(count, 1);
}
using WiredTigerKVEngineTestWithPreciseCheckpointsDeathTest =
WiredTigerKVEngineTestWithPreciseCheckpoints;
DEATH_TEST_F(WiredTigerKVEngineTestWithPreciseCheckpointsDeathTest,
UnresolvedPreparedTransactionsMustBeClaimed,
"Found 1 unclaimed prepared transactions") {
// Create an unresolved prepared transaction.
auto opCtxPtr = _makeOperationContext();
auto& ru = *shard_role_details::getRecoveryUnit(opCtxPtr.get());
const auto prepareTimestamp = Timestamp(2, 0);
const auto preparedId = prepareTimestamp.asULL();
createPreparedTransaction(opCtxPtr.get(), ru, prepareTimestamp, preparedId);
ASSERT_EQ(ru.getPrepareTimestamp(), prepareTimestamp);
ASSERT_EQ(ru.getPreparedId().value(), preparedId);
// Create a checkpoint that includes the prepared transaction.
auto* engine = _helper.getWiredTigerKVEngine();
engine->setInitialDataTimestamp(Timestamp(1, 0));
engine->setStableTimestamp(prepareTimestamp, /*force=*/false);
engine->checkpoint();
// This is necessary to satisfy the destructor of the recovery unit which expects to not be
// in a unit of work when the storage engine is restarted. This does not affect the results of
// the prepared transaction iterator since the transaction rollback is not in the
// checkpoint.
ru.abortUnitOfWork();
// Release the opCtx to prevent memory issues when the storage engine is restarted.
opCtxPtr.reset();
// Simulate startup recovery by restarting the storage engine.
_helper.restartEngine();
engine = _helper.getWiredTigerKVEngine();
auto opCtxPtr2 = _makeOperationContext();
// Verify that we see the prepared transaction on startup recovery.
auto iterator = engine->getUnclaimedPreparedTransactionsForStartupRecovery(opCtxPtr2.get());
auto recoveredPreparedId = iterator->next();
ASSERT_EQ(*recoveredPreparedId, preparedId);
ASSERT_TRUE(!iterator->next());
// Purposely don't reclaim the transaction to verify that destroying the iterator without
// claiming the prepared transaction results in a crash.
}
} // namespace
} // namespace mongo

View File

@ -552,6 +552,19 @@ void WiredTigerRecoveryUnit::_txnOpen() {
_timer.reset(new Timer());
}
// A non-empty _preparedId at transaction start means the caller intends to reclaim the
// corresponding prepared transaction during startup recovery. In this case, a read source
// must not be set, since it is not applicable.
if (_preparedId) {
tassert(11447700,
"Read source should not be specified when recovering a prepared transaction.",
_timestampReadSource == ReadSource::kNoTimestamp);
tassert(
11447701,
"Rounding up prepared timestamps is incompatible with claiming a prepared transaction",
!_optionsUsedToOpenSnapshot.roundUpPreparedTimestamps);
}
switch (_timestampReadSource) {
case ReadSource::kNoTimestamp: {
if (_oplogManager) {
@ -561,7 +574,8 @@ void WiredTigerRecoveryUnit::_txnOpen() {
_prepareConflictBehavior,
_optionsUsedToOpenSnapshot.roundUpPreparedTimestamps,
RoundUpReadTimestamp::kNoRoundError,
_untimestampedWriteAssertionLevel)
_untimestampedWriteAssertionLevel,
_preparedId)
.done();
break;
}
@ -950,6 +964,14 @@ PrepareConflictBehavior WiredTigerRecoveryUnit::getPrepareConflictBehavior() con
return _prepareConflictBehavior;
}
void WiredTigerRecoveryUnit::reclaimPreparedTransactionForRecovery() {
invariant(_preparedId.has_value());
// Start a WiredTiger transaction that takes over and continues the prepared transaction
// identified by _preparedId.
preallocateSnapshot();
}
void WiredTigerRecoveryUnit::setTimestampReadSource(ReadSource readSource,
boost::optional<Timestamp> provided) {
tassert(5863604, "Cannot change ReadSource as it is pinned.", !isReadSourcePinned());

View File

@ -129,6 +129,8 @@ public:
PrepareConflictBehavior getPrepareConflictBehavior() const override;
void reclaimPreparedTransactionForRecovery() override;
/**
* Set pre-fetching capabilities for this session. This allows pre-loading of a set of pages
* into the cache and is an optional optimization.