mongo/jstests/replsets/kill_prepared_transaction_c...

168 lines
6.2 KiB
JavaScript

/**
* Test killing 'commitTransaction' and 'abortTransaction' operations on prepared transactions.
*
* @tags: [uses_transactions, uses_prepare_transaction]
*/
import {PrepareHelpers} from "jstests/core/txns/libs/prepare_helpers.js";
import {Thread} from "jstests/libs/parallelTester.js";
import {ReplSetTest} from "jstests/libs/replsettest.js";
const name = "kill_prepared_transaction_commit_abort";
const rst = new ReplSetTest({
nodes: 1,
});
rst.startSet();
rst.initiate();
const TxnState = {
InProgress: 1,
Committed: 2,
Aborted: 3,
};
const dbName = "test";
const collName = name;
const primary = rst.getPrimary();
const testDB = primary.getDB(dbName);
// A latch that will act as a signal to shut down the killOp thread.
let shutdownLatch = new CountDownLatch(1);
assert.commandWorked(testDB.runCommand({create: collName, writeConcern: {w: "majority"}}));
/**
* A function that continuously kills any running 'commitTransaction' or 'abortTransaction' commands
* on the server, until it receives a shutdown signal via 'shutdownLatch'.
*/
function killOpThread(host, dbName, collName, shutdownLatch) {
const nodeDB = new Mongo(host).getDB(dbName);
jsTestLog("killOp thread starting.");
while (shutdownLatch.getCount() > 0) {
let filter = {
"$or": [
{"command.commitTransaction": 1, active: true},
{"command.abortTransaction": 1, active: true},
],
};
let ops = nodeDB.currentOp(filter).inprog;
ops.forEach((op) => {
// Let some operations survive so the test terminates.
const shouldKill = Math.random() < 0.8;
if (op.opid && shouldKill) {
nodeDB.killOp(op.opid);
}
});
}
jsTestLog("killOp thread exiting.");
}
/**
* Creates 'num' sessions and starts and prepares a transaction on each. Returns an array of
* sessions included with the commit timestamp for each prepared transaction and the current state
* of that transaction.
*/
function createPreparedTransactions(num) {
let sessions = [];
for (let i = 0; i < num; i++) {
const priSession = primary.startSession();
const priSessionDB = priSession.getDatabase(dbName);
const priSessionColl = priSessionDB.getCollection(collName);
priSession.startTransaction();
assert.commandWorked(priSessionColl.insert({_id: i}));
const prepareTimestamp = PrepareHelpers.prepareTransaction(priSession);
sessions.push({session: priSession, commitTs: prepareTimestamp, state: TxnState.InProgress});
}
return sessions;
}
/**
* Commit or abort transactions on all the given sessions until all transactions are complete. We
* choose to randomly commit or abort a given transaction with equal probability.
*/
function commitOrAbortAllTransactions(sessions) {
// Until all transactions have definitively completed, try to abort/commit the open,
// prepared transactions.
while (sessions.find((s) => s.state === TxnState.InProgress) !== undefined) {
for (let i = 0; i < sessions.length; i++) {
// We don't need to commit an already completed transaction.
if (sessions[i].state !== TxnState.InProgress) {
continue;
}
// Randomly choose to commit or abort the transaction.
let sess = sessions[i];
let terminalStates = [TxnState.Committed, TxnState.Aborted];
let terminalState = terminalStates[Math.round(Math.random())];
let cmd =
terminalState === TxnState.Committed
? {commitTransaction: 1, commitTimestamp: sess.commitTs}
: {abortTransaction: 1};
let res = sess.session.getDatabase("admin").adminCommand(cmd);
if (res.ok === 1) {
// Mark the transaction's terminal state.
sessions[i].state = terminalState;
}
if (res.ok === 0) {
// We assume that transaction commit/abort should not fail for any reason other than
// interruption in this test. If the commit/abort was interrupted, then the command
// should have failed, and the transaction state should be unaffected.
assert.commandFailedWithCode(res, ErrorCodes.Interrupted);
print("Transaction " + i + " was interrupted.");
}
}
}
}
// The number of sessions and transactions to create.
const numTxns = 100;
jsTestLog("Creating sessions and preparing " + numTxns + " transactions.");
let sessions = createPreparedTransactions(numTxns);
jsTestLog("Starting the killOp thread.");
let killThread = new Thread(killOpThread, primary.host, dbName, collName, shutdownLatch);
killThread.start();
// Sleep for a second to let the killThread begin killing.
sleep(1000);
jsTestLog("Committing/aborting all transactions.");
commitOrAbortAllTransactions(sessions);
// Make sure all transactions were completed.
assert(sessions.every((s) => s.state === TxnState.Committed || s.state === TxnState.Aborted));
jsTestLog("Stopping the killOp thread.");
shutdownLatch.countDown();
killThread.join();
jsTestLog("Checking visibility of all transaction operations.");
// If a transaction committed then its document should be visible. If it aborted then its document
// should not be visible.
let docs = testDB[collName].find().toArray();
let committedTxnIds = sessions.reduce((acc, s, i) => (s.state === TxnState.Committed ? acc.concat(i) : acc), []);
let commitCount = committedTxnIds.length;
let abortCount = sessions.length - committedTxnIds.length;
jsTestLog("Committed " + commitCount + " transactions, Aborted " + abortCount + " transactions.");
// Verify that the correct documents exist.
let expectedDocs = committedTxnIds.map((i) => ({_id: i}));
assert.sameMembers(docs, expectedDocs);
// Assert that no prepared transactions are open on any of the sessions we started, and then end
// each session.
for (let i = 0; i < sessions.length; i++) {
const ops = testDB.currentOp({
"lsid.id": sessions[i].session.getSessionId().id,
"transaction.timePreparedMicros": {$exists: true},
}).inprog;
assert.eq(ops.length, 0);
sessions[i].session.endSession();
}
rst.stopSet();