mirror of https://github.com/mongodb/mongo
181 lines
6.8 KiB
JavaScript
181 lines
6.8 KiB
JavaScript
/**
|
|
* Tests that resharding can handle the case where there is replication lag on donor shards. That
|
|
* is, when the recipient is approaching strict consistency, the resharding oplog fetcher starts
|
|
* targeting the primary node of the donor shard instead of the nearest node to prepare for the
|
|
* critical section.
|
|
*/
|
|
import {configureFailPoint} from "jstests/libs/fail_point_util.js";
|
|
import {Thread} from "jstests/libs/parallelTester.js";
|
|
import {ShardingTest} from "jstests/libs/shardingtest.js";
|
|
import {restartServerReplication, stopServerReplication} from "jstests/libs/write_concern_util.js";
|
|
|
|
/**
|
|
* Returns profiler entries for the resharding oplog fetcher aggregate command.
|
|
*/
|
|
function getProfilerEntries(conn) {
|
|
return conn
|
|
.getDB("local")
|
|
.system.profile.find({
|
|
op: "command",
|
|
ns: "local.oplog.rs",
|
|
"command.aggregate": "oplog.rs",
|
|
})
|
|
.toArray()
|
|
.filter((entry) => {
|
|
for (let stage of entry.command.pipeline) {
|
|
if (stage.hasOwnProperty("$_internalReshardingIterateTransaction")) {
|
|
return true;
|
|
}
|
|
}
|
|
return false;
|
|
});
|
|
}
|
|
|
|
function resetProfilerCollection(db) {
|
|
db.setProfilingLevel(0);
|
|
assert(db.system.profile.drop());
|
|
db.createCollection("system.profile", {capped: true, size: 100 * 1024 * 1024});
|
|
db.setProfilingLevel(2);
|
|
}
|
|
|
|
/**
|
|
* Returns the read preference used by the command with the given profiler entry.
|
|
*/
|
|
function getReadPreference(profilerEntry) {
|
|
assert(profilerEntry.hasOwnProperty("command"), profilerEntry);
|
|
assert(profilerEntry.command.hasOwnProperty("$queryOptions"), profilerEntry);
|
|
assert(profilerEntry.command["$queryOptions"].hasOwnProperty("$readPreference"), profilerEntry);
|
|
return profilerEntry.command["$queryOptions"]["$readPreference"].mode;
|
|
}
|
|
|
|
const st = new ShardingTest({
|
|
shards: 2,
|
|
rs: {
|
|
nodes: 3,
|
|
// Disallow chaining to force both secondaries to sync from the primary. This disables
|
|
// replication on one of the secondaries, with chaining that would effectively disable
|
|
// replication on both secondaries, causing the test setup to be wrong since writeConcern of
|
|
// w: majority is unsatisfiable.
|
|
settings: {chainingAllowed: false},
|
|
},
|
|
});
|
|
|
|
const dbName = "testDb";
|
|
const collName = "testColl";
|
|
const ns = dbName + "." + collName;
|
|
|
|
const testColl = st.s.getCollection(ns);
|
|
|
|
assert.commandWorked(st.s.adminCommand({enableSharding: dbName, primaryShard: st.shard0.shardName}));
|
|
assert.commandWorked(testColl.insert([{x: -1}, {x: 0}, {x: 1}]));
|
|
|
|
const configPrimary = st.configRS.getPrimary();
|
|
const donorPrimary = st.rs0.getPrimary();
|
|
|
|
function runMoveCollection(host, ns, toShard) {
|
|
const mongos = new Mongo(host);
|
|
return mongos.adminCommand({moveCollection: ns, toShard});
|
|
}
|
|
|
|
// Turn on profiling on all nodes on the donor shard.
|
|
st.rs0.nodes.forEach((node) => resetProfilerCollection(node.getDB("local")));
|
|
|
|
const beforeQueryingRemainingTimeFp = configureFailPoint(configPrimary, "hangBeforeQueryingRecipients");
|
|
const beforeCriticalSectionFp = configureFailPoint(configPrimary, "reshardingPauseCoordinatorBeforeBlockingWrites");
|
|
const moveThread = new Thread(runMoveCollection, st.s.host, ns, st.shard1.shardName);
|
|
moveThread.start();
|
|
|
|
beforeQueryingRemainingTimeFp.wait();
|
|
|
|
// Verify that the resharding oplog fetcher initially fetches from the nearest node.
|
|
let numEntriesNearestReadPref = 0;
|
|
st.rs0.nodes.forEach((node) => {
|
|
const profilerEntries = getProfilerEntries(node);
|
|
profilerEntries.forEach((profilerEntry) => {
|
|
const readPref = getReadPreference(profilerEntry);
|
|
assert.eq(readPref, "nearest");
|
|
numEntriesNearestReadPref++;
|
|
});
|
|
});
|
|
assert.gt(
|
|
numEntriesNearestReadPref,
|
|
0,
|
|
"Expected to find resharding oplog fetcher profiler entries with read preference 'nearest'",
|
|
);
|
|
|
|
beforeQueryingRemainingTimeFp.off();
|
|
beforeCriticalSectionFp.wait();
|
|
|
|
// Verify that the resharding oplog fetcher eventually starts fetching from the primary now that the
|
|
// recipient is approaching strict consistency to prepare for the critical section.
|
|
let numEntriesPrimaryReadPrefBefore = 0;
|
|
assert.soon(() => {
|
|
numEntriesPrimaryReadPrefBefore = 0;
|
|
st.rs0.nodes.forEach((node) => {
|
|
const profilerEntries = getProfilerEntries(node);
|
|
if (node == donorPrimary) {
|
|
profilerEntries.forEach((profilerEntry) => {
|
|
const readPref = getReadPreference(profilerEntry);
|
|
assert(readPref == "primary" || readPref == "nearest", readPref);
|
|
if (readPref == "primary") {
|
|
numEntriesPrimaryReadPrefBefore++;
|
|
}
|
|
});
|
|
} else {
|
|
profilerEntries.forEach((profilerEntry) => {
|
|
const readPref = getReadPreference(profilerEntry);
|
|
assert.eq(readPref, "nearest");
|
|
});
|
|
// Drop the profiler entries on the secondaries so that we can verify later that the
|
|
// resharding oplog fetcher was still fetching from the primary after the critical
|
|
// section had started.
|
|
resetProfilerCollection(node.getDB("local"));
|
|
}
|
|
});
|
|
return numEntriesPrimaryReadPrefBefore > 0;
|
|
}, "Expected to find resharding oplog fetcher profiler entries with read preference 'primary'");
|
|
|
|
// Pause replication on one of the secondaries so that if the resharding oplog fetcher targets
|
|
// nearest node instead of the primary at any point after this, the resharding operation could get
|
|
// stuck in the critical section.
|
|
stopServerReplication(st.rs0.getSecondaries()[1]);
|
|
|
|
beforeCriticalSectionFp.off();
|
|
assert.commandWorked(moveThread.returnData());
|
|
|
|
// Verify that the resharding oplog fetcher was still fetching from the primary after the critical
|
|
// section had started.
|
|
let numEntriesPrimaryReadPrefAfter = 0;
|
|
st.rs0.nodes.forEach((node) => {
|
|
const profilerEntries = getProfilerEntries(node);
|
|
if (node == donorPrimary) {
|
|
profilerEntries.forEach((profilerEntry) => {
|
|
const readPref = getReadPreference(profilerEntry);
|
|
assert(readPref == "primary" || readPref == "nearest", readPref);
|
|
if (readPref == "primary") {
|
|
numEntriesPrimaryReadPrefAfter++;
|
|
}
|
|
});
|
|
} else {
|
|
assert.eq(profilerEntries.length, 0, {profilerEntries});
|
|
}
|
|
});
|
|
assert.gte(
|
|
numEntriesPrimaryReadPrefAfter,
|
|
numEntriesPrimaryReadPrefBefore,
|
|
"Expected to find at least as many resharding oplog fetcher profiler entries with read " +
|
|
"preference 'primary' before and after the critical section",
|
|
);
|
|
|
|
jsTest.log(
|
|
"Profiler entry counts: " +
|
|
tojson({
|
|
numEntriesNearestReadPref,
|
|
numEntriesPrimaryReadPrefBefore,
|
|
numEntriesPrimaryReadPrefAfter,
|
|
}),
|
|
);
|
|
|
|
restartServerReplication(st.rs0.getSecondaries()[1]);
|
|
st.stop();
|