mirror of https://github.com/mongodb/mongo
2296 lines
90 KiB
JavaScript
2296 lines
90 KiB
JavaScript
import {getTimeseriesCollForDDLOps} from "jstests/core/timeseries/libs/viewless_timeseries_util.js";
|
|
import {FeatureFlagUtil} from "jstests/libs/feature_flag_util.js";
|
|
import {Thread} from "jstests/libs/parallelTester.js";
|
|
import {ReplSetTest} from "jstests/libs/replsettest.js";
|
|
|
|
// Symbol used to override the constructor. Please do not use this, it's only meant to aid
|
|
// in migrating the jstest corpus to proper module usage.
|
|
export const kOverrideConstructor = Symbol("overrideConstructor");
|
|
|
|
// Timeout to be used for operations scheduled by the sharding test, which must wait for write
|
|
// concern (5 minutes)
|
|
const kDefaultWTimeoutMs = 5 * 60 * 1000;
|
|
|
|
// Oplog collection name
|
|
const kOplogName = "oplog.rs";
|
|
|
|
export class ShardingTest {
|
|
// ShardingTest API
|
|
getDB(name) {
|
|
return this.s.getDB(name);
|
|
}
|
|
|
|
/**
|
|
* Finds the _id of the primary shard for database 'dbname', e.g., 'test-rs0'
|
|
*/
|
|
getPrimaryShardIdForDatabase(dbname) {
|
|
let x = this.config.databases.findOne({_id: "" + dbname});
|
|
if (x) {
|
|
return x.primary;
|
|
}
|
|
|
|
let countDBsFound = 0;
|
|
this.config.databases.find().forEach(function(db) {
|
|
countDBsFound++;
|
|
jsTest.log.info({db});
|
|
});
|
|
throw Error("couldn't find dbname: " + dbname +
|
|
" in config.databases. Total DBs: " + countDBsFound);
|
|
}
|
|
|
|
getNonPrimaries(dbname) {
|
|
let x = this.config.databases.findOne({_id: dbname});
|
|
if (!x) {
|
|
this.config.databases.find().forEach(jsTest.log.info);
|
|
throw Error("couldn't find dbname: " + dbname +
|
|
" total: " + this.config.databases.count());
|
|
}
|
|
|
|
return this.config.shards.find({_id: {$ne: x.primary}}).map((z) => z._id);
|
|
}
|
|
|
|
printNodes() {
|
|
jsTest.log.info("ShardingTest " + this._testName, {
|
|
config: this._configDB,
|
|
shards: this._connections,
|
|
mongos: this._mongos,
|
|
});
|
|
}
|
|
|
|
getConnNames() {
|
|
let names = [];
|
|
for (let i = 0; i < this._connections.length; i++) {
|
|
names.push(this._connections[i].name);
|
|
}
|
|
return names;
|
|
}
|
|
|
|
/**
|
|
* Find the connection to the primary shard for database 'dbname'.
|
|
*/
|
|
getPrimaryShard(dbname) {
|
|
let dbPrimaryShardId = this.getPrimaryShardIdForDatabase(dbname);
|
|
let primaryShard = this.config.shards.findOne({_id: dbPrimaryShardId});
|
|
|
|
if (primaryShard) {
|
|
const shardConnectionString = primaryShard.host;
|
|
let rsName = shardConnectionString.substring(0, shardConnectionString.indexOf("/"));
|
|
|
|
for (let i = 0; i < this._connections.length; i++) {
|
|
let c = this._connections[i];
|
|
if (connectionURLTheSame(shardConnectionString, c.name) ||
|
|
connectionURLTheSame(rsName, c.name))
|
|
return c;
|
|
}
|
|
}
|
|
|
|
throw Error("can't find server connection for db '" + dbname +
|
|
"'s primary shard: " + tojson(primaryShard));
|
|
}
|
|
|
|
// TODO SERVER-95358 remove once 9.0 becomes last LTS.
|
|
getMergeType(db) {
|
|
if (FeatureFlagUtil.isPresentAndEnabled(db, "AggMongosToRouter")) {
|
|
return "router";
|
|
}
|
|
return "mongos";
|
|
}
|
|
|
|
normalize(x) {
|
|
let z = this.config.shards.findOne({host: x});
|
|
if (z)
|
|
return z._id;
|
|
return x;
|
|
}
|
|
|
|
/**
|
|
* Find a different shard connection than the one given.
|
|
*/
|
|
getOther(one) {
|
|
if (this._connections.length < 2) {
|
|
throw Error("getOther only works with 2 shards");
|
|
}
|
|
|
|
if (one._mongo) {
|
|
one = one._mongo;
|
|
}
|
|
|
|
for (let i = 0; i < this._connections.length; i++) {
|
|
if (this._connections[i] != one) {
|
|
return this._connections[i];
|
|
}
|
|
}
|
|
|
|
return null;
|
|
}
|
|
|
|
getAnother(one) {
|
|
if (this._connections.length < 2) {
|
|
throw Error("getAnother() only works with multiple servers");
|
|
}
|
|
|
|
if (one._mongo) {
|
|
one = one._mongo;
|
|
}
|
|
|
|
for (let i = 0; i < this._connections.length; i++) {
|
|
if (this._connections[i] == one)
|
|
return this._connections[(i + 1) % this._connections.length];
|
|
}
|
|
}
|
|
|
|
stopAllConfigServers(opts, forRestart = undefined) {
|
|
this.configRS.stopSet(undefined, forRestart, opts);
|
|
}
|
|
|
|
stopAllShards(opts = {}, forRestart = undefined) {
|
|
if (isShutdownParallelSupported(this, opts)) {
|
|
const threads = [];
|
|
try {
|
|
for (let {rstArgs} of replicaSetsToTerminate(this, this._rs)) {
|
|
const thread = new Thread(
|
|
async (rstArgs, signal, forRestart, opts) => {
|
|
const {ReplSetTest} = await import("jstests/libs/replsettest.js");
|
|
try {
|
|
const rst = new ReplSetTest({rstArgs});
|
|
rst.stopSet(signal, forRestart, opts);
|
|
return {ok: 1};
|
|
} catch (e) {
|
|
return {
|
|
ok: 0,
|
|
hosts: rstArgs.nodeHosts,
|
|
name: rstArgs.name,
|
|
error: e.toString(),
|
|
stack: e.stack,
|
|
};
|
|
}
|
|
},
|
|
rstArgs,
|
|
15,
|
|
forRestart,
|
|
opts,
|
|
);
|
|
|
|
thread.start();
|
|
threads.push(thread);
|
|
}
|
|
} finally {
|
|
// Wait for each thread to finish. Throw an error if any thread fails.
|
|
const returnData = threads.map((thread) => {
|
|
thread.join();
|
|
return thread.returnData();
|
|
});
|
|
|
|
returnData.forEach((res) => {
|
|
assert.commandWorked(res,
|
|
"terminating shard or config server replica sets failed");
|
|
});
|
|
}
|
|
} else {
|
|
// The replica sets shutting down serially
|
|
this._rs.forEach((rs) => {
|
|
rs.test.stopSet(15, forRestart, opts);
|
|
});
|
|
}
|
|
}
|
|
|
|
stopAllMongos(opts) {
|
|
for (let i = 0; i < this._mongos.length; i++) {
|
|
this.stopMongos(i, opts);
|
|
}
|
|
}
|
|
|
|
awaitMigrations() {
|
|
this.stopBalancer();
|
|
|
|
const numShards = this._rs.length;
|
|
for (let i = 0; i < numShards; i++) {
|
|
assert.commandWorked(this["shard" + i].adminCommand({"_shardsvrJoinMigrations": 1}));
|
|
}
|
|
}
|
|
|
|
isReplicaSetEndpointActive() {
|
|
const numShards = this._rs.length;
|
|
return numShards == 1 && this._rs[0].test.isReplicaSetEndpointActive();
|
|
}
|
|
|
|
stop(opts = {}) {
|
|
this.checkMetadataConsistency();
|
|
this.checkUUIDsConsistentAcrossCluster();
|
|
this.checkIndexesConsistentAcrossCluster();
|
|
this.checkOrphansAreDeleted();
|
|
this.checkRoutingTableConsistency();
|
|
this.checkShardFilteringMetadata();
|
|
|
|
if (jsTestOptions().alwaysUseLogFiles) {
|
|
if (opts.noCleanData === false) {
|
|
throw new Error("Always using log files, but received conflicting option.");
|
|
}
|
|
|
|
opts.noCleanData = true;
|
|
}
|
|
|
|
this.stopAllMongos(opts);
|
|
|
|
if (jsTestOptions().runningWithConfigStepdowns && this.isConfigShardMode) {
|
|
// In case of a cluster with a config shard, the config server replica set is stopped
|
|
// via stopAllShards, which doesn't stop the continuous stepdown stop.
|
|
this.configRS.stopContinuousFailover();
|
|
}
|
|
|
|
let startTime = new Date(); // Measure the execution time of shutting down shards.
|
|
this.stopAllShards(opts);
|
|
jsTest.log.info(
|
|
"ShardingTest stopped all shards, took " + (new Date() - startTime) + "ms for " +
|
|
this._connections.length + " shards.",
|
|
);
|
|
|
|
if (!this.isConfigShardMode) {
|
|
this.stopAllConfigServers(opts);
|
|
}
|
|
|
|
let timeMillis = new Date().getTime() - this._startTime.getTime();
|
|
|
|
jsTest.log.info(
|
|
"*** ShardingTest " + this._testName + " completed successfully in " +
|
|
timeMillis / 1000 + " seconds ***",
|
|
);
|
|
}
|
|
|
|
stopOnFail() {
|
|
try {
|
|
this.stopAllMongos();
|
|
} catch (e) {
|
|
jsTest.log.info("Did not successfully stop all mongos.");
|
|
}
|
|
try {
|
|
this.stopAllShards();
|
|
} catch (e) {
|
|
jsTest.log.info("Did not successfully stop all shards.");
|
|
}
|
|
try {
|
|
this.stopAllConfigServers();
|
|
} catch (e) {
|
|
jsTest.log.info("Did not successfully stop all config servers.");
|
|
}
|
|
}
|
|
|
|
adminCommand(cmd) {
|
|
let res = this.admin.runCommand(cmd);
|
|
if (res && res.ok == 1)
|
|
return true;
|
|
|
|
throw _getErrorWithCode(res, "command " + tojson(cmd) + " failed: " + tojson(res));
|
|
}
|
|
|
|
restartAllConfigServers(opts) {
|
|
this.configRS.startSet(opts, true);
|
|
// We wait until a primary has been chosen since startSet can return without having elected
|
|
// one. This can cause issues that expect a functioning replicaset once this method returns.
|
|
this.configRS.waitForPrimary();
|
|
// We also wait for all secondaries to catch up with the primary, to ensure nodes complete any
|
|
// rollback that may have been triggered after becoming secondary. Rollback causes nodes to
|
|
// close connections, which can interfere with subsequent operations.
|
|
this.configRS.awaitReplication();
|
|
}
|
|
|
|
restartAllShards(opts) {
|
|
this._rs.forEach((rs) => {
|
|
rs.test.startSet(opts, true);
|
|
// We wait until a primary has been chosen since startSet can return without having
|
|
// elected one. This can cause issues that expect a functioning replicaset once this
|
|
// method returns.
|
|
rs.test.waitForPrimary();
|
|
// We also wait for all secondaries to catch up with the primary, to ensure nodes complete any
|
|
// rollback that may have been triggered after becoming secondary. Rollback causes nodes to
|
|
// close connections, which can interfere with subsequent operations.
|
|
rs.test.awaitReplication();
|
|
});
|
|
}
|
|
|
|
forEachConnection(fn) {
|
|
this._connections.forEach(function(conn) {
|
|
fn(conn);
|
|
});
|
|
}
|
|
|
|
forEachMongos(fn) {
|
|
this._mongos.forEach(function(conn) {
|
|
fn(conn);
|
|
});
|
|
}
|
|
|
|
forEachConfigServer(fn) {
|
|
this.configRS.nodes.forEach(function(conn) {
|
|
fn(conn);
|
|
});
|
|
}
|
|
|
|
printChangeLog() {
|
|
this.config.changelog.find().forEach(function(z) {
|
|
let msg = z.server + "\t" + z.time + "\t" + z.what;
|
|
for (let i = z.what.length; i < 15; i++)
|
|
msg += " ";
|
|
|
|
msg += " " + z.ns + "\t";
|
|
if (z.what == "split") {
|
|
msg += _rangeToString(z.details.before) + " -->> (" +
|
|
_rangeToString(z.details.left) + "), (" + _rangeToString(z.details.right) + ")";
|
|
} else if (z.what == "multi-split") {
|
|
msg += _rangeToString(z.details.before) + " -->> (" + z.details.number + "/" +
|
|
z.details.of + " " + _rangeToString(z.details.chunk) + ")";
|
|
} else {
|
|
msg += tojsononeline(z.details);
|
|
}
|
|
|
|
jsTest.log.info("ShardingTest " + msg);
|
|
});
|
|
}
|
|
|
|
getChunksString(ns) {
|
|
if (ns) {
|
|
let query = {};
|
|
let sorting_criteria = {};
|
|
const collection = this.config.collections.findOne({_id: ns});
|
|
if (!collection) {
|
|
return "";
|
|
}
|
|
|
|
if (collection.timestamp) {
|
|
const collectionUUID = collection.uuid;
|
|
assert.neq(collectionUUID, null);
|
|
|
|
query.uuid = collectionUUID;
|
|
sorting_criteria = {uuid: 1, min: 1};
|
|
} else {
|
|
query.ns = ns;
|
|
sorting_criteria = {ns: 1, min: 1};
|
|
}
|
|
|
|
let s = "";
|
|
this.config.chunks.find(query).sort(sorting_criteria).forEach(function(z) {
|
|
s += " \t" + z._id + "\t" + z.lastmod.t + "|" + z.lastmod.i + "\t" + tojson(z.min) +
|
|
" -> " + tojson(z.max) + " " + z.shard + " " + ns + "\n";
|
|
});
|
|
|
|
return s;
|
|
} else {
|
|
// call get chunks String for every namespace in the collections
|
|
let collections_cursor = this.config.collections.find();
|
|
let s = "";
|
|
while (collections_cursor.hasNext()) {
|
|
var ns = collections_cursor.next()._id;
|
|
s += this.getChunksString(ns);
|
|
}
|
|
return s;
|
|
}
|
|
}
|
|
|
|
printChunks(ns) {
|
|
jsTest.log.info("ShardingTest " + this.getChunksString(ns));
|
|
}
|
|
|
|
printShardingStatus(verbose) {
|
|
printShardingStatus(this.config, verbose);
|
|
}
|
|
|
|
printCollectionInfo(ns, msg) {
|
|
let out = "";
|
|
if (msg) {
|
|
out += msg + "\n";
|
|
}
|
|
out += "sharding collection info: " + ns + "\n";
|
|
|
|
for (var i = 0; i < this._connections.length; i++) {
|
|
var c = this._connections[i];
|
|
out += " mongod " + c + " " +
|
|
tojson(c.getCollection(ns).getShardVersion(), " ", true) + "\n";
|
|
}
|
|
|
|
for (var i = 0; i < this._mongos.length; i++) {
|
|
var c = this._mongos[i];
|
|
out += " mongos " + c + " " +
|
|
tojson(c.getCollection(ns).getShardVersion(), " ", true) + "\n";
|
|
}
|
|
|
|
out += this.getChunksString(ns);
|
|
|
|
jsTest.log.info("ShardingTest " + out);
|
|
}
|
|
|
|
/**
|
|
* Returns the number of shards which contain the given dbName.collName collection
|
|
*/
|
|
onNumShards(dbName, collName) {
|
|
return this.shardCounts(dbName, collName)
|
|
.reduce(
|
|
(total, currentValue) => total + (currentValue > 0 ? 1 : 0),
|
|
0,
|
|
);
|
|
}
|
|
|
|
/**
|
|
* Returns an array of the size of numShards where each element is the number of documents on
|
|
* that particular shard
|
|
*/
|
|
shardCounts(dbName, collName) {
|
|
return this._connections.map((connection) =>
|
|
connection.getDB(dbName).getCollection(collName).count());
|
|
}
|
|
|
|
chunkCounts(collName, dbName) {
|
|
dbName = dbName || "test";
|
|
|
|
let sDB = this.s.getDB(dbName);
|
|
let sColl = sDB.getCollection(collName);
|
|
if (sColl.getMetadata()?.type === "timeseries") {
|
|
collName = getTimeseriesCollForDDLOps(sDB, sColl).getName();
|
|
}
|
|
|
|
let x = {};
|
|
this.config.shards.find().forEach(function(z) {
|
|
x[z._id] = 0;
|
|
});
|
|
|
|
let coll = this.config.collections.findOne({_id: dbName + "." + collName});
|
|
let chunksQuery = (function() {
|
|
if (coll.timestamp != null) {
|
|
return {uuid: coll.uuid};
|
|
} else {
|
|
return {ns: dbName + "." + collName};
|
|
}
|
|
})();
|
|
|
|
this.config.chunks.find(chunksQuery).forEach(function(z) {
|
|
if (x[z.shard])
|
|
x[z.shard]++;
|
|
else
|
|
x[z.shard] = 1;
|
|
});
|
|
|
|
return x;
|
|
}
|
|
|
|
chunkDiff(collName, dbName) {
|
|
let c = this.chunkCounts(collName, dbName);
|
|
|
|
let min = Number.MAX_VALUE;
|
|
let max = 0;
|
|
for (let s in c) {
|
|
if (c[s] < min)
|
|
min = c[s];
|
|
if (c[s] > max)
|
|
max = c[s];
|
|
}
|
|
|
|
jsTest.log.info("ShardingTest input", {chunkCounts: c, min, max});
|
|
return max - min;
|
|
}
|
|
|
|
/**
|
|
* Waits up to the specified timeout (with a default of 60s) for the collection to be
|
|
* considered well balanced.
|
|
**/
|
|
awaitBalance(collName, dbName, timeToWait, interval) {
|
|
const coll = this.s.getCollection(dbName + "." + collName);
|
|
this.awaitCollectionBalance(coll, timeToWait, interval);
|
|
}
|
|
|
|
getShard(coll, query, includeEmpty) {
|
|
let shards = this.getShardsForQuery(coll, query, includeEmpty);
|
|
assert.eq(shards.length, 1);
|
|
return shards[0];
|
|
}
|
|
|
|
/**
|
|
* Returns the shards on which documents matching a particular query reside.
|
|
*/
|
|
getShardsForQuery(coll, query, includeEmpty) {
|
|
if (!coll.getDB) {
|
|
coll = this.s.getCollection(coll);
|
|
}
|
|
|
|
let explain = coll.find(query).explain("executionStats");
|
|
let shards = [];
|
|
|
|
let execStages = explain.executionStats.executionStages;
|
|
let plannerShards = explain.queryPlanner.winningPlan.shards;
|
|
|
|
if (execStages.shards) {
|
|
for (var i = 0; i < execStages.shards.length; i++) {
|
|
let hasResults = execStages.shards[i].executionStages.nReturned &&
|
|
execStages.shards[i].executionStages.nReturned > 0;
|
|
if (includeEmpty || hasResults) {
|
|
shards.push(plannerShards[i].connectionString);
|
|
}
|
|
}
|
|
}
|
|
|
|
for (var i = 0; i < shards.length; i++) {
|
|
for (let j = 0; j < this._connections.length; j++) {
|
|
if (connectionURLTheSame(this._connections[j], shards[i])) {
|
|
shards[i] = this._connections[j];
|
|
break;
|
|
}
|
|
}
|
|
}
|
|
|
|
return shards;
|
|
}
|
|
|
|
shardColl(collName, key, split, move, dbName, waitForDelete) {
|
|
split = split != false ? split || key : split;
|
|
move = split != false && move != false ? move || split : false;
|
|
|
|
if (collName.getDB)
|
|
dbName = "" + collName.getDB();
|
|
else
|
|
dbName = dbName || "test";
|
|
|
|
let c = dbName + "." + collName;
|
|
if (collName.getDB) {
|
|
c = "" + collName;
|
|
}
|
|
|
|
assert.commandWorked(this.s.adminCommand({enableSharding: dbName}));
|
|
|
|
let result = assert.commandWorked(this.s.adminCommand({shardcollection: c, key: key}));
|
|
|
|
if (split == false) {
|
|
return;
|
|
}
|
|
|
|
result = assert.commandWorked(this.s.adminCommand({split: c, middle: split}));
|
|
|
|
if (move == false) {
|
|
return;
|
|
}
|
|
|
|
for (let i = 0; i < 5; i++) {
|
|
let otherShard = this.getOther(this.getPrimaryShard(dbName)).name;
|
|
let cmd = {movechunk: c, find: move, to: otherShard};
|
|
|
|
if (waitForDelete != null) {
|
|
cmd._waitForDelete = waitForDelete;
|
|
}
|
|
|
|
const result = this.s.adminCommand(cmd);
|
|
if (result.ok)
|
|
break;
|
|
|
|
sleep(5 * 1000);
|
|
}
|
|
|
|
assert.commandWorked(result);
|
|
}
|
|
|
|
/**
|
|
* Wait for sharding to be initialized.
|
|
*/
|
|
waitForShardingInitialized(timeoutMs = 60 * 1000) {
|
|
const getShardVersion = (client, timeout) => {
|
|
assert.soon(
|
|
() => {
|
|
// The choice of namespace (local.fooCollection) does not affect the output.
|
|
let res = client.adminCommand({getShardVersion: "local.fooCollection"});
|
|
return res.ok == 1;
|
|
},
|
|
"timeout waiting for sharding to be initialized on mongod",
|
|
timeout,
|
|
0.1,
|
|
);
|
|
};
|
|
|
|
let start = new Date();
|
|
|
|
for (let i = 0; i < this._rs.length; ++i) {
|
|
let replSet = this._rs[i];
|
|
if (!replSet)
|
|
continue;
|
|
const nodes = replSet.test.nodes;
|
|
const keyFileUsed = replSet.test.keyFile;
|
|
|
|
for (let j = 0; j < nodes.length; ++j) {
|
|
const diff = new Date().getTime() - start.getTime();
|
|
var currNode = nodes[j];
|
|
// Skip arbiters
|
|
if (currNode.getDB("admin")._helloOrLegacyHello().arbiterOnly) {
|
|
continue;
|
|
}
|
|
|
|
const tlsOptions = ["preferTLS", "requireTLS"];
|
|
const sslOptions = ["preferSSL", "requireSSL"];
|
|
const TLSEnabled = currNode.fullOptions &&
|
|
(tlsOptions.includes(currNode.fullOptions.tlsMode) ||
|
|
sslOptions.includes(currNode.fullOptions.sslMode));
|
|
|
|
const x509AuthRequired = this.s.fullOptions && this.s.fullOptions.clusterAuthMode &&
|
|
this.s.fullOptions.clusterAuthMode === "x509";
|
|
|
|
if (keyFileUsed) {
|
|
authutil.asCluster(currNode, keyFileUsed, () => {
|
|
getShardVersion(currNode, timeoutMs - diff);
|
|
});
|
|
} else if (x509AuthRequired && TLSEnabled) {
|
|
const exitCode = _runMongoProgram(
|
|
...["mongo",
|
|
currNode.host,
|
|
"--tls",
|
|
"--tlsAllowInvalidHostnames",
|
|
"--tlsCertificateKeyFile",
|
|
currNode.fullOptions.tlsCertificateKeyFile
|
|
? currNode.fullOptions.tlsCertificateKeyFile
|
|
: currNode.fullOptions.sslPEMKeyFile,
|
|
"--tlsCAFile",
|
|
currNode.fullOptions.tlsCAFile ? currNode.fullOptions.tlsCAFile
|
|
: currNode.fullOptions.sslCAFile,
|
|
"--authenticationDatabase=$external",
|
|
"--authenticationMechanism=MONGODB-X509",
|
|
"--eval",
|
|
`(${getShardVersion.toString()})(db.getMongo(), ` +
|
|
(timeoutMs - diff).toString() + `)`,
|
|
],
|
|
);
|
|
assert.eq(0, exitCode, "parallel shell for x509 auth failed");
|
|
} else {
|
|
getShardVersion(currNode, timeoutMs - diff);
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
/**
|
|
* Kills the mongos with index n.
|
|
*
|
|
* @param {boolean} [extraOptions.waitPid=true] if true, we will wait for the process to
|
|
* terminate after stopping it.
|
|
*/
|
|
stopMongos(n, opts, {waitpid: waitpid = true} = {}) {
|
|
if (this._useBridge) {
|
|
MongoRunner.stopMongos(this._unbridgedMongos[n], undefined, opts, waitpid);
|
|
this["s" + n].stop();
|
|
} else {
|
|
let mongos = this["s" + n];
|
|
MongoRunner.stopMongos(mongos, undefined, opts, waitpid);
|
|
}
|
|
}
|
|
|
|
/**
|
|
* Kills the config server mongod with index n.
|
|
*/
|
|
stopConfigServer(n, opts) {
|
|
this.configRS.stop(n, undefined, opts);
|
|
}
|
|
|
|
/**
|
|
* Stops and restarts a mongos process.
|
|
*
|
|
* If 'opts' is not specified, starts the mongos with its previous parameters. If 'opts' is
|
|
* specified and 'opts.restart' is false or missing, starts mongos with the parameters specified
|
|
* in 'opts'. If opts is specified and 'opts.restart' is true, merges the previous options
|
|
* with the options specified in 'opts', with the options in 'opts' taking precedence.
|
|
*
|
|
* 'stopOpts' are the options passed to the mongos when it is stopping.
|
|
*
|
|
* Warning: Overwrites the old s (if n = 0) admin, config, and sn member variables.
|
|
*/
|
|
restartMongos(n, opts, stopOpts) {
|
|
let mongos;
|
|
|
|
if (this._useBridge) {
|
|
mongos = this._unbridgedMongos[n];
|
|
} else {
|
|
mongos = this["s" + n];
|
|
}
|
|
|
|
// Make a copy of the start options to prevent changing the original start options
|
|
let startOpts = Object.extend({}, opts || mongos, true);
|
|
startOpts.port = startOpts.port || mongos.port;
|
|
|
|
this.stopMongos(n, stopOpts);
|
|
|
|
if (this._useBridge) {
|
|
const hostName =
|
|
this._otherParams.host === undefined ? getHostName() : this._otherParams.host;
|
|
let bridgeOptions =
|
|
startOpts !== mongos ? startOpts.bridgeOptions : mongos.fullOptions.bridgeOptions;
|
|
bridgeOptions = Object.merge(this._otherParams.bridgeOptions, bridgeOptions || {});
|
|
bridgeOptions = Object.merge(bridgeOptions, {
|
|
hostName: this._otherParams.useHostname ? hostName : "localhost",
|
|
port: this._mongos[n].port,
|
|
// The mongos processes identify themselves to mongobridge as host:port, where the
|
|
// host is the actual hostname of the machine and not localhost.
|
|
dest: hostName + ":" + startOpts.port,
|
|
});
|
|
|
|
this._mongos[n] = new MongoBridge(bridgeOptions);
|
|
}
|
|
|
|
if (startOpts.restart) {
|
|
startOpts = Object.merge(mongos.fullOptions, startOpts);
|
|
}
|
|
|
|
let newConn = MongoRunner.runMongos(startOpts);
|
|
if (!newConn) {
|
|
throw new Error("Failed to restart mongos " + n);
|
|
}
|
|
|
|
if (this._useBridge) {
|
|
this._mongos[n].connectToBridge();
|
|
this._unbridgedMongos[n] = newConn;
|
|
} else {
|
|
this._mongos[n] = newConn;
|
|
}
|
|
|
|
this["s" + n] = this._mongos[n];
|
|
if (n == 0) {
|
|
this.s = this._mongos[n];
|
|
this.admin = this._mongos[n].getDB("admin");
|
|
this.config = this._mongos[n].getDB("config");
|
|
}
|
|
}
|
|
|
|
/**
|
|
* Shuts down and restarts replica set for a given shard and
|
|
* updates shard connection information.
|
|
*
|
|
* @param {string} prevShardName
|
|
* @param {object} replSet The replica set object. Defined in replsettest.js
|
|
*/
|
|
shutdownAndRestartPrimaryOnShard(shardName, replSet) {
|
|
const n = this._shardReplSetToIndex[replSet.name];
|
|
const originalPrimaryConn = replSet.getPrimary();
|
|
|
|
const SIGTERM = 15;
|
|
replSet.restart(originalPrimaryConn, {}, SIGTERM);
|
|
replSet.awaitNodesAgreeOnPrimary();
|
|
replSet.awaitSecondaryNodes();
|
|
|
|
this._connections[n] = new Mongo(replSet.getURL());
|
|
this._connections[n].shardName = shardName;
|
|
this._connections[n].rs = replSet;
|
|
|
|
this["shard" + n] = this._connections[n];
|
|
}
|
|
|
|
/**
|
|
* Kills and restarts replica set for a given shard and
|
|
* updates shard connection information.
|
|
*
|
|
* @param {string} prevShardName
|
|
* @param {object} replSet The replica set object. Defined in replsettest.js
|
|
*/
|
|
killAndRestartPrimaryOnShard(shardName, replSet) {
|
|
const n = this._shardReplSetToIndex[replSet.name];
|
|
const originalPrimaryConn = replSet.getPrimary();
|
|
|
|
const SIGKILL = 9;
|
|
const opts = {allowedExitCode: MongoRunner.EXIT_SIGKILL};
|
|
replSet.restart(originalPrimaryConn, opts, SIGKILL);
|
|
replSet.awaitNodesAgreeOnPrimary();
|
|
|
|
this._connections[n] = new Mongo(replSet.getURL());
|
|
this._connections[n].shardName = shardName;
|
|
this._connections[n].rs = replSet;
|
|
|
|
this["shard" + n] = this._connections[n];
|
|
}
|
|
|
|
/**
|
|
* Restarts each node in a particular shard replica set using the shard's original startup
|
|
* options by default.
|
|
*
|
|
* Option { startClean : true } forces clearing the data directory.
|
|
* Option { auth : Object } object that contains the auth details for admin credentials.
|
|
* Should contain the fields 'user' and 'pwd'
|
|
*
|
|
*
|
|
* @param {int} shard server number (0, 1, 2, ...) to be restarted
|
|
*/
|
|
restartShardRS(n, options, signal, wait) {
|
|
const prevShardName = this._connections[n].shardName;
|
|
for (let i = 0; i < this["rs" + n].nodeList().length; i++) {
|
|
this["rs" + n].restart(i);
|
|
}
|
|
|
|
this["rs" + n].awaitSecondaryNodes();
|
|
this._connections[n] = new Mongo(this["rs" + n].getURL(), undefined, {gRPC: false});
|
|
this._connections[n].shardName = prevShardName;
|
|
this._connections[n].rs = this["rs" + n];
|
|
this["shard" + n] = this._connections[n];
|
|
}
|
|
|
|
/**
|
|
* Stops and restarts a config server mongod process.
|
|
*
|
|
* If opts is specified, the new mongod is started using those options. Otherwise, it is
|
|
* started
|
|
* with its previous parameters.
|
|
*
|
|
* Warning: Overwrites the old cn/confign member variables.
|
|
*/
|
|
restartConfigServer(n, options, signal, wait) {
|
|
this.configRS.restart(n, options, signal, wait);
|
|
this["config" + n] = this.configRS.nodes[n];
|
|
this["c" + n] = this.configRS.nodes[n];
|
|
}
|
|
|
|
/**
|
|
* Returns a document {isMixedVersion: <bool>, oldestBinVersion: <string>}.
|
|
* The 'isMixedVersion' field is true if any settings to ShardingTest or jsTestOptions indicate
|
|
* this is a multiversion cluster.
|
|
* The 'oldestBinVersion' field is set to the oldest binary version used in this cluster, one of
|
|
* 'latest', 'last-continuous' and 'last-lts'.
|
|
* Note: Mixed version cluster with binary versions older than 'last-lts' is not supported. If
|
|
* such binary exists in the cluster, this function assumes this is not a mixed version cluster
|
|
* and returns 'oldestBinVersion' as 'latest'.
|
|
*
|
|
* Checks for bin versions via:
|
|
* jsTestOptions().mongosBinVersion,
|
|
* otherParams.configOptions.binVersion,
|
|
* otherParams.mongosOptions.binVersion
|
|
*/
|
|
getClusterVersionInfo() {
|
|
let hasLastLTS = clusterHasBinVersion(this, "last-lts");
|
|
let hasLastContinuous = clusterHasBinVersion(this, "last-continuous");
|
|
if (lastLTSFCV !== lastContinuousFCV && hasLastLTS && hasLastContinuous) {
|
|
throw new Error("Can only specify one of 'last-lts' and 'last-continuous' " +
|
|
"in binVersion, not both.");
|
|
}
|
|
if (hasLastLTS) {
|
|
return {isMixedVersion: true, oldestBinVersion: "last-lts"};
|
|
} else if (hasLastContinuous) {
|
|
return {isMixedVersion: true, oldestBinVersion: "last-continuous"};
|
|
} else {
|
|
return {isMixedVersion: false, oldestBinVersion: "latest"};
|
|
}
|
|
}
|
|
|
|
/**
|
|
* Runs a find on the namespace to force a refresh of the node's catalog cache.
|
|
*/
|
|
refreshCatalogCacheForNs(node, ns) {
|
|
node.getCollection(ns).findOne();
|
|
}
|
|
|
|
/**
|
|
* Waits for all operations to fully replicate on all shards.
|
|
*/
|
|
awaitReplicationOnShards() {
|
|
this._rs.forEach((replSet) => replSet.test.awaitReplication());
|
|
}
|
|
|
|
/**
|
|
* Query the oplog from a given node.
|
|
*/
|
|
findOplog(conn, query, limit) {
|
|
return conn.getDB("local")
|
|
.getCollection(kOplogName)
|
|
.find(query)
|
|
.sort({$natural: -1})
|
|
.limit(limit);
|
|
}
|
|
|
|
/**
|
|
* Returns all nodes in the cluster including shards, config servers and mongoses.
|
|
*/
|
|
getAllNodes() {
|
|
let nodes = [];
|
|
nodes.concat([this._configDB, this._connections, this._mongos]);
|
|
return [...new Set(nodes)];
|
|
}
|
|
|
|
/**
|
|
* Returns all shards in the cluster.
|
|
*/
|
|
getAllShards() {
|
|
return this._rs.map((obj) => obj.test);
|
|
}
|
|
|
|
/**
|
|
* TODO (SERVER-112863) Remove this once the maintenance port is supported on lastLTS.
|
|
*
|
|
* Returns true if:
|
|
* 1. A maintenance port is specified in any of the replica set configurations in the cluster
|
|
* 2. LastLTS FCV is less than the FCV on which featureFlagReplicationUsageOfMaintenancePort is
|
|
* enabled.
|
|
* Note that we compare the FCVs directly rather than checking the feature flag on the replica
|
|
* sets because the FCV isn't known until replSetInitiate.
|
|
*/
|
|
_shouldSkipMaintenancePortDuringInitialization() {
|
|
const mpFeatureFlag = "ReplicationUsageOfMaintenancePort";
|
|
let skipInitiatingWithMaintenancePort = false;
|
|
const containsMaintenancePort = this._rs.some(rs => {
|
|
return bsonUnorderedFieldsCompare(rs.test.getReplSetConfig(true), rs.test.getReplSetConfig(false)) != 0;
|
|
});
|
|
if (containsMaintenancePort) {
|
|
let maintenancePortEnabledFCV;
|
|
let node = this.configRS.nodes[0];
|
|
if (this.configRS.keyFile) {
|
|
authutil.asCluster(node, this.configRS.keyFile, function() {
|
|
maintenancePortEnabledFCV = FeatureFlagUtil.getFeatureFlagDoc(node, mpFeatureFlag).version;
|
|
});
|
|
} else {
|
|
maintenancePortEnabledFCV = FeatureFlagUtil.getFeatureFlagDoc(node, mpFeatureFlag).version;
|
|
}
|
|
skipInitiatingWithMaintenancePort = MongoRunner.compareBinVersions(lastLTSFCV, maintenancePortEnabledFCV) == -1;
|
|
}
|
|
return skipInitiatingWithMaintenancePort;
|
|
}
|
|
|
|
/**
|
|
* @typedef {Object} ShardingTestOtherParams
|
|
* @property {Object} [rs] Same `rs` parameter to ShardingTest constructor
|
|
* @property {number} [chunkSize] Same as chunkSize parameter to ShardingTest constructor
|
|
* @property {string} [keyFile] The location of the keyFile
|
|
* Can be used to specify options that are common all shards.
|
|
* @property {Object} [configOptions] Same as the `config` parameter to ShardingTest
|
|
* constructor. Can be used to specify options that are common all config servers.
|
|
* @property {Object} [mongosOptions] Same as the `mongos` parameter to ShardingTest
|
|
* constructor. Can be used to specify options that are common all mongos.
|
|
* @property {boolean} [enableBalancer] If true, enable the balancer enableBalancer setting
|
|
* @property {boolean} [manualAddShard] Shards will not be added if true.
|
|
* @property {number} [migrationLockAcquisitionMaxWaitMS] Number of milliseconds to acquire the
|
|
* migration lock.
|
|
* @property {boolean} [useBridge=false] If true, then a mongobridge process is started for each
|
|
* node in the sharded cluster.
|
|
* @property {boolean} [causallyConsistent=false] Specifies whether the connections to the
|
|
* replica set nodes should be created with the 'causal consistency' flag enabled, which means
|
|
* they will gossip the cluster time and add readConcern afterClusterTime where applicable.
|
|
* @property {Object} [bridgeOptions={}] Options to apply to all mongobridge processes.
|
|
* @property {Object} [rsOptions] Same as the `rs` parameter to ShardingTest constructor. Can be
|
|
* used to specify options that are common all replica members.
|
|
* @property {boolean} [useMaintenancePorts=false] If true, then a maintenance port will be
|
|
* specified for each node in the cluster.
|
|
*
|
|
* // replica Set only:
|
|
* @property {boolean} [useHostname] if true, use hostname of machine, otherwise use localhost
|
|
* @property {number} [numReplicas]
|
|
* @property {boolean} [configShard] Add the config server as a shard if true.
|
|
* @property {boolean} [initiateWithDefaultElectionTimeout] Set the electionTimeoutMillis to its
|
|
* default value when initiating all replica sets for both config and non-config shards. If not
|
|
* set, 'ReplSetTest.initiate' defaults to a very high election timeout value (24 hours).
|
|
* @property {boolean} [allNodesAuthorizedToRunRSGetStatus] Informs `ReplSetTest.initiate`
|
|
* whether all nodes in the replica set are authorized to run `replSetGetStatus`.
|
|
* @property {boolean} [useAutoBootstrapProcedure] Use the auto-bootstrapping procedure on every
|
|
* shard and config server if set to true.
|
|
* @property {boolean} [alwaysUseTestNameForShardName] Always use the testname as the name of
|
|
* the shard.
|
|
*/
|
|
|
|
/**
|
|
* @typedef {Object} ReplSetConfig
|
|
* @property {number} [nodes=3] Number of replica members.
|
|
* @property {number} [protocolVersion] Protocol version of replset used by the replset
|
|
* initiation. For other options, @see ReplSetTest#initiate
|
|
*/
|
|
|
|
/**
|
|
* Starts up a sharded cluster with the given specifications. The cluster will be fully
|
|
* operational after the execution of this constructor function.
|
|
*
|
|
* In addition to its own methods, ShardingTest inherits all the functions from the 'sh' utility
|
|
* with the db set as the first mongos instance in the test (i.e. s0).
|
|
*
|
|
* @param {Object} params Contains the key-value pairs for the cluster configuration.
|
|
* @param {string} [params.name] Name for this test
|
|
* @param {boolean} [params.shouldFailInit] If set, assert that this will fail initialization
|
|
* @param {number} [params.verbose] The verbosity for the mongos
|
|
* @param {number} [params.chunkSize] The chunk size to use as configuration for the cluster
|
|
* @param {number|Object|Array.<Object>} [params.mongos] Number of mongos or mongos
|
|
* configuration object(s)(*). see MongoRunner.runMongos
|
|
* @param {ReplSetConfig|Array.<ReplSetConfig>} [params.rs] Replica set configuration object.
|
|
* @param {number|Object|Array.<Object>} [params.shards] Number of shards or shard configuration
|
|
* object(s)(*). see MongoRunner.runMongod
|
|
* @param {number|Object|Array.<Object>} [params.config] Number of config server or config
|
|
* server configuration object(s)(*). see MongoRunner.runMongod
|
|
* (*) There are two ways For multiple configuration objects.
|
|
* (1) Using the object format. Example:
|
|
* { d0: { verbose: 5 }, d1: { auth: '' }, rs2: { oplogsize: 10 }}
|
|
* In this format, d0 = shard0, s = mongos & c = config servers
|
|
*
|
|
* (2) Using the array format. Example:
|
|
* [{ verbose: 5 }, { auth: '' }]
|
|
*
|
|
* Note: you can only have single server shards for array format.
|
|
* Note: A special "bridgeOptions" property can be specified in both the object and array
|
|
* formats to configure the options for the mongobridge corresponding to that node.
|
|
* These options are merged with the params.bridgeOptions options, where the node-specific
|
|
* options take precedence.
|
|
* @param {ShardingTestOtherParams} [params.other] Other parameters
|
|
*
|
|
* Member variables:
|
|
* s {Mongo} - connection to the first mongos
|
|
* s0, s1, ... {Mongo} - connection to different mongos
|
|
* rs0, rs1, ... {ReplSetTest} - test objects to replica sets
|
|
* shard0, shard1, ... {Mongo} - connection to shards
|
|
* config0, config1, ... {Mongo} - connection to config servers
|
|
* c0, c1, ... {Mongo} - same as config0, config1, ...
|
|
* configRS - the config ReplSetTest object
|
|
*/
|
|
constructor(params) {
|
|
if (!(this instanceof ShardingTest)) {
|
|
return new ShardingTest(params);
|
|
}
|
|
|
|
if (this.constructor === ReplSetTest && this.constructor[kOverrideConstructor]) {
|
|
return new this.constructor[kOverrideConstructor]();
|
|
}
|
|
|
|
// Ensure we don't mutate the passed-in parameters.
|
|
params = Object.extend({}, params, true);
|
|
|
|
// Used for counting the test duration
|
|
this._startTime = new Date();
|
|
|
|
assert(isObject(params), "ShardingTest configuration must be a JSON object");
|
|
|
|
var testName = params.name || jsTest.name();
|
|
let otherParams = Object.deepMerge(params, params.other || {});
|
|
let numShards = otherParams.hasOwnProperty("shards") ? otherParams.shards : 2;
|
|
let mongosVerboseLevel = otherParams.hasOwnProperty("verbose") ? otherParams.verbose : 1;
|
|
let numMongos = otherParams.hasOwnProperty("mongos") ? otherParams.mongos : 1;
|
|
const usedDefaultNumConfigs =
|
|
!otherParams.hasOwnProperty("config") || otherParams.config === undefined;
|
|
let numConfigs = otherParams.hasOwnProperty("config") ? otherParams.config : 3;
|
|
|
|
let useAutoBootstrapProcedure = otherParams.hasOwnProperty("useAutoBootstrapProcedure")
|
|
? otherParams.useAutoBootstrapProcedure
|
|
: false;
|
|
useAutoBootstrapProcedure =
|
|
useAutoBootstrapProcedure || jsTestOptions().useAutoBootstrapProcedure;
|
|
let alwaysUseTestNameForShardName =
|
|
otherParams.hasOwnProperty("alwaysUseTestNameForShardName")
|
|
? otherParams.alwaysUseTestNameForShardName
|
|
: false;
|
|
|
|
let isConfigShardMode =
|
|
otherParams.hasOwnProperty("configShard") ? otherParams.configShard : false;
|
|
isConfigShardMode =
|
|
isConfigShardMode || jsTestOptions().configShard || useAutoBootstrapProcedure;
|
|
Object.defineProperty(this, "isConfigShardMode", {
|
|
value: isConfigShardMode,
|
|
writable: false,
|
|
enumerable: true,
|
|
configurable: false,
|
|
});
|
|
|
|
if ("shardAsReplicaSet" in otherParams) {
|
|
throw new Error("Use of deprecated option 'shardAsReplicaSet'");
|
|
}
|
|
|
|
// Default enableBalancer to false.
|
|
otherParams.enableBalancer =
|
|
"enableBalancer" in otherParams && otherParams.enableBalancer === true;
|
|
|
|
// Allow specifying mixed-type options like this:
|
|
// { mongos : [ { bind_ip : "localhost" } ],
|
|
// shards : { rs : true, d : true } }
|
|
if (Array.isArray(numShards)) {
|
|
for (var i = 0; i < numShards.length; i++) {
|
|
otherParams["d" + i] = numShards[i];
|
|
}
|
|
|
|
numShards = numShards.length;
|
|
} else if (isObject(numShards)) {
|
|
let tempCount = 0;
|
|
for (var i in numShards) {
|
|
otherParams[i] = numShards[i];
|
|
|
|
tempCount++;
|
|
}
|
|
|
|
numShards = tempCount;
|
|
}
|
|
defineReadOnlyProperty(this, "_numShards", numShards);
|
|
|
|
if (isConfigShardMode) {
|
|
assert(numShards > 0, "Config shard mode requires at least one shard");
|
|
}
|
|
|
|
if (Array.isArray(numMongos)) {
|
|
for (var i = 0; i < numMongos.length; i++) {
|
|
otherParams["s" + i] = numMongos[i];
|
|
}
|
|
|
|
numMongos = numMongos.length;
|
|
} else if (isObject(numMongos)) {
|
|
let tempCount = 0;
|
|
for (var i in numMongos) {
|
|
otherParams[i] = numMongos[i];
|
|
tempCount++;
|
|
}
|
|
|
|
numMongos = tempCount;
|
|
}
|
|
defineReadOnlyProperty(this, "_numMongos", numMongos);
|
|
|
|
if (Array.isArray(numConfigs)) {
|
|
assert(!usedDefaultNumConfigs);
|
|
for (let i = 0; i < numConfigs.length; i++) {
|
|
otherParams["c" + i] = numConfigs[i];
|
|
}
|
|
|
|
numConfigs = numConfigs.length;
|
|
} else if (isObject(numConfigs)) {
|
|
assert(!usedDefaultNumConfigs);
|
|
let tempCount = 0;
|
|
for (var i in numConfigs) {
|
|
otherParams[i] = numConfigs[i];
|
|
tempCount++;
|
|
}
|
|
|
|
numConfigs = tempCount;
|
|
}
|
|
defineReadOnlyProperty(this, "_numConfigs", numConfigs);
|
|
|
|
otherParams.useHostname =
|
|
otherParams.useHostname == undefined ? true : otherParams.useHostname;
|
|
otherParams.useBridge = otherParams.useBridge || false;
|
|
otherParams.bridgeOptions = otherParams.bridgeOptions || {};
|
|
otherParams.causallyConsistent = otherParams.causallyConsistent || false;
|
|
otherParams.useMaintenancePorts = otherParams.useMaintenancePorts ?? false;
|
|
|
|
if (jsTestOptions().networkMessageCompressors) {
|
|
otherParams.bridgeOptions["networkMessageCompressors"] =
|
|
jsTestOptions().networkMessageCompressors;
|
|
}
|
|
|
|
this.keyFile = otherParams.keyFile;
|
|
const hostName = otherParams.host === undefined ? getHostName() : otherParams.host;
|
|
|
|
this._testName = testName;
|
|
this._otherParams = otherParams;
|
|
|
|
let pathOpts = {testName: testName};
|
|
|
|
this._connections = [];
|
|
this._rs = [];
|
|
this._rsObjects = [];
|
|
this._shardReplSetToIndex = {};
|
|
|
|
this._useBridge = otherParams.useBridge;
|
|
if (this._useBridge) {
|
|
assert(
|
|
!jsTestOptions().tlsMode,
|
|
"useBridge cannot be true when using TLS. Add the requires_mongobridge tag to the test to ensure it will be skipped on variants that use TLS.",
|
|
);
|
|
}
|
|
this._useMaintenancePorts = otherParams.useMaintenancePorts;
|
|
|
|
this._unbridgedMongos = [];
|
|
let _allocatePortForMongos;
|
|
let _allocatePortForBridgeForMongos;
|
|
|
|
if (this._useBridge) {
|
|
this._unbridgedMongos = [];
|
|
|
|
const _makeAllocatePortFn = (preallocatedPorts, errorMessage) => {
|
|
let idxNextNodePort = 0;
|
|
|
|
return function() {
|
|
if (idxNextNodePort >= preallocatedPorts.length) {
|
|
throw new Error(errorMessage(preallocatedPorts.length));
|
|
}
|
|
|
|
const nextPort = preallocatedPorts[idxNextNodePort];
|
|
++idxNextNodePort;
|
|
return nextPort;
|
|
};
|
|
};
|
|
|
|
const errorMessage = (length) =>
|
|
"Cannot use more than " + length + " mongos processes when useBridge=true";
|
|
_allocatePortForBridgeForMongos = _makeAllocatePortFn(
|
|
allocatePorts(MongoBridge.kBridgeOffset),
|
|
errorMessage,
|
|
);
|
|
_allocatePortForMongos =
|
|
_makeAllocatePortFn(allocatePorts(MongoBridge.kBridgeOffset), errorMessage);
|
|
} else {
|
|
_allocatePortForBridgeForMongos = function() {
|
|
throw new Error("Using mongobridge isn't enabled for this sharded cluster");
|
|
};
|
|
_allocatePortForMongos = allocatePort;
|
|
}
|
|
|
|
otherParams.migrationLockAcquisitionMaxWaitMS =
|
|
otherParams.migrationLockAcquisitionMaxWaitMS || 30000;
|
|
|
|
let randomSeedAlreadySet = false;
|
|
|
|
if (jsTest.options().useRandomBinVersionsWithinReplicaSet) {
|
|
// We avoid setting the random seed unequivocally to avoid unexpected behavior in tests
|
|
// that already make use of Random.setRandomSeed(). This conditional can be removed if
|
|
// it becomes the standard to always be generating the seed through ShardingTest.
|
|
Random.setRandomFixtureSeed();
|
|
randomSeedAlreadySet = true;
|
|
}
|
|
|
|
jsTest.options().setParameters = jsTest.options().setParameters || {};
|
|
let setDefaultTransactionLockTimeout = false;
|
|
if (jsTest.options().setParameters.maxTransactionLockRequestTimeoutMillis === undefined) {
|
|
// Set a higher maxTransactionLockRequestTimeoutMillis. Tests written with ShardingTest
|
|
// are generally single threaded and often don't expect lock timeouts, so a higher
|
|
// timeout avoids spurious failures on slow machines.
|
|
//
|
|
// TODO SERVER-98408: Ideally this would be passed as a default setParameter to
|
|
// ReplSetTest, but the rules for passing default options to ReplSetTest via
|
|
// ShardingTest are finnicky and tests rely on the current behaviors. Once this is
|
|
// refactored, we should be able to avoid using TestData.
|
|
jsTest.options().setParameters.maxTransactionLockRequestTimeoutMillis = 5 * 60 * 1000;
|
|
setDefaultTransactionLockTimeout = true;
|
|
}
|
|
|
|
try {
|
|
const clusterVersionInfo = this.getClusterVersionInfo();
|
|
|
|
let startTime = new Date(); // Measure the execution time of startup and initiate.
|
|
if (!isConfigShardMode) {
|
|
//
|
|
// Start up the config server replica set.
|
|
//
|
|
|
|
let rstOptions = {
|
|
useHostName: otherParams.useHostname,
|
|
host: hostName,
|
|
useBridge: otherParams.useBridge,
|
|
bridgeOptions: otherParams.bridgeOptions,
|
|
useMaintenancePorts: otherParams.useMaintenancePorts,
|
|
keyFile: this.keyFile,
|
|
waitForKeys: false,
|
|
name: testName + "-configRS",
|
|
seedRandomNumberGenerator: !randomSeedAlreadySet,
|
|
isConfigServer: true,
|
|
};
|
|
|
|
// always use wiredTiger as the storage engine for CSRS
|
|
let startOptions = {
|
|
pathOpts: pathOpts,
|
|
// Ensure that journaling is always enabled for config servers.
|
|
configsvr: "",
|
|
storageEngine: "wiredTiger",
|
|
};
|
|
|
|
if (otherParams.configOptions && otherParams.configOptions.binVersion) {
|
|
otherParams.configOptions.binVersion = MongoRunner.versionIterator(
|
|
otherParams.configOptions.binVersion,
|
|
);
|
|
}
|
|
|
|
startOptions = Object.merge(startOptions, otherParams.configOptions);
|
|
|
|
const clusterVersionInfo = this.getClusterVersionInfo();
|
|
if (jsTestOptions().otelTraceDirectory && !clusterVersionInfo.isMixedVersion &&
|
|
MongoRunner.compareBinVersions(MongoRunner.getBinVersionFor(startOptions.binVersion ?? "latest"), MongoRunner.getBinVersionFor("8.3.0")) >= 0) {
|
|
startOptions.setParameter = startOptions.setParameter ?? {};
|
|
startOptions.setParameter.opentelemetryTraceDirectory = jsTestOptions().otelTraceDirectory;
|
|
}
|
|
rstOptions = Object.merge(rstOptions, otherParams.configReplSetTestOptions);
|
|
|
|
let nodeOptions = [];
|
|
for (var i = 0; i < numConfigs; ++i) {
|
|
nodeOptions.push(otherParams["c" + i] || {});
|
|
}
|
|
|
|
rstOptions.nodes = nodeOptions;
|
|
|
|
// Start the config server's replica set without waiting for it to complete. This
|
|
// allows it to proceed in parallel with the startup of each shard.
|
|
this.configRS = new ReplSetTest(rstOptions);
|
|
this.configRS.startSetAsync(startOptions);
|
|
}
|
|
|
|
//
|
|
// Start each shard replica set.
|
|
//
|
|
for (var i = 0; i < numShards; i++) {
|
|
let setName = testName + "-rs" + i;
|
|
|
|
let rsDefaults = {
|
|
useHostname: otherParams.useHostname,
|
|
oplogSize: 16,
|
|
pathOpts: Object.merge(pathOpts, {shard: i}),
|
|
};
|
|
|
|
let setIsConfigSvr = false;
|
|
|
|
if (isConfigShardMode && i == 0) {
|
|
otherParams.configOptions = Object.merge(otherParams.configOptions, {
|
|
configsvr: "",
|
|
storageEngine: "wiredTiger",
|
|
});
|
|
|
|
rsDefaults = Object.merge(rsDefaults, otherParams.configOptions);
|
|
setIsConfigSvr = true;
|
|
} else {
|
|
rsDefaults.shardsvr = "";
|
|
}
|
|
|
|
if (otherParams.rs || otherParams["rs" + i] || otherParams.rsOptions) {
|
|
if (otherParams.rs) {
|
|
rsDefaults = Object.merge(rsDefaults, otherParams.rs);
|
|
}
|
|
if (otherParams["rs" + i]) {
|
|
rsDefaults = Object.merge(rsDefaults, otherParams["rs" + i]);
|
|
}
|
|
if (otherParams.rsOptions) {
|
|
rsDefaults = Object.merge(rsDefaults, otherParams.rsOptions);
|
|
}
|
|
|
|
rsDefaults.nodes = rsDefaults.nodes || otherParams.numReplicas;
|
|
} else {
|
|
rsDefaults = Object.merge(rsDefaults, otherParams["d" + i]);
|
|
}
|
|
|
|
// TODO SERVER-98408: Passing setParameter via rsDefaults will always override any
|
|
// setParameters passed via replSetTestOpts. Instead the options should be merged.
|
|
rsDefaults.setParameter = rsDefaults.setParameter || {};
|
|
|
|
if (typeof rsDefaults.setParameter === "string") {
|
|
let eqIdx = rsDefaults.setParameter.indexOf("=");
|
|
if (eqIdx != -1) {
|
|
let param = rsDefaults.setParameter.substring(0, eqIdx);
|
|
let value = rsDefaults.setParameter.substring(eqIdx + 1);
|
|
rsDefaults.setParameter = {};
|
|
rsDefaults.setParameter[param] = value;
|
|
}
|
|
}
|
|
|
|
rsDefaults.setParameter.migrationLockAcquisitionMaxWaitMS =
|
|
otherParams.migrationLockAcquisitionMaxWaitMS;
|
|
|
|
const clusterVersionInfo = this.getClusterVersionInfo();
|
|
if (jsTestOptions().otelTraceDirectory && !clusterVersionInfo.isMixedVersion &&
|
|
MongoRunner.compareBinVersions(MongoRunner.getBinVersionFor(rsDefaults.binVersion || "latest"), MongoRunner.getBinVersionFor("8.3.0")) >= 0) {
|
|
rsDefaults.setParameter.opentelemetryTraceDirectory = jsTestOptions().otelTraceDirectory;
|
|
}
|
|
|
|
let rsSettings = rsDefaults.settings;
|
|
delete rsDefaults.settings;
|
|
|
|
// The number of nodes in the rs field will take priority.
|
|
let numReplicas = 1; /* default */
|
|
if (rsDefaults.nodes) {
|
|
numReplicas = rsDefaults.nodes;
|
|
} else if (otherParams.rs || otherParams["rs" + i]) {
|
|
numReplicas = 3;
|
|
}
|
|
|
|
// Unless explicitly given a number of config servers, a config shard uses the
|
|
// shard's number of nodes to increase odds of compatibility with test assertions.
|
|
if (isConfigShardMode && i == 0 && !usedDefaultNumConfigs) {
|
|
numReplicas = numConfigs;
|
|
}
|
|
|
|
delete rsDefaults.nodes;
|
|
|
|
let protocolVersion = rsDefaults.protocolVersion;
|
|
delete rsDefaults.protocolVersion;
|
|
|
|
let replSetTestOpts = {
|
|
name: setName,
|
|
nodes: numReplicas,
|
|
host: hostName,
|
|
useHostName: otherParams.useHostname,
|
|
useBridge: otherParams.useBridge,
|
|
bridgeOptions: otherParams.bridgeOptions,
|
|
useMaintenancePorts: otherParams.useMaintenancePorts,
|
|
keyFile: this.keyFile,
|
|
protocolVersion: protocolVersion,
|
|
waitForKeys: false,
|
|
settings: rsSettings,
|
|
seedRandomNumberGenerator: !randomSeedAlreadySet,
|
|
isConfigServer: setIsConfigSvr,
|
|
useAutoBootstrapProcedure: useAutoBootstrapProcedure,
|
|
};
|
|
|
|
const rs = new ReplSetTest(replSetTestOpts);
|
|
|
|
jsTest.log.info("ShardingTest starting replica set for shard: " + setName);
|
|
|
|
// Start up the replica set but don't wait for it to complete. This allows the
|
|
// startup of each shard to proceed in parallel.
|
|
this._rs[i] = {
|
|
setName: setName,
|
|
test: rs,
|
|
nodes: rs.startSetAsync(rsDefaults),
|
|
url: rs.getURL(),
|
|
};
|
|
if (i == 0 && isConfigShardMode) {
|
|
this.configRS = this._rs[0].test;
|
|
}
|
|
}
|
|
|
|
//
|
|
// Wait for each shard replica set to finish starting up.
|
|
//
|
|
for (let i = 0; i < numShards; i++) {
|
|
jsTest.log.info("Waiting for shard " + this._rs[i].setName +
|
|
" to finish starting up.");
|
|
if (isConfigShardMode && i == 0) {
|
|
continue;
|
|
}
|
|
this._rs[i].test.startSetAwait();
|
|
}
|
|
|
|
//
|
|
// Wait for the config server to finish starting up.
|
|
//
|
|
jsTest.log.info("Waiting for the config server to finish starting up.");
|
|
this.configRS.startSetAwait();
|
|
let config = this.configRS.getReplSetConfig();
|
|
config.configsvr = true;
|
|
config.settings = config.settings || {};
|
|
|
|
jsTest.log.info(
|
|
"ShardingTest startup for all nodes took " + (new Date() - startTime) + "ms with " +
|
|
this.configRS.nodeList().length + " config server nodes and " +
|
|
totalNumShardNodes(this) + " total shard nodes.",
|
|
);
|
|
|
|
if (setDefaultTransactionLockTimeout) {
|
|
// Clean up TestData.setParameters to avoid affecting other tests.
|
|
delete jsTest.options().setParameters.maxTransactionLockRequestTimeoutMillis;
|
|
}
|
|
|
|
let skipInitiatingWithMaintenancePort = this._shouldSkipMaintenancePortDuringInitialization();
|
|
|
|
//
|
|
// Initiate each shard replica set and wait for replication. Also initiate the config
|
|
// replica set. Whenever possible, in parallel.
|
|
//
|
|
const shardsRS = this._rs.map((obj) => obj.test);
|
|
let replSetToIntiateArr = [];
|
|
if (isConfigShardMode) {
|
|
replSetToIntiateArr = [...shardsRS];
|
|
} else {
|
|
replSetToIntiateArr = [...shardsRS, this.configRS];
|
|
}
|
|
|
|
const replicaSetsToInitiate = replSetToIntiateArr.map((rst) => {
|
|
const rstConfig = rst.getReplSetConfig(skipInitiatingWithMaintenancePort);
|
|
|
|
// The mongo shell cannot authenticate as the internal __system user in tests that
|
|
// use x509 for cluster authentication. Choosing the default value for
|
|
// wcMajorityJournalDefault in ReplSetTest cannot be done automatically without the
|
|
// shell performing such authentication, so allow tests to pass the value in.
|
|
if (otherParams.hasOwnProperty("writeConcernMajorityJournalDefault")) {
|
|
rstConfig.writeConcernMajorityJournalDefault =
|
|
otherParams.writeConcernMajorityJournalDefault;
|
|
}
|
|
|
|
if (rst === this.configRS) {
|
|
rstConfig.configsvr = true;
|
|
rstConfig.writeConcernMajorityJournalDefault = true;
|
|
}
|
|
|
|
let rstInitiateArgs = {
|
|
allNodesAuthorizedToRunRSGetStatus: true,
|
|
initiateWithDefaultElectionTimeout: false,
|
|
};
|
|
|
|
if (otherParams.hasOwnProperty("allNodesAuthorizedToRunRSGetStatus") &&
|
|
otherParams.allNodesAuthorizedToRunRSGetStatus == false) {
|
|
rstInitiateArgs.allNodesAuthorizedToRunRSGetStatus = false;
|
|
}
|
|
|
|
if (otherParams.hasOwnProperty("initiateWithDefaultElectionTimeout") &&
|
|
otherParams.initiateWithDefaultElectionTimeout == true) {
|
|
rstInitiateArgs.initiateWithDefaultElectionTimeout = true;
|
|
}
|
|
|
|
const makeNodeHost = (node) => {
|
|
const [_, port] = node.name.split(":");
|
|
return `127.0.0.1:${port}`;
|
|
};
|
|
|
|
return {
|
|
rst,
|
|
// Arguments for creating instances of each replica set within parallel threads.
|
|
rstArgs: {
|
|
name: rst.name,
|
|
nodeHosts: rst.nodes.map((node) => makeNodeHost(node)),
|
|
nodeOptions: rst.nodeOptions,
|
|
// Mixed-mode SSL tests may specify a keyFile per replica set rather than
|
|
// one for the whole cluster.
|
|
keyFile: rst.keyFile ? rst.keyFile : this.keyFile,
|
|
host: otherParams.useHostname ? hostName : "localhost",
|
|
waitForKeys: false,
|
|
useAutoBootstrapProcedure: useAutoBootstrapProcedure,
|
|
},
|
|
// Replica set configuration for initiating the replica set.
|
|
rstConfig,
|
|
|
|
// Args to be sent to rst.initiate
|
|
rstInitiateArgs,
|
|
};
|
|
});
|
|
|
|
const initiateReplicaSet = (rst, rstConfig, rstInitiateArgs) => {
|
|
rst.initiate(rstConfig, null, rstInitiateArgs);
|
|
|
|
// Do replication.
|
|
rst.awaitNodesAgreeOnPrimary();
|
|
if (rst.keyFile) {
|
|
authutil.asCluster(rst.nodes, rst.keyFile, function() {
|
|
rst.awaitReplication();
|
|
});
|
|
}
|
|
rst.awaitSecondaryNodes();
|
|
};
|
|
|
|
const isParallelSupported = (() => {
|
|
for (let {rst} of replicaSetsToInitiate) {
|
|
if (rst.startOptions && rst.startOptions.clusterAuthMode === "x509") {
|
|
// The mongo shell performing X.509 authentication as a cluster member
|
|
// requires starting a parallel shell and using the server's (not the
|
|
// client's) certificate. The ReplSetTest instance constructed in a Thread
|
|
// wouldn't have copied the path to the server's certificate. We therefore
|
|
// fall back to initiating the CSRS and replica set shards sequentially when
|
|
// X.509 authentication is being used.
|
|
return false;
|
|
}
|
|
|
|
for (let n of Object.keys(rst.nodeOptions)) {
|
|
const nodeOptions = rst.nodeOptions[n];
|
|
if (nodeOptions && nodeOptions.clusterAuthMode === "x509") {
|
|
return false;
|
|
}
|
|
}
|
|
}
|
|
|
|
return true;
|
|
})();
|
|
|
|
if (isParallelSupported) {
|
|
const threads = [];
|
|
try {
|
|
for (let {rstArgs, rstConfig, rstInitiateArgs} of replicaSetsToInitiate) {
|
|
const thread = new Thread(
|
|
async (rstArgs, rstConfig, rstInitiateArgs, initiateReplicaSet) => {
|
|
const {ReplSetTest} = await import("jstests/libs/replsettest.js");
|
|
try {
|
|
const rst = new ReplSetTest({rstArgs});
|
|
initiateReplicaSet(rst, rstConfig, rstInitiateArgs);
|
|
return {ok: 1};
|
|
} catch (e) {
|
|
return {
|
|
ok: 0,
|
|
hosts: rstArgs.nodeHosts,
|
|
name: rstArgs.name,
|
|
error: e.toString(),
|
|
stack: e.stack,
|
|
};
|
|
}
|
|
},
|
|
rstArgs,
|
|
rstConfig,
|
|
rstInitiateArgs,
|
|
initiateReplicaSet,
|
|
);
|
|
thread.start();
|
|
threads.push(thread);
|
|
}
|
|
} finally {
|
|
// Wait for each thread to finish. Throw an error if any thread fails.
|
|
const returnData = threads.map((thread) => {
|
|
thread.join();
|
|
return thread.returnData();
|
|
});
|
|
|
|
returnData.forEach((res) => {
|
|
assert.commandWorked(
|
|
res, "Initiating shard or config servers as a replica set failed");
|
|
});
|
|
}
|
|
} else {
|
|
for (let {rst, rstConfig, rstInitiateArgs} of replicaSetsToInitiate) {
|
|
initiateReplicaSet(rst, rstConfig, rstInitiateArgs);
|
|
}
|
|
}
|
|
|
|
for (let i = 0; i < numShards; i++) {
|
|
let rs = this._rs[i].test;
|
|
|
|
this._shardReplSetToIndex[rs.name] = i;
|
|
this["rs" + i] = rs;
|
|
this._rsObjects[i] = rs;
|
|
|
|
this._connections.push(null);
|
|
|
|
let rsConn = new Mongo(rs.getURL(), undefined, {gRPC: false});
|
|
rsConn.name = rs.getURL();
|
|
|
|
this._connections[i] = rsConn;
|
|
this["shard" + i] = rsConn;
|
|
rsConn.rs = rs;
|
|
}
|
|
|
|
// Wait for master to be elected before starting mongos
|
|
this.configRS.awaitNodesAgreeOnPrimary();
|
|
let csrsPrimary = this.configRS.getPrimary();
|
|
|
|
// TODO: SERVER-80010 Remove assert.soon.
|
|
if (useAutoBootstrapProcedure) {
|
|
assert.soonNoExcept(() => {
|
|
function isShardingReady() {
|
|
return csrsPrimary.adminCommand({getShardingReady: 1}).isReady;
|
|
}
|
|
return this.keyFile
|
|
? authutil.asCluster(csrsPrimary, this.keyFile, isShardingReady)
|
|
: isShardingReady();
|
|
});
|
|
}
|
|
|
|
jsTest.log.info(
|
|
"ShardingTest startup and initiation for all nodes took " +
|
|
(new Date() - startTime) + "ms with " + this.configRS.nodeList().length +
|
|
" config server nodes and " + totalNumShardNodes(this) + " total shard nodes.",
|
|
);
|
|
|
|
// If 'otherParams.mongosOptions.binVersion' is an array value, then we'll end up
|
|
// constructing a version iterator.
|
|
const mongosOptions = [];
|
|
for (var i = 0; i < numMongos; ++i) {
|
|
let options = {
|
|
useHostname: otherParams.useHostname,
|
|
pathOpts: Object.merge(pathOpts, {mongos: i}),
|
|
verbose: mongosVerboseLevel,
|
|
keyFile: this.keyFile,
|
|
};
|
|
|
|
if (otherParams.mongosOptions && otherParams.mongosOptions.binVersion) {
|
|
otherParams.mongosOptions.binVersion = MongoRunner.versionIterator(
|
|
otherParams.mongosOptions.binVersion,
|
|
);
|
|
}
|
|
|
|
options = Object.merge(options, otherParams.mongosOptions);
|
|
options = Object.merge(options, otherParams["s" + i]);
|
|
|
|
// The default time for mongos quiesce mode in response to SIGTERM is 15 seconds.
|
|
// Reduce this to 0 for faster shutdown.
|
|
options.setParameter = options.setParameter || {};
|
|
options.setParameter.mongosShutdownTimeoutMillisForSignaledShutdown =
|
|
options.setParameter.mongosShutdownTimeoutMillisForSignaledShutdown || 0;
|
|
|
|
const clusterVersionInfo = this.getClusterVersionInfo();
|
|
if (jsTestOptions().otelTraceDirectory && !clusterVersionInfo.isMixedVersion &&
|
|
MongoRunner.compareBinVersions(MongoRunner.getBinVersionFor(options.binVersion ?? "latest"), MongoRunner.getBinVersionFor("8.3.0")) >= 0) {
|
|
options.setParameter.opentelemetryTraceDirectory = jsTestOptions().otelTraceDirectory;
|
|
}
|
|
|
|
options.port = options.port || _allocatePortForMongos();
|
|
if (this._useMaintenancePorts || options.hasOwnProperty("maintenancePort")) {
|
|
options.maintenancePort = options.hasOwnProperty("maintenancePort") ? options.maintenancePort : _allocatePortForMongos();
|
|
}
|
|
if (jsTestOptions().shellGRPC) {
|
|
options.grpcPort = options.grpcPort || _allocatePortForMongos();
|
|
}
|
|
|
|
mongosOptions.push(options);
|
|
}
|
|
|
|
const configRS = this.configRS;
|
|
if (_hasNewFeatureCompatibilityVersion() && clusterVersionInfo.isMixedVersion) {
|
|
const fcv = binVersionToFCV(clusterVersionInfo.oldestBinVersion);
|
|
function setFeatureCompatibilityVersion() {
|
|
assert.commandWorked(
|
|
csrsPrimary.adminCommand({
|
|
setFeatureCompatibilityVersion: fcv,
|
|
confirm: true,
|
|
fromConfigServer: true,
|
|
}),
|
|
);
|
|
|
|
// Wait for the new featureCompatibilityVersion to propagate to all nodes in the
|
|
// CSRS to ensure that older versions of mongos can successfully connect.
|
|
configRS.awaitReplication();
|
|
}
|
|
|
|
if (this.keyFile) {
|
|
authutil.asCluster(
|
|
this.configRS.nodes, this.keyFile, setFeatureCompatibilityVersion);
|
|
} else {
|
|
setFeatureCompatibilityVersion();
|
|
}
|
|
}
|
|
|
|
// If chunkSize has been requested for this test, write the configuration
|
|
if (otherParams.chunkSize) {
|
|
function setChunkSize() {
|
|
assert.commandWorked(
|
|
csrsPrimary.getDB("config").settings.update(
|
|
{_id: "chunksize"},
|
|
{$set: {value: otherParams.chunkSize}},
|
|
{
|
|
upsert: true,
|
|
writeConcern: {w: "majority", wtimeout: kDefaultWTimeoutMs},
|
|
},
|
|
),
|
|
);
|
|
|
|
configRS.awaitLastOpCommitted();
|
|
}
|
|
|
|
if (this.keyFile) {
|
|
authutil.asCluster(csrsPrimary, this.keyFile, setChunkSize);
|
|
} else {
|
|
setChunkSize();
|
|
}
|
|
}
|
|
|
|
this._configDB = this.configRS.getURL();
|
|
for (var i = 0; i < numConfigs; ++i) {
|
|
var conn = this.configRS.nodes[i];
|
|
this["config" + i] = conn;
|
|
this["c" + i] = conn;
|
|
}
|
|
|
|
jsTest.log.info("Config servers", {configDB: this._configDB});
|
|
|
|
this.printNodes();
|
|
|
|
this._mongos = [];
|
|
|
|
// Start and connect to the MongoS servers if needed
|
|
for (var i = 0; i < numMongos; i++) {
|
|
const options = mongosOptions[i];
|
|
options.configdb = this._configDB;
|
|
|
|
if (otherParams.useBridge) {
|
|
let bridgeOptions =
|
|
Object.merge(otherParams.bridgeOptions, options.bridgeOptions || {});
|
|
bridgeOptions = Object.merge(bridgeOptions, {
|
|
hostName: otherParams.useHostname ? hostName : "localhost",
|
|
port: _allocatePortForBridgeForMongos(),
|
|
// The mongos processes identify themselves to mongobridge as host:port,
|
|
// where the host is the actual hostname of the machine and not
|
|
// localhost.
|
|
dest: hostName + ":" + options.port,
|
|
});
|
|
|
|
var bridge = new MongoBridge(bridgeOptions);
|
|
}
|
|
|
|
var conn = MongoRunner.runMongos(options, clusterVersionInfo.isMixedVersion);
|
|
if (!conn) {
|
|
throw new Error("Failed to start mongos " + i);
|
|
}
|
|
|
|
if (otherParams.causallyConsistent) {
|
|
conn.setCausalConsistency(true);
|
|
}
|
|
|
|
if (otherParams.useBridge) {
|
|
bridge.connectToBridge();
|
|
this._mongos.push(bridge);
|
|
this._unbridgedMongos.push(conn);
|
|
} else {
|
|
this._mongos.push(conn);
|
|
}
|
|
|
|
if (i === 0) {
|
|
this.s = this._mongos[i];
|
|
this.admin = this._mongos[i].getDB("admin");
|
|
this.config = this._mongos[i].getDB("config");
|
|
}
|
|
|
|
this["s" + i] = this._mongos[i];
|
|
}
|
|
|
|
_extendWithShMethods(this);
|
|
|
|
// If auth is enabled for the test, login the mongos connections as system in order to
|
|
// configure the instances and then log them out again.
|
|
if (this.keyFile) {
|
|
authutil.asCluster(this._mongos, this.keyFile, () => _configureCluster(this));
|
|
} else if (mongosOptions[0] && mongosOptions[0].keyFile) {
|
|
authutil.asCluster(
|
|
this._mongos, mongosOptions[0].keyFile, () => _configureCluster(this));
|
|
} else {
|
|
_configureCluster(this);
|
|
// Ensure that all config server nodes are up to date with any changes made to
|
|
// balancer settings before adding shards to the cluster. This prevents shards,
|
|
// which read config.settings with readPreference 'nearest', from accidentally
|
|
// fetching stale values from secondaries that aren't up-to-date.
|
|
this.configRS.awaitLastOpCommitted();
|
|
}
|
|
|
|
try {
|
|
if (!otherParams.manualAddShard) {
|
|
var testName = this._testName;
|
|
let admin = this.admin;
|
|
let keyFile = this.keyFile;
|
|
|
|
this._connections.forEach(function(z, idx) {
|
|
let n = z.name || z.host || z;
|
|
|
|
let name;
|
|
if (isConfigShardMode && idx == 0) {
|
|
name = "config";
|
|
|
|
if (!useAutoBootstrapProcedure) {
|
|
jsTest.log.info("ShardingTest " + testName +
|
|
" transitioning to config shard");
|
|
|
|
function transitionFromDedicatedConfigServer() {
|
|
return assert.commandWorked(
|
|
admin.runCommand({transitionFromDedicatedConfigServer: 1}),
|
|
);
|
|
}
|
|
|
|
if (keyFile) {
|
|
authutil.asCluster(admin.getMongo(),
|
|
keyFile,
|
|
transitionFromDedicatedConfigServer);
|
|
} else if (mongosOptions[0] && mongosOptions[0].keyFile) {
|
|
authutil.asCluster(
|
|
admin.getMongo(),
|
|
mongosOptions[0].keyFile,
|
|
transitionFromDedicatedConfigServer,
|
|
);
|
|
} else {
|
|
transitionFromDedicatedConfigServer();
|
|
}
|
|
}
|
|
|
|
z.shardName = name;
|
|
} else {
|
|
jsTest.log.info("ShardingTest " + testName +
|
|
" going to add shard : " + n);
|
|
|
|
let addShardCmd = {addShard: n};
|
|
if (alwaysUseTestNameForShardName) {
|
|
addShardCmd.name = `${testName}-${idx}`;
|
|
}
|
|
|
|
let result = assert.commandWorked(
|
|
admin.runCommand(addShardCmd),
|
|
"Failed to add shard " + n,
|
|
);
|
|
z.shardName = result.shardAdded;
|
|
}
|
|
});
|
|
}
|
|
} catch (e) {
|
|
// Clean up the running procceses on failure
|
|
jsTest.log.info("Failed to add shards, stopping cluster.");
|
|
this.stop();
|
|
throw e;
|
|
}
|
|
|
|
// TODO (SERVER-112863) Remove this once the maintenance port is supported on lastLTS.
|
|
this._rs.forEach((rs) => {
|
|
if (skipInitiatingWithMaintenancePort) {
|
|
rs.test.reInitiate();
|
|
}
|
|
});
|
|
|
|
|
|
// Ensure that the sessions collection exists so jstests can run things with
|
|
// logical sessions and test them. We do this by forcing an immediate cache refresh
|
|
// on the config server, which auto-shards the collection for the cluster.
|
|
this.configRS.getPrimary().getDB("admin").runCommand(
|
|
{refreshLogicalSessionCacheNow: 1});
|
|
|
|
// Ensure that all CSRS nodes are up to date. This is strictly needed for tests that use
|
|
// multiple mongoses. In those cases, the first mongos initializes the contents of the
|
|
// 'config' database, but without waiting for those writes to replicate to all the
|
|
// config servers then the secondary mongoses risk reading from a stale config server
|
|
// and seeing an empty config database.
|
|
this.configRS.awaitLastOpCommitted();
|
|
|
|
if (useAutoBootstrapProcedure) {
|
|
// This is needed because auto-bootstrapping will initially create a config.shards
|
|
// entry for the config shard where the host field does not contain all the nodes in
|
|
// the replica set.
|
|
assert.soonNoExcept(() => {
|
|
function getConfigShardDoc() {
|
|
return csrsPrimary.getDB("config").shards.findOne({_id: "config"});
|
|
}
|
|
const configShardDoc = this.keyFile
|
|
? authutil.asCluster(csrsPrimary, this.keyFile, getConfigShardDoc)
|
|
: getConfigShardDoc();
|
|
|
|
// TODO SERVER-89498: This check is flaky and should be fixed before re-enabling
|
|
// the autobootstrap procedure. See BF-31879 for more details.
|
|
return configShardDoc.host == this.configRS.getURL();
|
|
});
|
|
}
|
|
|
|
if (jsTestOptions().keyFile) {
|
|
jsTest.authenticateNodes(this._mongos);
|
|
}
|
|
|
|
// Flushes the routing table cache on connection 'conn'. If 'keyFileLocal' is defined,
|
|
// authenticates the keyfile user.
|
|
const flushRT = function flushRoutingTableAndHandleAuth(conn, keyFileLocal) {
|
|
// Invokes the actual execution of cache refresh.
|
|
const execFlushRT = (conn) => {
|
|
assert.commandWorked(
|
|
conn.getDB("admin").runCommand(
|
|
{_flushRoutingTableCacheUpdates: "config.system.sessions"}),
|
|
);
|
|
};
|
|
|
|
const x509AuthRequired = conn.fullOptions && conn.fullOptions.clusterAuthMode &&
|
|
conn.fullOptions.clusterAuthMode === "x509";
|
|
|
|
if (keyFileLocal) {
|
|
authutil.asCluster(conn, keyFileLocal, () => execFlushRT(conn));
|
|
} else if (x509AuthRequired) {
|
|
const exitCode = _runMongoProgram(
|
|
...["mongo",
|
|
conn.host,
|
|
"--tls",
|
|
"--tlsAllowInvalidHostnames",
|
|
"--tlsCertificateKeyFile",
|
|
conn.fullOptions.tlsCertificateKeyFile
|
|
? conn.fullOptions.tlsCertificateKeyFile
|
|
: conn.fullOptions.sslPEMKeyFile,
|
|
"--tlsCAFile",
|
|
conn.fullOptions.tlsCAFile ? conn.fullOptions.tlsCAFile
|
|
: conn.fullOptions.sslCAFile,
|
|
"--authenticationDatabase=$external",
|
|
"--authenticationMechanism=MONGODB-X509",
|
|
"--eval",
|
|
`(${execFlushRT.toString()})(db.getMongo())`,
|
|
],
|
|
);
|
|
assert.eq(0, exitCode, "parallel shell for x509 auth failed");
|
|
} else {
|
|
execFlushRT(conn);
|
|
}
|
|
};
|
|
|
|
if (!otherParams.manualAddShard) {
|
|
for (let i = 0; i < numShards; i++) {
|
|
const keyFileLocal =
|
|
otherParams.shards && otherParams.shards[i] && otherParams.shards[i].keyFile
|
|
? otherParams.shards[i].keyFile
|
|
: this.keyFile;
|
|
|
|
const rs = this._rs[i].test;
|
|
flushRT(rs.getPrimary(), keyFileLocal);
|
|
}
|
|
|
|
this.waitForShardingInitialized();
|
|
}
|
|
} catch (e) {
|
|
// this was expected to fail, so clean up appropriately
|
|
if (params.shouldFailInit === true) {
|
|
this.stopOnFail();
|
|
}
|
|
throw e;
|
|
}
|
|
// This initialization was expected to fail, but it did not.
|
|
assert.neq(true,
|
|
params.shouldFailInit,
|
|
"This was expected to fail initialization, but it did not");
|
|
}
|
|
}
|
|
|
|
// Stub for a hook to check that the cluster-wide metadata is consistent.
|
|
ShardingTest.prototype.checkMetadataConsistency = function() {
|
|
jsTest.log.info("Unhooked checkMetadataConsistency function");
|
|
};
|
|
|
|
// Stub for a hook to check that collection UUIDs are consistent across shards and the config
|
|
// server.
|
|
ShardingTest.prototype.checkUUIDsConsistentAcrossCluster = function() {};
|
|
|
|
// Stub for a hook to check that indexes are consistent across shards.
|
|
ShardingTest.prototype.checkIndexesConsistentAcrossCluster = function() {};
|
|
|
|
ShardingTest.prototype.checkOrphansAreDeleted = function() {
|
|
jsTest.log.info("Unhooked function");
|
|
};
|
|
|
|
ShardingTest.prototype.checkRoutingTableConsistency = function() {
|
|
jsTest.log.info("Unhooked checkRoutingTableConsistency function");
|
|
};
|
|
|
|
ShardingTest.prototype.checkShardFilteringMetadata = function() {
|
|
jsTest.log.info("Unhooked checkShardFilteringMetadata function");
|
|
};
|
|
|
|
/**
|
|
* Constructs a human-readable string representing a chunk's range.
|
|
*/
|
|
function _rangeToString(r) {
|
|
return tojsononeline(r.min) + " -> " + tojsononeline(r.max);
|
|
}
|
|
|
|
/**
|
|
* Extends the ShardingTest class with the methods exposed by the sh utility class.
|
|
*/
|
|
function _extendWithShMethods(st) {
|
|
Object.keys(sh).forEach(function(fn) {
|
|
if (typeof sh[fn] !== "function") {
|
|
return;
|
|
}
|
|
|
|
assert.eq(
|
|
undefined,
|
|
st[fn],
|
|
"ShardingTest contains a method " + fn +
|
|
" which duplicates a method with the same name on sh. " +
|
|
"Please select a different function name.",
|
|
);
|
|
|
|
st[fn] = function() {
|
|
const oldDb = globalThis.db;
|
|
globalThis.db = st.getDB("test");
|
|
|
|
try {
|
|
return sh[fn].apply(sh, arguments);
|
|
} finally {
|
|
globalThis.db = oldDb;
|
|
}
|
|
};
|
|
});
|
|
}
|
|
|
|
/**
|
|
* Configures the cluster based on the specified parameters (balancer state, etc).
|
|
*/
|
|
function _configureCluster(st) {
|
|
if (!st._otherParams.enableBalancer) {
|
|
st.stopBalancer();
|
|
}
|
|
}
|
|
|
|
function connectionURLTheSame(a, b) {
|
|
if (a == b)
|
|
return true;
|
|
|
|
if (!a || !b)
|
|
return false;
|
|
|
|
if (a.name)
|
|
return connectionURLTheSame(a.name, b);
|
|
if (b.name)
|
|
return connectionURLTheSame(a, b.name);
|
|
|
|
if (a.host)
|
|
return connectionURLTheSame(a.host, b);
|
|
if (b.host)
|
|
return connectionURLTheSame(a, b.host);
|
|
|
|
if (a.indexOf("/") < 0 && b.indexOf("/") < 0) {
|
|
a = a.split(":");
|
|
b = b.split(":");
|
|
|
|
if (a.length != b.length)
|
|
return false;
|
|
|
|
if (a.length == 2 && a[1] != b[1])
|
|
return false;
|
|
|
|
if (a[0] == "localhost" || a[0] == "127.0.0.1")
|
|
a[0] = getHostName();
|
|
if (b[0] == "localhost" || b[0] == "127.0.0.1")
|
|
b[0] = getHostName();
|
|
|
|
return a[0] == b[0];
|
|
} else {
|
|
let a0 = a.split("/")[0];
|
|
let b0 = b.split("/")[0];
|
|
return a0 == b0;
|
|
}
|
|
}
|
|
|
|
// These appear to be unit tests for `connectionURLTheSame`
|
|
assert(connectionURLTheSame("foo", "foo"));
|
|
assert(!connectionURLTheSame("foo", "bar"));
|
|
assert(connectionURLTheSame("foo/a,b", "foo/b,a"));
|
|
assert(!connectionURLTheSame("foo/a,b", "bar/a,b"));
|
|
|
|
/**
|
|
* Returns boolean for whether the sharding test is compatible to shutdown in parallel.
|
|
*/
|
|
function isShutdownParallelSupported(st, opts = {}) {
|
|
if (st._useBridge) {
|
|
// Keep the current behavior of shutting down each replica set shard and the
|
|
// CSRS individually when otherParams.useBridge === true. There appear to only
|
|
// be 8 instances of {useBridge: true} with ShardingTest and the implementation
|
|
// complexity is too high
|
|
return false;
|
|
}
|
|
|
|
if (st._otherParams.configOptions && st._otherParams.configOptions.clusterAuthMode === "x509") {
|
|
// The mongo shell performing X.509 authentication as a cluster member requires
|
|
// starting a parallel shell and using the server's (not the client's)
|
|
// certificate. The ReplSetTest instance constructed in a Thread wouldn't have
|
|
// copied the path to the server's certificate. We therefore fall back to
|
|
// initiating the CSRS and replica set shards sequentially when X.509
|
|
// authentication is being used.
|
|
return false;
|
|
}
|
|
|
|
if (st._otherParams.configOptions && st._otherParams.configOptions.tlsMode === "preferTLS") {
|
|
return false;
|
|
}
|
|
|
|
if (st._otherParams.configOptions && st._otherParams.configOptions.sslMode === "requireSSL") {
|
|
return false;
|
|
}
|
|
|
|
if (opts.parallelSupported !== undefined && opts.parallelSupported === false) {
|
|
// The test has chosen to opt out of parallel shutdown
|
|
return false;
|
|
}
|
|
|
|
return true;
|
|
}
|
|
|
|
/**
|
|
* Returns the replica sets args for sets that are to be terminated in parallel threads.
|
|
*/
|
|
function replicaSetsToTerminate(st, shardRS) {
|
|
const replicaSetsToTerminate = [];
|
|
[...shardRS.map((obj) => obj.test)].forEach((rst) => {
|
|
// Generating a list of live nodes in the replica set
|
|
const liveNodes = [];
|
|
const pidValues = [];
|
|
rst.nodes.forEach(function(node) {
|
|
try {
|
|
node.getDB("admin")._helloOrLegacyHello();
|
|
liveNodes.push(node);
|
|
} catch (err) {
|
|
// Ignore since the node is not live
|
|
jsTest.log.info("ShardingTest replicaSetsToTerminate ignoring: " + node.host);
|
|
return;
|
|
}
|
|
|
|
if (!node.pid) {
|
|
// Getting the pid for the node
|
|
let serverStatus;
|
|
rst.keyFile = rst.keyFile ? rst.keyFile : st.keyFile;
|
|
if (rst.keyFile) {
|
|
serverStatus = authutil.asCluster(node, rst.keyFile, () => {
|
|
return node.getDB("admin").serverStatus();
|
|
});
|
|
} else {
|
|
serverStatus = node.getDB("admin").serverStatus();
|
|
}
|
|
|
|
if (serverStatus["pid"]) {
|
|
node.pid = serverStatus["pid"];
|
|
} else {
|
|
// Shutdown requires PID values for every node. The case we are
|
|
// unable to obtain a PID value is rare, however, should it
|
|
// occur, the code will throw this error.
|
|
throw "Could not obtain node PID value. Shutdown failed.";
|
|
}
|
|
}
|
|
pidValues.push(node.pid.valueOf());
|
|
});
|
|
|
|
if (pidValues.length > 0) {
|
|
// the number of livenodes must match the number of pidvalues being passed in
|
|
// rst.Args to ensure the replica set is constructed correctly
|
|
assert(liveNodes.length == pidValues.length);
|
|
|
|
const hostName =
|
|
st._otherParams.host === undefined ? getHostName() : st._otherParams.host;
|
|
replicaSetsToTerminate.push({
|
|
// Arguments for each replica set within parallel threads.
|
|
rstArgs: {
|
|
name: rst.name,
|
|
nodeHosts: liveNodes.map((node) => `${node.name}`),
|
|
nodeOptions: rst.nodeOptions,
|
|
// Mixed-mode SSL tests may specify a keyFile per replica set rather
|
|
// than one for the whole cluster.
|
|
keyFile: rst.keyFile ? rst.keyFile : st.keyFile,
|
|
host: st._otherParams.useHostname ? hostName : "localhost",
|
|
waitForKeys: false,
|
|
pidValue: pidValues,
|
|
},
|
|
});
|
|
}
|
|
});
|
|
return replicaSetsToTerminate;
|
|
}
|
|
|
|
/**
|
|
* Returns if there is a new feature compatibility version for the "latest" version. This must
|
|
* be manually changed if and when there is a new feature compatibility version.
|
|
*/
|
|
function _hasNewFeatureCompatibilityVersion() {
|
|
return true;
|
|
}
|
|
|
|
/**
|
|
* Returns the total number of mongod nodes across all shards, excluding config server nodes.
|
|
* Used only for diagnostic logging.
|
|
*/
|
|
function totalNumShardNodes(st) {
|
|
const numNodesPerReplSet = st._rs.map((r) => r.test.nodes.length);
|
|
return numNodesPerReplSet.reduce((a, b) => a + b, 0);
|
|
}
|
|
|
|
function clusterHasBinVersion(st, version) {
|
|
const binVersion = MongoRunner.getBinVersionFor(version);
|
|
const hasBinVersionInParams = (params) => {
|
|
return (params && params.binVersion &&
|
|
MongoRunner.areBinVersionsTheSame(binVersion,
|
|
MongoRunner.getBinVersionFor(params.binVersion)));
|
|
};
|
|
|
|
// Must check mongosBinVersion because it does not update mongosOptions.binVersion.
|
|
const isMixedVersionMongos = jsTestOptions().mongosBinVersion &&
|
|
MongoRunner.areBinVersionsTheSame(binVersion, jsTestOptions().mongosBinVersion);
|
|
|
|
if (isMixedVersionMongos) {
|
|
return true;
|
|
}
|
|
|
|
// Check for config servers.
|
|
if (hasBinVersionInParams(st._otherParams.configOptions)) {
|
|
return true;
|
|
}
|
|
|
|
const numConfigs = st._numConfigs;
|
|
for (let i = 0; i < numConfigs; ++i) {
|
|
if (hasBinVersionInParams(st._otherParams["c" + i])) {
|
|
return true;
|
|
}
|
|
}
|
|
|
|
if (hasBinVersionInParams(st._otherParams.rs)) {
|
|
return true;
|
|
}
|
|
|
|
const numShards = st._numShards;
|
|
for (let i = 0; i < numShards; ++i) {
|
|
if (hasBinVersionInParams(st._otherParams["d" + i])) {
|
|
return true;
|
|
}
|
|
if (hasBinVersionInParams(st._otherParams["rs" + i])) {
|
|
return true;
|
|
}
|
|
}
|
|
|
|
// Check for mongos servers.
|
|
if (hasBinVersionInParams(st._otherParams.mongosOptions)) {
|
|
return true;
|
|
}
|
|
|
|
const numMongos = st._numMongos;
|
|
for (let i = 0; i < numMongos; ++i) {
|
|
if (hasBinVersionInParams(st._otherParams["s" + i])) {
|
|
return true;
|
|
}
|
|
}
|
|
|
|
return false;
|
|
}
|
|
|
|
function defineReadOnlyProperty(st, name, value) {
|
|
Object.defineProperty(
|
|
st, name, {value: value, writable: false, enumerable: true, configurable: false});
|
|
}
|