mirror of https://github.com/mongodb/mongo
136 lines
5.6 KiB
JavaScript
136 lines
5.6 KiB
JavaScript
// Basic tests for $changeStream against all databases in the cluster.
|
|
// @tags: [
|
|
// requires_profiling,
|
|
// requires_fcv_81
|
|
// ]
|
|
import {assertDropAndRecreateCollection} from "jstests/libs/collection_drop_recreate.js";
|
|
import {FixtureHelpers} from "jstests/libs/fixture_helpers.js";
|
|
import {getLatestProfilerEntry} from "jstests/libs/profiler.js";
|
|
import {
|
|
assertInvalidChangeStreamNss,
|
|
assertValidChangeStreamNss,
|
|
ChangeStreamTest,
|
|
} from "jstests/libs/query/change_stream_util.js";
|
|
|
|
const testDb = db.getSiblingDB(jsTestName());
|
|
const adminDB = testDb.getSiblingDB("admin");
|
|
const otherDB = testDb.getSiblingDB(jsTestName() + "_other");
|
|
|
|
// Drop and recreate the collections to be used in this set of tests.
|
|
assertDropAndRecreateCollection(testDb, "t1");
|
|
assertDropAndRecreateCollection(otherDB, "t2");
|
|
|
|
// Test that a change stream can be opened on the admin database if {allChangesForCluster:true}
|
|
// is specified.
|
|
assertValidChangeStreamNss("admin", 1, {allChangesForCluster: true});
|
|
// Test that a change stream cannot be opened on the admin database if a collection is
|
|
// specified, even with {allChangesForCluster:true}.
|
|
assertInvalidChangeStreamNss("admin", "testcoll", {allChangesForCluster: true});
|
|
// Test that a change stream cannot be opened on a database other than admin if
|
|
// {allChangesForCluster:true} is specified.
|
|
assertInvalidChangeStreamNss(testDb.getName(), 1, {allChangesForCluster: true});
|
|
|
|
let cst = new ChangeStreamTest(adminDB);
|
|
let cursor = cst.startWatchingAllChangesForCluster();
|
|
|
|
// Test that if there are no changes, we return an empty batch.
|
|
assert.eq(0, cursor.firstBatch.length, "Cursor had changes: " + tojson(cursor));
|
|
|
|
// Test that the change stream returns an inserted doc.
|
|
assert.commandWorked(testDb.t1.insert({_id: 0, a: 1}));
|
|
let expected = {
|
|
documentKey: {_id: 0},
|
|
fullDocument: {_id: 0, a: 1},
|
|
ns: {db: testDb.getName(), coll: "t1"},
|
|
operationType: "insert",
|
|
};
|
|
cst.assertNextChangesEqual({cursor: cursor, expectedChanges: [expected]});
|
|
|
|
// Test that the change stream returns another inserted doc in a different database.
|
|
assert.commandWorked(otherDB.t2.insert({_id: 0, a: 2}));
|
|
expected = {
|
|
documentKey: {_id: 0},
|
|
fullDocument: {_id: 0, a: 2},
|
|
ns: {db: otherDB.getName(), coll: "t2"},
|
|
operationType: "insert",
|
|
};
|
|
cst.assertNextChangesEqual({cursor: cursor, expectedChanges: [expected]});
|
|
|
|
// Test that the change stream returns an inserted doc on a user-created database whose name
|
|
// includes 'admin', 'local', or 'config'.
|
|
const validUserDBs = ["admin1", "1admin", "_admin_", "local_", "_local", "_local_", "config_", "_config", "_config_"];
|
|
validUserDBs.forEach((dbName) => {
|
|
assert.commandWorked(testDb.getSiblingDB(dbName).test.insert({_id: 0, a: 1}));
|
|
expected = [
|
|
{
|
|
documentKey: {_id: 0},
|
|
fullDocument: {_id: 0, a: 1},
|
|
ns: {db: dbName, coll: "test"},
|
|
operationType: "insert",
|
|
},
|
|
];
|
|
cst.assertNextChangesEqual({cursor: cursor, expectedChanges: expected});
|
|
});
|
|
|
|
// Test that the change stream returns an inserted doc on a user-created collection whose name
|
|
// includes "system" but is not considered an internal collection.
|
|
const validSystemColls = ["system", "systems.views", "ssystem.views", "test.system", "system_views"];
|
|
validSystemColls.forEach((collName) => {
|
|
assert.commandWorked(testDb.getCollection(collName).insert({_id: 0, a: 1}));
|
|
expected = [
|
|
{
|
|
documentKey: {_id: 0},
|
|
fullDocument: {_id: 0, a: 1},
|
|
ns: {db: testDb.getName(), coll: collName},
|
|
operationType: "insert",
|
|
},
|
|
];
|
|
cst.assertNextChangesEqual({cursor: cursor, expectedChanges: expected});
|
|
});
|
|
|
|
// Test that the change stream filters out operations on any collection in the 'admin', 'local',
|
|
// or 'config' databases.
|
|
const filteredDBs = ["admin", "local", "config"];
|
|
filteredDBs.forEach((dbName) => {
|
|
// Not allowed to use 'local' db through mongos.
|
|
if (FixtureHelpers.isMongos(testDb) && dbName == "local") return;
|
|
|
|
assert.commandWorked(testDb.getSiblingDB(dbName).test.insert({_id: 0, a: 1}));
|
|
// Insert to the test collection to ensure that the change stream has something to
|
|
// return.
|
|
assert.commandWorked(testDb.t1.insert({_id: dbName}));
|
|
expected = [
|
|
{
|
|
documentKey: {_id: dbName},
|
|
fullDocument: {_id: dbName},
|
|
ns: {db: testDb.getName(), coll: "t1"},
|
|
operationType: "insert",
|
|
},
|
|
];
|
|
cst.assertNextChangesEqual({cursor: cursor, expectedChanges: expected});
|
|
// Clear the test collection to avoid duplicate key errors if this test is run multiple times.
|
|
assert.commandWorked(testDb.getSiblingDB(dbName).test.remove({}, false /* justOne */));
|
|
});
|
|
|
|
// Dropping a database should generate drop entries for each collection followed by a database
|
|
// drop.
|
|
assert.commandWorked(otherDB.dropDatabase());
|
|
cst.assertDatabaseDrop({cursor: cursor, db: otherDB});
|
|
|
|
// Drop the remaining databases and clean up the test.
|
|
assert.commandWorked(testDb.dropDatabase());
|
|
validUserDBs.forEach((dbName) => {
|
|
testDb.getSiblingDB(dbName).dropDatabase();
|
|
});
|
|
|
|
// Test that getMore commands from the whole-cluster change stream are logged by the profiler.
|
|
if (!FixtureHelpers.isMongos(adminDB)) {
|
|
assert.commandWorked(adminDB.runCommand({profile: 2}));
|
|
cst.getNextBatch(cursor);
|
|
const profileEntry = getLatestProfilerEntry(adminDB, {op: "getmore"});
|
|
const firstStage = Object.keys(profileEntry.originatingCommand.pipeline[0])[0];
|
|
assert(["$changeStream", "$_internalChangeStreamOplogMatch"].includes(firstStage), profileEntry);
|
|
}
|
|
|
|
cst.cleanUp();
|