mongo/jstests/concurrency/fsm_workload_helpers/chunks.js

273 lines
12 KiB
JavaScript

/**
* Provides wrapper functions that perform exponential backoff and allow for
* acceptable errors to be returned from mergeChunks, moveChunk, and splitChunk
* commands.
*
* Also provides functions to help perform assertions about the state of chunks.
*
* Intended for use by workloads testing sharding (i.e., workloads starting with 'sharded_').
*/
import {isMongod, isMongodConfigsvr, isMongos} from "jstests/concurrency/fsm_workload_helpers/server_types.js";
import {findChunksUtil} from "jstests/sharding/libs/find_chunks_util.js";
export var ChunkHelper = (function () {
// exponential backoff
function getNextBackoffSleep(curSleep) {
const MAX_BACKOFF_SLEEP = 5000; // milliseconds
curSleep *= 2;
return Math.min(curSleep, MAX_BACKOFF_SLEEP);
}
function runCommandWithRetries(db, cmd, didAcceptableErrorOccurFn) {
const INITIAL_BACKOFF_SLEEP = 500; // milliseconds
const MAX_RETRIES = 5;
let res;
let retries = 0;
let backoffSleep = INITIAL_BACKOFF_SLEEP;
while (retries < MAX_RETRIES) {
retries++;
res = db.adminCommand(cmd);
// If the command worked, exit the loop early.
if (res.ok) {
return res;
}
// Assert command worked or acceptable error occurred.
if (didAcceptableErrorOccurFn(res)) {
// When an acceptable error occurs, sleep and then retry.
sleep(backoffSleep);
backoffSleep = getNextBackoffSleep(backoffSleep);
continue;
}
// Throw an exception if the command errored for any other reason.
assert.commandWorked(res, cmd);
}
return res;
}
function splitChunkAt(db, collName, middle) {
let cmd = {split: db[collName].getFullName(), middle: middle};
return runCommandWithRetries(db, cmd, (res) => res.code === ErrorCodes.LockBusy);
}
function splitChunkAtPoint(db, collName, splitPoint) {
return splitChunkAt(db, collName, {_id: splitPoint});
}
function splitChunkWithBounds(db, collName, bounds) {
let cmd = {split: db[collName].getFullName(), bounds: bounds};
return runCommandWithRetries(db, cmd, (res) => res.code === ErrorCodes.LockBusy);
}
function moveChunk(db, collName, bounds, toShard, waitForDelete, secondaryThrottle) {
let cmd = {
moveChunk: db[collName].getFullName(),
bounds: bounds,
to: toShard,
};
if (waitForDelete != null) {
cmd._waitForDelete = waitForDelete;
}
// Using _secondaryThrottle adds coverage for additional waits for write concern on the
// recipient during cloning.
if (secondaryThrottle != null) {
cmd._secondaryThrottle = secondaryThrottle;
cmd.writeConcern = {w: "majority"}; // _secondaryThrottle requires a write concern.
}
const runningWithStepdowns = TestData.runningWithConfigStepdowns || TestData.runningWithShardStepdowns;
const isSlowBuild = () => {
// Consider this a slow build on TSAN or if we're fuzzing mongod configs, which
// can intentionally slow the server.
return TestData.fuzzMongodConfigs || db.getServerBuildInfo().isThreadSanitizerActive();
};
return runCommandWithRetries(
db,
cmd,
(res) =>
res.code === ErrorCodes.ConflictingOperationInProgress ||
res.code === ErrorCodes.ChunkRangeCleanupPending ||
res.code === ErrorCodes.LockTimeout ||
// The chunk migration has surely been aborted if the startCommit of the
// procedure was interrupted by a stepdown.
((runningWithStepdowns || isSlowBuild()) &&
res.code === ErrorCodes.CommandFailed &&
res.errmsg.includes("startCommit")) ||
// The chunk migration has surely been aborted if the recipient shard didn't
// believe there was an active chunk migration.
(runningWithStepdowns &&
res.code === ErrorCodes.OperationFailed &&
res.errmsg.includes("NotYetInitialized")) ||
// The chunk migration has surely been aborted if there was another active
// chunk migration on the donor.
(runningWithStepdowns &&
res.code === ErrorCodes.OperationFailed &&
res.errmsg.includes("does not match active session id")) ||
// A stepdown may cause the collection's lock to become temporarily unreleasable
// and cause the chunk migration to timeout. The migration may still succeed
// after the lock's lease expires.
(runningWithStepdowns && res.code === ErrorCodes.LockBusy) ||
// Slow builds can result in a failure when waiting for write concern in certain
// migration phases.
(isSlowBuild() &&
res.code === ErrorCodes.OperationFailed &&
res.errmsg.includes("WriteConcernTimeout")),
);
}
function mergeChunks(db, collName, bounds) {
let cmd = {mergeChunks: db[collName].getFullName(), bounds: bounds};
return runCommandWithRetries(db, cmd, (res) => res.code === ErrorCodes.LockBusy);
}
// Take a set of connections to a shard (replica set or standalone mongod),
// or a set of connections to the config servers, and return a connection
// to any node in the set for which isWritablePrimary is true.
function getPrimary(connArr) {
const kDefaultTimeoutMS = 10 * 60 * 1000; // 10 minutes.
assert(Array.isArray(connArr), "Expected an array but got " + tojson(connArr));
let primary = null;
assert.soon(
() => {
for (let conn of connArr) {
assert(isMongod(conn.getDB("admin")), tojson(conn) + " is not to a mongod");
let res = conn.adminCommand({hello: 1});
assert.commandWorked(res);
if (res.isWritablePrimary) {
primary = conn;
return primary;
}
}
},
"Finding primary timed out",
kDefaultTimeoutMS,
);
return primary;
}
// Take a set of mongos connections to a sharded cluster and return a
// random connection.
function getRandomMongos(connArr) {
assert(Array.isArray(connArr), "Expected an array but got " + tojson(connArr));
let conn = connArr[Random.randInt(connArr.length)];
assert(isMongos(conn.getDB("admin")), tojson(conn) + " is not to a mongos");
return conn;
}
// Intended for use on mongos connections only.
// Return all shards containing documents in [lower, upper).
function getShardsForRange(conn, collName, lower, upper) {
assert(isMongos(conn.getDB("admin")), tojson(conn) + " is not to a mongos");
let adminDB = conn.getDB("admin");
let shardVersion = adminDB.runCommand({getShardVersion: collName, fullMetadata: true});
assert.commandWorked(shardVersion);
// As noted in SERVER-20768, doing a range query with { $lt : X }, where
// X is the _upper bound_ of a chunk, incorrectly targets the shard whose
// _lower bound_ is X. Therefore, if upper !== MaxKey, we use a workaround
// to ensure that only the shard whose lower bound = X is targeted.
let query;
if (upper === MaxKey) {
query = {$and: [{_id: {$gte: lower}}, {_id: {$lt: upper}}]};
} else {
query = {$and: [{_id: {$gte: lower}}, {_id: {$lte: upper - 1}}]};
}
let res = conn.getCollection(collName).find(query).explain();
assert.commandWorked(res);
assert.gt(res.queryPlanner.winningPlan.shards.length, 0, "Explain did not have shards key.");
let shards = res.queryPlanner.winningPlan.shards.map((shard) => shard.shardName);
return {shards: shards, explain: res, query: query, shardVersion: shardVersion};
}
function itcount(collection, query) {
// We set a large batch size and project out all of the fields in order to greatly reduce
// the likelihood a cursor would actually be returned. This is acceptable because we're only
// interested in how many documents there were and not any of their contents. The
// network_error_and_txn_override.js override would throw an exception if we attempted to
// use the getMore command.
return collection.find(query, {_id: 0, nonExistingField: 1}).batchSize(1e6).itcount();
}
// Return the number of docs in [lower, upper) as seen by conn.
// Note it will ignore those documents whose shardKey value type does not match the type of
// lower/upper.
function getNumDocs(conn, collName, lower, upper) {
let coll = conn.getCollection(collName);
let query = {$and: [{_id: {$gte: lower}}, {_id: {$lt: upper}}]};
return itcount(coll, query);
}
// Intended for use on config or mongos connections only.
// Get number of chunks containing values in [lower, upper). The upper bound on a chunk is
// exclusive, but to capture the chunk we must provide it with less than or equal to 'upper'.
function getNumChunks(conn, ns, lower, upper) {
assert(
isMongos(conn.getDB("admin")) || isMongodConfigsvr(conn.getDB("admin")),
tojson(conn) + " is not to a mongos or a mongod config server",
);
assert(
isString(ns) && ns.indexOf(".") !== -1 && !ns.startsWith(".") && !ns.endsWith("."),
ns + " is not a valid namespace",
);
return findChunksUtil
.findChunksByNs(conn.getDB("config"), ns, {"min._id": {$gte: lower}, "max._id": {$lte: upper}})
.itcount();
}
// Intended for use on config or mongos connections only.
// For getting chunks containing values in [lower, upper). The upper bound on a chunk is
// exclusive, but to capture the chunk we must provide it with less than or equal to 'upper'.
function getChunks(conn, ns, lower, upper) {
assert(
isMongos(conn.getDB("admin")) || isMongodConfigsvr(conn.getDB("admin")),
tojson(conn) + " is not to a mongos or a mongod config server",
);
assert(
isString(ns) && ns.indexOf(".") !== -1 && !ns.startsWith(".") && !ns.endsWith("."),
ns + " is not a valid namespace",
);
return findChunksUtil
.findChunksByNs(conn.getDB("config"), ns, {"min._id": {$gte: lower}, "max._id": {$lte: upper}})
.sort({"min._id": 1})
.toArray();
}
// Intended for use on config or mongos connections only.
// For debug printing chunks containing values in [lower, upper). The upper bound on a chunk is
// exclusive, but to capture the chunk we must provide it with less than or equal to 'upper'.
function stringifyChunks(conn, lower, upper) {
assert(
isMongos(conn.getDB("admin")) || isMongodConfigsvr(conn.getDB("admin")),
tojson(conn) + " is not to a mongos or a mongod config server",
);
return getChunks(conn, lower, upper)
.map((chunk) => tojson(chunk))
.join("\n");
}
return {
splitChunkAt: splitChunkAt,
splitChunkAtPoint: splitChunkAtPoint,
splitChunkWithBounds: splitChunkWithBounds,
moveChunk: moveChunk,
mergeChunks: mergeChunks,
getPrimary: getPrimary,
getRandomMongos: getRandomMongos,
getShardsForRange: getShardsForRange,
getNumDocs: getNumDocs,
getNumChunks: getNumChunks,
getChunks: getChunks,
stringifyChunks: stringifyChunks,
};
})();