mongo/jstests/sharding/transactions_multi_writes.js

152 lines
6.2 KiB
JavaScript

/**
* Verifies multi-writes in transactions are sent with shard versions to only the targeted shards.
*
* @tags: [
* requires_sharding,
* uses_multi_shard_transaction,
* uses_transactions,
* ]
*/
import {withAbortAndRetryOnTransientTxnError} from "jstests/libs/auto_retry_transaction_in_sharding.js";
import {assertDropCollection} from "jstests/libs/collection_drop_recreate.js";
import {ShardingTest} from "jstests/libs/shardingtest.js";
import {
disableStaleVersionAndSnapshotRetriesWithinTransactions,
enableCoordinateCommitReturnImmediatelyAfterPersistingDecision,
enableStaleVersionAndSnapshotRetriesWithinTransactions,
} from "jstests/sharding/libs/sharded_transactions_helpers.js";
const dbName = "test";
const collName = "foo";
const ns = dbName + "." + collName;
const st = new ShardingTest({shards: 3, mongos: 2});
enableCoordinateCommitReturnImmediatelyAfterPersistingDecision(st);
enableStaleVersionAndSnapshotRetriesWithinTransactions(st);
// Set up a sharded collection with 3 chunks, [min, 0), [0, 10), [10, max), one on each shard,
// with one document in each.
assert.commandWorked(st.s.adminCommand({enableSharding: dbName, primaryShard: st.shard0.shardName}));
function setupCollection() {
assert.commandWorked(st.s.adminCommand({shardCollection: ns, key: {skey: 1}}));
assert.commandWorked(st.s.adminCommand({split: ns, middle: {skey: 0}}));
assert.commandWorked(st.s.adminCommand({split: ns, middle: {skey: 10}}));
assert.commandWorked(st.s.adminCommand({moveChunk: ns, find: {skey: 5}, to: st.shard1.shardName}));
assert.commandWorked(st.s.adminCommand({moveChunk: ns, find: {skey: 15}, to: st.shard2.shardName}));
assert.commandWorked(st.s.getDB(dbName)[collName].insert({_id: 1, counter: 0, skey: -5}));
assert.commandWorked(st.s.getDB(dbName)[collName].insert({_id: 2, counter: 0, skey: 5}));
assert.commandWorked(st.s.getDB(dbName)[collName].insert({_id: 3, counter: 0, skey: 15}));
}
// Runs the given multi-write and asserts a manually inserted orphan document is not affected.
// The write is assumed to target chunks [min, 0) and [0, 10), which begin on shard0 and shard1,
// respectively.
function runTest(st, session, writeCmd, staleRouter) {
const isUpdate = writeCmd.hasOwnProperty("update");
const sessionDB = session.getDatabase(dbName);
setupCollection();
let orphanShardName;
let orphanDoc = {_id: 2, counter: 0, skey: 5};
if (staleRouter) {
// Using a separate router, move a chunk that will be targeted by the write to a shard
// that would not be targeted by a stale router. Put the orphan on the shard that
// previously owned the chunk to verify the multi-write obeys the shard versioning
// protocol.
assert.commandWorked(
st.s1.adminCommand({moveChunk: ns, find: {skey: 5}, to: st.shard2.shardName, _waitForDelete: true}),
);
orphanShardName = "rs1";
} else {
// Otherwise put the orphan on a shard that should not be targeted by a fresh router to
// verify the multi-write is not broadcast to all shards.
orphanShardName = "rs2";
}
const orphanShardDB = st[orphanShardName].getPrimary().getDB(dbName);
assert.commandWorked(orphanShardDB[collName].insert(orphanDoc, {writeConcern: {w: "majority"}}));
// Start a transaction with majority read concern to ensure the orphan will be visible if
// its shard is targeted and send the multi-write.
withAbortAndRetryOnTransientTxnError(session, () => {
session.startTransaction({readConcern: {level: "majority"}});
assert.commandWorked(sessionDB.runCommand(writeCmd));
// The write shouldn't be visible until the transaction commits.
assert.sameMembers(st.getDB(dbName)[collName].find().toArray(), [
{_id: 1, counter: 0, skey: -5},
{_id: 2, counter: 0, skey: 5},
{_id: 3, counter: 0, skey: 15},
]);
// Commit the transaction and verify the write was successful.
assert.commandWorked(session.commitTransaction_forTesting());
});
if (isUpdate) {
assert.sameMembers(
sessionDB[collName].find().toArray(),
[
{_id: 1, counter: 1, skey: -5},
{_id: 2, counter: 1, skey: 5},
{_id: 3, counter: 0, skey: 15},
],
"document mismatch for update, stale: " + staleRouter + ", cmd: " + tojson(writeCmd),
);
} else {
// isDelete
assert.sameMembers(
sessionDB[collName].find().toArray(),
[{_id: 3, counter: 0, skey: 15}],
"document mismatch for delete, stale: " + staleRouter + ", cmd: " + tojson(writeCmd),
);
}
// The orphaned document should not have been affected.
assert.docEq(
orphanDoc,
orphanShardDB[collName].findOne({skey: orphanDoc.skey}),
"document mismatch for orphaned doc, stale: " + staleRouter + ", cmd: " + tojson(writeCmd),
);
assertDropCollection(st.getDB(dbName), collName);
}
const session = st.s.startSession();
let multiUpdate = {
update: collName,
updates: [{q: {skey: {$lte: 5}}, u: {$inc: {counter: 1}}, multi: true}],
};
multiUpdate.ordered = false;
runTest(st, session, multiUpdate, false /*staleRouter*/);
// TODO: SERVER-39704 uncomment when mongos can internally retry txn on stale errors for real.
// runTest(st, session, multiUpdate, true /*staleRouter*/);
multiUpdate.ordered = true;
runTest(st, session, multiUpdate, false /*staleRouter*/);
// TODO: SERVER-39704 uncomment when mongos can internally retry txn on stale errors for real.
// runTest(st, session, multiUpdate, true /*staleRouter*/);
let multiDelete = {delete: collName, deletes: [{q: {skey: {$lte: 5}}, limit: 0}]};
multiDelete.ordered = false;
runTest(st, session, multiDelete, false /*staleRouter*/);
// TODO: SERVER-39704 uncomment when mongos can internally retry txn on stale errors for real.
// runTest(st, session, multiDelete, true /*staleRouter*/);
multiDelete.ordered = true;
runTest(st, session, multiDelete, false /*staleRouter*/);
// TODO: SERVER-39704 uncomment when mongos can internally retry txn on stale errors for real.
// runTest(st, session, multiDelete, true /*staleRouter*/);
disableStaleVersionAndSnapshotRetriesWithinTransactions(st);
st.stop();