mirror of https://github.com/mongodb/mongo
296 lines
11 KiB
JavaScript
296 lines
11 KiB
JavaScript
/*
|
|
* Test that when featureFlagReshardingSkipCloningAndApplying is enabled, a recipient shard that
|
|
* is not going to own any chunks for the collection after resharding would only clone the options
|
|
* and indexes for the collection, i.e. skip cloning documents and fetching/applying oplog entries
|
|
* for it. In addition, test that after failover the skip is respected and the
|
|
* 'approxDocumentsToCopy' and 'approxBytesToCopy' are restored correctly.
|
|
* @tags: [
|
|
* featureFlagReshardingSkipCloningAndApplying,
|
|
* requires_fcv_81,
|
|
* # TODO (SERVER-103522): Re-enable this test.
|
|
* DISABLED_TEMPORARILY_DUE_TO_FCV_UPGRADE,
|
|
* # TODO (SERVER-104862): Re-enable this test in aubsan variants.
|
|
* incompatible_aubsan,
|
|
* ]
|
|
*/
|
|
|
|
import {configureFailPoint} from "jstests/libs/fail_point_util.js";
|
|
import {Thread} from "jstests/libs/parallelTester.js";
|
|
import {ReplSetTest} from "jstests/libs/replsettest.js";
|
|
import {ShardingTest} from "jstests/libs/shardingtest.js";
|
|
import {extractUUIDFromObject} from "jstests/libs/uuid_util.js";
|
|
|
|
function getDottedField(doc, fieldName) {
|
|
let val = doc;
|
|
const fieldNames = fieldName.split(".");
|
|
for (let i = 0; i < fieldNames.length; i++) {
|
|
val = val[fieldNames[i]];
|
|
}
|
|
return val;
|
|
}
|
|
|
|
function getCollectionUuid(db, collName) {
|
|
const listCollectionRes = assert.commandWorked(db.runCommand({listCollections: 1, filter: {name: collName}}));
|
|
return listCollectionRes.cursor.firstBatch[0].info.uuid;
|
|
}
|
|
|
|
function checkCollectionOptionsAndIndexes(conn, ns, expectedOptions, expectedIndexes) {
|
|
const coll = conn.getCollection(ns);
|
|
const listIndexesDoc = coll.exists();
|
|
for (let fieldName in expectedOptions) {
|
|
const actual = getDottedField(listIndexesDoc.options, fieldName);
|
|
const expected = expectedOptions[fieldName];
|
|
assert.eq(bsonUnorderedFieldsCompare(actual, expected), 0, {fieldName, actual, expected});
|
|
}
|
|
|
|
const actualIndexes = coll.getIndexes();
|
|
expectedIndexes.forEach((expectedIndex) => {
|
|
assert(
|
|
actualIndexes.some((actualIndex) => bsonWoCompare(actualIndex.key, expectedIndex) != 0),
|
|
{actualIndexes, expectedIndex},
|
|
);
|
|
});
|
|
}
|
|
|
|
function checkMoveCollectionCloningMetrics(st, ns, numDocs, numBytes, primaryShardName, toShardName) {
|
|
assert.neq(primaryShardName, toShardName);
|
|
let currentOps;
|
|
assert.soon(
|
|
() => {
|
|
currentOps = st.s
|
|
.getDB("admin")
|
|
.aggregate([
|
|
{$currentOp: {allUsers: true, localOps: false}},
|
|
{
|
|
$match: {
|
|
type: "op",
|
|
"originatingCommand.reshardCollection": ns,
|
|
recipientState: {$exists: true},
|
|
},
|
|
},
|
|
])
|
|
.toArray();
|
|
if (currentOps.length < 2) {
|
|
return false;
|
|
}
|
|
for (let op of currentOps) {
|
|
if (op.recipientState != "cloning") {
|
|
return false;
|
|
}
|
|
}
|
|
return true;
|
|
},
|
|
() => tojson(currentOps),
|
|
);
|
|
|
|
assert.eq(currentOps.length, 2, currentOps);
|
|
currentOps.forEach((op) => {
|
|
if (op.shard == primaryShardName) {
|
|
assert.eq(op.approxDocumentsToCopy, 0, {op});
|
|
assert.eq(op.approxBytesToCopy, 0, {op});
|
|
} else if (op.shard == toShardName) {
|
|
assert.eq(op.approxDocumentsToCopy, numDocs, {op});
|
|
assert.eq(op.approxBytesToCopy, numBytes, {op});
|
|
} else {
|
|
throw Error("Unexpected shard name " + tojson(op));
|
|
}
|
|
});
|
|
}
|
|
|
|
function checkCollectionExistence(conn, ns, exists) {
|
|
const coll = conn.getCollection(ns);
|
|
if (exists) {
|
|
assert(coll.exists());
|
|
} else {
|
|
assert(!coll.exists());
|
|
}
|
|
}
|
|
|
|
function checkOplogBufferAndConflictStashCollections(conn, collUuid, donorShardName, exists) {
|
|
const oplogBufferNs = "config.localReshardingOplogBuffer." + collUuid + "." + donorShardName;
|
|
checkCollectionExistence(conn, oplogBufferNs, exists);
|
|
const conflictStashNs = "config.localReshardingConflictStash." + collUuid + "." + donorShardName;
|
|
checkCollectionExistence(conn, conflictStashNs, exists);
|
|
}
|
|
|
|
function runTest(testOptions) {
|
|
jsTest.log("Testing with " + tojson(testOptions));
|
|
const st = new ShardingTest({
|
|
mongos: 1,
|
|
shards: 2,
|
|
rs: {
|
|
nodes: 2,
|
|
setParameter: testOptions.setParameters,
|
|
},
|
|
// By default, our test infrastructure sets the election timeout to a very high value (24
|
|
// hours). For this test, we need a shorter election timeout because it relies on nodes
|
|
// running an election when they do not detect an active primary. Therefore, we are setting
|
|
// the electionTimeoutMillis to its default value.
|
|
initiateWithDefaultElectionTimeout: true,
|
|
});
|
|
|
|
// Create an unsharded collection on shard0 (primary shard) and move the collection from
|
|
// shard0 to shard1.
|
|
const dbName = "testDb";
|
|
const collName = "testColl";
|
|
const ns = dbName + "." + collName;
|
|
const db = st.s.getDB(dbName);
|
|
const coll = db.getCollection(collName);
|
|
const options = {validator: {x: {$gte: 0}}};
|
|
|
|
assert.commandWorked(st.s.adminCommand({enableSharding: dbName, primaryShard: st.shard0.shardName}));
|
|
assert.commandWorked(
|
|
db.runCommand(
|
|
Object.assign(
|
|
{
|
|
create: collName,
|
|
},
|
|
options,
|
|
),
|
|
),
|
|
);
|
|
|
|
const numDocs = 100;
|
|
const docs = [];
|
|
for (let i = 0; i < numDocs; i++) {
|
|
const doc = {_id: i, x: i};
|
|
docs.push(doc);
|
|
}
|
|
const numBytes = numDocs * Object.bsonsize({_id: 0, x: 0});
|
|
assert.commandWorked(coll.insert(docs));
|
|
const collUuid = extractUUIDFromObject(getCollectionUuid(db, collName));
|
|
|
|
const indexes = [{x: 1}];
|
|
indexes.forEach((index) => assert.commandWorked(coll.createIndex(index)));
|
|
|
|
const oldShard0Primary = st.rs0.getPrimary();
|
|
const oldShard1Primary = st.rs1.getPrimary();
|
|
checkCollectionOptionsAndIndexes(oldShard0Primary, ns, options, indexes);
|
|
|
|
const shard0Indexes = st.rs0.getPrimary().getCollection(ns).getIndexes();
|
|
assert(
|
|
shard0Indexes.some((index) => bsonWoCompare(index.key, {x: 1}) != 0),
|
|
shard0Indexes,
|
|
);
|
|
|
|
// Pause resharding recipients (both shard0 and shard1) at the "cloning" state.
|
|
const shard0CloningFps = st.rs0.nodes.map((node) =>
|
|
configureFailPoint(node, "reshardingPauseRecipientBeforeCloning"),
|
|
);
|
|
const shard1CloningFps = st.rs1.nodes.map((node) =>
|
|
configureFailPoint(node, "reshardingPauseRecipientBeforeCloning"),
|
|
);
|
|
|
|
// Also pause resharding coordinator before it starts committing.
|
|
const beforePersistingDecisionFps = st.configRS.nodes.map((node) =>
|
|
configureFailPoint(node, "reshardingPauseCoordinatorBeforeDecisionPersisted"),
|
|
);
|
|
|
|
const thread = new Thread(
|
|
(host, ns, toShard) => {
|
|
const mongos = new Mongo(host);
|
|
assert.soonRetryOnAcceptableErrors(() => {
|
|
assert.commandWorked(mongos.adminCommand({moveCollection: ns, toShard}));
|
|
return true;
|
|
}, [ErrorCodes.FailedToSatisfyReadPreference, ErrorCodes.NetworkInterfaceExceededTimeLimit]);
|
|
},
|
|
st.s.host,
|
|
ns,
|
|
st.shard1.shardName,
|
|
);
|
|
thread.start();
|
|
|
|
checkMoveCollectionCloningMetrics(
|
|
st,
|
|
ns,
|
|
numDocs,
|
|
numBytes,
|
|
st.shard0.shardName /* primaryShard */,
|
|
st.shard1.shardName /* toShard */,
|
|
);
|
|
|
|
// Trigger a failover on shard0.
|
|
assert.commandWorked(oldShard0Primary.adminCommand({replSetStepDown: ReplSetTest.kForeverSecs, force: true}));
|
|
assert.commandWorked(oldShard0Primary.adminCommand({replSetFreeze: 0}));
|
|
const newShard0Primary = st.rs0.waitForPrimary();
|
|
|
|
// Trigger a failover on shard1.
|
|
assert.commandWorked(oldShard1Primary.adminCommand({replSetStepDown: ReplSetTest.kForeverSecs, force: true}));
|
|
assert.commandWorked(oldShard1Primary.adminCommand({replSetFreeze: 0}));
|
|
const newShard1Primary = st.rs1.waitForPrimary();
|
|
|
|
checkMoveCollectionCloningMetrics(
|
|
st,
|
|
ns,
|
|
numDocs,
|
|
numBytes,
|
|
st.shard0.shardName /* primaryShard */,
|
|
st.shard1.shardName /* toShard */,
|
|
);
|
|
|
|
shard0CloningFps.forEach((fp) => fp.off());
|
|
shard1CloningFps.forEach((fp) => fp.off());
|
|
|
|
// Verify that shard0 which is a recipient that is not going to own any chunk for the collection
|
|
// after resharding skipped fetching/applying oplog entries.
|
|
beforePersistingDecisionFps.forEach((fp) => {
|
|
if (fp.conn == st.configRS.getPrimary()) {
|
|
fp.wait();
|
|
}
|
|
});
|
|
checkOplogBufferAndConflictStashCollections(
|
|
newShard0Primary,
|
|
collUuid,
|
|
st.shard0.shardName,
|
|
testOptions.shouldBufferCollectionExistsInNoChunkRecipient,
|
|
);
|
|
checkOplogBufferAndConflictStashCollections(newShard1Primary, collUuid, st.shard0.shardName, true /* exists */);
|
|
|
|
beforePersistingDecisionFps.forEach((fp) => fp.off());
|
|
thread.join();
|
|
|
|
checkCollectionOptionsAndIndexes(newShard1Primary, ns, options, indexes);
|
|
// Verify that shard0 cloned the options and indexes for the collection.
|
|
checkCollectionOptionsAndIndexes(newShard0Primary, ns, options, indexes);
|
|
|
|
assert(coll.drop());
|
|
st.stop();
|
|
}
|
|
|
|
const testCases = [
|
|
{
|
|
// Note: need to specify both all the time in order for all feature flag
|
|
// suites not to override them.
|
|
setParameters: {
|
|
featureFlagReshardingSkipCloningAndApplyingIfApplicable: false,
|
|
featureFlagReshardingSkipCloningIfApplicable: false,
|
|
},
|
|
shouldBufferCollectionExistsInNoChunkRecipient: true,
|
|
},
|
|
{
|
|
setParameters: {
|
|
featureFlagReshardingSkipCloningAndApplyingIfApplicable: false,
|
|
featureFlagReshardingSkipCloningIfApplicable: true,
|
|
},
|
|
shouldBufferCollectionExistsInNoChunkRecipient: true,
|
|
},
|
|
{
|
|
setParameters: {
|
|
featureFlagReshardingSkipCloningAndApplyingIfApplicable: true,
|
|
featureFlagReshardingSkipCloningIfApplicable: false,
|
|
},
|
|
shouldBufferCollectionExistsInNoChunkRecipient: false,
|
|
},
|
|
{
|
|
setParameters: {
|
|
featureFlagReshardingSkipCloningAndApplyingIfApplicable: true,
|
|
featureFlagReshardingSkipCloningIfApplicable: true,
|
|
},
|
|
shouldBufferCollectionExistsInNoChunkRecipient: false,
|
|
},
|
|
];
|
|
|
|
testCases.forEach((testCase) => {
|
|
runTest(testCase);
|
|
});
|