/** * Tests the behavior of change streams on a system.resharding.* namespace in the presence of * 'showSystemEvents' flag. This is a separate test from 'show_system_events.js' because it can only * operate in a sharded cluster. * * @tags: [ * requires_sharding, * uses_change_streams, * change_stream_does_not_expect_txns, * assumes_unsharded_collection, * assumes_read_preference_unchanged, * ] */ import {FeatureFlagUtil} from "jstests/libs/feature_flag_util.js"; import {assertChangeStreamEventEq} from "jstests/libs/query/change_stream_util.js"; import {ShardingTest} from "jstests/libs/shardingtest.js"; // Create a single-shard cluster for this test. const st = new ShardingTest({ shards: 1, rs: {nodes: 1, setParameter: {writePeriodicNoops: true, periodicNoopIntervalSecs: 1}}, other: { configOptions: {setParameter: {reshardingCriticalSectionTimeoutMillis: 24 * 60 * 60 * 1000}}, }, }); const testDB = st.s.getDB(jsTestName()); const testColl = testDB[jsTestName()]; // Shard the collection based on '_id'. st.shardColl(testColl, {_id: 1}, false); // Build an index on the collection to support the resharding operation. assert.commandWorked(testColl.createIndex({a: 1})); // Insert some documents that will be resharded. assert.commandWorked(testColl.insert({_id: 0, a: 0})); assert.commandWorked(testColl.insert({_id: 1, a: 1})); // Helper function to retrieve the UUID of the specified collection. function getCollectionUuid(coll) { const collInfo = testDB.getCollectionInfos({name: coll.getName()})[0]; return collInfo.info.uuid; } // Obtain a resume token indicating the start point for the test. const startPoint = testDB.watch().getResumeToken(); // Get the UUID of the collection before resharding. const oldUUID = getCollectionUuid(testColl); // Reshard the collection. assert.commandWorked( st.s.adminCommand({ reshardCollection: testColl.getFullName(), key: {a: 1}, numInitialChunks: 1, }), ); // Get the UUID of the collection after resharding. const newUUID = getCollectionUuid(testColl); // Write one more sentinel document into the collection. assert.commandWorked(testColl.insert({_id: 2, a: 2})); // Now confirm the sequence of events that we expect to see in the change stream. const reshardingCollName = `system.resharding.${oldUUID.toString().match(/\"([^\"]+)\"/)[1]}`; const reshardingNs = { db: testDB.getName(), coll: reshardingCollName, }; const origNs = { db: testDB.getName(), coll: testColl.getName(), }; let expectedReshardingEvents = [ {ns: reshardingNs, collectionUUID: newUUID, operationType: "create"}, { ns: reshardingNs, collectionUUID: newUUID, operationType: "shardCollection", operationDescription: {shardKey: {a: 1}}, }, { ns: reshardingNs, collectionUUID: newUUID, operationType: "insert", fullDocument: {_id: 0, a: 0}, documentKey: {a: 0, _id: 0}, }, { ns: reshardingNs, collectionUUID: newUUID, operationType: "insert", fullDocument: {_id: 1, a: 1}, documentKey: {a: 1, _id: 1}, }, { operationType: "endOfTransaction", }, { ns: reshardingNs, collectionUUID: newUUID, operationType: "startIndexBuild", operationDescription: {indexes: [{v: 2, key: {a: 1}, name: "a_1"}]}, }, { ns: reshardingNs, collectionUUID: newUUID, operationType: "createIndexes", operationDescription: {indexes: [{v: 2, key: {a: 1}, name: "a_1"}]}, }, { ns: origNs, collectionUUID: oldUUID, reshardingUUID: newUUID, operationType: "reshardBlockingWrites", operationDescription: {reshardingUUID: newUUID, type: "reshardFinalOp"}, }, { ns: origNs, collectionUUID: oldUUID, operationType: "reshardCollection", operationDescription: { reshardUUID: newUUID, shardKey: {a: 1}, oldShardKey: {_id: 1}, unique: false, numInitialChunks: NumberLong(1), provenance: "reshardCollection", }, }, { ns: origNs, collectionUUID: newUUID, operationType: "insert", fullDocument: {_id: 2, a: 2}, documentKey: {a: 2, _id: 2}, }, ]; if (!FeatureFlagUtil.isEnabled(st.s, "EndOfTransactionChangeEvent")) { expectedReshardingEvents = expectedReshardingEvents.filter((event) => event.operationType !== "endOfTransaction"); } // Helper to confirm the sequence of events observed in the change stream. function assertChangeStreamEventSequence(csConfig, expectedEvents) { // Open a change stream on the test DB using the given configuration. const finalConfig = Object.assign({resumeAfter: startPoint, showExpandedEvents: true}, csConfig); const csCursor = testDB.watch([], finalConfig); // Confirm that we see the expected sequence of events. expectedEvents.forEach((expectedEvent) => { assert.soon(() => csCursor.hasNext()); assertChangeStreamEventEq(csCursor.next(), expectedEvent); }); } // With showSystemEvents set to true, we expect to see the full sequence of events. assertChangeStreamEventSequence({showSystemEvents: true}, expectedReshardingEvents); // With showSystemEvents set to false, we expect to only see events on the original namespace and // not see the "reshardBlockingWrites" event. const nonSystemEvents = expectedReshardingEvents.filter( (event) => event.ns && event.ns.coll === testColl.getName() && event.operationType != "reshardBlockingWrites", ); assertChangeStreamEventSequence({showSystemEvents: false}, nonSystemEvents); st.stop();