mongo/jstests/change_streams/reshard_collection_event.js

245 lines
8.4 KiB
JavaScript

/**
* Test that change streams return the expected reshardCollection event for each related end-user
* command (reshardCollection, moveCollection, unshardCollection).
*
* @tags: [
* requires_sharding,
* uses_change_streams,
* change_stream_does_not_expect_txns,
* assumes_unsharded_collection,
* assumes_read_preference_unchanged,
* ]
*/
import {assertDropCollection} from "jstests/libs/collection_drop_recreate.js";
import {assertChangeStreamEventEq, ChangeStreamTest} from "jstests/libs/query/change_stream_util.js";
import {ShardingTest} from "jstests/libs/shardingtest.js";
let st = new ShardingTest({
shards: 2,
rs: {nodes: 1, setParameter: {writePeriodicNoops: true, periodicNoopIntervalSecs: 1}},
other: {
configOptions: {setParameter: {reshardingCriticalSectionTimeoutMillis: 24 * 60 * 60 * 1000}},
},
});
const mongos = st.s0;
const primaryShard = st.shard0.shardName;
const nonPrimaryShard = st.shard1.shardName;
const kDbName = jsTestName();
const kCollName = "coll";
const kNsName = kDbName + "." + kCollName;
const numDocs = 1;
const zoneName = "zone1";
const db = mongos.getDB(kDbName);
const test = new ChangeStreamTest(db);
function getCollectionUuid(coll) {
const collInfo = db.getCollectionInfos({name: coll})[0];
return collInfo.info.uuid;
}
const ns = {
db: kDbName,
coll: kCollName,
};
function prepareShardedCollection() {
assertDropCollection(db, kCollName);
assert.commandWorked(mongos.adminCommand({shardCollection: kNsName, key: {_id: 1}}));
assert.commandWorked(st.s.adminCommand({addShardToZone: st.shard1.shardName, zone: zoneName}));
}
function prepareUnshardedCollection() {
assertDropCollection(db, kCollName);
assert.commandWorked(db.createCollection(kCollName));
}
function reshardCollectionCmd() {
assert.commandWorked(
mongos.adminCommand({
reshardCollection: kNsName,
key: {newKey: 1},
unique: false,
numInitialChunks: 1,
collation: {locale: "simple"},
zones: [{zone: zoneName, min: {newKey: MinKey}, max: {newKey: MaxKey}}],
}),
);
}
function rewriteCollectionCmd() {
assert.commandWorked(
mongos.adminCommand({
rewriteCollection: kNsName,
numInitialChunks: 1,
zones: [{zone: zoneName, min: {_id: MinKey}, max: {_id: MaxKey}}],
}),
);
}
function moveCollectionCmd() {
assert.commandWorked(mongos.adminCommand({moveCollection: kNsName, toShard: nonPrimaryShard}));
}
function unshardCollectionCmd() {
assert.commandWorked(mongos.adminCommand({unshardCollection: kNsName, toShard: nonPrimaryShard}));
}
function assertExpectedEventObserved(cursor, expectedEvent) {
let events = test.getNextChanges(cursor, 1);
let event = events[0];
// Check the presence and the type of 'wallTime' field. We have no way to check the
// correctness of 'wallTime' value, so we delete it afterwards.
assert(event.wallTime instanceof Date);
delete event.wallTime;
assertChangeStreamEventEq(event, expectedEvent);
return event._id;
}
function validateExpectedEventAndConfirmResumability(setupFn, userCmdFn, collParam, expectedOutput) {
setupFn();
let collectionUUID = getCollectionUuid(kCollName);
let pipeline = [
{$changeStream: {showExpandedEvents: true}},
{$match: {operationType: {$nin: ["create", "createIndexes"]}}},
];
let cursor = test.startWatchingChanges({
pipeline: pipeline,
collection: collParam,
aggregateOptions: {cursor: {batchSize: 0}},
});
userCmdFn();
// Confirm that we observe the reshardCollection event, and obtain its resume token.
expectedOutput.collectionUUID = collectionUUID;
expectedOutput.operationDescription.reshardUUID = getCollectionUuid(kCollName);
const reshardResumeToken = assertExpectedEventObserved(cursor, expectedOutput);
// Insert a document before starting the next change stream so that we can validate the
// resuming behavior.
assert.commandWorked(db[kCollName].insert({_id: numDocs + 1}));
// Resume after the reshard event and confirm we see the subsequent insert.
pipeline = [{$changeStream: {showExpandedEvents: true, resumeAfter: reshardResumeToken}}];
cursor = test.startWatchingChanges({pipeline: pipeline, collection: collParam});
test.assertNextChangesEqual({
cursor: cursor,
expectedChanges: {
operationType: "insert",
ns: ns,
fullDocument: {_id: numDocs + 1},
documentKey: {_id: numDocs + 1},
},
});
}
assert.commandWorked(mongos.adminCommand({enableSharding: kDbName, primaryShard: primaryShard}));
for (let watchCollectionParameter of [kCollName, 1]) {
const watchLevel = watchCollectionParameter === 1 ? kDbName : watchCollectionParameter;
jsTest.log(`Validate behavior of reshardCollection against a change stream watching at '${watchLevel}' level`);
// The values of 'collectionUUID' and 'reshardUUID' will be filled in by the validate function.
validateExpectedEventAndConfirmResumability(
prepareShardedCollection,
reshardCollectionCmd,
watchCollectionParameter,
{
"operationType": "reshardCollection",
"collectionUUID": 0,
"ns": {"db": "reshard_collection_event", "coll": "coll"},
"operationDescription": {
"reshardUUID": 0,
"shardKey": {"newKey": 1},
"oldShardKey": {"_id": 1},
"unique": false,
"numInitialChunks": NumberLong(1),
"collation": {"locale": "simple"},
"provenance": "reshardCollection",
"zones": [
{
"zone": "zone1",
"min": {"newKey": {"$minKey": 1}},
"max": {"newKey": {"$maxKey": 1}},
},
],
},
},
);
jsTest.log(`Validate behavior of rewriteCollection against a change stream watching at '${watchLevel}' level`);
validateExpectedEventAndConfirmResumability(
prepareShardedCollection,
rewriteCollectionCmd,
watchCollectionParameter,
{
"operationType": "reshardCollection",
"collectionUUID": 0,
"ns": {"db": "reshard_collection_event", "coll": "coll"},
"operationDescription": {
"reshardUUID": 0,
"shardKey": {"_id": 1},
"oldShardKey": {"_id": 1},
"unique": false,
"numInitialChunks": NumberLong(1),
"provenance": "rewriteCollection",
"zones": [
{
"zone": "zone1",
"min": {"_id": {"$minKey": 1}},
"max": {"_id": {"$maxKey": 1}},
},
],
},
},
);
jsTest.log(`Validate behavior of moveCollection against a change stream watching at '${watchLevel}' level`);
validateExpectedEventAndConfirmResumability(
prepareUnshardedCollection,
moveCollectionCmd,
watchCollectionParameter,
{
"operationType": "reshardCollection",
"collectionUUID": 0,
"ns": {"db": "reshard_collection_event", "coll": "coll"},
"operationDescription": {
"reshardUUID": 0,
"shardKey": {"_id": 1},
"oldShardKey": {"_id": 1},
"unique": false,
"numInitialChunks": NumberLong(1),
"provenance": "moveCollection",
},
},
);
jsTest.log(`Validate behavior of unshardCollection against a change stream watching at '${watchLevel}' level`);
validateExpectedEventAndConfirmResumability(
prepareShardedCollection,
unshardCollectionCmd,
watchCollectionParameter,
{
"operationType": "reshardCollection",
"collectionUUID": 0,
"ns": {"db": "reshard_collection_event", "coll": "coll"},
"operationDescription": {
"reshardUUID": 0,
"shardKey": {"_id": 1},
"oldShardKey": {"_id": 1},
"unique": false,
"numInitialChunks": NumberLong(1),
"provenance": "unshardCollection",
},
},
);
}
st.stop();