mirror of https://github.com/mongodb/mongo
229 lines
8.9 KiB
JavaScript
229 lines
8.9 KiB
JavaScript
/**
|
|
* Sends killOp on the coordinator's OperationContext's opId at each point in the commit
|
|
* coordination where there is an OperationContext and ensures the coordination still runs to
|
|
* completion for all the points.
|
|
*
|
|
* @tags: [uses_transactions, uses_multi_shard_transaction]
|
|
*/
|
|
|
|
import {configureFailPoint} from "jstests/libs/fail_point_util.js";
|
|
import {ShardingTest} from "jstests/libs/shardingtest.js";
|
|
import {getCoordinatorFailpoints, waitForFailpoint} from "jstests/sharding/libs/sharded_transactions_helpers.js";
|
|
import {
|
|
checkDecisionIs,
|
|
runCommitThroughMongosInParallelThread,
|
|
} from "jstests/sharding/libs/txn_two_phase_commit_util.js";
|
|
|
|
const dbName = "test";
|
|
const collName = "foo";
|
|
const ns = dbName + "." + collName;
|
|
const hangBeforeDeletingCoordinatorDocFpName = "hangBeforeDeletingCoordinatorDoc";
|
|
|
|
let st = new ShardingTest({shards: 3, causallyConsistent: true});
|
|
|
|
let coordinator = st.shard0;
|
|
let participant1 = st.shard1;
|
|
let participant2 = st.shard2;
|
|
|
|
let lsid = {id: UUID()};
|
|
let txnNumber = 0;
|
|
|
|
const setUp = function () {
|
|
// Create a sharded collection with a chunk on each shard:
|
|
// shard0: [-inf, 0)
|
|
// shard1: [0, 10)
|
|
// shard2: [10, +inf)
|
|
assert.commandWorked(st.s.adminCommand({enableSharding: dbName, primaryShard: coordinator.shardName}));
|
|
assert.commandWorked(st.s.adminCommand({shardCollection: ns, key: {_id: 1}}));
|
|
assert.commandWorked(st.s.adminCommand({split: ns, middle: {_id: 0}}));
|
|
assert.commandWorked(st.s.adminCommand({split: ns, middle: {_id: 10}}));
|
|
assert.commandWorked(st.s.adminCommand({moveChunk: ns, find: {_id: 0}, to: participant1.shardName}));
|
|
assert.commandWorked(st.s.adminCommand({moveChunk: ns, find: {_id: 10}, to: participant2.shardName}));
|
|
|
|
// These forced refreshes are not strictly necessary; they just prevent extra TXN log lines
|
|
// from the shards starting, aborting, and restarting the transaction due to needing to
|
|
// refresh after the transaction has started.
|
|
assert.commandWorked(coordinator.adminCommand({_flushRoutingTableCacheUpdates: ns}));
|
|
assert.commandWorked(participant1.adminCommand({_flushRoutingTableCacheUpdates: ns}));
|
|
assert.commandWorked(participant2.adminCommand({_flushRoutingTableCacheUpdates: ns}));
|
|
st.refreshCatalogCacheForNs(st.s, ns);
|
|
|
|
// Start a new transaction by inserting a document onto each shard.
|
|
assert.commandWorked(
|
|
st.s.getDB(dbName).runCommand({
|
|
insert: collName,
|
|
documents: [{_id: -5}, {_id: 5}, {_id: 15}],
|
|
lsid: lsid,
|
|
txnNumber: NumberLong(txnNumber),
|
|
stmtId: NumberInt(0),
|
|
startTransaction: true,
|
|
autocommit: false,
|
|
}),
|
|
);
|
|
};
|
|
|
|
const testCommitProtocol = function (shouldCommit, failpointData) {
|
|
jsTest.log(
|
|
"Testing two-phase " +
|
|
(shouldCommit ? "commit" : "abort") +
|
|
" protocol with failpointData: " +
|
|
tojson(failpointData),
|
|
);
|
|
|
|
txnNumber++;
|
|
setUp();
|
|
|
|
if (!shouldCommit) {
|
|
// Manually abort the transaction on one of the participants, so that the participant
|
|
// fails to prepare.
|
|
assert.commandWorked(
|
|
participant2.adminCommand({
|
|
abortTransaction: 1,
|
|
lsid: lsid,
|
|
txnNumber: NumberLong(txnNumber),
|
|
stmtId: NumberInt(0),
|
|
autocommit: false,
|
|
}),
|
|
);
|
|
}
|
|
|
|
// Turn on failpoint to make the coordinator hang at a specified point.
|
|
assert.commandWorked(
|
|
coordinator.adminCommand({
|
|
configureFailPoint: failpointData.failpoint,
|
|
mode: "alwaysOn",
|
|
data: failpointData.data ? failpointData.data : {},
|
|
}),
|
|
);
|
|
|
|
// Run commitTransaction through a parallel shell.
|
|
let commitThread;
|
|
if (shouldCommit) {
|
|
commitThread = runCommitThroughMongosInParallelThread(lsid, txnNumber, st.s.host);
|
|
} else {
|
|
commitThread = runCommitThroughMongosInParallelThread(lsid, txnNumber, st.s.host, ErrorCodes.NoSuchTransaction);
|
|
}
|
|
commitThread.start();
|
|
|
|
// Deliver killOp once the failpoint has been hit.
|
|
|
|
waitForFailpoint("Hit " + failpointData.failpoint + " failpoint", failpointData.numTimesShouldBeHit);
|
|
|
|
jsTest.log("Going to find coordinator opCtx ids");
|
|
let coordinatorOpsToKill = [];
|
|
assert.soon(
|
|
() => {
|
|
coordinatorOpsToKill = coordinator
|
|
.getDB("admin")
|
|
.aggregate([
|
|
{$currentOp: {"allUsers": true, "idleConnections": true}},
|
|
{
|
|
$match: {
|
|
$and: [
|
|
{desc: "TransactionCoordinator"},
|
|
// Filter out the prepareTransaction op on the
|
|
// coordinator itself since killing it would
|
|
// cause the transaction to abort.
|
|
{"command.prepareTransaction": {$exists: false}},
|
|
],
|
|
},
|
|
},
|
|
])
|
|
.toArray();
|
|
|
|
for (let x = 0; x < coordinatorOpsToKill.length; x++) {
|
|
if (!coordinatorOpsToKill[x].opid) {
|
|
print("Retrying currentOp because op doesn't have opId: " + tojson(coordinatorOpsToKill[x]));
|
|
return false;
|
|
}
|
|
}
|
|
|
|
return true;
|
|
},
|
|
"timed out trying to fetch coordinator ops",
|
|
undefined /* timeout */,
|
|
1000 /* interval */,
|
|
);
|
|
|
|
// Use "greater than or equal to" since, for failpoints that pause the coordinator while
|
|
// it's sending prepare or sending the decision, there might be one additional thread that's
|
|
// doing the "send" to the local participant (or that thread might have already completed).
|
|
assert.gte(coordinatorOpsToKill.length, failpointData.numTimesShouldBeHit);
|
|
|
|
coordinatorOpsToKill.forEach(function (coordinatorOp) {
|
|
coordinator.getDB("admin").killOp(coordinatorOp.opid);
|
|
});
|
|
|
|
// If we're hanging after the decision is written and before it is removed, then we can read the
|
|
// decision.
|
|
let commitTimestamp;
|
|
const canReadDecision =
|
|
(failpointData.data || {}).twoPhaseCommitStage === "decision" ||
|
|
failpointData.failpoint === hangBeforeDeletingCoordinatorDocFpName;
|
|
let hangBeforeDeletingCoordinatorDocFp;
|
|
if (canReadDecision && shouldCommit) {
|
|
commitTimestamp = checkDecisionIs(coordinator, lsid, txnNumber, "commit");
|
|
} else if (shouldCommit) {
|
|
// We're hanging before the decision is written, so we need to read the decision later
|
|
// but before it's deleted.
|
|
const coordinatorPrimary = coordinator.rs.getPrimary();
|
|
hangBeforeDeletingCoordinatorDocFp = configureFailPoint(
|
|
coordinatorPrimary,
|
|
hangBeforeDeletingCoordinatorDocFpName,
|
|
{},
|
|
"alwaysOn",
|
|
);
|
|
}
|
|
|
|
assert.commandWorked(
|
|
coordinator.adminCommand({
|
|
configureFailPoint: failpointData.failpoint,
|
|
mode: "off",
|
|
}),
|
|
);
|
|
|
|
if (!canReadDecision && shouldCommit) {
|
|
// Wait to delete the coordinator doc, then read the commitTimestamp.
|
|
hangBeforeDeletingCoordinatorDocFp.wait();
|
|
commitTimestamp = checkDecisionIs(coordinator, lsid, txnNumber, "commit");
|
|
hangBeforeDeletingCoordinatorDocFp.off();
|
|
}
|
|
|
|
// If the commit coordination was not robust to killOp, then commitTransaction would fail
|
|
// with an Interrupted error rather than fail with NoSuchTransaction or return success.
|
|
jsTest.log("Wait for the commit coordination to complete.");
|
|
commitThread.join();
|
|
|
|
// If deleting the coordinator doc was not robust to killOp, the document would still exist.
|
|
// Deletion is done asynchronously, so we might have to wait.
|
|
assert.soon(() => coordinator.getDB("config").getCollection("transaction_coordinators").count() == 0);
|
|
|
|
// Check that the transaction committed or aborted as expected.
|
|
if (!shouldCommit) {
|
|
jsTest.log("Verify that the transaction was aborted on all shards.");
|
|
assert.eq(0, st.s.getDB(dbName).getCollection(collName).find().itcount());
|
|
} else {
|
|
jsTest.log("Verify that the transaction was committed on all shards.");
|
|
const res = assert.commandWorked(
|
|
st.s.getDB(dbName).runCommand({
|
|
find: collName,
|
|
readConcern: {level: "majority", afterClusterTime: commitTimestamp},
|
|
maxTimeMS: 10000,
|
|
}),
|
|
);
|
|
assert.eq(3, res.cursor.firstBatch.length);
|
|
}
|
|
|
|
st.s.getDB(dbName).getCollection(collName).drop();
|
|
};
|
|
|
|
const failpointDataArr = getCoordinatorFailpoints();
|
|
|
|
// Test commit path.
|
|
failpointDataArr.forEach(function (failpointData) {
|
|
testCommitProtocol(true /* shouldCommit */, failpointData);
|
|
clearRawMongoProgramOutput();
|
|
});
|
|
|
|
st.stop();
|