/** * Tests that change stream point-in-time pre-images are replicated to secondaries, recovered during * startup recovery, and not written during the logical initial sync of a node. * * The test relies on a correct change stream pre-image recording on a node in the primary role. * * @tags: [ * # 6.2 removes support for atomic applyOps * requires_fcv_62, * # The test waits for the Checkpointer, but this process runs only for on-disk storage engines. * requires_persistence, * ] */ import {PrepareHelpers} from "jstests/core/txns/libs/prepare_helpers.js"; import {configureFailPoint} from "jstests/libs/fail_point_util.js"; import {getPreImages, getPreImagesCollection} from "jstests/libs/query/change_stream_util.js"; import {ReplSetTest} from "jstests/libs/replsettest.js"; import {TxnUtil} from "jstests/libs/txns/txn_util.js"; const testName = jsTestName(); const replTest = new ReplSetTest({ name: testName, nodes: [{}, {rsConfig: {priority: 0}}], nodeOptions: { setParameter: {logComponentVerbosity: tojsononeline({replication: {initialSync: 5}})}, oplogSize: 1024, }, }); replTest.startSet(); replTest.initiate(null, null, {initiateWithDefaultElectionTimeout: true}); // Asserts that documents in the pre-images collection on the primary node are the same as on a // secondary node. function assertPreImagesCollectionOnPrimaryMatchesSecondary() { function detailedError() { return ( "pre-images collection on primary " + tojson(getPreImages(replTest.getPrimary())) + " does not match pre-images collection on secondary " + tojson(getPreImages(replTest.getSecondary())) ); } const preImagesCollOnPrimary = getPreImagesCollection(replTest.getPrimary()); const preImagesCollOnSecondary = getPreImagesCollection(replTest.getSecondary()); assert.eq(preImagesCollOnPrimary.find().itcount(), preImagesCollOnSecondary.find().itcount(), detailedError); assert.eq(preImagesCollOnPrimary.hashAllDocs(), preImagesCollOnSecondary.hashAllDocs(), detailedError); } for (const [collectionName, collectionOptions] of [ ["collStandard", {}], ["collClustered", {clusteredIndex: {key: {_id: 1}, unique: true}}], ]) { const testDB = replTest.getPrimary().getDB(testName); jsTest.log(`Testing on collection '${collectionName}'`); // Create a collection with change stream pre- and post-images enabled. assert.commandWorked( testDB.createCollection( collectionName, Object.assign({changeStreamPreAndPostImages: {enabled: true}}, collectionOptions), ), ); const coll = testDB[collectionName]; function issueRetryableFindAndModifyCommands(testDB) { // Open a new session with retryable writes set to on. const session = testDB.getMongo().startSession({retryWrites: true}); const coll = session.getDatabase(testName)[collectionName]; assert.commandWorked(coll.insert({_id: 5, v: 1})); // Issue "findAndModify" command to return a document version before update. assert.docEq({_id: 5, v: 1}, coll.findAndModify({query: {_id: 5}, update: {$inc: {v: 1}}, new: false})); // Issue "findAndModify" command to return a document version after update. assert.docEq({_id: 5, v: 3}, coll.findAndModify({query: {_id: 5}, update: {$inc: {v: 1}}, new: true})); // Issue "findAndModify" command to return a document version before deletion. assert.docEq( {_id: 5, v: 3}, coll.findAndModify({query: {_id: 5}, new: false, remove: true, writeConcern: {w: 2}}), ); } function issueWriteCommandsInTransaction(testDB) { assert.commandWorked(coll.deleteMany({$and: [{_id: {$gte: 6}}, {_id: {$lte: 10}}]})); assert.commandWorked( coll.insert([ {_id: 6, a: 1}, {_id: 7, a: 1}, {_id: 8, a: 1}, ]), ); const transactionOptions = {readConcern: {level: "majority"}, writeConcern: {w: 2}}; // Issue commands in a single "applyOps" transaction. TxnUtil.runInTransaction( testDB, () => {}, function (db, state) { const coll = db[collectionName]; assert.commandWorked(coll.updateOne({_id: 6}, {$inc: {a: 1}})); assert.commandWorked(coll.replaceOne({_id: 7}, {a: "Long string"})); assert.commandWorked(coll.deleteOne({_id: 8})); }, transactionOptions, ); // Issue commands in a multiple-"applyOps" transaction. assert.commandWorked(coll.insert({_id: 8, a: 1})); TxnUtil.runInTransaction( testDB, () => {}, function (db, state) { const coll = db[collectionName]; const largeString = "a".repeat(15 * 1024 * 1024); assert.commandWorked(coll.updateOne({_id: 6}, {$inc: {a: 1}})); assert.commandWorked(coll.insert({_id: 9, a: largeString})); assert.commandWorked(coll.insert({_id: 10, a: largeString})); // Should go to the second "applyOps" entry. assert.commandWorked(coll.replaceOne({_id: 7}, {a: "String"})); assert.commandWorked(coll.deleteOne({_id: 8})); }, transactionOptions, ); // Issue commands in a transaction that gets prepared before a commit. assert.commandWorked(coll.deleteMany({$and: [{_id: {$gte: 6}}, {_id: {$lte: 10}}]})); assert.commandWorked( coll.insert([ {_id: 6, a: 1}, {_id: 7, a: 1}, {_id: 8, a: 1}, ]), ); const session = testDB.getMongo().startSession(); const sessionDb = session.getDatabase(testDB.getName()); session.startTransaction(); const collInner = sessionDb[coll.getName()]; assert.commandWorked(collInner.updateOne({_id: 6}, {$inc: {a: 1}})); assert.commandWorked(collInner.replaceOne({_id: 7}, {a: "Long string"})); assert.commandWorked(collInner.deleteOne({_id: 8})); let prepareTimestamp = PrepareHelpers.prepareTransaction(session); assert.commandWorked(PrepareHelpers.commitTransaction(session, prepareTimestamp)); } function issueApplyOpsCommand(testDB) { assert.commandWorked(coll.deleteMany({$and: [{_id: {$gte: 9}}, {_id: {$lte: 10}}]})); assert.commandWorked( coll.insert([ {_id: 9, a: 1}, {_id: 10, a: 1}, ]), ); assert.commandWorked( testDB.runCommand({ applyOps: [ {op: "u", ns: coll.getFullName(), o2: {_id: 9}, o: {$v: 2, diff: {u: {a: 2}}}}, {op: "d", ns: coll.getFullName(), o: {_id: 10}}, ], }), ); } (function testSteadyStateReplication() { jsTestLog("Testing pre-image replication to secondaries."); // Insert a document. assert.commandWorked(coll.insert({_id: 1, v: 1, largeField: "AAAAAAAAAAAAAAAAAAAAAAAA"})); // Update the document by issuing a basic "update" command. assert.commandWorked(coll.updateOne({_id: 1}, {$inc: {v: 1}})); // Verify that a related change stream pre-image was replicated to the secondary. replTest.awaitReplication(); assertPreImagesCollectionOnPrimaryMatchesSecondary(); // Issue a "delete" command. assert.commandWorked(coll.insert({_id: 2, field: "A"})); assert.commandWorked(coll.deleteOne({_id: 2})); // Verify that a related change stream pre-image was replicated to the secondary. replTest.awaitReplication(); assertPreImagesCollectionOnPrimaryMatchesSecondary(); // Issue retryable "findAndModify" commands. issueRetryableFindAndModifyCommands(testDB); // Verify that related change stream pre-images were replicated to the secondary. replTest.awaitReplication(); assertPreImagesCollectionOnPrimaryMatchesSecondary(); issueWriteCommandsInTransaction(testDB); // Verify that related change stream pre-images were replicated to the secondary. replTest.awaitReplication(); assertPreImagesCollectionOnPrimaryMatchesSecondary(); issueApplyOpsCommand(testDB); // Verify that related change stream pre-images were replicated to the secondary. replTest.awaitReplication(); assertPreImagesCollectionOnPrimaryMatchesSecondary(); })(); (function testInitialSync() { jsTestLog("Testing pre-image replication during the logical initial sync."); // Insert a document for deletion test. assert.commandWorked(coll.insert({_id: 3, field: "A"}, {writeConcern: {w: 2}})); // Add a new node that will perform an initial sync. Pause the initial sync process (using // failpoint "initialSyncHangBeforeCopyingDatabases") before copying the database to perform // document modifications to make the collection content more recent and create inconsistent // data situation during oplog application. const initialSyncNode = replTest.add({ rsConfig: {priority: 0}, setParameter: {"failpoint.initialSyncHangBeforeCopyingDatabases": tojson({mode: "alwaysOn"})}, }); // Wait until the new node starts and pauses on the fail point. replTest.reInitiate(); assert.commandWorked( initialSyncNode.adminCommand({ waitForFailPoint: "initialSyncHangBeforeCopyingDatabases", timesEntered: 1, maxTimeMS: 60000, }), ); // Update the document on the primary node. assert.commandWorked(coll.updateOne({_id: 1}, {$inc: {v: 1}}, {writeConcern: {w: 2}})); // Delete the document on the primary node. assert.commandWorked(coll.deleteOne({_id: 3}, {writeConcern: {w: 2}})); issueRetryableFindAndModifyCommands(testDB); issueApplyOpsCommand(testDB); issueWriteCommandsInTransaction(testDB); // Resume the initial sync process. assert.commandWorked( initialSyncNode.adminCommand({configureFailPoint: "initialSyncHangBeforeCopyingDatabases", mode: "off"}), ); // Wait until the initial sync process is complete and the new node becomes a fully // functioning secondary. replTest.awaitSecondaryNodes(null, [initialSyncNode]); // Verify that pre-images were not written during the logical initial sync. let preImageDocuments = getPreImages(initialSyncNode); assert.eq(preImageDocuments.length, 0, preImageDocuments); // Verify that in the secondary mode, after initial sync is complete, the pre-images are // written. assert.commandWorked(coll.updateOne({_id: 1}, {$inc: {v: 1}}, {writeConcern: {w: 3}})); preImageDocuments = getPreImages(initialSyncNode); assert.eq(preImageDocuments.length, 1, preImageDocuments); const preImageDocumentsOnPrimary = getPreImages(replTest.getPrimary()); assert.docEq(preImageDocuments[0], preImageDocumentsOnPrimary.pop()); // Remove the initial sync node from the replica set. replTest.remove(initialSyncNode); })(); (function testStartupRecovery() { jsTestLog("Testing pre-image writing during startup recovery."); // Pause check-pointing on the primary node to ensure new pre-images are not flushed to the // disk. const pauseCheckpointThreadFailPoint = configureFailPoint(replTest.getPrimary(), "pauseCheckpointThread"); pauseCheckpointThreadFailPoint.wait(); // Update the document on the primary node. assert.commandWorked(coll.updateOne({_id: 1}, {$inc: {v: 1}}, {writeConcern: {w: 2}})); // Insert and delete a document on the primary node. assert.commandWorked(coll.insert({_id: 4, field: "A"})); assert.commandWorked(coll.deleteOne({_id: 4}, {writeConcern: {w: 2}})); issueRetryableFindAndModifyCommands(testDB); issueApplyOpsCommand(testDB); issueWriteCommandsInTransaction(testDB); // Do an unclean shutdown of the primary node, and then restart. replTest.stop(0, 9, {allowedExitCode: MongoRunner.EXIT_SIGKILL}); replTest.restart(0); // Wait until the primary is up and running. replTest.waitForPrimary(); // Verify that pre-image documents are the same on the recovered primary and the secondary. assertPreImagesCollectionOnPrimaryMatchesSecondary(); })(); } replTest.stopSet();