mongo/jstests/change_streams/apply_ops_resumability.js

259 lines
11 KiB
JavaScript

// Tests that a change stream will correctly unwind applyOps entries generated by a transaction, and
// that we can resume from any point within the transaction.
// @tags: [uses_transactions, requires_snapshot_read, requires_majority_read_concern, resource_intensive]
import {withTxnAndAutoRetryOnMongos} from "jstests/libs/auto_retry_transaction_in_sharding.js";
import {assertDropAndRecreateCollection} from "jstests/libs/collection_drop_recreate.js";
import {FixtureHelpers} from "jstests/libs/fixture_helpers.js";
import {ChangeStreamTest} from "jstests/libs/query/change_stream_util.js";
const coll = assertDropAndRecreateCollection(db, "change_stream_apply_ops");
const otherCollName = "change_stream_apply_ops_2";
assertDropAndRecreateCollection(db, otherCollName);
const otherDbName = "change_stream_apply_ops_db";
const otherDbCollName = "someColl";
assertDropAndRecreateCollection(db.getSiblingDB(otherDbName), otherDbCollName);
let cst = new ChangeStreamTest(db);
let changeStream = cst.startWatchingChanges({
pipeline: [{$changeStream: {}}, {$project: {"lsid.uid": 0}}],
collection: coll,
});
// Record the clusterTime at the outset of the test, before any writes are performed.
const testStartTime = db.hello().$clusterTime.clusterTime;
// Do an insert outside of a transaction.
assert.commandWorked(coll.insert({_id: 0, a: 123}));
// Open a session, and perform two writes within a transaction.
const sessionOptions = {
causalConsistency: false,
};
const txnOptions = {
readConcern: {level: "snapshot"},
writeConcern: {w: "majority"},
};
const session = db.getMongo().startSession(sessionOptions);
// Create these variables before starting the transaction. In sharded passthroughs, accessing
// db[collname] may attempt to implicitly shard the collection, which is not allowed in a txn.
const sessionDb = session.getDatabase(db.getName());
const sessionColl = sessionDb[coll.getName()];
const sessionOtherColl = sessionDb[otherCollName];
const sessionOtherDbColl = session.getDatabase(otherDbName)[otherDbCollName];
withTxnAndAutoRetryOnMongos(
session,
() => {
// Two inserts on the main test collection.
assert.commandWorked(sessionColl.insert({_id: 1, a: 0}));
assert.commandWorked(sessionColl.insert({_id: 2, a: 0}));
// One insert on a collection that we're not watching. This should be skipped by the
// single-collection change stream.
assert.commandWorked(sessionOtherColl.insert({_id: 111, a: "Doc on other collection"}));
// One insert on a collection in a different database. This should be skipped by the single
// collection and single-db changestreams.
assert.commandWorked(sessionOtherDbColl.insert({_id: 222, a: "Doc on other DB"}));
assert.commandWorked(sessionColl.updateOne({_id: 1}, {$inc: {a: 1}}));
},
txnOptions,
);
// Now insert another document, not part of a transaction.
assert.commandWorked(coll.insert({_id: 3, a: 123}));
// Drop the collection. This will trigger a "drop" event, which in the case of the single-collection
// stream will be followed by an "invalidate".
assert.commandWorked(db.runCommand({drop: coll.getName()}));
// Define the set of all changes expected to be generated by the operations above.
let expectedChanges = [
{
documentKey: {_id: 0},
fullDocument: {_id: 0, a: 123},
ns: {db: db.getName(), coll: coll.getName()},
operationType: "insert",
},
{
documentKey: {_id: 1},
fullDocument: {_id: 1, a: 0},
ns: {db: db.getName(), coll: coll.getName()},
operationType: "insert",
lsid: session.getSessionId(),
txnNumber: session.getTxnNumber_forTesting(),
},
{
documentKey: {_id: 2},
fullDocument: {_id: 2, a: 0},
ns: {db: db.getName(), coll: coll.getName()},
operationType: "insert",
lsid: session.getSessionId(),
txnNumber: session.getTxnNumber_forTesting(),
},
{
documentKey: {_id: 111},
fullDocument: {_id: 111, a: "Doc on other collection"},
ns: {db: db.getName(), coll: otherCollName},
operationType: "insert",
lsid: session.getSessionId(),
txnNumber: session.getTxnNumber_forTesting(),
},
{
documentKey: {_id: 222},
fullDocument: {_id: 222, a: "Doc on other DB"},
ns: {db: otherDbName, coll: otherDbCollName},
operationType: "insert",
lsid: session.getSessionId(),
txnNumber: session.getTxnNumber_forTesting(),
},
{
documentKey: {_id: 1},
ns: {db: db.getName(), coll: coll.getName()},
operationType: "update",
updateDescription: {removedFields: [], updatedFields: {a: 1}, truncatedArrays: []},
lsid: session.getSessionId(),
txnNumber: session.getTxnNumber_forTesting(),
},
{
documentKey: {_id: 3},
fullDocument: {_id: 3, a: 123},
ns: {db: db.getName(), coll: coll.getName()},
operationType: "insert",
},
{operationType: "drop", ns: {db: db.getName(), coll: coll.getName()}},
];
// Validate that we observe all expected changes in the stream, and replace the'expectedChanges'
// list with the changes returned by ChangeStreamTest. These will include the _id resume tokens for
// each change, so subsequent tests will be able to resume from any point.
(function validateExpectedChangesAndPopulateResumeTokens() {
const wholeClusterCST = new ChangeStreamTest(db.getSiblingDB("admin"));
const wholeClusterCursor = wholeClusterCST.startWatchingChanges({
pipeline: [
{$changeStream: {startAtOperationTime: testStartTime, allChangesForCluster: true}},
{$project: {"lsid.uid": 0}},
],
collection: 1,
});
expectedChanges = wholeClusterCST.assertNextChangesEqualWithDeploymentAwareness({
cursor: wholeClusterCursor,
expectedChanges: expectedChanges,
});
})();
// Helper function to find the first non-transaction event and the first two transaction events in
// the given list of change stream events.
function findMilestoneEvents(eventList) {
const nonTxnIdx = eventList.findIndex((event) => !event.lsid),
firstTxnIdx = eventList.findIndex((event) => event.lsid),
secondTxnIdx = eventList.findIndex((event, idx) => idx > firstTxnIdx && event.lsid);
// Return the array indices of each event, and the events themselves.
return [
nonTxnIdx,
firstTxnIdx,
secondTxnIdx,
eventList[nonTxnIdx],
eventList[firstTxnIdx],
eventList[secondTxnIdx],
];
}
//
// Test behavior of single-collection change streams with apply ops.
//
// Filter out any events that aren't on the main test collection namespace.
const expectedSingleCollChanges = expectedChanges.filter(
(event) => event.ns.db === db.getName() && event.ns.coll === coll.getName(),
);
// Verify that the stream returns the expected sequence of changes.
cst.assertNextChangesEqual({cursor: changeStream, expectedChanges: expectedSingleCollChanges});
// Obtain the first non-transaction change and the first two in-transaction changes.
let [nonTxnIdx, firstTxnIdx, secondTxnIdx, nonTxnChange, firstTxnChange, secondTxnChange] =
findMilestoneEvents(expectedSingleCollChanges);
// Resume after the first non-transaction change. Be sure we see the documents from the
// transaction again.
changeStream = cst.startWatchingChanges({
pipeline: [{$changeStream: {resumeAfter: nonTxnChange._id}}, {$project: {"lsid.uid": 0}}],
collection: coll,
});
cst.assertNextChangesEqual({cursor: changeStream, expectedChanges: expectedSingleCollChanges.slice(nonTxnIdx + 1)});
// Resume after the first transaction change. Be sure we see the second change again.
changeStream = cst.startWatchingChanges({
pipeline: [{$changeStream: {resumeAfter: firstTxnChange._id}}, {$project: {"lsid.uid": 0}}],
collection: coll,
});
cst.assertNextChangesEqual({cursor: changeStream, expectedChanges: expectedSingleCollChanges.slice(firstTxnIdx + 1)});
// Try starting another change stream from the second change caused by the transaction. Verify
// that we can see the insert performed after the transaction was committed.
let otherCursor = cst.startWatchingChanges({
pipeline: [{$changeStream: {resumeAfter: secondTxnChange._id}}, {$project: {"lsid.uid": 0}}],
collection: coll,
doNotModifyInPassthroughs: true, // A collection drop only invalidates single-collection
// change streams.
});
cst.assertNextChangesEqual({cursor: otherCursor, expectedChanges: expectedSingleCollChanges.slice(secondTxnIdx + 1)});
// Verify that the next event observed by the stream is an invalidate following the collection drop.
const invalidateEvent = cst.getOneChange(otherCursor, true);
assert.eq(invalidateEvent.operationType, "invalidate");
//
// Test behavior of whole-db change streams with apply ops.
//
// In a sharded cluster, whole-db-or-cluster streams will see a collection drop from each shard.
for (let i = 1; i < FixtureHelpers.numberOfShardsForCollection(coll); ++i) {
expectedChanges.push({operationType: "drop", ns: {db: db.getName(), coll: coll.getName()}});
}
// Filter out any events that aren't on the main test database.
const expectedSingleDBChanges = expectedChanges.filter((event) => event.ns.db === db.getName());
// Obtain the first non-transaction change and the first two in-transaction changes.
[nonTxnIdx, firstTxnIdx, secondTxnIdx, nonTxnChange, firstTxnChange, secondTxnChange] =
findMilestoneEvents(expectedSingleDBChanges);
// Verify that a whole-db stream can be resumed from the middle of the transaction, and that it
// will see all subsequent changes including the insert on the other collection but NOT the
// changes on the other DB.
changeStream = cst.startWatchingChanges({
pipeline: [{$changeStream: {resumeAfter: secondTxnChange._id}}, {$project: {"lsid.uid": 0}}],
collection: 1,
});
cst.assertNextChangesEqual({cursor: changeStream, expectedChanges: expectedSingleDBChanges.slice(secondTxnIdx + 1)});
//
// Test behavior of whole-cluster change streams with apply ops.
//
// Obtain the first non-transaction change and the first two in-transaction changes.
[nonTxnIdx, firstTxnIdx, secondTxnIdx, nonTxnChange, firstTxnChange, secondTxnChange] =
findMilestoneEvents(expectedChanges);
// Verify that a whole-cluster stream can be resumed from the middle of the transaction, and
// that it will see all subsequent changes including the insert on the other collection and the
// changes on the other DB.
cst = new ChangeStreamTest(db.getSiblingDB("admin"));
changeStream = cst.startWatchingChanges({
pipeline: [
{$changeStream: {resumeAfter: secondTxnChange._id, allChangesForCluster: true}},
{$project: {"lsid.uid": 0}},
],
collection: 1,
});
cst.assertNextChangesEqual({cursor: changeStream, expectedChanges: expectedChanges.slice(secondTxnIdx + 1)});
cst.cleanUp();