mirror of https://github.com/mongodb/mongo
201 lines
8.6 KiB
JavaScript
201 lines
8.6 KiB
JavaScript
// Basic tests for resuming a $changeStream that is open against all collections in a database.
|
|
// Do not run in whole-cluster passthrough since this test assumes that the change stream will be
|
|
// invalidated by a database drop.
|
|
// @tags: [do_not_run_in_whole_cluster_passthrough]
|
|
import {assertDropAndRecreateCollection, assertDropCollection} from "jstests/libs/collection_drop_recreate.js";
|
|
import {FixtureHelpers} from "jstests/libs/fixture_helpers.js";
|
|
import {ChangeStreamTest} from "jstests/libs/query/change_stream_util.js";
|
|
|
|
// Drop and recreate the collections to be used in this set of tests.
|
|
const testDB = db.getSiblingDB(jsTestName());
|
|
let coll = assertDropAndRecreateCollection(testDB, "resume_coll");
|
|
const otherColl = assertDropAndRecreateCollection(testDB, "resume_coll_other");
|
|
|
|
let cst = new ChangeStreamTest(testDB);
|
|
let resumeCursor = cst.startWatchingChanges({pipeline: [{$changeStream: {}}], collection: 1});
|
|
|
|
// Insert a single document to each collection and save the resume token from the first insert.
|
|
assert.commandWorked(coll.insert({_id: 1}));
|
|
assert.commandWorked(otherColl.insert({_id: 2}));
|
|
const firstInsertChangeDoc = cst.getOneChange(resumeCursor);
|
|
assert.docEq({_id: 1}, firstInsertChangeDoc.fullDocument);
|
|
assert.eq(firstInsertChangeDoc.ns, {db: testDB.getName(), coll: coll.getName()});
|
|
|
|
// Test resuming the change stream after the first insert should pick up the insert on the
|
|
// second collection.
|
|
resumeCursor = cst.startWatchingChanges({
|
|
pipeline: [{$changeStream: {resumeAfter: firstInsertChangeDoc._id}}],
|
|
collection: 1,
|
|
aggregateOptions: {cursor: {batchSize: 0}},
|
|
});
|
|
|
|
const secondInsertChangeDoc = cst.getOneChange(resumeCursor);
|
|
assert.docEq({_id: 2}, secondInsertChangeDoc.fullDocument);
|
|
assert.eq(secondInsertChangeDoc.ns, {db: testDB.getName(), coll: otherColl.getName()});
|
|
|
|
// Insert a third document to the first collection and test that the change stream picks it up.
|
|
assert.commandWorked(coll.insert({_id: 3}));
|
|
const thirdInsertChangeDoc = cst.getOneChange(resumeCursor);
|
|
assert.docEq({_id: 3}, thirdInsertChangeDoc.fullDocument);
|
|
assert.eq(thirdInsertChangeDoc.ns, {db: testDB.getName(), coll: coll.getName()});
|
|
|
|
// Test resuming after the first insert again.
|
|
resumeCursor = cst.startWatchingChanges({
|
|
pipeline: [{$changeStream: {resumeAfter: firstInsertChangeDoc._id}}],
|
|
collection: 1,
|
|
aggregateOptions: {cursor: {batchSize: 0}},
|
|
});
|
|
assert.docEq(secondInsertChangeDoc, cst.getOneChange(resumeCursor));
|
|
assert.docEq(thirdInsertChangeDoc, cst.getOneChange(resumeCursor));
|
|
|
|
// Test resume after second insert.
|
|
resumeCursor = cst.startWatchingChanges({
|
|
pipeline: [{$changeStream: {resumeAfter: secondInsertChangeDoc._id}}],
|
|
collection: 1,
|
|
aggregateOptions: {cursor: {batchSize: 0}},
|
|
});
|
|
assert.docEq(thirdInsertChangeDoc, cst.getOneChange(resumeCursor));
|
|
|
|
// Rename the collection and attempt to resume from the 'rename' notification. Skip this
|
|
// test when running on a sharded collection, since these cannot be renamed.
|
|
if (!FixtureHelpers.isSharded(coll)) {
|
|
assertDropAndRecreateCollection(coll.getDB(), coll.getName());
|
|
const renameColl = coll.getDB().getCollection("rename_coll");
|
|
assertDropCollection(renameColl.getDB(), renameColl.getName());
|
|
|
|
resumeCursor = cst.startWatchingChanges({collection: 1, pipeline: [{$changeStream: {}}]});
|
|
assert.commandWorked(coll.renameCollection(renameColl.getName()));
|
|
|
|
const renameChanges = cst.assertNextChangesEqual({
|
|
cursor: resumeCursor,
|
|
expectedChanges: [
|
|
{
|
|
operationType: "rename",
|
|
ns: {db: coll.getDB().getName(), coll: coll.getName()},
|
|
to: {db: renameColl.getDB().getName(), coll: renameColl.getName()},
|
|
},
|
|
],
|
|
});
|
|
const resumeTokenRename = renameChanges[0]._id;
|
|
|
|
// Insert into the renamed collection.
|
|
assert.commandWorked(renameColl.insert({_id: "after rename"}));
|
|
|
|
// Resume from the rename notification using 'resumeAfter' and verify that the change stream
|
|
// returns the next insert.
|
|
let expectedInsert = {
|
|
operationType: "insert",
|
|
ns: {db: renameColl.getDB().getName(), coll: renameColl.getName()},
|
|
fullDocument: {_id: "after rename"},
|
|
documentKey: {_id: "after rename"},
|
|
};
|
|
resumeCursor = cst.startWatchingChanges({
|
|
collection: 1,
|
|
pipeline: [{$changeStream: {resumeAfter: resumeTokenRename}}],
|
|
});
|
|
cst.assertNextChangesEqual({cursor: resumeCursor, expectedChanges: expectedInsert});
|
|
|
|
// Resume from the rename notification using 'startAfter' and verify that the change stream
|
|
// returns the next insert.
|
|
expectedInsert = {
|
|
operationType: "insert",
|
|
ns: {db: renameColl.getDB().getName(), coll: renameColl.getName()},
|
|
fullDocument: {_id: "after rename"},
|
|
documentKey: {_id: "after rename"},
|
|
};
|
|
resumeCursor = cst.startWatchingChanges({
|
|
collection: 1,
|
|
pipeline: [{$changeStream: {startAfter: resumeTokenRename}}],
|
|
});
|
|
cst.assertNextChangesEqual({cursor: resumeCursor, expectedChanges: expectedInsert});
|
|
|
|
// Rename back to the original collection for reliability of the collection drops when
|
|
// dropping the database.
|
|
assert.commandWorked(renameColl.renameCollection(coll.getName()));
|
|
}
|
|
|
|
// Explicitly drop one collection to ensure reliability of the order of notifications from the
|
|
// dropDatabase command.
|
|
resumeCursor = cst.startWatchingChanges({pipeline: [{$changeStream: {}}], collection: 1});
|
|
assertDropCollection(testDB, otherColl.getName());
|
|
const firstCollDrop = cst.getOneChange(resumeCursor);
|
|
assert.eq(firstCollDrop.operationType, "drop", tojson(firstCollDrop));
|
|
assert.eq(firstCollDrop.ns, {db: testDB.getName(), coll: otherColl.getName()});
|
|
|
|
// Dropping a database should generate a 'drop' notification for each collection, a
|
|
// 'dropDatabase' notification, and finally an 'invalidate'.
|
|
assert.commandWorked(testDB.dropDatabase());
|
|
const dropDbChanges = cst.assertDatabaseDrop({cursor: resumeCursor, db: testDB});
|
|
const secondCollDrop = dropDbChanges[0];
|
|
// For sharded passthrough suites, we know that the last entry will be a 'dropDatabase' however
|
|
// there may be multiple collection drops in 'dropDbChanges' depending on the number of involved
|
|
// shards.
|
|
const resumeTokenDropDb = dropDbChanges[dropDbChanges.length - 1]._id;
|
|
const resumeTokenInvalidate = cst.assertNextChangesEqual({
|
|
cursor: resumeCursor,
|
|
expectedChanges: [{operationType: "invalidate"}],
|
|
})[0]._id;
|
|
|
|
// Test resuming from the first collection drop and the second collection drop as a result of
|
|
// dropping the database.
|
|
[firstCollDrop, secondCollDrop].forEach((token) => {
|
|
resumeCursor = cst.startWatchingChanges({
|
|
pipeline: [{$changeStream: {resumeAfter: token._id}}],
|
|
collection: 1,
|
|
aggregateOptions: {cursor: {batchSize: 0}},
|
|
});
|
|
cst.assertDatabaseDrop({cursor: resumeCursor, db: testDB});
|
|
cst.assertNextChangesEqual({cursor: resumeCursor, expectedChanges: [{operationType: "invalidate"}]});
|
|
});
|
|
|
|
// Recreate the test collection.
|
|
assert.commandWorked(coll.insert({_id: "after recreate"}));
|
|
|
|
// Test resuming from the 'dropDatabase' entry using 'resumeAfter'.
|
|
resumeCursor = cst.startWatchingChanges({
|
|
pipeline: [{$changeStream: {resumeAfter: resumeTokenDropDb}}],
|
|
collection: 1,
|
|
aggregateOptions: {cursor: {batchSize: 0}},
|
|
});
|
|
cst.assertNextChangesEqual({cursor: resumeCursor, expectedChanges: [{operationType: "invalidate"}]});
|
|
|
|
// Test resuming from the 'invalidate' entry using 'resumeAfter'.
|
|
assert.commandFailedWithCode(
|
|
db.runCommand({
|
|
aggregate: 1,
|
|
pipeline: [{$changeStream: {resumeAfter: resumeTokenInvalidate}}],
|
|
cursor: {},
|
|
collation: {locale: "simple"},
|
|
}),
|
|
ErrorCodes.InvalidResumeToken,
|
|
);
|
|
|
|
// Test resuming from the 'dropDatabase' entry using 'startAfter'.
|
|
resumeCursor = cst.startWatchingChanges({
|
|
pipeline: [{$changeStream: {startAfter: resumeTokenDropDb}}],
|
|
collection: 1,
|
|
aggregateOptions: {cursor: {batchSize: 0}},
|
|
});
|
|
cst.assertNextChangesEqual({cursor: resumeCursor, expectedChanges: [{operationType: "invalidate"}]});
|
|
|
|
// Test resuming from the 'invalidate' entry using 'startAfter' and verifies it picks up the
|
|
// insert after recreating the db/collection.
|
|
const expectedInsert = {
|
|
operationType: "insert",
|
|
ns: {db: testDB.getName(), coll: coll.getName()},
|
|
fullDocument: {_id: "after recreate"},
|
|
documentKey: {_id: "after recreate"},
|
|
};
|
|
resumeCursor = cst.startWatchingChanges({
|
|
pipeline: [{$changeStream: {startAfter: resumeTokenInvalidate}}],
|
|
collection: 1,
|
|
aggregateOptions: {cursor: {batchSize: 0}},
|
|
});
|
|
cst.consumeDropUpTo({
|
|
cursor: resumeCursor,
|
|
dropType: "dropDatabase",
|
|
expectedNext: expectedInsert,
|
|
});
|
|
|
|
cst.cleanUp();
|