mirror of https://github.com/mongodb/mongo
284 lines
11 KiB
JavaScript
284 lines
11 KiB
JavaScript
// Tests of $changeStream notifications for metadata operations.
|
|
// Do not run in whole-cluster passthrough since this test assumes that the change stream will be
|
|
// invalidated by a database drop.
|
|
// @tags: [
|
|
// do_not_run_in_whole_cluster_passthrough,
|
|
// requires_fcv_63,
|
|
// ]
|
|
import {
|
|
assertCreateCollection,
|
|
assertDropAndRecreateCollection,
|
|
assertDropCollection,
|
|
} from "jstests/libs/collection_drop_recreate.js";
|
|
import {FixtureHelpers} from "jstests/libs/fixture_helpers.js";
|
|
import {ChangeStreamTest} from "jstests/libs/query/change_stream_util.js";
|
|
|
|
const testDb = db.getSiblingDB(jsTestName());
|
|
let cst = new ChangeStreamTest(testDb);
|
|
|
|
// Test that it is possible to open a new change stream cursor on a collection that does not
|
|
// exist.
|
|
const collName = "test";
|
|
assertDropCollection(testDb, collName);
|
|
|
|
// Asserts that resuming a change stream with 'spec' and an explicit simple collation returns
|
|
// the results specified by 'expected'.
|
|
function assertResumeExpected({coll, spec, expected}) {
|
|
const cursor = cst.startWatchingChanges({
|
|
collection: coll,
|
|
pipeline: [{$changeStream: spec}],
|
|
aggregateOptions: {collation: {locale: "simple"}},
|
|
});
|
|
cst.assertNextChangesEqual({cursor: cursor, expectedChanges: expected});
|
|
}
|
|
|
|
// Cursor creation succeeds, but there are no results. We do not expect to see a notification
|
|
// for collection creation.
|
|
let cursor = cst.startWatchingChanges({
|
|
collection: collName,
|
|
pipeline: [{$changeStream: {}}, {$project: {operationType: 1}}],
|
|
});
|
|
|
|
// We explicitly test getMore, to ensure that the getMore command for a non-existent collection
|
|
// does not return an error.
|
|
let change = cst.getNextBatch(cursor);
|
|
assert.neq(change.id, 0);
|
|
assert.eq(change.nextBatch.length, 0, tojson(change.nextBatch));
|
|
|
|
// Dropping the empty database should not generate any notification for the change stream, since
|
|
// the collection does not exist yet.
|
|
assert.commandWorked(testDb.dropDatabase());
|
|
change = cst.getNextBatch(cursor);
|
|
assert.neq(change.id, 0);
|
|
assert.eq(change.nextBatch.length, 0, tojson(change.nextBatch));
|
|
|
|
// After collection creation, we expect to see oplog entries for each subsequent operation.
|
|
let coll = assertCreateCollection(testDb, collName);
|
|
assert.commandWorked(coll.insert({_id: 0}));
|
|
|
|
change = cst.getOneChange(cursor);
|
|
assert.eq(change.operationType, "insert", tojson(change));
|
|
|
|
// Create oplog entries of type insert, update, delete, and drop.
|
|
assert.commandWorked(coll.insert({_id: 1}));
|
|
assert.commandWorked(coll.update({_id: 1}, {$set: {a: 1}}));
|
|
assert.commandWorked(coll.remove({_id: 1}));
|
|
assertDropCollection(testDb, coll.getName());
|
|
|
|
// We should get oplog entries of type insert, update, delete, drop, and invalidate. The cursor
|
|
// should be closed.
|
|
let expectedChanges = [
|
|
{operationType: "insert"},
|
|
{operationType: "update"},
|
|
{operationType: "delete"},
|
|
{operationType: "drop"},
|
|
{operationType: "invalidate"},
|
|
];
|
|
let changes = cst.assertNextChangesEqual({cursor: cursor, expectedChanges: expectedChanges, expectInvalidate: true});
|
|
const resumeToken = changes[0]._id;
|
|
const resumeTokenDrop = changes[3]._id;
|
|
const resumeTokenInvalidate = changes[4]._id;
|
|
|
|
// Verify we can startAfter the invalidate, but no new events may be retrieved. This test
|
|
// exercises the bug described in SERVER-41196.
|
|
const restartedStream = coll.watch([], {startAfter: resumeTokenInvalidate});
|
|
assert(!restartedStream.hasNext(), () => tojson(restartedStream.next()));
|
|
|
|
// Verify that we can resume a stream after a collection drop without an explicit collation.
|
|
assert.commandWorked(
|
|
testDb.runCommand({
|
|
aggregate: coll.getName(),
|
|
pipeline: [{$changeStream: {resumeAfter: resumeToken}}],
|
|
cursor: {},
|
|
}),
|
|
);
|
|
|
|
// Recreate the collection.
|
|
coll = assertCreateCollection(testDb, collName);
|
|
assert.commandWorked(coll.insert({_id: "after recreate"}));
|
|
|
|
// Test resuming the change stream from the collection drop using 'resumeAfter'. If running in a
|
|
// sharded passthrough suite, resuming from the drop will first return the drop from the other
|
|
// shard before returning an invalidate.
|
|
cursor = cst.startWatchingChanges({
|
|
collection: coll,
|
|
pipeline: [{$changeStream: {resumeAfter: resumeTokenDrop}}],
|
|
aggregateOptions: {collation: {locale: "simple"}, cursor: {batchSize: 0}},
|
|
});
|
|
cst.consumeDropUpTo({
|
|
cursor: cursor,
|
|
dropType: "drop",
|
|
expectedNext: {operationType: "invalidate"},
|
|
expectInvalidate: true,
|
|
});
|
|
|
|
// Test resuming the change stream from the invalidate after the drop using 'resumeAfter'.
|
|
assert.commandFailedWithCode(
|
|
testDb.runCommand({
|
|
aggregate: coll.getName(),
|
|
pipeline: [{$changeStream: {resumeAfter: resumeTokenInvalidate}}],
|
|
cursor: {},
|
|
collation: {locale: "simple"},
|
|
}),
|
|
ErrorCodes.InvalidResumeToken,
|
|
);
|
|
|
|
// Even after the 'invalidate' event has been filtered out, the cursor should hold the resume token
|
|
// of the 'invalidate' event.
|
|
const resumeStream = coll.watch([{$match: {operationType: "DummyOperationType"}}], {resumeAfter: resumeToken});
|
|
assert.soon(() => {
|
|
assert(!resumeStream.hasNext());
|
|
return resumeStream.isExhausted();
|
|
});
|
|
assert.eq(resumeStream.getResumeToken(), resumeTokenInvalidate);
|
|
|
|
// Test resuming the change stream from the collection drop using 'startAfter'.
|
|
assertResumeExpected({
|
|
coll: coll.getName(),
|
|
spec: {startAfter: resumeTokenDrop},
|
|
expected: [{operationType: "invalidate"}],
|
|
});
|
|
|
|
// Test resuming the change stream from the 'invalidate' notification using 'startAfter'.
|
|
cursor = cst.startWatchingChanges({
|
|
collection: coll,
|
|
pipeline: [{$changeStream: {startAfter: resumeTokenInvalidate}}],
|
|
aggregateOptions: {collation: {locale: "simple"}, cursor: {batchSize: 0}},
|
|
});
|
|
cst.consumeDropUpTo({
|
|
cursor: cursor,
|
|
dropType: "drop",
|
|
expectedNext: {
|
|
operationType: "insert",
|
|
ns: {db: testDb.getName(), coll: coll.getName()},
|
|
fullDocument: {_id: "after recreate"},
|
|
documentKey: {_id: "after recreate"},
|
|
},
|
|
});
|
|
|
|
// Test that renaming a collection being watched generates a "rename" entry followed by an
|
|
// "invalidate". This is true if the change stream is on the source or target collection of the
|
|
// rename. Sharded collections cannot be renamed.
|
|
if (!FixtureHelpers.isSharded(coll)) {
|
|
cursor = cst.startWatchingChanges({collection: collName, pipeline: [{$changeStream: {}}]});
|
|
assertDropCollection(testDb, "renamed_coll");
|
|
assert.commandWorked(coll.renameCollection("renamed_coll"));
|
|
expectedChanges = [
|
|
{
|
|
operationType: "rename",
|
|
ns: {db: testDb.getName(), coll: collName},
|
|
to: {db: testDb.getName(), coll: "renamed_coll"},
|
|
},
|
|
{operationType: "invalidate"},
|
|
];
|
|
cst.assertNextChangesEqual({cursor: cursor, expectedChanges: expectedChanges, expectInvalidate: true});
|
|
|
|
coll = testDb["renamed_coll"];
|
|
|
|
// Repeat the test, this time with a change stream open on the target.
|
|
cursor = cst.startWatchingChanges({collection: collName, pipeline: [{$changeStream: {}}]});
|
|
assert.commandWorked(coll.renameCollection(collName));
|
|
expectedChanges = [
|
|
{
|
|
operationType: "rename",
|
|
ns: {db: testDb.getName(), coll: "renamed_coll"},
|
|
to: {db: testDb.getName(), coll: collName},
|
|
},
|
|
{operationType: "invalidate"},
|
|
];
|
|
const changes = cst.assertNextChangesEqual({cursor: cursor, expectedChanges: expectedChanges});
|
|
const resumeTokenRename = changes[0]._id;
|
|
const resumeTokenInvalidate = changes[1]._id;
|
|
|
|
coll = testDb[collName];
|
|
assert.commandWorked(coll.insert({_id: "after rename"}));
|
|
|
|
// Test resuming the change stream from the collection rename using 'resumeAfter'.
|
|
assertResumeExpected({
|
|
coll: coll.getName(),
|
|
spec: {resumeAfter: resumeTokenRename},
|
|
expected: [{operationType: "invalidate"}],
|
|
});
|
|
// Test resuming the change stream from the invalidate after the rename using 'resumeAfter'.
|
|
assert.commandFailedWithCode(
|
|
testDb.runCommand({
|
|
aggregate: coll.getName(),
|
|
pipeline: [{$changeStream: {resumeAfter: resumeTokenInvalidate}}],
|
|
cursor: {},
|
|
collation: {locale: "simple"},
|
|
}),
|
|
ErrorCodes.InvalidResumeToken,
|
|
);
|
|
|
|
// Test resuming the change stream from the rename using 'startAfter'.
|
|
assertResumeExpected({
|
|
coll: coll.getName(),
|
|
spec: {startAfter: resumeTokenRename},
|
|
expected: [{operationType: "invalidate"}],
|
|
});
|
|
|
|
// Test resuming the change stream from the invalidate after the rename using 'startAfter'.
|
|
expectedChanges = [
|
|
{
|
|
operationType: "insert",
|
|
ns: {db: testDb.getName(), coll: coll.getName()},
|
|
fullDocument: {_id: "after rename"},
|
|
documentKey: {_id: "after rename"},
|
|
},
|
|
];
|
|
assertResumeExpected({
|
|
coll: coll.getName(),
|
|
spec: {startAfter: resumeTokenInvalidate},
|
|
expected: expectedChanges,
|
|
});
|
|
|
|
assertDropAndRecreateCollection(testDb, "renamed_coll");
|
|
assert.commandWorked(testDb.renamed_coll.insert({_id: 0}));
|
|
|
|
// Repeat the test again, this time using the 'dropTarget' option with an existing target
|
|
// collection.
|
|
cursor = cst.startWatchingChanges({collection: "renamed_coll", pipeline: [{$changeStream: {}}]});
|
|
assert.commandWorked(coll.renameCollection("renamed_coll", true /* dropTarget */));
|
|
expectedChanges = [
|
|
{
|
|
operationType: "rename",
|
|
ns: {db: testDb.getName(), coll: collName},
|
|
to: {db: testDb.getName(), coll: "renamed_coll"},
|
|
},
|
|
{operationType: "invalidate"},
|
|
];
|
|
cst.assertNextChangesEqual({cursor: cursor, expectedChanges: expectedChanges, expectInvalidate: true});
|
|
|
|
coll = testDb["renamed_coll"];
|
|
|
|
// Test the behavior of a change stream watching the target collection of a $out aggregation
|
|
// stage.
|
|
cursor = cst.startWatchingChanges({collection: collName, pipeline: [{$changeStream: {}}]});
|
|
coll.aggregate([{$out: collName}]);
|
|
// Note that $out will first create a temp collection, and then rename the temp collection
|
|
// to the target. Do not explicitly check the 'ns' field.
|
|
const rename = cst.getOneChange(cursor);
|
|
assert.eq(rename.operationType, "rename", tojson(rename));
|
|
assert.eq(rename.to, {db: testDb.getName(), coll: collName}, tojson(rename));
|
|
assert.eq(cst.getOneChange(cursor, true).operationType, "invalidate");
|
|
}
|
|
|
|
// Test that dropping a database will first drop all of it's collections, invalidating any
|
|
// change streams on those collections.
|
|
cursor = cst.startWatchingChanges({
|
|
collection: coll.getName(),
|
|
pipeline: [{$changeStream: {}}],
|
|
});
|
|
assert.commandWorked(testDb.dropDatabase());
|
|
|
|
expectedChanges = [
|
|
{
|
|
operationType: "drop",
|
|
ns: {db: testDb.getName(), coll: coll.getName()},
|
|
},
|
|
{operationType: "invalidate"},
|
|
];
|
|
cst.assertNextChangesEqual({cursor: cursor, expectedChanges: expectedChanges, expectInvalidate: true});
|
|
|
|
cst.cleanUp();
|