mirror of https://github.com/mongodb/mongo
196 lines
7.8 KiB
JavaScript
196 lines
7.8 KiB
JavaScript
/**
|
|
* Tests the cloning portion of a resharding operation in isolation.
|
|
*
|
|
* @tags: [
|
|
* uses_atclustertime,
|
|
* ]
|
|
*/
|
|
import {ShardingTest} from "jstests/libs/shardingtest.js";
|
|
import {extractUUIDFromObject, getUUIDFromListCollections} from "jstests/libs/uuid_util.js";
|
|
import {CreateShardedCollectionUtil} from "jstests/sharding/libs/create_sharded_collection_util.js";
|
|
|
|
const st = new ShardingTest({
|
|
mongos: 1,
|
|
config: 1,
|
|
shards: 2,
|
|
rs: {nodes: 3},
|
|
rsOptions: {
|
|
setParameter: {
|
|
"failpoint.WTPreserveSnapshotHistoryIndefinitely": tojson({mode: "alwaysOn"}),
|
|
logComponentVerbosity: tojson({sharding: {reshard: 2}}),
|
|
},
|
|
},
|
|
});
|
|
|
|
const inputCollection = st.s.getCollection("reshardingDb.coll");
|
|
|
|
CreateShardedCollectionUtil.shardCollectionWithChunks(inputCollection, {oldKey: 1}, [
|
|
{min: {oldKey: MinKey}, max: {oldKey: 0}, shard: st.shard0.shardName},
|
|
{min: {oldKey: 0}, max: {oldKey: MaxKey}, shard: st.shard1.shardName},
|
|
]);
|
|
|
|
const inputCollectionUUID = getUUIDFromListCollections(inputCollection.getDB(), inputCollection.getName());
|
|
const inputCollectionUUIDString = extractUUIDFromObject(inputCollectionUUID);
|
|
|
|
const temporaryReshardingCollection = st.s.getCollection(`reshardingDb.system.resharding.${inputCollectionUUIDString}`);
|
|
|
|
CreateShardedCollectionUtil.shardCollectionWithChunks(temporaryReshardingCollection, {newKey: 1}, [
|
|
{min: {newKey: MinKey}, max: {newKey: 0}, shard: st.shard0.shardName},
|
|
{min: {newKey: 0}, max: {newKey: MaxKey}, shard: st.shard1.shardName},
|
|
]);
|
|
|
|
// The shardCollection command doesn't wait for the config.cache.chunks entries to have been written
|
|
// on the primary shard for the database. We manually run the _flushRoutingTableCacheUpdates command
|
|
// to guarantee they have been written and are visible with the atClusterTime used by the
|
|
// testReshardCloneCollection command.
|
|
for (const shard of [st.shard0, st.shard1]) {
|
|
assert.commandWorked(
|
|
shard.rs
|
|
.getPrimary()
|
|
.adminCommand({_flushRoutingTableCacheUpdates: temporaryReshardingCollection.getFullName()}),
|
|
);
|
|
}
|
|
|
|
assert.commandWorked(
|
|
inputCollection.insert([
|
|
{_id: "stays on shard0", oldKey: -10, newKey: -10},
|
|
{_id: "moves to shard0", oldKey: 10, newKey: -10},
|
|
{_id: "moves to shard1", oldKey: -10, newKey: 10},
|
|
{_id: "stays on shard1", oldKey: 10, newKey: 10},
|
|
]),
|
|
);
|
|
|
|
const atClusterTime = inputCollection.getDB().getSession().getOperationTime();
|
|
|
|
assert.commandWorked(
|
|
inputCollection.insert([
|
|
{_id: "not visible, but would stay on shard0", oldKey: -10, newKey: -10},
|
|
{_id: "not visible, but would move to shard0", oldKey: 10, newKey: -10},
|
|
{_id: "not visible, but would move to shard1", oldKey: -10, newKey: 10},
|
|
{_id: "not visible, but would stay on shard1", oldKey: 10, newKey: 10},
|
|
]),
|
|
);
|
|
|
|
// We wait for the "not visible" inserts to become majority-committed on all members of the replica
|
|
// set shards. This isn't necessary for the test's correctness but makes it more likely that the
|
|
// test would fail if ReshardingCollectionCloner wasn't specifying atClusterTime in its read
|
|
// concern.
|
|
st.shard0.rs.awaitLastOpCommitted();
|
|
st.shard1.rs.awaitLastOpCommitted();
|
|
|
|
const isMultiversion = Boolean(jsTest.options().useRandomBinVersionsWithinReplicaSet);
|
|
const reshardingCollectionClonerMaxStalenessSeconds = 100;
|
|
|
|
if (!isMultiversion) {
|
|
// Verify that the ReshardingCollectionCloner's maxStalenessSeconds can be set at runtime.
|
|
const shard0Primary = st.rs0.getPrimary();
|
|
const shard1Primary = st.rs1.getPrimary();
|
|
|
|
assert.commandWorked(shard0Primary.adminCommand({setParameter: 1, reshardingCollectionClonerMaxStalenessSeconds}));
|
|
assert.commandWorked(shard1Primary.adminCommand({setParameter: 1, reshardingCollectionClonerMaxStalenessSeconds}));
|
|
|
|
// Verify that setting the ReshardingCollectionCloner's maxStalenessSeconds to 0 or lower
|
|
// than the minimum is disallowed.
|
|
assert.commandFailedWithCode(
|
|
shard0Primary.adminCommand({setParameter: 1, reshardingCollectionClonerMaxStalenessSeconds: 0}),
|
|
ErrorCodes.BadValue,
|
|
);
|
|
assert.commandFailedWithCode(
|
|
shard0Primary.adminCommand({setParameter: 1, reshardingCollectionClonerMaxStalenessSeconds: 1}),
|
|
ErrorCodes.MaxStalenessOutOfRange,
|
|
);
|
|
}
|
|
|
|
function testReshardCloneCollection(shard, expectedDocs) {
|
|
const dbName = inputCollection.getDB().getName();
|
|
const allNodes = [...st.shard0.rs.nodes, ...st.shard1.rs.nodes];
|
|
|
|
for (const node of allNodes) {
|
|
node.getDB(dbName).setProfilingLevel(2);
|
|
}
|
|
|
|
const reshardCmd = {
|
|
testReshardCloneCollection: inputCollection.getFullName(),
|
|
shardKey: {newKey: 1},
|
|
uuid: inputCollectionUUID,
|
|
shardId: shard.shardName,
|
|
atClusterTime: atClusterTime,
|
|
outputNs: temporaryReshardingCollection.getFullName(),
|
|
};
|
|
jsTestLog({"Node": shard.rs.getPrimary(), "ReshardingCmd": reshardCmd});
|
|
assert.commandWorked(shard.rs.getPrimary().adminCommand(reshardCmd));
|
|
|
|
for (const node of allNodes) {
|
|
node.getDB(dbName).setProfilingLevel(0);
|
|
}
|
|
|
|
// We sort by oldKey so the order of `expectedDocs` can be deterministic.
|
|
assert.eq(
|
|
expectedDocs,
|
|
shard.rs
|
|
.getPrimary()
|
|
.getCollection(temporaryReshardingCollection.getFullName())
|
|
.find()
|
|
.sort({oldKey: 1})
|
|
.toArray(),
|
|
);
|
|
|
|
// Verify the ReshardingCollectionCloner is sending its aggregation requests with a logical
|
|
// session ID to prevent idle cursors from being timed out by the CursorManager.
|
|
for (const donorShard of [st.shard0, st.shard1]) {
|
|
const profilerEntries = donorShard.rs.nodes
|
|
.map((node) =>
|
|
node.getDB(dbName).system.profile.findOne({
|
|
op: "command",
|
|
ns: inputCollection.getFullName(),
|
|
"command.aggregate": inputCollection.getName(),
|
|
"command.collectionUUID": inputCollectionUUID,
|
|
}),
|
|
)
|
|
.filter((entry) => entry !== null);
|
|
|
|
assert.neq(
|
|
[],
|
|
profilerEntries,
|
|
`expected to find collection cloning aggregation in profiler output of some ${donorShard.shardName} node`,
|
|
);
|
|
|
|
for (const entry of profilerEntries) {
|
|
assert(
|
|
entry.command.hasOwnProperty("lsid"),
|
|
"expected profiler entry for collection cloning aggregation to have a logical" +
|
|
` session ID: ${tojson(entry)}`,
|
|
);
|
|
|
|
if (!isMultiversion) {
|
|
// Verify that the aggregation requests have readPreference "nearest" with the
|
|
// configured maxStalenessSeconds.
|
|
assert(
|
|
entry.command.hasOwnProperty("$queryOptions") &&
|
|
entry.command["$queryOptions"].hasOwnProperty("$readPreference"),
|
|
"expected profiler entry for collection cloning aggregation to have a readPreference" +
|
|
`: ${tojson(entry)}`,
|
|
);
|
|
const readPreference = entry.command["$queryOptions"]["$readPreference"];
|
|
assert.eq(readPreference.mode, "nearest", entry);
|
|
assert.eq(readPreference.maxStalenessSeconds, reshardingCollectionClonerMaxStalenessSeconds, entry);
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
testReshardCloneCollection(st.shard0, [
|
|
{_id: "stays on shard0", oldKey: -10, newKey: -10},
|
|
{_id: "moves to shard0", oldKey: 10, newKey: -10},
|
|
]);
|
|
|
|
testReshardCloneCollection(st.shard1, [
|
|
{_id: "moves to shard1", oldKey: -10, newKey: 10},
|
|
{_id: "stays on shard1", oldKey: 10, newKey: 10},
|
|
]);
|
|
|
|
// The temporary reshard collection must be dropped before checking metadata integrity.
|
|
assert(temporaryReshardingCollection.drop());
|
|
|
|
st.stop();
|