mirror of https://github.com/mongodb/mongo
168 lines
6.2 KiB
JavaScript
168 lines
6.2 KiB
JavaScript
/**
|
|
* Test killing 'commitTransaction' and 'abortTransaction' operations on prepared transactions.
|
|
*
|
|
* @tags: [uses_transactions, uses_prepare_transaction]
|
|
*/
|
|
|
|
import {PrepareHelpers} from "jstests/core/txns/libs/prepare_helpers.js";
|
|
import {Thread} from "jstests/libs/parallelTester.js";
|
|
import {ReplSetTest} from "jstests/libs/replsettest.js";
|
|
|
|
const name = "kill_prepared_transaction_commit_abort";
|
|
const rst = new ReplSetTest({
|
|
nodes: 1,
|
|
});
|
|
rst.startSet();
|
|
rst.initiate();
|
|
|
|
const TxnState = {
|
|
InProgress: 1,
|
|
Committed: 2,
|
|
Aborted: 3,
|
|
};
|
|
|
|
const dbName = "test";
|
|
const collName = name;
|
|
|
|
const primary = rst.getPrimary();
|
|
const testDB = primary.getDB(dbName);
|
|
|
|
// A latch that will act as a signal to shut down the killOp thread.
|
|
let shutdownLatch = new CountDownLatch(1);
|
|
assert.commandWorked(testDB.runCommand({create: collName, writeConcern: {w: "majority"}}));
|
|
|
|
/**
|
|
* A function that continuously kills any running 'commitTransaction' or 'abortTransaction' commands
|
|
* on the server, until it receives a shutdown signal via 'shutdownLatch'.
|
|
*/
|
|
function killOpThread(host, dbName, collName, shutdownLatch) {
|
|
const nodeDB = new Mongo(host).getDB(dbName);
|
|
jsTestLog("killOp thread starting.");
|
|
while (shutdownLatch.getCount() > 0) {
|
|
let filter = {
|
|
"$or": [
|
|
{"command.commitTransaction": 1, active: true},
|
|
{"command.abortTransaction": 1, active: true},
|
|
],
|
|
};
|
|
let ops = nodeDB.currentOp(filter).inprog;
|
|
ops.forEach((op) => {
|
|
// Let some operations survive so the test terminates.
|
|
const shouldKill = Math.random() < 0.8;
|
|
if (op.opid && shouldKill) {
|
|
nodeDB.killOp(op.opid);
|
|
}
|
|
});
|
|
}
|
|
jsTestLog("killOp thread exiting.");
|
|
}
|
|
|
|
/**
|
|
* Creates 'num' sessions and starts and prepares a transaction on each. Returns an array of
|
|
* sessions included with the commit timestamp for each prepared transaction and the current state
|
|
* of that transaction.
|
|
*/
|
|
function createPreparedTransactions(num) {
|
|
let sessions = [];
|
|
for (let i = 0; i < num; i++) {
|
|
const priSession = primary.startSession();
|
|
const priSessionDB = priSession.getDatabase(dbName);
|
|
const priSessionColl = priSessionDB.getCollection(collName);
|
|
|
|
priSession.startTransaction();
|
|
assert.commandWorked(priSessionColl.insert({_id: i}));
|
|
const prepareTimestamp = PrepareHelpers.prepareTransaction(priSession);
|
|
sessions.push({session: priSession, commitTs: prepareTimestamp, state: TxnState.InProgress});
|
|
}
|
|
return sessions;
|
|
}
|
|
|
|
/**
|
|
* Commit or abort transactions on all the given sessions until all transactions are complete. We
|
|
* choose to randomly commit or abort a given transaction with equal probability.
|
|
*/
|
|
function commitOrAbortAllTransactions(sessions) {
|
|
// Until all transactions have definitively completed, try to abort/commit the open,
|
|
// prepared transactions.
|
|
while (sessions.find((s) => s.state === TxnState.InProgress) !== undefined) {
|
|
for (let i = 0; i < sessions.length; i++) {
|
|
// We don't need to commit an already completed transaction.
|
|
if (sessions[i].state !== TxnState.InProgress) {
|
|
continue;
|
|
}
|
|
|
|
// Randomly choose to commit or abort the transaction.
|
|
let sess = sessions[i];
|
|
let terminalStates = [TxnState.Committed, TxnState.Aborted];
|
|
let terminalState = terminalStates[Math.round(Math.random())];
|
|
let cmd =
|
|
terminalState === TxnState.Committed
|
|
? {commitTransaction: 1, commitTimestamp: sess.commitTs}
|
|
: {abortTransaction: 1};
|
|
let res = sess.session.getDatabase("admin").adminCommand(cmd);
|
|
|
|
if (res.ok === 1) {
|
|
// Mark the transaction's terminal state.
|
|
sessions[i].state = terminalState;
|
|
}
|
|
if (res.ok === 0) {
|
|
// We assume that transaction commit/abort should not fail for any reason other than
|
|
// interruption in this test. If the commit/abort was interrupted, then the command
|
|
// should have failed, and the transaction state should be unaffected.
|
|
assert.commandFailedWithCode(res, ErrorCodes.Interrupted);
|
|
print("Transaction " + i + " was interrupted.");
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
// The number of sessions and transactions to create.
|
|
const numTxns = 100;
|
|
|
|
jsTestLog("Creating sessions and preparing " + numTxns + " transactions.");
|
|
let sessions = createPreparedTransactions(numTxns);
|
|
|
|
jsTestLog("Starting the killOp thread.");
|
|
let killThread = new Thread(killOpThread, primary.host, dbName, collName, shutdownLatch);
|
|
killThread.start();
|
|
|
|
// Sleep for a second to let the killThread begin killing.
|
|
sleep(1000);
|
|
|
|
jsTestLog("Committing/aborting all transactions.");
|
|
commitOrAbortAllTransactions(sessions);
|
|
|
|
// Make sure all transactions were completed.
|
|
assert(sessions.every((s) => s.state === TxnState.Committed || s.state === TxnState.Aborted));
|
|
|
|
jsTestLog("Stopping the killOp thread.");
|
|
shutdownLatch.countDown();
|
|
killThread.join();
|
|
|
|
jsTestLog("Checking visibility of all transaction operations.");
|
|
|
|
// If a transaction committed then its document should be visible. If it aborted then its document
|
|
// should not be visible.
|
|
let docs = testDB[collName].find().toArray();
|
|
let committedTxnIds = sessions.reduce((acc, s, i) => (s.state === TxnState.Committed ? acc.concat(i) : acc), []);
|
|
let commitCount = committedTxnIds.length;
|
|
let abortCount = sessions.length - committedTxnIds.length;
|
|
jsTestLog("Committed " + commitCount + " transactions, Aborted " + abortCount + " transactions.");
|
|
|
|
// Verify that the correct documents exist.
|
|
let expectedDocs = committedTxnIds.map((i) => ({_id: i}));
|
|
assert.sameMembers(docs, expectedDocs);
|
|
|
|
// Assert that no prepared transactions are open on any of the sessions we started, and then end
|
|
// each session.
|
|
for (let i = 0; i < sessions.length; i++) {
|
|
const ops = testDB.currentOp({
|
|
"lsid.id": sessions[i].session.getSessionId().id,
|
|
"transaction.timePreparedMicros": {$exists: true},
|
|
}).inprog;
|
|
assert.eq(ops.length, 0);
|
|
sessions[i].session.endSession();
|
|
}
|
|
|
|
rst.stopSet();
|