mirror of https://github.com/mongodb/mongo
282 lines
13 KiB
JavaScript
282 lines
13 KiB
JavaScript
/**
|
|
* Test to check that the Initial Sync Test Fixture properly pauses initial sync.
|
|
*
|
|
* The test checks that both the collection cloning and oplog application stages of initial sync
|
|
* pause after exactly one command is run when the test fixture's step function is called. The test
|
|
* issues the same listDatabases and listCollections commands that collection cloning does so we
|
|
* know all the commands that will be run on the sync source and can verify that only one is run per
|
|
* call to step(). Similarly for oplog application, we can check the log messages to make sure that
|
|
* the batches being applied are of the expected size and that only one batch was applied per step()
|
|
* call.
|
|
*
|
|
* @tags: [
|
|
* uses_prepare_transaction,
|
|
* uses_transactions,
|
|
* ]
|
|
*/
|
|
|
|
import {PrepareHelpers} from "jstests/core/txns/libs/prepare_helpers.js";
|
|
import {InitialSyncTest} from "jstests/replsets/libs/initial_sync_test.js";
|
|
|
|
/**
|
|
* Helper function to check that specific messages appeared or did not appear in the logs.
|
|
*/
|
|
function checkLogForMsg(node, msg, contains) {
|
|
if (contains) {
|
|
jsTest.log("Check for presence of message (" + node.port + "): |" + msg + "|");
|
|
assert(checkLog.checkContainsOnce(node, msg));
|
|
} else {
|
|
jsTest.log("Check for absence of message (" + node.port + "): |" + msg + "|");
|
|
assert(!checkLog.checkContainsOnce(node, msg));
|
|
}
|
|
}
|
|
|
|
/**
|
|
* Helper function to check that specific messages appeared or did not appear in the logs. If we
|
|
* expect the log message to appear, this will show that the node is paused after getting the
|
|
* specified timestamp.
|
|
*/
|
|
function checkLogForGetTimestampMsg(node, timestampName, timestamp, contains) {
|
|
let msg = "Initial Syncer got the " + timestampName + ": { ts: " + tojson(timestamp);
|
|
|
|
checkLogForMsg(node, msg, contains);
|
|
}
|
|
|
|
/**
|
|
* Helper function to check that specific messages appeared or did not appear in the logs. If
|
|
* the command was listIndexes and we expect the message to appear, we also add the collection
|
|
* UUID to make sure that it corresponds to the expected collection.
|
|
*/
|
|
function checkLogForCollectionClonerMsg(node, commandName, dbname, contains, collUUID) {
|
|
let msg = 'Collection Cloner scheduled a remote command","attr":{"stage":"' + dbname +
|
|
" db: { " + commandName;
|
|
|
|
if (commandName === "listIndexes" && contains) {
|
|
msg += ": " + collUUID;
|
|
msg = msg.replace('("', '(\\"').replace('")', '\\")');
|
|
}
|
|
|
|
checkLogForMsg(node, msg, contains);
|
|
}
|
|
|
|
/**
|
|
* Helper function to check that the specific message appeared exactly once in the logs and that
|
|
* there is no other message saying that the next batch is about to be applied. This will show
|
|
* that oplog application is paused.
|
|
*/
|
|
function checkLogForOplogApplicationMsg(node, size) {
|
|
let msg = "Initial Syncer is about to apply the next oplog batch of size: ";
|
|
checkLog.containsWithCount(node, msg, 1, 5 * 1000);
|
|
|
|
msg += size;
|
|
assert(checkLog.checkContainsOnce(node, msg));
|
|
}
|
|
|
|
// Set up Initial Sync Test.
|
|
const rst = new ReplSetTest({
|
|
name: "InitialSyncTest",
|
|
nodes: [
|
|
{
|
|
// Each PrimaryOnlyService rebuilds its instances on stepup, and that may involve
|
|
// doing writes. So we need to disable PrimaryOnlyService rebuild to make the number
|
|
// of oplog batches check below work reliably.
|
|
setParameter: {
|
|
"failpoint.PrimaryOnlyServiceSkipRebuildingInstances": tojson({mode: "alwaysOn"}),
|
|
}
|
|
},
|
|
{rsConfig: {priority: 0, votes: 0}}
|
|
]
|
|
});
|
|
rst.startSet();
|
|
rst.initiate();
|
|
|
|
const initialSyncTest = new InitialSyncTest("InitialSyncTest", rst);
|
|
try {
|
|
// If the test fails, the initial syncing node may be left with an engaged failpoint that
|
|
// instructs it to hang. This `try` block is to guarantee we call `initialSyncTest.fail()` which
|
|
// allows the test to gracefully exit with an error.
|
|
|
|
const primary = initialSyncTest.getPrimary();
|
|
let secondary = initialSyncTest.getSecondary();
|
|
const db = primary.getDB("test");
|
|
let maxLargeStringsInBatch = 9;
|
|
// If we can fit exactly 9+1=10 large strings in a batch, the small overhead for each oplog
|
|
// entry means we expect only 9 oplog entries per batch.
|
|
let largeStringSize = initialSyncTest.replBatchLimitBytes / (maxLargeStringsInBatch + 1);
|
|
const largeString = 'z'.repeat(largeStringSize);
|
|
|
|
assert.commandWorked(db.foo.insert({a: 1}));
|
|
assert.commandWorked(db.bar.insert({b: 1}));
|
|
|
|
// Prepare a transaction so that we know that step() can restart the secondary even if there is
|
|
// a prepared transaction. The prepareTimestamp will be used as the beginFetchingTimestamp and
|
|
// beginApplyingTimestamp during initial sync.
|
|
const session = primary.startSession({causalConsistency: false});
|
|
const sessionDB = session.getDatabase("test");
|
|
const sessionColl = sessionDB.getCollection("foo");
|
|
session.startTransaction();
|
|
assert.commandWorked(sessionColl.insert({c: 1}));
|
|
let prepareTimestamp = PrepareHelpers.prepareTransaction(session);
|
|
|
|
// This step call restarts the secondary and causes it to go into initial sync. It will pause
|
|
// initial sync after the node has fetched the defaultBeginFetchingTimestamp.
|
|
assert(!initialSyncTest.step());
|
|
|
|
secondary = initialSyncTest.getSecondary();
|
|
secondary.setSecondaryOk();
|
|
|
|
// Make sure that we cannot read from this node yet.
|
|
assert.commandFailedWithCode(secondary.getDB("test").runCommand({count: "foo"}),
|
|
ErrorCodes.NotPrimaryOrSecondary);
|
|
|
|
// Make sure that we see that the node got the defaultBeginFetchingTimestamp, but hasn't gotten
|
|
// the beginFetchingTimestamp yet.
|
|
checkLogForGetTimestampMsg(secondary, "defaultBeginFetchingTimestamp", prepareTimestamp, true);
|
|
checkLogForGetTimestampMsg(secondary, "beginFetchingTimestamp", prepareTimestamp, false);
|
|
checkLogForGetTimestampMsg(secondary, "beginApplyingTimestamp", prepareTimestamp, false);
|
|
|
|
// This step call will resume initial sync and pause it again after the node gets the
|
|
// beginFetchingTimestamp from its sync source.
|
|
assert(!initialSyncTest.step());
|
|
|
|
// Make sure that we cannot read from this node yet.
|
|
assert.commandFailedWithCode(secondary.getDB("test").runCommand({count: "foo"}),
|
|
ErrorCodes.NotPrimaryOrSecondary);
|
|
|
|
// Make sure that we see that the node got the beginFetchingTimestamp, but hasn't gotten the
|
|
// beginApplyingTimestamp yet.
|
|
checkLogForGetTimestampMsg(secondary, "defaultBeginFetchingTimestamp", prepareTimestamp, false);
|
|
checkLogForGetTimestampMsg(secondary, "beginFetchingTimestamp", prepareTimestamp, true);
|
|
checkLogForGetTimestampMsg(secondary, "beginApplyingTimestamp", prepareTimestamp, false);
|
|
|
|
// This step call will resume initial sync and pause it again after the node gets the
|
|
// beginApplyingTimestamp from its sync source.
|
|
assert(!initialSyncTest.step());
|
|
|
|
// Make sure that we cannot read from this node yet.
|
|
assert.commandFailedWithCode(secondary.getDB("test").runCommand({count: "foo"}),
|
|
ErrorCodes.NotPrimaryOrSecondary);
|
|
|
|
// Make sure that we see that the node got the beginApplyingTimestamp, but that we don't see the
|
|
// listDatabases call yet.
|
|
checkLogForGetTimestampMsg(secondary, "defaultBeginFetchingTimestamp", prepareTimestamp, false);
|
|
checkLogForGetTimestampMsg(secondary, "beginFetchingTimestamp", prepareTimestamp, false);
|
|
checkLogForGetTimestampMsg(secondary, "beginApplyingTimestamp", prepareTimestamp, true);
|
|
checkLogForCollectionClonerMsg(secondary, "listDatabases", "admin", false);
|
|
|
|
// This step call will resume initial sync and pause it again after the node gets the
|
|
// listDatabases result from its sync source.
|
|
assert(!initialSyncTest.step());
|
|
|
|
// Make sure that we cannot read from this node yet.
|
|
assert.commandFailedWithCode(secondary.getDB("test").runCommand({count: "foo"}),
|
|
ErrorCodes.NotPrimaryOrSecondary);
|
|
|
|
// Make sure that we saw the listDatabases call in the log messages, but didn't see any
|
|
// listCollections or listIndexes call.
|
|
checkLogForCollectionClonerMsg(secondary, "listDatabases", "admin", true);
|
|
checkLogForCollectionClonerMsg(secondary, "listCollections", "admin", false);
|
|
checkLogForCollectionClonerMsg(secondary, "listIndexes", "admin", false);
|
|
|
|
// Do same listDatabases command as CollectionCloner.
|
|
const databases =
|
|
assert.commandWorked(primary.adminCommand({listDatabases: 1, nameOnly: true})).databases;
|
|
|
|
// Iterate over the databases and collections in the same order that the test fixture would so
|
|
// that we can check the log messages to make sure initial sync is paused as expected.
|
|
for (let dbObj of databases) {
|
|
const dbname = dbObj.name;
|
|
|
|
// We skip the local db during the collection cloning phase of initial sync.
|
|
if (dbname === "local") {
|
|
continue;
|
|
}
|
|
|
|
const database = primary.getDB(dbname);
|
|
|
|
// Do same listCollections command as CollectionCloner.
|
|
const res = assert.commandWorked(database.runCommand(
|
|
{listCollections: 1, filter: {$or: [{type: "collection"}, {type: {$exists: false}}]}}));
|
|
|
|
// Make sure that there is only one batch.
|
|
assert.eq(NumberLong(0), res.cursor.id, res);
|
|
|
|
const collectionsCursor = res.cursor;
|
|
|
|
// For each database, CollectionCloner will first call listCollections.
|
|
assert(!initialSyncTest.step());
|
|
|
|
// Make sure that we cannot read from this node yet.
|
|
assert.commandFailedWithCode(secondary.getDB("test").runCommand({count: "foo"}),
|
|
ErrorCodes.NotPrimaryOrSecondary);
|
|
|
|
// Make sure that we saw the listCollections call in the log messages, but didn't see a
|
|
// listIndexes call.
|
|
checkLogForCollectionClonerMsg(secondary, "listCollections", dbname, true);
|
|
checkLogForCollectionClonerMsg(secondary, "listIndexes", "admin", false);
|
|
|
|
for (let collectionObj of collectionsCursor.firstBatch) {
|
|
assert(collectionObj.info, collectionObj);
|
|
const collUUID = collectionObj.info.uuid;
|
|
|
|
// For each collection, CollectionCloner will call listIndexes.
|
|
assert(!initialSyncTest.step());
|
|
|
|
// Make sure that we cannot read from this node yet.
|
|
assert.commandFailedWithCode(secondary.getDB("test").runCommand({count: "foo"}),
|
|
ErrorCodes.NotPrimaryOrSecondary);
|
|
|
|
// Make sure that we saw the listIndexes call in the log messages, but didn't
|
|
// see a listCollections call.
|
|
checkLogForCollectionClonerMsg(secondary, "listIndexes", dbname, true, collUUID);
|
|
checkLogForCollectionClonerMsg(secondary, "listCollections", "admin", false);
|
|
}
|
|
}
|
|
|
|
// Perform large operations during collection cloning so that we will need multiple batches
|
|
// during oplog application. For simplicity, guarantee we will create only two batches during
|
|
// the oplog application phase of initial sync.
|
|
const docsToInsertPerCollectionDuringOplogApplication = maxLargeStringsInBatch - 1;
|
|
const totalDocsInserted = 2 * docsToInsertPerCollectionDuringOplogApplication;
|
|
for (let count = 0; count < docsToInsertPerCollectionDuringOplogApplication; ++count) {
|
|
assert.commandWorked(db.foo.insert({d: largeString}));
|
|
assert.commandWorked(db.bar.insert({e: largeString}));
|
|
}
|
|
|
|
// Check that we see the expected number of batches during oplog application.
|
|
|
|
// This batch should correspond to the 'prepare' op.
|
|
assert(!initialSyncTest.step());
|
|
checkLogForOplogApplicationMsg(secondary, 1);
|
|
assert(!initialSyncTest.step());
|
|
checkLogForOplogApplicationMsg(secondary, maxLargeStringsInBatch);
|
|
assert(!initialSyncTest.step());
|
|
checkLogForOplogApplicationMsg(secondary, totalDocsInserted - maxLargeStringsInBatch);
|
|
|
|
assert(initialSyncTest.step(), "Expected initial sync to have completed, but it did not");
|
|
|
|
// Abort transaction so that the data consistency checks in stop() can run.
|
|
assert.commandWorked(session.abortTransaction_forTesting());
|
|
|
|
// Issue a w:2 write to make sure the secondary has replicated the abortTransaction oplog entry.
|
|
assert.commandWorked(primary.getDB("otherDB").otherColl.insert({x: 1}, {writeConcern: {w: 2}}));
|
|
|
|
// Confirm that node can be read from and that it has the inserts that were made while the node
|
|
// was in initial sync. We inserted `docsToInsertPerCollectionDuringOplogApplication` + 1
|
|
// additional document prior to the oplog application phase to each of `foo` and `bar`.
|
|
assert.eq(secondary.getDB("test").foo.find().count(),
|
|
docsToInsertPerCollectionDuringOplogApplication + 1);
|
|
assert.eq(secondary.getDB("test").bar.find().count(),
|
|
docsToInsertPerCollectionDuringOplogApplication + 1);
|
|
assert.eq(secondary.getDB("test").foo.find().itcount(),
|
|
docsToInsertPerCollectionDuringOplogApplication + 1);
|
|
assert.eq(secondary.getDB("test").bar.find().itcount(),
|
|
docsToInsertPerCollectionDuringOplogApplication + 1);
|
|
|
|
// Do data consistency checks at the end.
|
|
initialSyncTest.stop();
|
|
} catch (errorDuringTest) {
|
|
initialSyncTest.fail();
|
|
throw errorDuringTest;
|
|
}
|