mirror of https://github.com/mongodb/mongo
296 lines
12 KiB
JavaScript
296 lines
12 KiB
JavaScript
/**
|
|
* Tests that change stream point-in-time pre-images are replicated to secondaries, recovered during
|
|
* startup recovery, and not written during the logical initial sync of a node.
|
|
*
|
|
* The test relies on a correct change stream pre-image recording on a node in the primary role.
|
|
*
|
|
* @tags: [
|
|
* # 6.2 removes support for atomic applyOps
|
|
* requires_fcv_62,
|
|
* # The test waits for the Checkpointer, but this process runs only for on-disk storage engines.
|
|
* requires_persistence,
|
|
* ]
|
|
*/
|
|
import {PrepareHelpers} from "jstests/core/txns/libs/prepare_helpers.js";
|
|
import {configureFailPoint} from "jstests/libs/fail_point_util.js";
|
|
import {getPreImages, getPreImagesCollection} from "jstests/libs/query/change_stream_util.js";
|
|
import {ReplSetTest} from "jstests/libs/replsettest.js";
|
|
import {TxnUtil} from "jstests/libs/txns/txn_util.js";
|
|
|
|
const testName = jsTestName();
|
|
const replTest = new ReplSetTest({
|
|
name: testName,
|
|
nodes: [{}, {rsConfig: {priority: 0}}],
|
|
nodeOptions: {
|
|
setParameter: {logComponentVerbosity: tojsononeline({replication: {initialSync: 5}})},
|
|
oplogSize: 1024,
|
|
},
|
|
});
|
|
replTest.startSet();
|
|
replTest.initiate(null, null, {initiateWithDefaultElectionTimeout: true});
|
|
|
|
// Asserts that documents in the pre-images collection on the primary node are the same as on a
|
|
// secondary node.
|
|
function assertPreImagesCollectionOnPrimaryMatchesSecondary() {
|
|
function detailedError() {
|
|
return (
|
|
"pre-images collection on primary " +
|
|
tojson(getPreImages(replTest.getPrimary())) +
|
|
" does not match pre-images collection on secondary " +
|
|
tojson(getPreImages(replTest.getSecondary()))
|
|
);
|
|
}
|
|
const preImagesCollOnPrimary = getPreImagesCollection(replTest.getPrimary());
|
|
const preImagesCollOnSecondary = getPreImagesCollection(replTest.getSecondary());
|
|
assert.eq(preImagesCollOnPrimary.find().itcount(), preImagesCollOnSecondary.find().itcount(), detailedError);
|
|
assert.eq(preImagesCollOnPrimary.hashAllDocs(), preImagesCollOnSecondary.hashAllDocs(), detailedError);
|
|
}
|
|
|
|
for (const [collectionName, collectionOptions] of [
|
|
["collStandard", {}],
|
|
["collClustered", {clusteredIndex: {key: {_id: 1}, unique: true}}],
|
|
]) {
|
|
const testDB = replTest.getPrimary().getDB(testName);
|
|
jsTest.log(`Testing on collection '${collectionName}'`);
|
|
|
|
// Create a collection with change stream pre- and post-images enabled.
|
|
assert.commandWorked(
|
|
testDB.createCollection(
|
|
collectionName,
|
|
Object.assign({changeStreamPreAndPostImages: {enabled: true}}, collectionOptions),
|
|
),
|
|
);
|
|
const coll = testDB[collectionName];
|
|
|
|
function issueRetryableFindAndModifyCommands(testDB) {
|
|
// Open a new session with retryable writes set to on.
|
|
const session = testDB.getMongo().startSession({retryWrites: true});
|
|
const coll = session.getDatabase(testName)[collectionName];
|
|
assert.commandWorked(coll.insert({_id: 5, v: 1}));
|
|
|
|
// Issue "findAndModify" command to return a document version before update.
|
|
assert.docEq({_id: 5, v: 1}, coll.findAndModify({query: {_id: 5}, update: {$inc: {v: 1}}, new: false}));
|
|
|
|
// Issue "findAndModify" command to return a document version after update.
|
|
assert.docEq({_id: 5, v: 3}, coll.findAndModify({query: {_id: 5}, update: {$inc: {v: 1}}, new: true}));
|
|
|
|
// Issue "findAndModify" command to return a document version before deletion.
|
|
assert.docEq(
|
|
{_id: 5, v: 3},
|
|
coll.findAndModify({query: {_id: 5}, new: false, remove: true, writeConcern: {w: 2}}),
|
|
);
|
|
}
|
|
|
|
function issueWriteCommandsInTransaction(testDB) {
|
|
assert.commandWorked(coll.deleteMany({$and: [{_id: {$gte: 6}}, {_id: {$lte: 10}}]}));
|
|
assert.commandWorked(
|
|
coll.insert([
|
|
{_id: 6, a: 1},
|
|
{_id: 7, a: 1},
|
|
{_id: 8, a: 1},
|
|
]),
|
|
);
|
|
|
|
const transactionOptions = {readConcern: {level: "majority"}, writeConcern: {w: 2}};
|
|
|
|
// Issue commands in a single "applyOps" transaction.
|
|
TxnUtil.runInTransaction(
|
|
testDB,
|
|
() => {},
|
|
function (db, state) {
|
|
const coll = db[collectionName];
|
|
assert.commandWorked(coll.updateOne({_id: 6}, {$inc: {a: 1}}));
|
|
assert.commandWorked(coll.replaceOne({_id: 7}, {a: "Long string"}));
|
|
assert.commandWorked(coll.deleteOne({_id: 8}));
|
|
},
|
|
transactionOptions,
|
|
);
|
|
|
|
// Issue commands in a multiple-"applyOps" transaction.
|
|
assert.commandWorked(coll.insert({_id: 8, a: 1}));
|
|
TxnUtil.runInTransaction(
|
|
testDB,
|
|
() => {},
|
|
function (db, state) {
|
|
const coll = db[collectionName];
|
|
const largeString = "a".repeat(15 * 1024 * 1024);
|
|
assert.commandWorked(coll.updateOne({_id: 6}, {$inc: {a: 1}}));
|
|
assert.commandWorked(coll.insert({_id: 9, a: largeString}));
|
|
assert.commandWorked(coll.insert({_id: 10, a: largeString})); // Should go to the second "applyOps" entry.
|
|
assert.commandWorked(coll.replaceOne({_id: 7}, {a: "String"}));
|
|
assert.commandWorked(coll.deleteOne({_id: 8}));
|
|
},
|
|
transactionOptions,
|
|
);
|
|
|
|
// Issue commands in a transaction that gets prepared before a commit.
|
|
assert.commandWorked(coll.deleteMany({$and: [{_id: {$gte: 6}}, {_id: {$lte: 10}}]}));
|
|
assert.commandWorked(
|
|
coll.insert([
|
|
{_id: 6, a: 1},
|
|
{_id: 7, a: 1},
|
|
{_id: 8, a: 1},
|
|
]),
|
|
);
|
|
const session = testDB.getMongo().startSession();
|
|
const sessionDb = session.getDatabase(testDB.getName());
|
|
session.startTransaction();
|
|
const collInner = sessionDb[coll.getName()];
|
|
assert.commandWorked(collInner.updateOne({_id: 6}, {$inc: {a: 1}}));
|
|
assert.commandWorked(collInner.replaceOne({_id: 7}, {a: "Long string"}));
|
|
assert.commandWorked(collInner.deleteOne({_id: 8}));
|
|
let prepareTimestamp = PrepareHelpers.prepareTransaction(session);
|
|
assert.commandWorked(PrepareHelpers.commitTransaction(session, prepareTimestamp));
|
|
}
|
|
|
|
function issueApplyOpsCommand(testDB) {
|
|
assert.commandWorked(coll.deleteMany({$and: [{_id: {$gte: 9}}, {_id: {$lte: 10}}]}));
|
|
assert.commandWorked(
|
|
coll.insert([
|
|
{_id: 9, a: 1},
|
|
{_id: 10, a: 1},
|
|
]),
|
|
);
|
|
assert.commandWorked(
|
|
testDB.runCommand({
|
|
applyOps: [
|
|
{op: "u", ns: coll.getFullName(), o2: {_id: 9}, o: {$v: 2, diff: {u: {a: 2}}}},
|
|
{op: "d", ns: coll.getFullName(), o: {_id: 10}},
|
|
],
|
|
}),
|
|
);
|
|
}
|
|
|
|
(function testSteadyStateReplication() {
|
|
jsTestLog("Testing pre-image replication to secondaries.");
|
|
|
|
// Insert a document.
|
|
assert.commandWorked(coll.insert({_id: 1, v: 1, largeField: "AAAAAAAAAAAAAAAAAAAAAAAA"}));
|
|
|
|
// Update the document by issuing a basic "update" command.
|
|
assert.commandWorked(coll.updateOne({_id: 1}, {$inc: {v: 1}}));
|
|
|
|
// Verify that a related change stream pre-image was replicated to the secondary.
|
|
replTest.awaitReplication();
|
|
assertPreImagesCollectionOnPrimaryMatchesSecondary();
|
|
|
|
// Issue a "delete" command.
|
|
assert.commandWorked(coll.insert({_id: 2, field: "A"}));
|
|
assert.commandWorked(coll.deleteOne({_id: 2}));
|
|
|
|
// Verify that a related change stream pre-image was replicated to the secondary.
|
|
replTest.awaitReplication();
|
|
assertPreImagesCollectionOnPrimaryMatchesSecondary();
|
|
|
|
// Issue retryable "findAndModify" commands.
|
|
issueRetryableFindAndModifyCommands(testDB);
|
|
|
|
// Verify that related change stream pre-images were replicated to the secondary.
|
|
replTest.awaitReplication();
|
|
assertPreImagesCollectionOnPrimaryMatchesSecondary();
|
|
|
|
issueWriteCommandsInTransaction(testDB);
|
|
|
|
// Verify that related change stream pre-images were replicated to the secondary.
|
|
replTest.awaitReplication();
|
|
assertPreImagesCollectionOnPrimaryMatchesSecondary();
|
|
|
|
issueApplyOpsCommand(testDB);
|
|
|
|
// Verify that related change stream pre-images were replicated to the secondary.
|
|
replTest.awaitReplication();
|
|
assertPreImagesCollectionOnPrimaryMatchesSecondary();
|
|
})();
|
|
|
|
(function testInitialSync() {
|
|
jsTestLog("Testing pre-image replication during the logical initial sync.");
|
|
|
|
// Insert a document for deletion test.
|
|
assert.commandWorked(coll.insert({_id: 3, field: "A"}, {writeConcern: {w: 2}}));
|
|
|
|
// Add a new node that will perform an initial sync. Pause the initial sync process (using
|
|
// failpoint "initialSyncHangBeforeCopyingDatabases") before copying the database to perform
|
|
// document modifications to make the collection content more recent and create inconsistent
|
|
// data situation during oplog application.
|
|
const initialSyncNode = replTest.add({
|
|
rsConfig: {priority: 0},
|
|
setParameter: {"failpoint.initialSyncHangBeforeCopyingDatabases": tojson({mode: "alwaysOn"})},
|
|
});
|
|
|
|
// Wait until the new node starts and pauses on the fail point.
|
|
replTest.reInitiate();
|
|
assert.commandWorked(
|
|
initialSyncNode.adminCommand({
|
|
waitForFailPoint: "initialSyncHangBeforeCopyingDatabases",
|
|
timesEntered: 1,
|
|
maxTimeMS: 60000,
|
|
}),
|
|
);
|
|
|
|
// Update the document on the primary node.
|
|
assert.commandWorked(coll.updateOne({_id: 1}, {$inc: {v: 1}}, {writeConcern: {w: 2}}));
|
|
|
|
// Delete the document on the primary node.
|
|
assert.commandWorked(coll.deleteOne({_id: 3}, {writeConcern: {w: 2}}));
|
|
|
|
issueRetryableFindAndModifyCommands(testDB);
|
|
issueApplyOpsCommand(testDB);
|
|
issueWriteCommandsInTransaction(testDB);
|
|
|
|
// Resume the initial sync process.
|
|
assert.commandWorked(
|
|
initialSyncNode.adminCommand({configureFailPoint: "initialSyncHangBeforeCopyingDatabases", mode: "off"}),
|
|
);
|
|
|
|
// Wait until the initial sync process is complete and the new node becomes a fully
|
|
// functioning secondary.
|
|
replTest.awaitSecondaryNodes(null, [initialSyncNode]);
|
|
|
|
// Verify that pre-images were not written during the logical initial sync.
|
|
let preImageDocuments = getPreImages(initialSyncNode);
|
|
assert.eq(preImageDocuments.length, 0, preImageDocuments);
|
|
|
|
// Verify that in the secondary mode, after initial sync is complete, the pre-images are
|
|
// written.
|
|
assert.commandWorked(coll.updateOne({_id: 1}, {$inc: {v: 1}}, {writeConcern: {w: 3}}));
|
|
preImageDocuments = getPreImages(initialSyncNode);
|
|
assert.eq(preImageDocuments.length, 1, preImageDocuments);
|
|
const preImageDocumentsOnPrimary = getPreImages(replTest.getPrimary());
|
|
assert.docEq(preImageDocuments[0], preImageDocumentsOnPrimary.pop());
|
|
|
|
// Remove the initial sync node from the replica set.
|
|
replTest.remove(initialSyncNode);
|
|
})();
|
|
|
|
(function testStartupRecovery() {
|
|
jsTestLog("Testing pre-image writing during startup recovery.");
|
|
|
|
// Pause check-pointing on the primary node to ensure new pre-images are not flushed to the
|
|
// disk.
|
|
const pauseCheckpointThreadFailPoint = configureFailPoint(replTest.getPrimary(), "pauseCheckpointThread");
|
|
pauseCheckpointThreadFailPoint.wait();
|
|
|
|
// Update the document on the primary node.
|
|
assert.commandWorked(coll.updateOne({_id: 1}, {$inc: {v: 1}}, {writeConcern: {w: 2}}));
|
|
|
|
// Insert and delete a document on the primary node.
|
|
assert.commandWorked(coll.insert({_id: 4, field: "A"}));
|
|
assert.commandWorked(coll.deleteOne({_id: 4}, {writeConcern: {w: 2}}));
|
|
|
|
issueRetryableFindAndModifyCommands(testDB);
|
|
issueApplyOpsCommand(testDB);
|
|
issueWriteCommandsInTransaction(testDB);
|
|
|
|
// Do an unclean shutdown of the primary node, and then restart.
|
|
replTest.stop(0, 9, {allowedExitCode: MongoRunner.EXIT_SIGKILL});
|
|
replTest.restart(0);
|
|
|
|
// Wait until the primary is up and running.
|
|
replTest.waitForPrimary();
|
|
|
|
// Verify that pre-image documents are the same on the recovered primary and the secondary.
|
|
assertPreImagesCollectionOnPrimaryMatchesSecondary();
|
|
})();
|
|
}
|
|
replTest.stopSet();
|