mirror of https://github.com/mongodb/mongo
165 lines
6.4 KiB
JavaScript
165 lines
6.4 KiB
JavaScript
// Tests that internal resharding ops are exposed as change stream events during a
|
|
// resharding operation. Internal resharding ops include:
|
|
// 1. reshardBegin
|
|
// 2. reshardDoneCatchUp
|
|
// 3. reshardBlockingWrites
|
|
//
|
|
// @tags: [
|
|
// requires_majority_read_concern,
|
|
// uses_change_streams,
|
|
// uses_atclustertime,
|
|
//
|
|
// ]
|
|
import {DiscoverTopology} from "jstests/libs/discover_topology.js";
|
|
import {assertChangeStreamEventEq, ChangeStreamTest} from "jstests/libs/query/change_stream_util.js";
|
|
import {ReshardingTest} from "jstests/sharding/libs/resharding_test_fixture.js";
|
|
|
|
// Use a higher frequency for periodic noops to speed up the test.
|
|
const reshardingTest = new ReshardingTest({
|
|
numDonors: 2,
|
|
numRecipients: 1,
|
|
reshardInPlace: false,
|
|
periodicNoopIntervalSecs: 1,
|
|
writePeriodicNoops: true,
|
|
});
|
|
reshardingTest.setup();
|
|
|
|
const kDbName = "reshardingDb";
|
|
const collName = "coll";
|
|
const ns = kDbName + "." + collName;
|
|
|
|
const donorShardNames = reshardingTest.donorShardNames;
|
|
const recipientShardNames = reshardingTest.recipientShardNames;
|
|
const sourceCollection = reshardingTest.createShardedCollection({
|
|
ns: ns,
|
|
shardKeyPattern: {oldKey: 1},
|
|
chunks: [
|
|
{min: {oldKey: MinKey}, max: {oldKey: 0}, shard: donorShardNames[0]},
|
|
{min: {oldKey: 0}, max: {oldKey: MaxKey}, shard: donorShardNames[1]},
|
|
],
|
|
primaryShardName: donorShardNames[0],
|
|
});
|
|
|
|
const mongos = sourceCollection.getMongo();
|
|
const topology = DiscoverTopology.findConnectedNodes(mongos);
|
|
|
|
const donor0 = new Mongo(topology.shards[donorShardNames[0]].primary);
|
|
const cstDonor0 = new ChangeStreamTest(donor0.getDB(kDbName));
|
|
let changeStreamsCursorDonor0 = cstDonor0.startWatchingChanges({
|
|
pipeline: [{$changeStream: {showMigrationEvents: true}}],
|
|
collection: collName,
|
|
});
|
|
|
|
const donor1 = new Mongo(topology.shards[donorShardNames[1]].primary);
|
|
const cstDonor1 = new ChangeStreamTest(donor1.getDB(kDbName));
|
|
let changeStreamsCursorDonor1 = cstDonor1.startWatchingChanges({
|
|
pipeline: [{$changeStream: {showMigrationEvents: true}}],
|
|
collection: collName,
|
|
});
|
|
|
|
const recipient0 = new Mongo(topology.shards[recipientShardNames[0]].primary);
|
|
const cstRecipient0 = new ChangeStreamTest(recipient0.getDB(kDbName));
|
|
|
|
const fcv = mongos.getDB("admin").runCommand({getParameter: 1, featureCompatibilityVersion: 1});
|
|
|
|
let reshardingUUID;
|
|
let changeStreamsCursorRecipient0;
|
|
|
|
reshardingTest.withReshardingInBackground(
|
|
{
|
|
// If a donor is also a recipient, the donor state machine will run renameCollection with
|
|
// {dropTarget : true} rather than running drop and letting the recipient state machine run
|
|
// rename at the end of the resharding operation. So, we ensure that only one of the donor
|
|
// shards will also be a recipient shard in order to verify that neither the rename with
|
|
// {dropTarget : true} nor the drop command are picked up by the change streams cursor.
|
|
newShardKeyPattern: {newKey: 1},
|
|
newChunks: [{min: {newKey: MinKey}, max: {newKey: MaxKey}, shard: recipientShardNames[0]}],
|
|
},
|
|
(tempNs) => {
|
|
// Wait until participants are aware of the resharding operation.
|
|
reshardingTest.awaitCloneTimestampChosen();
|
|
|
|
const coordinatorDoc = mongos.getCollection("config.reshardingOperations").findOne();
|
|
reshardingUUID = coordinatorDoc._id;
|
|
|
|
changeStreamsCursorRecipient0 = cstRecipient0.startWatchingChanges({
|
|
pipeline: [{$changeStream: {showMigrationEvents: true, allowToRunOnSystemNS: true}}],
|
|
collection: tempNs.substring(tempNs.indexOf(".") + 1),
|
|
});
|
|
|
|
// Check for reshardBegin event on both donors.
|
|
const expectedReshardBeginEvent = {
|
|
reshardingUUID: reshardingUUID,
|
|
operationType: "reshardBegin",
|
|
ns: {db: kDbName, coll: collName},
|
|
};
|
|
|
|
const reshardBeginDonor0Event = cstDonor0.getNextChanges(
|
|
changeStreamsCursorDonor0,
|
|
1,
|
|
false /* skipFirstBatch */,
|
|
);
|
|
|
|
assertChangeStreamEventEq(reshardBeginDonor0Event[0], expectedReshardBeginEvent);
|
|
|
|
const reshardBeginDonor1Event = cstDonor1.getNextChanges(
|
|
changeStreamsCursorDonor1,
|
|
1,
|
|
false /* skipFirstBatch */,
|
|
);
|
|
assertChangeStreamEventEq(reshardBeginDonor1Event[0], expectedReshardBeginEvent);
|
|
|
|
// TODO (SERVER-94478): Remove FCV check.
|
|
if (fcv == latestFCV) {
|
|
// Check for reshardBlockingWrites event on both donors.
|
|
const expectedReshardBlockingWritesEvent = {
|
|
reshardingUUID: reshardingUUID,
|
|
operationType: "reshardBlockingWrites",
|
|
ns: {db: kDbName, coll: collName},
|
|
};
|
|
|
|
const reshardBlockingWritesDonor0Event = cstDonor0.getNextChanges(
|
|
changeStreamsCursorDonor0,
|
|
1,
|
|
false /* skipFirstBatch */,
|
|
);
|
|
|
|
assertChangeStreamEventEq(reshardBlockingWritesDonor0Event[0], expectedReshardBlockingWritesEvent);
|
|
|
|
const reshardBlockingWritesDonor1Event = cstDonor1.getNextChanges(
|
|
changeStreamsCursorDonor1,
|
|
1,
|
|
false /* skipFirstBatch */,
|
|
);
|
|
|
|
assertChangeStreamEventEq(reshardBlockingWritesDonor1Event[0], expectedReshardBlockingWritesEvent);
|
|
}
|
|
},
|
|
{
|
|
postDecisionPersistedFn: () => {
|
|
// Check for reshardDoneCatchUp event on the recipient.
|
|
const expectedReshardDoneCatchUpEvent = {
|
|
reshardingUUID: reshardingUUID,
|
|
operationType: "reshardDoneCatchUp",
|
|
};
|
|
|
|
const reshardDoneCatchUpEvent = cstRecipient0.getNextChanges(
|
|
changeStreamsCursorRecipient0,
|
|
1,
|
|
false /* skipFirstBatch */,
|
|
)[0];
|
|
|
|
// Ensure that the 'reshardingDoneCatchUp' event has an 'ns' field of the format
|
|
// '{ns: kDbName, coll: "system.resharding.<>"}.
|
|
assert(reshardDoneCatchUpEvent.ns, reshardDoneCatchUpEvent);
|
|
assert.eq(reshardDoneCatchUpEvent.ns.db, kDbName, reshardDoneCatchUpEvent);
|
|
assert(reshardDoneCatchUpEvent.ns.coll.startsWith("system.resharding."), reshardDoneCatchUpEvent);
|
|
delete reshardDoneCatchUpEvent.ns;
|
|
|
|
assertChangeStreamEventEq(reshardDoneCatchUpEvent, expectedReshardDoneCatchUpEvent);
|
|
},
|
|
},
|
|
);
|
|
|
|
reshardingTest.teardown();
|