mongo/jstests/sharding/additional_txn_participants...

122 lines
5.4 KiB
JavaScript

/**
* Tests that snapshot readConcern and placementConflictTime are respected on participants added to
* a transaction by other participants when there are conflicting operations.
* @tags: [requires_fcv_80]
*/
import {assertArrayEq} from "jstests/aggregation/extras/utils.js";
import {ShardingTest} from "jstests/libs/shardingtest.js";
let st = new ShardingTest({shards: 2});
const dbName = "test";
const localColl = "local";
const foreignColl = "foreign";
const localNs = dbName + "." + localColl;
const foreignNs = dbName + "." + foreignColl;
let shard0 = st.shard0;
let shard1 = st.shard1;
assert.commandWorked(st.s.adminCommand({enableSharding: dbName, primaryShard: shard0.shardName}));
// Create a collection, "local" that lives on shard0.
assert.commandWorked(st.s.getDB(dbName).local.insert({_id: 0, x: 1}));
// Create a sharded collection, "foreign", with the following chunks:
// shard0: [x: -inf, x: 0)
// shard1: [x: 0, x: +inf)
assert.commandWorked(st.s.adminCommand({shardCollection: foreignNs, key: {x: 1}}));
assert.commandWorked(st.s.adminCommand({split: foreignNs, middle: {x: 0}}));
assert.commandWorked(st.s.adminCommand({moveChunk: foreignNs, find: {x: -10}, to: shard0.shardName}));
assert.commandWorked(st.s.adminCommand({moveChunk: foreignNs, find: {x: 0}, to: shard1.shardName}));
// Force refreshes to avoid getting stale config errors
assert.commandWorked(shard0.adminCommand({_flushRoutingTableCacheUpdates: localNs}));
assert.commandWorked(shard1.adminCommand({_flushRoutingTableCacheUpdates: localNs}));
st.refreshCatalogCacheForNs(st.s, localNs);
assert.commandWorked(shard0.adminCommand({_flushRoutingTableCacheUpdates: foreignNs}));
assert.commandWorked(shard1.adminCommand({_flushRoutingTableCacheUpdates: foreignNs}));
st.refreshCatalogCacheForNs(st.s, foreignNs);
const session = st.s.startSession();
const sessionDB = session.getDatabase(dbName);
{
print("Testing that additional participant respects placementConflictTime with a conflicting migration");
// Must use readConcern other than snapshot for txn to use placementConflictTime
session.startTransaction({readConcern: {level: "majority"}});
// Run a find that will target shard0
assert.eq(sessionDB.getCollection(localColl).find().itcount(), 1);
// Move the foreignColl chunk to shard1 from shard0 and refresh mongos
assert.commandWorked(st.s.adminCommand({moveChunk: foreignNs, find: {x: -10}, to: shard1.shardName}));
st.refreshCatalogCacheForNs(st.s, foreignNs);
// Run a $lookup which will add shard1 as an additional participant. This should throw
// because shard1 had an incoming migration. This $lookup can also throw
// ShardCannotRefreshDueToLocksHeld if the shard acting as a router needs to refresh.
let err = assert.throwsWithCode(() => {
sessionDB
.getCollection(localColl)
.aggregate([{$lookup: {from: foreignColl, localField: "x", foreignField: "_id", as: "result"}}]);
}, [ErrorCodes.StaleConfig, ErrorCodes.MigrationConflict, ErrorCodes.ShardCannotRefreshDueToLocksHeld]);
assert.contains("TransientTransactionError", err.errorLabels, tojson(err));
session.abortTransaction();
}
{
print("Testing that additional participants respects readConcern snapshot with a conflicting write");
// Insert a doc in the foreign collection that we will later update
assert.commandWorked(st.s.getDB(dbName).foreign.insert({_id: 1, x: 1}));
// Define the test case aggregation.
const aggCmd = [{$lookup: {from: foreignColl, localField: "x", foreignField: "x", as: "result"}}];
// As a setup step, run the aggregation for a first time outside of the transaction: this will
// allow shard0 to recover up-to-date routing information about 'foreignColl' (which was stale
// after the last moveChunk operation completed) and avoid rising retryable errors that would
// disrupt the execution of this test case.
let _ = st.s
.getDB(dbName)
[localColl].aggregate(aggCmd, {cursor: {batchSize: 2}})
.toArray();
// Start a transaction on shard0
session.startTransaction({readConcern: {level: "snapshot"}});
assert.eq(sessionDB.getCollection(localColl).find().itcount(), 1);
// Update the doc in foreignColl in a different transaction
const session2 = st.s.startSession();
const sessionDB2 = session2.getDatabase(dbName);
session2.startTransaction();
assert.commandWorked(sessionDB2.getCollection(foreignColl).update({x: 1}, {$set: {x: 2}}));
assert.commandWorked(session2.commitTransaction_forTesting());
// Run the $lookup on the first transaction, which is expected to add shard1 as an additional
// participant and not see the updated value (hence returning a matching doc).
const expectedTxnAggRes = [{_id: 0, x: 1, result: [{_id: 1, x: 1}]}];
let txnAggRes = sessionDB
.getCollection(localColl)
.aggregate(aggCmd, {cursor: {batchSize: 2}})
.toArray();
assertArrayEq({actual: txnAggRes, expected: expectedTxnAggRes});
// For comparison, run $lookup outside of a transaction and assert we see the updated
// value, meaning that it does not return a matching doc
const expectedNonTxnAggRes = [{_id: 0, x: 1, result: []}];
let nonTxnAggRes = st.s
.getDB(dbName)
.local.aggregate(aggCmd, {cursor: {batchSize: 2}})
.toArray();
assertArrayEq({actual: nonTxnAggRes, expected: expectedNonTxnAggRes});
assert.commandWorked(session.commitTransaction_forTesting());
}
st.stop();