mirror of https://github.com/mongodb/mongo
220 lines
8.6 KiB
JavaScript
220 lines
8.6 KiB
JavaScript
/**
|
|
* Tests that a change stream will correctly unwind applyOps entries generated by a transaction.
|
|
* @tags: [
|
|
* uses_transactions,
|
|
* requires_fcv_61, # Pre-6.1 builds do not emit change stream events for applyOps.
|
|
* requires_majority_read_concern,
|
|
* requires_snapshot_read,
|
|
* ]
|
|
*/
|
|
|
|
import {withTxnAndAutoRetryOnMongos} from "jstests/libs/auto_retry_transaction_in_sharding.js";
|
|
import {ChangeStreamTest} from "jstests/libs/change_stream_util.js";
|
|
import {assertDropAndRecreateCollection} from "jstests/libs/collection_drop_recreate.js";
|
|
import {FixtureHelpers} from "jstests/libs/fixture_helpers.js";
|
|
|
|
const otherCollName = "change_stream_apply_ops_2";
|
|
const coll = assertDropAndRecreateCollection(db, "change_stream_apply_ops");
|
|
assertDropAndRecreateCollection(db, otherCollName);
|
|
|
|
const otherDbName = "change_stream_apply_ops_db";
|
|
const otherDbCollName = "someColl";
|
|
assertDropAndRecreateCollection(db.getSiblingDB(otherDbName), otherDbCollName);
|
|
|
|
// Insert a document that gets deleted as part of the transaction.
|
|
const kDeletedDocumentId = 0;
|
|
const insertRes = assert.commandWorked(coll.runCommand("insert", {
|
|
documents: [{_id: kDeletedDocumentId, a: "I was here before the transaction"}],
|
|
writeConcern: {w: "majority"}
|
|
}));
|
|
|
|
// Record the clusterTime of the insert, and increment it to give the test start time.
|
|
const testStartTime = insertRes.$clusterTime.clusterTime;
|
|
testStartTime.i++;
|
|
|
|
let cst = new ChangeStreamTest(db);
|
|
let changeStream = cst.startWatchingChanges({
|
|
pipeline: [{$changeStream: {}}, {$project: {"lsid.uid": 0}}],
|
|
collection: coll,
|
|
doNotModifyInPassthroughs:
|
|
true // A collection drop only invalidates single-collection change streams.
|
|
});
|
|
|
|
const sessionOptions = {
|
|
causalConsistency: false
|
|
};
|
|
const txnOptions = {
|
|
readConcern: {level: "snapshot"},
|
|
writeConcern: {w: "majority"}
|
|
};
|
|
|
|
const session = db.getMongo().startSession(sessionOptions);
|
|
|
|
// Create these variables before starting the transaction. In sharded passthroughs, accessing
|
|
// db[collname] may attempt to implicitly shard the collection, which is not allowed in a txn.
|
|
const sessionDb = session.getDatabase(db.getName());
|
|
const sessionColl = sessionDb[coll.getName()];
|
|
const sessionOtherColl = sessionDb[otherCollName];
|
|
const sessionOtherDbColl = session.getDatabase(otherDbName)[otherDbCollName];
|
|
|
|
withTxnAndAutoRetryOnMongos(session, () => {
|
|
// Two inserts on the main test collection.
|
|
assert.commandWorked(sessionColl.insert({_id: 1, a: 0}));
|
|
assert.commandWorked(sessionColl.insert({_id: 2, a: 0}));
|
|
|
|
// One insert on a collection that we're not watching. This should be skipped by the
|
|
// single-collection changestream.
|
|
assert.commandWorked(sessionOtherColl.insert({_id: 111, a: "Doc on other collection"}));
|
|
|
|
// One insert on a collection in a different database. This should be skipped by the single
|
|
// collection and single-db changestreams.
|
|
assert.commandWorked(sessionOtherDbColl.insert({_id: 222, a: "Doc on other DB"}));
|
|
|
|
assert.commandWorked(sessionColl.updateOne({_id: 1}, {$inc: {a: 1}}));
|
|
|
|
assert.commandWorked(sessionColl.deleteOne({_id: kDeletedDocumentId}));
|
|
}, txnOptions);
|
|
|
|
// Do applyOps on the collection that we care about. This is an "external" applyOps, though (not run
|
|
// as part of a transaction). This checks that although this applyOps doesn't have an 'lsid' and
|
|
// 'txnNumber', the field gets unwound and a change stream event is emitted. Skip if running in a
|
|
// sharded passthrough, since the applyOps command does not exist on mongoS.
|
|
if (!FixtureHelpers.isMongos(db)) {
|
|
assert.commandWorked(db.runCommand({
|
|
applyOps: [
|
|
{op: "i", ns: coll.getFullName(), o: {_id: 3, a: "insert from applyOps"}},
|
|
]
|
|
}));
|
|
}
|
|
|
|
// Drop the collection. This will trigger an "invalidate" event at the end of the stream.
|
|
assert.commandWorked(db.runCommand({drop: coll.getName()}));
|
|
|
|
// Define the set of changes expected for the single-collection case per the operations above.
|
|
let expectedChanges = [
|
|
{
|
|
documentKey: {_id: 1},
|
|
fullDocument: {_id: 1, a: 0},
|
|
ns: {db: db.getName(), coll: coll.getName()},
|
|
operationType: "insert",
|
|
lsid: session.getSessionId(),
|
|
txnNumber: session.getTxnNumber_forTesting(),
|
|
},
|
|
{
|
|
documentKey: {_id: 2},
|
|
fullDocument: {_id: 2, a: 0},
|
|
ns: {db: db.getName(), coll: coll.getName()},
|
|
operationType: "insert",
|
|
lsid: session.getSessionId(),
|
|
txnNumber: session.getTxnNumber_forTesting(),
|
|
},
|
|
{
|
|
documentKey: {_id: 1},
|
|
ns: {db: db.getName(), coll: coll.getName()},
|
|
operationType: "update",
|
|
updateDescription: {removedFields: [], updatedFields: {a: 1}, truncatedArrays: []},
|
|
lsid: session.getSessionId(),
|
|
txnNumber: session.getTxnNumber_forTesting(),
|
|
},
|
|
{
|
|
documentKey: {_id: kDeletedDocumentId},
|
|
ns: {db: db.getName(), coll: coll.getName()},
|
|
operationType: "delete",
|
|
lsid: session.getSessionId(),
|
|
txnNumber: session.getTxnNumber_forTesting(),
|
|
}
|
|
];
|
|
|
|
if (!FixtureHelpers.isMongos(db)) {
|
|
expectedChanges.push({
|
|
documentKey: {_id: 3},
|
|
fullDocument: {_id: 3, a: "insert from applyOps"},
|
|
ns: {db: db.getName(), coll: coll.getName()},
|
|
operationType: "insert",
|
|
});
|
|
}
|
|
expectedChanges.push({
|
|
operationType: "drop",
|
|
ns: {db: db.getName(), coll: coll.getName()},
|
|
});
|
|
|
|
// If we are running in a sharded passthrough, then this may have been a multi-shard transaction.
|
|
// Change streams will interleave the txn events from across the shards in (clusterTime, txnOpIndex)
|
|
// order, and so may not reflect the ordering of writes in the test. We thus verify that exactly the
|
|
// expected set of events are observed, but we relax the ordering requirements.
|
|
function assertNextChangesEqual({cursor, expectedChanges, expectInvalidate}) {
|
|
const assertEqualFunc = FixtureHelpers.isMongos(db) ? cst.assertNextChangesEqualUnordered
|
|
: cst.assertNextChangesEqual;
|
|
return assertEqualFunc(
|
|
{cursor: cursor, expectedChanges: expectedChanges, expectInvalidate: expectInvalidate});
|
|
}
|
|
|
|
//
|
|
// Test behavior of single-collection change streams with apply ops.
|
|
//
|
|
|
|
// Verify that the stream returns the expected sequence of changes.
|
|
assertNextChangesEqual({cursor: changeStream, expectedChanges: expectedChanges});
|
|
|
|
// Single collection change stream should also be invalidated by the drop.
|
|
assertNextChangesEqual({
|
|
cursor: changeStream,
|
|
expectedChanges: [{operationType: "invalidate"}],
|
|
expectInvalidate: true
|
|
});
|
|
|
|
//
|
|
// Test behavior of whole-db change streams with apply ops.
|
|
//
|
|
|
|
// In a sharded cluster, whole-db-or-cluster streams will see a collection drop from each shard.
|
|
for (let i = 1; i < FixtureHelpers.numberOfShardsForCollection(coll); ++i) {
|
|
expectedChanges.push({operationType: "drop", ns: {db: db.getName(), coll: coll.getName()}});
|
|
}
|
|
|
|
// Add an entry for the insert on db.otherColl into expectedChanges.
|
|
expectedChanges.splice(2, 0, {
|
|
documentKey: {_id: 111},
|
|
fullDocument: {_id: 111, a: "Doc on other collection"},
|
|
ns: {db: db.getName(), coll: otherCollName},
|
|
operationType: "insert",
|
|
lsid: session.getSessionId(),
|
|
txnNumber: session.getTxnNumber_forTesting(),
|
|
});
|
|
|
|
// Verify that a whole-db stream returns the expected sequence of changes, including the insert
|
|
// on the other collection but NOT the changes on the other DB or the manual applyOps.
|
|
changeStream = cst.startWatchingChanges({
|
|
pipeline: [{$changeStream: {startAtOperationTime: testStartTime}}, {$project: {"lsid.uid": 0}}],
|
|
collection: 1
|
|
});
|
|
assertNextChangesEqual({cursor: changeStream, expectedChanges: expectedChanges});
|
|
|
|
//
|
|
// Test behavior of whole-cluster change streams with apply ops.
|
|
//
|
|
|
|
// Add an entry for the insert on otherDb.otherDbColl into expectedChanges.
|
|
expectedChanges.splice(3, 0, {
|
|
documentKey: {_id: 222},
|
|
fullDocument: {_id: 222, a: "Doc on other DB"},
|
|
ns: {db: otherDbName, coll: otherDbCollName},
|
|
operationType: "insert",
|
|
lsid: session.getSessionId(),
|
|
txnNumber: session.getTxnNumber_forTesting(),
|
|
});
|
|
|
|
// Verify that a whole-cluster stream returns the expected sequence of changes, including the
|
|
// inserts on the other collection and the other database, but NOT the manual applyOps.
|
|
cst = new ChangeStreamTest(db.getSiblingDB("admin"));
|
|
changeStream = cst.startWatchingChanges({
|
|
pipeline: [
|
|
{$changeStream: {startAtOperationTime: testStartTime, allChangesForCluster: true}},
|
|
{$project: {"lsid.uid": 0}}
|
|
],
|
|
collection: 1
|
|
});
|
|
assertNextChangesEqual({cursor: changeStream, expectedChanges: expectedChanges});
|
|
|
|
cst.cleanUp();
|