mirror of https://github.com/mongodb/mongo
122 lines
3.8 KiB
JavaScript
122 lines
3.8 KiB
JavaScript
/**
|
|
* Ensure that updates are not lost if they are made between processing deferred updates and reading
|
|
* from the updates list in _transferMods.
|
|
*
|
|
* @tags: [uses_transactions, uses_prepare_transaction, requires_persistence]
|
|
*/
|
|
|
|
import {
|
|
migrateStepNames,
|
|
moveChunkParallel,
|
|
moveChunkStepNames,
|
|
pauseMigrateAtStep,
|
|
unpauseMigrateAtStep,
|
|
waitForMoveChunkStep,
|
|
} from "jstests/libs/chunk_manipulation_util.js";
|
|
import {configureFailPoint} from "jstests/libs/fail_point_util.js";
|
|
import {ShardingTest} from "jstests/libs/shardingtest.js";
|
|
import {awaitRSClientHosts} from "jstests/replsets/rslib.js";
|
|
import {CreateShardedCollectionUtil} from "jstests/sharding/libs/create_sharded_collection_util.js";
|
|
|
|
const dbName = "test";
|
|
const collName = "user";
|
|
const staticMongod = MongoRunner.runMongod({});
|
|
const st = new ShardingTest({shards: {rs0: {nodes: 2}, rs1: {nodes: 1}}});
|
|
const collection = st.s.getDB(dbName).getCollection(collName);
|
|
const lsid = {
|
|
id: UUID(),
|
|
};
|
|
const txnNumber = 0;
|
|
|
|
function setup() {
|
|
CreateShardedCollectionUtil.shardCollectionWithChunks(collection, {_id: 1}, [
|
|
{min: {_id: MinKey}, max: {_id: 10}, shard: st.shard0.shardName},
|
|
{min: {_id: 10}, max: {_id: MaxKey}, shard: st.shard1.shardName},
|
|
]);
|
|
|
|
for (let i = 0; i < 20; i++) {
|
|
assert.commandWorked(collection.insertOne({_id: i, x: i}));
|
|
}
|
|
}
|
|
|
|
function prepareTransactionAndTriggerFailover() {
|
|
assert.commandWorked(
|
|
st.s.getDB(dbName).runCommand({
|
|
update: collName,
|
|
updates: [
|
|
{q: {_id: 1}, u: {$set: {x: 5}}},
|
|
{q: {_id: 2}, u: {$set: {x: -10}}},
|
|
],
|
|
lsid: lsid,
|
|
txnNumber: NumberLong(txnNumber),
|
|
stmtId: NumberInt(0),
|
|
startTransaction: true,
|
|
autocommit: false,
|
|
}),
|
|
);
|
|
|
|
const result = assert.commandWorked(
|
|
st.shard0.getDB(dbName).adminCommand({
|
|
prepareTransaction: 1,
|
|
lsid: lsid,
|
|
txnNumber: NumberLong(txnNumber),
|
|
autocommit: false,
|
|
writeConcern: {w: "majority"},
|
|
}),
|
|
);
|
|
|
|
let oldSecondary = st.rs0.getSecondary();
|
|
|
|
st.rs0.stepUp(oldSecondary);
|
|
|
|
awaitRSClientHosts(st.s, oldSecondary, {ok: true, ismaster: true});
|
|
|
|
return result.prepareTimestamp;
|
|
}
|
|
|
|
function commitPreparedTransaction(prepareTimestamp) {
|
|
assert.commandWorked(
|
|
st.shard0.getDB(dbName).adminCommand(
|
|
Object.assign(
|
|
{
|
|
commitTransaction: 1,
|
|
lsid: lsid,
|
|
txnNumber: NumberLong(txnNumber),
|
|
autocommit: false,
|
|
},
|
|
{commitTimestamp: prepareTimestamp},
|
|
),
|
|
),
|
|
);
|
|
}
|
|
|
|
function runMoveChunkAndCommitTransaction() {
|
|
const joinMoveChunk = moveChunkParallel(staticMongod, st.s.host, {_id: 1}, null, "test.user", st.shard1.shardName);
|
|
pauseMigrateAtStep(st.shard1, migrateStepNames.catchup);
|
|
waitForMoveChunkStep(st.shard0, moveChunkStepNames.startedMoveChunk);
|
|
commitPreparedTransaction(prepareTimestamp);
|
|
unpauseMigrateAtStep(st.shard1, migrateStepNames.catchup);
|
|
return joinMoveChunk;
|
|
}
|
|
|
|
setup();
|
|
const prepareTimestamp = prepareTransactionAndTriggerFailover();
|
|
const processingDeferredXferModsFp = configureFailPoint(st.rs0.getPrimary(), "hangAfterProcessingDeferredXferMods");
|
|
const pauseBeforeCriticalSectionFp = configureFailPoint(
|
|
st.rs0.getPrimary(),
|
|
"hangBeforeEnteringCriticalSection",
|
|
{},
|
|
"alwaysOn",
|
|
);
|
|
const joinMoveChunk = runMoveChunkAndCommitTransaction();
|
|
processingDeferredXferModsFp.wait();
|
|
assert.commandWorked(collection.update({_id: 4}, {$set: {x: 501}}));
|
|
processingDeferredXferModsFp.off();
|
|
pauseBeforeCriticalSectionFp.off();
|
|
joinMoveChunk();
|
|
assert.eq(collection.findOne({_id: 4}).x, 501);
|
|
|
|
st.stop();
|
|
|
|
MongoRunner.stopMongod(staticMongod);
|