mongo/jstests/sharding/txn_participant_adds_additi...

571 lines
22 KiB
JavaScript

/**
* Tests that additional participants can be added to an existing transaction.
* @tags: [
* requires_fcv_80,
* ]
*/
import {configureFailPoint} from "jstests/libs/fail_point_util.js";
import {Thread} from "jstests/libs/parallelTester.js";
import {ShardingTest} from "jstests/libs/shardingtest.js";
import {CreateShardedCollectionUtil} from "jstests/sharding/libs/create_sharded_collection_util.js";
const verifyMidpointTransactionMetrics = function (initialTxnMetrics, expectedParticipants) {
const expectedMongosTargetedShards = initialTxnMetrics.totalContactedParticipants + expectedParticipants;
let midpointTxnMetrics = assert.commandWorked(st.s.adminCommand({serverStatus: 1})).transactions;
// More transactions can be running in the background. Check we observe the minimum.
assert.gte(midpointTxnMetrics.totalContactedParticipants, expectedMongosTargetedShards);
};
const verifyFinalTransactionMetrics = function (initialTxnMetrics, expectedParticipants, numWriteShards) {
const finalTxnMetrics = assert.commandWorked(st.s.adminCommand({serverStatus: 1})).transactions;
// Check we executed the expected commit type. Currently this test only runs transactions where
// either a single shard, readOnly, or single write shard commit type should be executed
if (expectedParticipants.length == 1) {
assert.gte(
finalTxnMetrics.commitTypes.singleShard.successful,
initialTxnMetrics.commitTypes.singleShard.successful + 1,
);
} else if (numWriteShards == 0) {
assert.gte(
finalTxnMetrics.commitTypes.readOnly.successful,
initialTxnMetrics.commitTypes.readOnly.successful + 1,
);
} else if (numWriteShards == 1) {
assert.gte(
finalTxnMetrics.commitTypes.singleWriteShard.successful,
initialTxnMetrics.commitTypes.singleWriteShard.successful + 1,
);
}
// Check the number of participants at commit time was incremented by at least the number of
// participants we expect in the transaction.
assert.gte(
finalTxnMetrics.totalParticipantsAtCommit,
initialTxnMetrics.totalParticipantsAtCommit + expectedParticipants.length,
);
};
const setUpTestCase = function (
addedParticipantIsExistingParticipantBeforeAgg,
fooDocsToInsert,
barDocsToInsert,
txnNum,
sessionId,
) {
// Insert foo docs outside of the transaction
assert.commandWorked(st.s.getDB(dbName).foo.insert(fooDocsToInsert));
// If the participant that the $lookup adds is supposed to be an existing participant in the
// transaction already, insert the bar docs as part of the transaction. Otherwise, insert
// them outside of the transaction.
if (addedParticipantIsExistingParticipantBeforeAgg) {
// Check that we actually have docs to insert into "bar", which indicates we will run
// the $lookup using "bar" as the foreign collection. If we're running the $lookup
// using "foo" as the local and foreign collection, it doesn't mean much if we add the shard
// that owns "foo" a participant before we run the agg at all, so we assume this is a test
// writer's error.
assert.gt(barDocsToInsert.length, 0);
assert.commandWorked(
st.s.getDB(dbName).runCommand({
insert: foreignColl,
documents: barDocsToInsert,
lsid: sessionId,
txnNumber: NumberLong(txnNum),
stmtId: NumberInt(0),
autocommit: false,
startTransaction: true,
}),
);
} else {
assert.commandWorked(st.s.getDB(dbName).bar.insert(barDocsToInsert));
}
const initialTxnMetrics = assert.commandWorked(st.s.adminCommand({serverStatus: 1})).transactions;
return initialTxnMetrics;
};
const setHangFps = function (shardsToSetFp) {
// Set a failpoint on each of the shards we expect the participant shard to target that will
// force them to hang while executing the agg request.
const hangFps = [];
shardsToSetFp.forEach((shard) => {
hangFps.push(configureFailPoint(shard, "hangAfterAcquiringCollectionCatalog", {collection: foreignColl}));
});
return hangFps;
};
const runAgg = function (addedParticipantIsExistingParticipantBeforeAgg, lookupPipeline, txnNum, sessionId) {
// Run the $lookup in another thread.
const runAggRequest = (mongosConn, dbName, collName, pipeline, sessionId, txnNum, startTransaction) => {
let mongos = new Mongo(mongosConn);
const lsid = eval("(" + sessionId + ")");
let aggCmd = {
aggregate: collName,
pipeline: pipeline,
cursor: {},
lsid: lsid,
txnNumber: NumberLong(txnNum),
stmtId: NumberInt(0),
autocommit: false,
};
if (startTransaction) {
aggCmd = Object.merge(aggCmd, {startTransaction: true});
}
return mongos.getDB(dbName).runCommand(aggCmd);
};
let aggRequestThread = new Thread(
runAggRequest,
st.s.host,
dbName,
localColl,
lookupPipeline,
tojson(sessionId),
txnNum,
!addedParticipantIsExistingParticipantBeforeAgg,
);
aggRequestThread.start();
return aggRequestThread;
};
const testAddingParticipants = function (
expectedParticipants,
addedParticipantIsExistingParticipantBeforeAgg,
fooDocsToInsert,
barDocsToInsert,
lookupPipeline,
shardsWithForeignColl,
) {
const session = st.s.startSession();
const txnNum = 1;
const sessionId = session.getSessionId();
const initialTxnMetrics = setUpTestCase(
addedParticipantIsExistingParticipantBeforeAgg,
fooDocsToInsert,
barDocsToInsert,
txnNum,
sessionId,
);
const hangFps = setHangFps(shardsWithForeignColl);
const aggRequestThread = runAgg(addedParticipantIsExistingParticipantBeforeAgg, lookupPipeline, txnNum, sessionId);
// In order to assert that mongos did not target the shards with the foreign collection itself,
// wait to hit the failpoint on each shard with the foreign collection, then check that mongos
// has only bumped its 'totalContactedParticipants' by 1 to account for the shard that owns
// the "local" collection.
hangFps.forEach((fp) => {
fp.wait();
verifyMidpointTransactionMetrics(initialTxnMetrics, 1);
fp.off();
});
// Check that the agg returns the expected results. We've set up the docs and $lookup that we
// expect there to be a 1:1 mapping across the docs in the foreign coll and local coll in the
// $lookup
aggRequestThread.join();
let aggRes = aggRequestThread.returnData();
assert.eq(aggRes.cursor.firstBatch.length, barDocsToInsert.length);
[...Array(barDocsToInsert.length).keys()].forEach((i) => {
let next = aggRes.cursor.firstBatch[i];
assert.eq(next.result.length, 1);
assert.eq(next.result[0], barDocsToInsert[i]);
});
assert.commandWorked(
st.s.getDB("admin").adminCommand({
commitTransaction: 1,
lsid: session.getSessionId(),
txnNumber: NumberLong(txnNum),
stmtId: NumberInt(0),
autocommit: false,
}),
);
let numWriteShards = addedParticipantIsExistingParticipantBeforeAgg ? 1 : 0;
verifyFinalTransactionMetrics(initialTxnMetrics, expectedParticipants, numWriteShards);
// Drop any docs inserted to prep for the next test case
assert.commandWorked(st.s.getDB(dbName).foo.remove({}));
assert.commandWorked(st.s.getDB(dbName).bar.remove({}));
};
const refreshRoutingTable = (shards, collName) => {
const lookup = [{$lookup: {from: collName, pipeline: [], as: "out"}}];
const shardMap = {
[shard0.shardName]: localNsShard0,
[shard1.shardName]: localNsShard1,
[shard2.shardName]: localNsShard2,
};
shards.forEach((shard) => {
const ns = shardMap[shard.shardName];
if (ns) {
st.s.getCollection(ns).aggregate(lookup);
}
});
};
let st = new ShardingTest({shards: 3});
const dbName = "test";
const localColl = "foo";
const foreignColl = "bar";
const localNs = dbName + "." + localColl;
const localNsShard0 = dbName + ".foo_shard0";
const localNsShard1 = dbName + ".foo_shard1";
const localNsShard2 = dbName + ".foo_shard2";
const foreignNs = dbName + "." + foreignColl;
let shard0 = st.shard0;
let shard1 = st.shard1;
let shard2 = st.shard2;
assert.commandWorked(st.s.adminCommand({enableSharding: dbName, primaryShard: shard0.shardName}));
// Create a sharded collection, "foo", with the following chunk:
// shard0: _id: [-inf, +inf)
assert.commandWorked(st.s.adminCommand({shardCollection: localNs, key: {_id: 1}}));
// Create a sharded collection, "bar", with the following chunks:
// shard1: x: [-inf, 0)
// shard2: x: [0, +inf)
assert.commandWorked(st.s.adminCommand({shardCollection: foreignNs, key: {x: 1}}));
assert.commandWorked(st.s.adminCommand({split: foreignNs, middle: {x: 0}}));
assert.commandWorked(st.s.adminCommand({moveChunk: foreignNs, find: {x: -10}, to: shard1.shardName}));
assert.commandWorked(st.s.adminCommand({moveChunk: foreignNs, find: {x: 0}, to: shard2.shardName}));
// Create one sharded collection with once chunk placed in each shard.
CreateShardedCollectionUtil.shardCollectionWithChunks(st.s.getCollection(localNsShard0), {_id: 1}, [
{min: {_id: MinKey}, max: {_id: MaxKey}, shard: st.shard0.shardName},
]);
CreateShardedCollectionUtil.shardCollectionWithChunks(st.s.getCollection(localNsShard1), {_id: 1}, [
{min: {_id: MinKey}, max: {_id: MaxKey}, shard: st.shard1.shardName},
]);
CreateShardedCollectionUtil.shardCollectionWithChunks(st.s.getCollection(localNsShard2), {_id: 1}, [
{min: {_id: MinKey}, max: {_id: MaxKey}, shard: st.shard2.shardName},
]);
st.s.getCollection(localNsShard0).insert([{_id: 1}]);
st.s.getCollection(localNsShard1).insert([{_id: 1}]);
st.s.getCollection(localNsShard2).insert([{_id: 1}]);
// These forced refreshes are not strictly necessary; they just prevent extra TXN log lines
// from the shards starting, aborting, and restarting the transaction due to needing to
// refresh after the transaction has started.
refreshRoutingTable([shard0, shard1, shard2], localColl);
st.refreshCatalogCacheForNs(st.s, localNs);
refreshRoutingTable([shard0, shard1, shard2], foreignColl);
st.refreshCatalogCacheForNs(st.s, foreignNs);
print(
"Testing that an existing participant can add one additional participant which was not " +
"already an active participant",
);
let fooDocsToInsert = [{_id: -5}]; // will live on shard0
let barDocsToInsert = [{_id: 1, x: -5}]; // will live on shard1
let expectedParticipants = [0, 1];
let addedParticipantIsExistingParticipantBeforeAgg = false;
let lookupPipeline = [
{$lookup: {from: foreignColl, localField: "_id", foreignField: "x", as: "result"}},
{$sort: {"_id": 1}},
];
let shardsWithForeignColl = [st.shard1];
testAddingParticipants(
expectedParticipants,
addedParticipantIsExistingParticipantBeforeAgg,
fooDocsToInsert,
barDocsToInsert,
lookupPipeline,
shardsWithForeignColl,
);
print(
"Testing that an existing participant can add one additional participant which was " +
"already an active participant",
);
addedParticipantIsExistingParticipantBeforeAgg = true;
testAddingParticipants(
expectedParticipants,
addedParticipantIsExistingParticipantBeforeAgg,
fooDocsToInsert,
barDocsToInsert,
lookupPipeline,
shardsWithForeignColl,
);
print("Testing that an existing participant can add multiple additional participants");
expectedParticipants = [0, 1, 2];
addedParticipantIsExistingParticipantBeforeAgg = false;
fooDocsToInsert = [{_id: -5}, {_id: 5}]; // will live on shard0
barDocsToInsert = [
{_id: 1, x: -5},
{_id: 2, x: 5},
]; // will live on shard1 and shard2
shardsWithForeignColl = [st.shard1, st.shard2];
testAddingParticipants(
expectedParticipants,
addedParticipantIsExistingParticipantBeforeAgg,
fooDocsToInsert,
barDocsToInsert,
lookupPipeline,
shardsWithForeignColl,
);
print("Testing that an existing participant can add itself as an additional participant");
assert.commandWorked(st.s.adminCommand({moveChunk: foreignNs, find: {x: -10}, to: shard0.shardName}));
refreshRoutingTable([shard0, shard1, shard2], foreignColl);
st.refreshCatalogCacheForNs(st.s, foreignNs);
fooDocsToInsert = [{_id: -5}]; // will live on shard0
barDocsToInsert = [{_id: 1, x: -5}]; // will live on shard0
expectedParticipants = [0, 2];
lookupPipeline = [
{
$lookup: {
from: foreignColl,
let: {localVar: "$_id"},
pipeline: [{$match: {"_id": {$gte: -5}}}],
as: "result",
},
},
{$sort: {"_id": 1}},
];
shardsWithForeignColl = [st.shard0, st.shard2];
testAddingParticipants(
expectedParticipants,
addedParticipantIsExistingParticipantBeforeAgg,
fooDocsToInsert,
barDocsToInsert,
lookupPipeline,
shardsWithForeignColl,
);
print("Testing that a participant added on a getMore without a readOnly value participates in commit protocol");
// Move chunks such that the localColl has chunks:
// shard0: _id: [0, +inf]
// shard1: _id: [-inf, 0]
assert.commandWorked(st.s.adminCommand({split: localNs, middle: {_id: 0}}));
assert.commandWorked(st.s.adminCommand({moveChunk: localNs, find: {_id: 10}, to: shard1.shardName}));
// Move chunks such that the foreignColl has chunks:
// shard1: x: [-inf, 0]
// shard2: x: [0, +inf]
assert.commandWorked(st.s.adminCommand({moveChunk: foreignNs, find: {x: -10}, to: shard1.shardName}));
refreshRoutingTable([shard0, shard1, shard2], foreignColl);
st.refreshCatalogCacheForNs(st.s, foreignNs);
// Run an agg request such that:
// - Mongos will target shard0 and shard1 who own the localColl
// - Shard0 will target shard1 only. Shard0 will only target shard1 because shard0 owns docs with
// positive "_id" values in the localColl, and shard1 owns docs with positive "x" values in the
// foreign coll. Shard0 will turn the pipeline into a $match to run against shard1, and return
// docs whose foreignColl "x" values matches the localColl "_id" values.
// - Shard1 will target shard2 only for the same reason, except that shard1 owns docs with negative
// "_id" calues in the localColl, and shard2 owns docs with negative "x" values in the foreignColl
fooDocsToInsert = [];
barDocsToInsert = [];
for (let i = -200; i <= 200; i++) {
fooDocsToInsert.push({_id: i});
barDocsToInsert.push({x: i});
}
lookupPipeline = [
{$lookup: {from: foreignColl, localField: "_id", foreignField: "x", as: "result"}},
{$limit: NumberInt(200)},
];
let shardsToSetHangFp = [st.shard1, st.shard2];
const session = st.s.startSession();
let txnNum = 1;
const sessionId = session.getSessionId();
let initialTxnMetrics = setUpTestCase(
addedParticipantIsExistingParticipantBeforeAgg,
fooDocsToInsert,
barDocsToInsert,
txnNum,
sessionId,
);
// We force only shard2 to hang when executing the request sent by shard1. The agg request will
// follow the following sequence of events:
// - Mongos will send agg and then getMore to shard0 and shard1 (the default initial batchSize is 0,
// so mongos will always send a getMore).
// - Shard0 will contact shard1.
// - At the ~same time shard1 will contact shard2.
// - Shard1 will respond to shard0, and include shard2 in its additionalParticipants list. It will
// not have a readOnly value for shard2 yet, because shard2 has not responded.
// - Shard1 will eventually return enough docs such that to satisfy the $limit, and shard0 will
// respond to mongos with these results, and include both shard1 and shard2 in its additional
// participants list. It will not have a readOnly value for shard2.
// - Mongos will mark shard2 as readOnly because it receives the response on a getMore request. It
// will then return to client without shard1 ever having heard from shard2.
const hangFps = setHangFps(shardsToSetHangFp);
let aggRequestThread = runAgg(addedParticipantIsExistingParticipantBeforeAgg, lookupPipeline, txnNum, sessionId);
// Wait to hit the hang failpoint on shard1, and check that mongos has only contacted the two shards
hangFps[0].wait();
verifyMidpointTransactionMetrics(initialTxnMetrics, 2 /* expectedNewParticipantsContactedByMongos */);
// Turn off the failpoint on shard1 so that shard1 will continue with the request. Wait for shard1
// to contact shard2 and for shard2 to hit the failpoint
hangFps[0].off();
hangFps[1].wait();
// While shard2 is still hanging, wait for the agg to finish, and then check the results.
aggRequestThread.join();
let aggRes = aggRequestThread.returnData();
// Assert that the response only includes docs from the foreign collection that live on shard1,
// meaning docs with x < 0.
aggRes.cursor.firstBatch.forEach((doc) => {
assert.eq(doc.result.length, 1);
assert.lt(doc.result[0].x, 0);
});
// Now check that mongos now tracks all 3 participants, before turning off the failpoint on shard2.
verifyMidpointTransactionMetrics(initialTxnMetrics, 3 /* expectedNewParticipantsContactedByMongos */);
hangFps[1].off();
// Wait for the getMore to be cleaned up on shard1 to avoid a race between shard2 receiving
// commitTransaction and the cursor on shard1 being killed - if shard1 sends another getMore request
// to shard2 before shard1 receives commitTransaction, it's possible for shard2 to receive the
// request after it has received and processed the commit. If this happens, shard2 will respond with
// NoSuchTransaction to shard1 (since shard2 will have already committed), which will cause shard1
// to abort the transaction.
assert.soon(() => {
return (
st.shard1
.getDB("admin")
.aggregate([
{$currentOp: {allUsers: true, idleCursors: true}},
{$match: {"command.getMore": {$exists: true}}},
{$match: {"ns": {$regex: dbName + "\."}}},
])
.toArray().length == 0
);
}, `Timed out waiting for cursor to be cleaned up on shard1`);
// Commit the transaction and check the final metrics.
assert.commandWorked(
st.s.getDB("admin").adminCommand({
commitTransaction: 1,
lsid: sessionId,
txnNumber: NumberLong(txnNum),
stmtId: NumberInt(0),
autocommit: false,
}),
);
verifyFinalTransactionMetrics(initialTxnMetrics, [st.shard0, st.shard1, st.shard2], 0);
assert.commandWorked(st.s.getDB(dbName).foo.remove({}));
assert.commandWorked(st.s.getDB(dbName).bar.remove({}));
print("Testing a transaction in which an added participant throws a view resolution error");
// Move the localColl chunk away from the primary shard so that the primary shard does not own any
// data from either the local or foreign colls.
//
// The localColl has chunks:
// shard1: _id: [0, +inf]
// shard1: _id: [-inf, 0]
//
// The foreignColl has chunks:
// shard1: x: [-inf, 0]
// shard2: x: [0, +inf]
assert.commandWorked(st.s.adminCommand({moveChunk: localNs, find: {_id: -10}, to: shard1.shardName}));
fooDocsToInsert = [{_id: -5}, {_id: 5}]; // will live on shard1
barDocsToInsert = [
{_id: 1, x: -5},
{_id: 2, x: 5},
]; // will live on shard1 and shard2
initialTxnMetrics = setUpTestCase(
addedParticipantIsExistingParticipantBeforeAgg,
fooDocsToInsert,
barDocsToInsert,
txnNum,
sessionId,
);
// Create a simple view on the foreign collection
assert.commandWorked(st.s.getDB(dbName).createView("foreignView", foreignColl, []));
// Refresh shard0 to prevent it from getting a StaleConfig error before the expected view resolution
// error
refreshRoutingTable([shard0], "foreignView");
// Run $lookup against the view. Force the merging shard to be a non-primary shard so that the
// primary shard will not be a participant in the transaction at all. The expected behavior for this
// agg request is:
// 1. Mongos forwards the agg to shard1
// 2. Shard1 sends the agg request to shard0 (primary shard) because it treats "foreignView" as an
// unsharded collection.
// 3. Shard0 throws CommandOnShardedViewNotSupportedOnMongod which contains info on how to resolve
// the view. This was the first request shard0 had received in this transaciton, so it aborts
// the transaction on itself.
// 4. Shard1 clears shard0 from its participants list, and uses the info from shard0 to target
// itself and shard2.
// 5. The transaction finishes with only shard1 and shard2 as participants.
lookupPipeline = [
{$_internalSplitPipeline: {mergeType: {"specificShard": st.shard1.shardName}}},
{
$lookup: {
from: "foreignView",
let: {localVar: "$_id"},
pipeline: [{$match: {"_id": {$gte: -5}}}],
as: "result",
},
},
];
txnNum = 2;
// Get the initial primary shard metrics before running the agg request
let initialPrimaryShardTxnMetrics = assert.commandWorked(
st.shard0.getDB("admin").adminCommand({serverStatus: 1}),
).transactions;
aggRequestThread = runAgg(addedParticipantIsExistingParticipantBeforeAgg, lookupPipeline, txnNum, sessionId);
aggRequestThread.join();
aggRes = aggRequestThread.returnData();
let primaryShardTxnMetrics = assert.commandWorked(
st.shard0.getDB("admin").adminCommand({serverStatus: 1}),
).transactions;
assert.gte(primaryShardTxnMetrics.currentOpen, 0);
assert.gte(primaryShardTxnMetrics.totalAborted, initialPrimaryShardTxnMetrics.totalAborted + 1);
assert.commandWorked(
st.s.getDB("admin").adminCommand({
commitTransaction: 1,
lsid: sessionId,
txnNumber: NumberLong(txnNum),
stmtId: NumberInt(0),
autocommit: false,
}),
);
verifyFinalTransactionMetrics(initialTxnMetrics, [st.shard1, st.shard2], 0);
// Assert that the transaction was started and then aborted on the primary shard
primaryShardTxnMetrics = assert.commandWorked(st.shard0.getDB("admin").adminCommand({serverStatus: 1})).transactions;
assert.gte(primaryShardTxnMetrics.totalStarted, initialPrimaryShardTxnMetrics.totalStarted + 1);
assert.gte(primaryShardTxnMetrics.totalAborted, initialPrimaryShardTxnMetrics.totalAborted + 1);
st.stop();