mirror of https://github.com/mongodb/mongo
93 lines
3.7 KiB
JavaScript
93 lines
3.7 KiB
JavaScript
// Verifies basic sharded transaction behavior with causal consistency.
|
|
//
|
|
// @tags: [
|
|
// requires_sharding,
|
|
// uses_multi_shard_transaction,
|
|
// uses_transactions,
|
|
// ]
|
|
import {withTxnAndAutoRetry} from "jstests/concurrency/fsm_workload_helpers/auto_retry_transaction.js";
|
|
import {ShardingTest} from "jstests/libs/shardingtest.js";
|
|
import {
|
|
disableStaleVersionAndSnapshotRetriesWithinTransactions,
|
|
enableStaleVersionAndSnapshotRetriesWithinTransactions,
|
|
kShardOptionsForDisabledStaleShardVersionRetries,
|
|
} from "jstests/sharding/libs/sharded_transactions_helpers.js";
|
|
|
|
const dbName = "test";
|
|
const collName = "foo";
|
|
const ns = dbName + "." + collName;
|
|
|
|
Random.setRandomSeed();
|
|
|
|
const st = new ShardingTest({
|
|
shards: 2,
|
|
mongos: 2,
|
|
other: {
|
|
rsOptions: kShardOptionsForDisabledStaleShardVersionRetries,
|
|
},
|
|
});
|
|
|
|
enableStaleVersionAndSnapshotRetriesWithinTransactions(st);
|
|
|
|
// Set up a sharded collection with 2 chunks, [min, 0) and [0, max), one on each shard, with one
|
|
// document in each.
|
|
|
|
assert.commandWorked(st.s.adminCommand({enableSharding: dbName, primaryShard: st.shard0.shardName}));
|
|
|
|
assert.commandWorked(st.s.adminCommand({shardCollection: ns, key: {_id: 1}}));
|
|
assert.commandWorked(st.s.adminCommand({split: ns, middle: {_id: 0}}));
|
|
assert.commandWorked(st.s.adminCommand({moveChunk: ns, find: {_id: 1}, to: st.shard1.shardName}));
|
|
|
|
assert.commandWorked(st.s.getDB(dbName)[collName].insert({_id: -1}, {writeConcern: {w: "majority"}}));
|
|
assert.commandWorked(st.s.getDB(dbName)[collName].insert({_id: 1}, {writeConcern: {w: "majority"}}));
|
|
|
|
// Verifies transactions using causal consistency read all causally prior operations.
|
|
function runTest(st, readConcern) {
|
|
jsTestLog("Testing readConcern: " + tojson(readConcern));
|
|
|
|
const session = st.s.startSession({causalConsistency: true});
|
|
const sessionDB = session.getDatabase(dbName);
|
|
|
|
// Insert data to one shard in a causally consistent session.
|
|
const docToInsert = {_id: 5};
|
|
assert.commandWorked(sessionDB.runCommand({insert: collName, documents: [docToInsert]}));
|
|
|
|
// Through a separate router move the chunk that was inserted to, so the original router is
|
|
// stale when it starts its transaction.
|
|
const otherRouter = st.s1;
|
|
assert.commandWorked(otherRouter.adminCommand({moveChunk: ns, find: docToInsert, to: st.shard0.shardName}));
|
|
|
|
withTxnAndAutoRetry(
|
|
session,
|
|
() => {
|
|
// The transaction should always see the document written earlier through its session,
|
|
// regardless of the move.
|
|
//
|
|
// Note: until transactions can read from secondaries and/or disabling speculative snapshot
|
|
// is allowed, read concerns that do not require global snapshots (i.e. local and majority)
|
|
// will always read the inserted document here because the local snapshot established on
|
|
// this shard will include all currently applied operations, which must include all earlier
|
|
// acknowledged writes.
|
|
assert.docEq(
|
|
docToInsert,
|
|
sessionDB[collName].findOne(docToInsert),
|
|
"sharded transaction with read concern " + tojson(readConcern) + " did not see expected document",
|
|
);
|
|
},
|
|
{readConcern: readConcern},
|
|
);
|
|
|
|
// Clean up for the next iteration.
|
|
assert.commandWorked(st.s.adminCommand({moveChunk: ns, find: docToInsert, to: st.shard1.shardName}));
|
|
assert.commandWorked(sessionDB[collName].remove(docToInsert));
|
|
}
|
|
|
|
const kAllowedReadConcernLevels = ["local", "majority", "snapshot"];
|
|
for (let readConcernLevel of kAllowedReadConcernLevels) {
|
|
runTest(st, {level: readConcernLevel});
|
|
}
|
|
|
|
disableStaleVersionAndSnapshotRetriesWithinTransactions(st);
|
|
|
|
st.stop();
|