// Basic tests for resuming a $changeStream that is open against all collections in a database. // Do not run in whole-cluster passthrough since this test assumes that the change stream will be // invalidated by a database drop. // @tags: [do_not_run_in_whole_cluster_passthrough] import {assertDropAndRecreateCollection, assertDropCollection} from "jstests/libs/collection_drop_recreate.js"; import {FixtureHelpers} from "jstests/libs/fixture_helpers.js"; import {ChangeStreamTest} from "jstests/libs/query/change_stream_util.js"; // Drop and recreate the collections to be used in this set of tests. const testDB = db.getSiblingDB(jsTestName()); let coll = assertDropAndRecreateCollection(testDB, "resume_coll"); const otherColl = assertDropAndRecreateCollection(testDB, "resume_coll_other"); let cst = new ChangeStreamTest(testDB); let resumeCursor = cst.startWatchingChanges({pipeline: [{$changeStream: {}}], collection: 1}); // Insert a single document to each collection and save the resume token from the first insert. assert.commandWorked(coll.insert({_id: 1})); assert.commandWorked(otherColl.insert({_id: 2})); const firstInsertChangeDoc = cst.getOneChange(resumeCursor); assert.docEq({_id: 1}, firstInsertChangeDoc.fullDocument); assert.eq(firstInsertChangeDoc.ns, {db: testDB.getName(), coll: coll.getName()}); // Test resuming the change stream after the first insert should pick up the insert on the // second collection. resumeCursor = cst.startWatchingChanges({ pipeline: [{$changeStream: {resumeAfter: firstInsertChangeDoc._id}}], collection: 1, aggregateOptions: {cursor: {batchSize: 0}}, }); const secondInsertChangeDoc = cst.getOneChange(resumeCursor); assert.docEq({_id: 2}, secondInsertChangeDoc.fullDocument); assert.eq(secondInsertChangeDoc.ns, {db: testDB.getName(), coll: otherColl.getName()}); // Insert a third document to the first collection and test that the change stream picks it up. assert.commandWorked(coll.insert({_id: 3})); const thirdInsertChangeDoc = cst.getOneChange(resumeCursor); assert.docEq({_id: 3}, thirdInsertChangeDoc.fullDocument); assert.eq(thirdInsertChangeDoc.ns, {db: testDB.getName(), coll: coll.getName()}); // Test resuming after the first insert again. resumeCursor = cst.startWatchingChanges({ pipeline: [{$changeStream: {resumeAfter: firstInsertChangeDoc._id}}], collection: 1, aggregateOptions: {cursor: {batchSize: 0}}, }); assert.docEq(secondInsertChangeDoc, cst.getOneChange(resumeCursor)); assert.docEq(thirdInsertChangeDoc, cst.getOneChange(resumeCursor)); // Test resume after second insert. resumeCursor = cst.startWatchingChanges({ pipeline: [{$changeStream: {resumeAfter: secondInsertChangeDoc._id}}], collection: 1, aggregateOptions: {cursor: {batchSize: 0}}, }); assert.docEq(thirdInsertChangeDoc, cst.getOneChange(resumeCursor)); // Rename the collection and attempt to resume from the 'rename' notification. Skip this // test when running on a sharded collection, since these cannot be renamed. if (!FixtureHelpers.isSharded(coll)) { assertDropAndRecreateCollection(coll.getDB(), coll.getName()); const renameColl = coll.getDB().getCollection("rename_coll"); assertDropCollection(renameColl.getDB(), renameColl.getName()); resumeCursor = cst.startWatchingChanges({collection: 1, pipeline: [{$changeStream: {}}]}); assert.commandWorked(coll.renameCollection(renameColl.getName())); const renameChanges = cst.assertNextChangesEqual({ cursor: resumeCursor, expectedChanges: [ { operationType: "rename", ns: {db: coll.getDB().getName(), coll: coll.getName()}, to: {db: renameColl.getDB().getName(), coll: renameColl.getName()}, }, ], }); const resumeTokenRename = renameChanges[0]._id; // Insert into the renamed collection. assert.commandWorked(renameColl.insert({_id: "after rename"})); // Resume from the rename notification using 'resumeAfter' and verify that the change stream // returns the next insert. let expectedInsert = { operationType: "insert", ns: {db: renameColl.getDB().getName(), coll: renameColl.getName()}, fullDocument: {_id: "after rename"}, documentKey: {_id: "after rename"}, }; resumeCursor = cst.startWatchingChanges({ collection: 1, pipeline: [{$changeStream: {resumeAfter: resumeTokenRename}}], }); cst.assertNextChangesEqual({cursor: resumeCursor, expectedChanges: expectedInsert}); // Resume from the rename notification using 'startAfter' and verify that the change stream // returns the next insert. expectedInsert = { operationType: "insert", ns: {db: renameColl.getDB().getName(), coll: renameColl.getName()}, fullDocument: {_id: "after rename"}, documentKey: {_id: "after rename"}, }; resumeCursor = cst.startWatchingChanges({ collection: 1, pipeline: [{$changeStream: {startAfter: resumeTokenRename}}], }); cst.assertNextChangesEqual({cursor: resumeCursor, expectedChanges: expectedInsert}); // Rename back to the original collection for reliability of the collection drops when // dropping the database. assert.commandWorked(renameColl.renameCollection(coll.getName())); } // Explicitly drop one collection to ensure reliability of the order of notifications from the // dropDatabase command. resumeCursor = cst.startWatchingChanges({pipeline: [{$changeStream: {}}], collection: 1}); assertDropCollection(testDB, otherColl.getName()); const firstCollDrop = cst.getOneChange(resumeCursor); assert.eq(firstCollDrop.operationType, "drop", tojson(firstCollDrop)); assert.eq(firstCollDrop.ns, {db: testDB.getName(), coll: otherColl.getName()}); // Dropping a database should generate a 'drop' notification for each collection, a // 'dropDatabase' notification, and finally an 'invalidate'. assert.commandWorked(testDB.dropDatabase()); const dropDbChanges = cst.assertDatabaseDrop({cursor: resumeCursor, db: testDB}); const secondCollDrop = dropDbChanges[0]; // For sharded passthrough suites, we know that the last entry will be a 'dropDatabase' however // there may be multiple collection drops in 'dropDbChanges' depending on the number of involved // shards. const resumeTokenDropDb = dropDbChanges[dropDbChanges.length - 1]._id; const resumeTokenInvalidate = cst.assertNextChangesEqual({ cursor: resumeCursor, expectedChanges: [{operationType: "invalidate"}], })[0]._id; // Test resuming from the first collection drop and the second collection drop as a result of // dropping the database. [firstCollDrop, secondCollDrop].forEach((token) => { resumeCursor = cst.startWatchingChanges({ pipeline: [{$changeStream: {resumeAfter: token._id}}], collection: 1, aggregateOptions: {cursor: {batchSize: 0}}, }); cst.assertDatabaseDrop({cursor: resumeCursor, db: testDB}); cst.assertNextChangesEqual({cursor: resumeCursor, expectedChanges: [{operationType: "invalidate"}]}); }); // Recreate the test collection. assert.commandWorked(coll.insert({_id: "after recreate"})); // Test resuming from the 'dropDatabase' entry using 'resumeAfter'. resumeCursor = cst.startWatchingChanges({ pipeline: [{$changeStream: {resumeAfter: resumeTokenDropDb}}], collection: 1, aggregateOptions: {cursor: {batchSize: 0}}, }); cst.assertNextChangesEqual({cursor: resumeCursor, expectedChanges: [{operationType: "invalidate"}]}); // Test resuming from the 'invalidate' entry using 'resumeAfter'. assert.commandFailedWithCode( db.runCommand({ aggregate: 1, pipeline: [{$changeStream: {resumeAfter: resumeTokenInvalidate}}], cursor: {}, collation: {locale: "simple"}, }), ErrorCodes.InvalidResumeToken, ); // Test resuming from the 'dropDatabase' entry using 'startAfter'. resumeCursor = cst.startWatchingChanges({ pipeline: [{$changeStream: {startAfter: resumeTokenDropDb}}], collection: 1, aggregateOptions: {cursor: {batchSize: 0}}, }); cst.assertNextChangesEqual({cursor: resumeCursor, expectedChanges: [{operationType: "invalidate"}]}); // Test resuming from the 'invalidate' entry using 'startAfter' and verifies it picks up the // insert after recreating the db/collection. const expectedInsert = { operationType: "insert", ns: {db: testDB.getName(), coll: coll.getName()}, fullDocument: {_id: "after recreate"}, documentKey: {_id: "after recreate"}, }; resumeCursor = cst.startWatchingChanges({ pipeline: [{$changeStream: {startAfter: resumeTokenInvalidate}}], collection: 1, aggregateOptions: {cursor: {batchSize: 0}}, }); cst.consumeDropUpTo({ cursor: resumeCursor, dropType: "dropDatabase", expectedNext: expectedInsert, }); cst.cleanUp();