import {getTimeseriesCollForDDLOps} from "jstests/core/timeseries/libs/viewless_timeseries_util.js"; import {FeatureFlagUtil} from "jstests/libs/feature_flag_util.js"; import {Thread} from "jstests/libs/parallelTester.js"; import {ReplSetTest} from "jstests/libs/replsettest.js"; // Symbol used to override the constructor. Please do not use this, it's only meant to aid // in migrating the jstest corpus to proper module usage. export const kOverrideConstructor = Symbol("overrideConstructor"); // Timeout to be used for operations scheduled by the sharding test, which must wait for write // concern (5 minutes) const kDefaultWTimeoutMs = 5 * 60 * 1000; // Oplog collection name const kOplogName = "oplog.rs"; export class ShardingTest { // ShardingTest API getDB(name) { return this.s.getDB(name); } /** * Finds the _id of the primary shard for database 'dbname', e.g., 'test-rs0' */ getPrimaryShardIdForDatabase(dbname) { let x = this.config.databases.findOne({_id: "" + dbname}); if (x) { return x.primary; } let countDBsFound = 0; this.config.databases.find().forEach(function(db) { countDBsFound++; jsTest.log.info({db}); }); throw Error("couldn't find dbname: " + dbname + " in config.databases. Total DBs: " + countDBsFound); } getNonPrimaries(dbname) { let x = this.config.databases.findOne({_id: dbname}); if (!x) { this.config.databases.find().forEach(jsTest.log.info); throw Error("couldn't find dbname: " + dbname + " total: " + this.config.databases.count()); } return this.config.shards.find({_id: {$ne: x.primary}}).map((z) => z._id); } printNodes() { jsTest.log.info("ShardingTest " + this._testName, { config: this._configDB, shards: this._connections, mongos: this._mongos, }); } getConnNames() { let names = []; for (let i = 0; i < this._connections.length; i++) { names.push(this._connections[i].name); } return names; } /** * Find the connection to the primary shard for database 'dbname'. */ getPrimaryShard(dbname) { let dbPrimaryShardId = this.getPrimaryShardIdForDatabase(dbname); let primaryShard = this.config.shards.findOne({_id: dbPrimaryShardId}); if (primaryShard) { const shardConnectionString = primaryShard.host; let rsName = shardConnectionString.substring(0, shardConnectionString.indexOf("/")); for (let i = 0; i < this._connections.length; i++) { let c = this._connections[i]; if (connectionURLTheSame(shardConnectionString, c.name) || connectionURLTheSame(rsName, c.name)) return c; } } throw Error("can't find server connection for db '" + dbname + "'s primary shard: " + tojson(primaryShard)); } // TODO SERVER-95358 remove once 9.0 becomes last LTS. getMergeType(db) { if (FeatureFlagUtil.isPresentAndEnabled(db, "AggMongosToRouter")) { return "router"; } return "mongos"; } normalize(x) { let z = this.config.shards.findOne({host: x}); if (z) return z._id; return x; } /** * Find a different shard connection than the one given. */ getOther(one) { if (this._connections.length < 2) { throw Error("getOther only works with 2 shards"); } if (one._mongo) { one = one._mongo; } for (let i = 0; i < this._connections.length; i++) { if (this._connections[i] != one) { return this._connections[i]; } } return null; } getAnother(one) { if (this._connections.length < 2) { throw Error("getAnother() only works with multiple servers"); } if (one._mongo) { one = one._mongo; } for (let i = 0; i < this._connections.length; i++) { if (this._connections[i] == one) return this._connections[(i + 1) % this._connections.length]; } } stopAllConfigServers(opts, forRestart = undefined) { this.configRS.stopSet(undefined, forRestart, opts); } stopAllShards(opts = {}, forRestart = undefined) { if (isShutdownParallelSupported(this, opts)) { const threads = []; try { for (let {rstArgs} of replicaSetsToTerminate(this, this._rs)) { const thread = new Thread( async (rstArgs, signal, forRestart, opts) => { const {ReplSetTest} = await import("jstests/libs/replsettest.js"); try { const rst = new ReplSetTest({rstArgs}); rst.stopSet(signal, forRestart, opts); return {ok: 1}; } catch (e) { return { ok: 0, hosts: rstArgs.nodeHosts, name: rstArgs.name, error: e.toString(), stack: e.stack, }; } }, rstArgs, 15, forRestart, opts, ); thread.start(); threads.push(thread); } } finally { // Wait for each thread to finish. Throw an error if any thread fails. const returnData = threads.map((thread) => { thread.join(); return thread.returnData(); }); returnData.forEach((res) => { assert.commandWorked(res, "terminating shard or config server replica sets failed"); }); } } else { // The replica sets shutting down serially this._rs.forEach((rs) => { rs.test.stopSet(15, forRestart, opts); }); } } stopAllMongos(opts) { for (let i = 0; i < this._mongos.length; i++) { this.stopMongos(i, opts); } } awaitMigrations() { this.stopBalancer(); const numShards = this._rs.length; for (let i = 0; i < numShards; i++) { assert.commandWorked(this["shard" + i].adminCommand({"_shardsvrJoinMigrations": 1})); } } isReplicaSetEndpointActive() { const numShards = this._rs.length; return numShards == 1 && this._rs[0].test.isReplicaSetEndpointActive(); } stop(opts = {}) { this.checkMetadataConsistency(); this.checkUUIDsConsistentAcrossCluster(); this.checkIndexesConsistentAcrossCluster(); this.checkOrphansAreDeleted(); this.checkRoutingTableConsistency(); this.checkShardFilteringMetadata(); if (jsTestOptions().alwaysUseLogFiles) { if (opts.noCleanData === false) { throw new Error("Always using log files, but received conflicting option."); } opts.noCleanData = true; } this.stopAllMongos(opts); if (jsTestOptions().runningWithConfigStepdowns && this.isConfigShardMode) { // In case of a cluster with a config shard, the config server replica set is stopped // via stopAllShards, which doesn't stop the continuous stepdown stop. this.configRS.stopContinuousFailover(); } let startTime = new Date(); // Measure the execution time of shutting down shards. this.stopAllShards(opts); jsTest.log.info( "ShardingTest stopped all shards, took " + (new Date() - startTime) + "ms for " + this._connections.length + " shards.", ); if (!this.isConfigShardMode) { this.stopAllConfigServers(opts); } let timeMillis = new Date().getTime() - this._startTime.getTime(); jsTest.log.info( "*** ShardingTest " + this._testName + " completed successfully in " + timeMillis / 1000 + " seconds ***", ); } stopOnFail() { try { this.stopAllMongos(); } catch (e) { jsTest.log.info("Did not successfully stop all mongos."); } try { this.stopAllShards(); } catch (e) { jsTest.log.info("Did not successfully stop all shards."); } try { this.stopAllConfigServers(); } catch (e) { jsTest.log.info("Did not successfully stop all config servers."); } } adminCommand(cmd) { let res = this.admin.runCommand(cmd); if (res && res.ok == 1) return true; throw _getErrorWithCode(res, "command " + tojson(cmd) + " failed: " + tojson(res)); } restartAllConfigServers(opts) { this.configRS.startSet(opts, true); // We wait until a primary has been chosen since startSet can return without having elected // one. This can cause issues that expect a functioning replicaset once this method returns. this.configRS.waitForPrimary(); // We also wait for all secondaries to catch up with the primary, to ensure nodes complete any // rollback that may have been triggered after becoming secondary. Rollback causes nodes to // close connections, which can interfere with subsequent operations. this.configRS.awaitReplication(); } restartAllShards(opts) { this._rs.forEach((rs) => { rs.test.startSet(opts, true); // We wait until a primary has been chosen since startSet can return without having // elected one. This can cause issues that expect a functioning replicaset once this // method returns. rs.test.waitForPrimary(); // We also wait for all secondaries to catch up with the primary, to ensure nodes complete any // rollback that may have been triggered after becoming secondary. Rollback causes nodes to // close connections, which can interfere with subsequent operations. rs.test.awaitReplication(); }); } forEachConnection(fn) { this._connections.forEach(function(conn) { fn(conn); }); } forEachMongos(fn) { this._mongos.forEach(function(conn) { fn(conn); }); } forEachConfigServer(fn) { this.configRS.nodes.forEach(function(conn) { fn(conn); }); } printChangeLog() { this.config.changelog.find().forEach(function(z) { let msg = z.server + "\t" + z.time + "\t" + z.what; for (let i = z.what.length; i < 15; i++) msg += " "; msg += " " + z.ns + "\t"; if (z.what == "split") { msg += _rangeToString(z.details.before) + " -->> (" + _rangeToString(z.details.left) + "), (" + _rangeToString(z.details.right) + ")"; } else if (z.what == "multi-split") { msg += _rangeToString(z.details.before) + " -->> (" + z.details.number + "/" + z.details.of + " " + _rangeToString(z.details.chunk) + ")"; } else { msg += tojsononeline(z.details); } jsTest.log.info("ShardingTest " + msg); }); } getChunksString(ns) { if (ns) { let query = {}; let sorting_criteria = {}; const collection = this.config.collections.findOne({_id: ns}); if (!collection) { return ""; } if (collection.timestamp) { const collectionUUID = collection.uuid; assert.neq(collectionUUID, null); query.uuid = collectionUUID; sorting_criteria = {uuid: 1, min: 1}; } else { query.ns = ns; sorting_criteria = {ns: 1, min: 1}; } let s = ""; this.config.chunks.find(query).sort(sorting_criteria).forEach(function(z) { s += " \t" + z._id + "\t" + z.lastmod.t + "|" + z.lastmod.i + "\t" + tojson(z.min) + " -> " + tojson(z.max) + " " + z.shard + " " + ns + "\n"; }); return s; } else { // call get chunks String for every namespace in the collections let collections_cursor = this.config.collections.find(); let s = ""; while (collections_cursor.hasNext()) { var ns = collections_cursor.next()._id; s += this.getChunksString(ns); } return s; } } printChunks(ns) { jsTest.log.info("ShardingTest " + this.getChunksString(ns)); } printShardingStatus(verbose) { printShardingStatus(this.config, verbose); } printCollectionInfo(ns, msg) { let out = ""; if (msg) { out += msg + "\n"; } out += "sharding collection info: " + ns + "\n"; for (var i = 0; i < this._connections.length; i++) { var c = this._connections[i]; out += " mongod " + c + " " + tojson(c.getCollection(ns).getShardVersion(), " ", true) + "\n"; } for (var i = 0; i < this._mongos.length; i++) { var c = this._mongos[i]; out += " mongos " + c + " " + tojson(c.getCollection(ns).getShardVersion(), " ", true) + "\n"; } out += this.getChunksString(ns); jsTest.log.info("ShardingTest " + out); } /** * Returns the number of shards which contain the given dbName.collName collection */ onNumShards(dbName, collName) { return this.shardCounts(dbName, collName) .reduce( (total, currentValue) => total + (currentValue > 0 ? 1 : 0), 0, ); } /** * Returns an array of the size of numShards where each element is the number of documents on * that particular shard */ shardCounts(dbName, collName) { return this._connections.map((connection) => connection.getDB(dbName).getCollection(collName).count()); } chunkCounts(collName, dbName) { dbName = dbName || "test"; let sDB = this.s.getDB(dbName); let sColl = sDB.getCollection(collName); if (sColl.getMetadata()?.type === "timeseries") { collName = getTimeseriesCollForDDLOps(sDB, sColl).getName(); } let x = {}; this.config.shards.find().forEach(function(z) { x[z._id] = 0; }); let coll = this.config.collections.findOne({_id: dbName + "." + collName}); let chunksQuery = (function() { if (coll.timestamp != null) { return {uuid: coll.uuid}; } else { return {ns: dbName + "." + collName}; } })(); this.config.chunks.find(chunksQuery).forEach(function(z) { if (x[z.shard]) x[z.shard]++; else x[z.shard] = 1; }); return x; } chunkDiff(collName, dbName) { let c = this.chunkCounts(collName, dbName); let min = Number.MAX_VALUE; let max = 0; for (let s in c) { if (c[s] < min) min = c[s]; if (c[s] > max) max = c[s]; } jsTest.log.info("ShardingTest input", {chunkCounts: c, min, max}); return max - min; } /** * Waits up to the specified timeout (with a default of 60s) for the collection to be * considered well balanced. **/ awaitBalance(collName, dbName, timeToWait, interval) { const coll = this.s.getCollection(dbName + "." + collName); this.awaitCollectionBalance(coll, timeToWait, interval); } getShard(coll, query, includeEmpty) { let shards = this.getShardsForQuery(coll, query, includeEmpty); assert.eq(shards.length, 1); return shards[0]; } /** * Returns the shards on which documents matching a particular query reside. */ getShardsForQuery(coll, query, includeEmpty) { if (!coll.getDB) { coll = this.s.getCollection(coll); } let explain = coll.find(query).explain("executionStats"); let shards = []; let execStages = explain.executionStats.executionStages; let plannerShards = explain.queryPlanner.winningPlan.shards; if (execStages.shards) { for (var i = 0; i < execStages.shards.length; i++) { let hasResults = execStages.shards[i].executionStages.nReturned && execStages.shards[i].executionStages.nReturned > 0; if (includeEmpty || hasResults) { shards.push(plannerShards[i].connectionString); } } } for (var i = 0; i < shards.length; i++) { for (let j = 0; j < this._connections.length; j++) { if (connectionURLTheSame(this._connections[j], shards[i])) { shards[i] = this._connections[j]; break; } } } return shards; } shardColl(collName, key, split, move, dbName, waitForDelete) { split = split != false ? split || key : split; move = split != false && move != false ? move || split : false; if (collName.getDB) dbName = "" + collName.getDB(); else dbName = dbName || "test"; let c = dbName + "." + collName; if (collName.getDB) { c = "" + collName; } assert.commandWorked(this.s.adminCommand({enableSharding: dbName})); let result = assert.commandWorked(this.s.adminCommand({shardcollection: c, key: key})); if (split == false) { return; } result = assert.commandWorked(this.s.adminCommand({split: c, middle: split})); if (move == false) { return; } for (let i = 0; i < 5; i++) { let otherShard = this.getOther(this.getPrimaryShard(dbName)).name; let cmd = {movechunk: c, find: move, to: otherShard}; if (waitForDelete != null) { cmd._waitForDelete = waitForDelete; } const result = this.s.adminCommand(cmd); if (result.ok) break; sleep(5 * 1000); } assert.commandWorked(result); } /** * Wait for sharding to be initialized. */ waitForShardingInitialized(timeoutMs = 60 * 1000) { const getShardVersion = (client, timeout) => { assert.soon( () => { // The choice of namespace (local.fooCollection) does not affect the output. let res = client.adminCommand({getShardVersion: "local.fooCollection"}); return res.ok == 1; }, "timeout waiting for sharding to be initialized on mongod", timeout, 0.1, ); }; let start = new Date(); for (let i = 0; i < this._rs.length; ++i) { let replSet = this._rs[i]; if (!replSet) continue; const nodes = replSet.test.nodes; const keyFileUsed = replSet.test.keyFile; for (let j = 0; j < nodes.length; ++j) { const diff = new Date().getTime() - start.getTime(); var currNode = nodes[j]; // Skip arbiters if (currNode.getDB("admin")._helloOrLegacyHello().arbiterOnly) { continue; } const tlsOptions = ["preferTLS", "requireTLS"]; const sslOptions = ["preferSSL", "requireSSL"]; const TLSEnabled = currNode.fullOptions && (tlsOptions.includes(currNode.fullOptions.tlsMode) || sslOptions.includes(currNode.fullOptions.sslMode)); const x509AuthRequired = this.s.fullOptions && this.s.fullOptions.clusterAuthMode && this.s.fullOptions.clusterAuthMode === "x509"; if (keyFileUsed) { authutil.asCluster(currNode, keyFileUsed, () => { getShardVersion(currNode, timeoutMs - diff); }); } else if (x509AuthRequired && TLSEnabled) { const exitCode = _runMongoProgram( ...["mongo", currNode.host, "--tls", "--tlsAllowInvalidHostnames", "--tlsCertificateKeyFile", currNode.fullOptions.tlsCertificateKeyFile ? currNode.fullOptions.tlsCertificateKeyFile : currNode.fullOptions.sslPEMKeyFile, "--tlsCAFile", currNode.fullOptions.tlsCAFile ? currNode.fullOptions.tlsCAFile : currNode.fullOptions.sslCAFile, "--authenticationDatabase=$external", "--authenticationMechanism=MONGODB-X509", "--eval", `(${getShardVersion.toString()})(db.getMongo(), ` + (timeoutMs - diff).toString() + `)`, ], ); assert.eq(0, exitCode, "parallel shell for x509 auth failed"); } else { getShardVersion(currNode, timeoutMs - diff); } } } } /** * Kills the mongos with index n. * * @param {boolean} [extraOptions.waitPid=true] if true, we will wait for the process to * terminate after stopping it. */ stopMongos(n, opts, {waitpid: waitpid = true} = {}) { if (this._useBridge) { MongoRunner.stopMongos(this._unbridgedMongos[n], undefined, opts, waitpid); this["s" + n].stop(); } else { let mongos = this["s" + n]; MongoRunner.stopMongos(mongos, undefined, opts, waitpid); } } /** * Kills the config server mongod with index n. */ stopConfigServer(n, opts) { this.configRS.stop(n, undefined, opts); } /** * Stops and restarts a mongos process. * * If 'opts' is not specified, starts the mongos with its previous parameters. If 'opts' is * specified and 'opts.restart' is false or missing, starts mongos with the parameters specified * in 'opts'. If opts is specified and 'opts.restart' is true, merges the previous options * with the options specified in 'opts', with the options in 'opts' taking precedence. * * 'stopOpts' are the options passed to the mongos when it is stopping. * * Warning: Overwrites the old s (if n = 0) admin, config, and sn member variables. */ restartMongos(n, opts, stopOpts) { let mongos; if (this._useBridge) { mongos = this._unbridgedMongos[n]; } else { mongos = this["s" + n]; } // Make a copy of the start options to prevent changing the original start options let startOpts = Object.extend({}, opts || mongos, true); startOpts.port = startOpts.port || mongos.port; this.stopMongos(n, stopOpts); if (this._useBridge) { const hostName = this._otherParams.host === undefined ? getHostName() : this._otherParams.host; let bridgeOptions = startOpts !== mongos ? startOpts.bridgeOptions : mongos.fullOptions.bridgeOptions; bridgeOptions = Object.merge(this._otherParams.bridgeOptions, bridgeOptions || {}); bridgeOptions = Object.merge(bridgeOptions, { hostName: this._otherParams.useHostname ? hostName : "localhost", port: this._mongos[n].port, // The mongos processes identify themselves to mongobridge as host:port, where the // host is the actual hostname of the machine and not localhost. dest: hostName + ":" + startOpts.port, }); this._mongos[n] = new MongoBridge(bridgeOptions); } if (startOpts.restart) { startOpts = Object.merge(mongos.fullOptions, startOpts); } let newConn = MongoRunner.runMongos(startOpts); if (!newConn) { throw new Error("Failed to restart mongos " + n); } if (this._useBridge) { this._mongos[n].connectToBridge(); this._unbridgedMongos[n] = newConn; } else { this._mongos[n] = newConn; } this["s" + n] = this._mongos[n]; if (n == 0) { this.s = this._mongos[n]; this.admin = this._mongos[n].getDB("admin"); this.config = this._mongos[n].getDB("config"); } } /** * Shuts down and restarts replica set for a given shard and * updates shard connection information. * * @param {string} prevShardName * @param {object} replSet The replica set object. Defined in replsettest.js */ shutdownAndRestartPrimaryOnShard(shardName, replSet) { const n = this._shardReplSetToIndex[replSet.name]; const originalPrimaryConn = replSet.getPrimary(); const SIGTERM = 15; replSet.restart(originalPrimaryConn, {}, SIGTERM); replSet.awaitNodesAgreeOnPrimary(); replSet.awaitSecondaryNodes(); this._connections[n] = new Mongo(replSet.getURL()); this._connections[n].shardName = shardName; this._connections[n].rs = replSet; this["shard" + n] = this._connections[n]; } /** * Kills and restarts replica set for a given shard and * updates shard connection information. * * @param {string} prevShardName * @param {object} replSet The replica set object. Defined in replsettest.js */ killAndRestartPrimaryOnShard(shardName, replSet) { const n = this._shardReplSetToIndex[replSet.name]; const originalPrimaryConn = replSet.getPrimary(); const SIGKILL = 9; const opts = {allowedExitCode: MongoRunner.EXIT_SIGKILL}; replSet.restart(originalPrimaryConn, opts, SIGKILL); replSet.awaitNodesAgreeOnPrimary(); this._connections[n] = new Mongo(replSet.getURL()); this._connections[n].shardName = shardName; this._connections[n].rs = replSet; this["shard" + n] = this._connections[n]; } /** * Restarts each node in a particular shard replica set using the shard's original startup * options by default. * * Option { startClean : true } forces clearing the data directory. * Option { auth : Object } object that contains the auth details for admin credentials. * Should contain the fields 'user' and 'pwd' * * * @param {int} shard server number (0, 1, 2, ...) to be restarted */ restartShardRS(n, options, signal, wait) { const prevShardName = this._connections[n].shardName; for (let i = 0; i < this["rs" + n].nodeList().length; i++) { this["rs" + n].restart(i); } this["rs" + n].awaitSecondaryNodes(); this._connections[n] = new Mongo(this["rs" + n].getURL(), undefined, {gRPC: false}); this._connections[n].shardName = prevShardName; this._connections[n].rs = this["rs" + n]; this["shard" + n] = this._connections[n]; } /** * Stops and restarts a config server mongod process. * * If opts is specified, the new mongod is started using those options. Otherwise, it is * started * with its previous parameters. * * Warning: Overwrites the old cn/confign member variables. */ restartConfigServer(n, options, signal, wait) { this.configRS.restart(n, options, signal, wait); this["config" + n] = this.configRS.nodes[n]; this["c" + n] = this.configRS.nodes[n]; } /** * Returns a document {isMixedVersion: , oldestBinVersion: }. * The 'isMixedVersion' field is true if any settings to ShardingTest or jsTestOptions indicate * this is a multiversion cluster. * The 'oldestBinVersion' field is set to the oldest binary version used in this cluster, one of * 'latest', 'last-continuous' and 'last-lts'. * Note: Mixed version cluster with binary versions older than 'last-lts' is not supported. If * such binary exists in the cluster, this function assumes this is not a mixed version cluster * and returns 'oldestBinVersion' as 'latest'. * * Checks for bin versions via: * jsTestOptions().mongosBinVersion, * otherParams.configOptions.binVersion, * otherParams.mongosOptions.binVersion */ getClusterVersionInfo() { let hasLastLTS = clusterHasBinVersion(this, "last-lts"); let hasLastContinuous = clusterHasBinVersion(this, "last-continuous"); if (lastLTSFCV !== lastContinuousFCV && hasLastLTS && hasLastContinuous) { throw new Error("Can only specify one of 'last-lts' and 'last-continuous' " + "in binVersion, not both."); } if (hasLastLTS) { return {isMixedVersion: true, oldestBinVersion: "last-lts"}; } else if (hasLastContinuous) { return {isMixedVersion: true, oldestBinVersion: "last-continuous"}; } else { return {isMixedVersion: false, oldestBinVersion: "latest"}; } } /** * Runs a find on the namespace to force a refresh of the node's catalog cache. */ refreshCatalogCacheForNs(node, ns) { node.getCollection(ns).findOne(); } /** * Waits for all operations to fully replicate on all shards. */ awaitReplicationOnShards() { this._rs.forEach((replSet) => replSet.test.awaitReplication()); } /** * Query the oplog from a given node. */ findOplog(conn, query, limit) { return conn.getDB("local") .getCollection(kOplogName) .find(query) .sort({$natural: -1}) .limit(limit); } /** * Returns all nodes in the cluster including shards, config servers and mongoses. */ getAllNodes() { let nodes = []; nodes.concat([this._configDB, this._connections, this._mongos]); return [...new Set(nodes)]; } /** * Returns all shards in the cluster. */ getAllShards() { return this._rs.map((obj) => obj.test); } /** * TODO (SERVER-112863) Remove this once the maintenance port is supported on lastLTS. * * Returns true if: * 1. A maintenance port is specified in any of the replica set configurations in the cluster * 2. LastLTS FCV is less than the FCV on which featureFlagReplicationUsageOfMaintenancePort is * enabled. * Note that we compare the FCVs directly rather than checking the feature flag on the replica * sets because the FCV isn't known until replSetInitiate. */ _shouldSkipMaintenancePortDuringInitialization() { const mpFeatureFlag = "ReplicationUsageOfMaintenancePort"; let skipInitiatingWithMaintenancePort = false; const containsMaintenancePort = this._rs.some(rs => { return bsonUnorderedFieldsCompare(rs.test.getReplSetConfig(true), rs.test.getReplSetConfig(false)) != 0; }); if (containsMaintenancePort) { let maintenancePortEnabledFCV; let node = this.configRS.nodes[0]; if (this.configRS.keyFile) { authutil.asCluster(node, this.configRS.keyFile, function() { maintenancePortEnabledFCV = FeatureFlagUtil.getFeatureFlagDoc(node, mpFeatureFlag).version; }); } else { maintenancePortEnabledFCV = FeatureFlagUtil.getFeatureFlagDoc(node, mpFeatureFlag).version; } skipInitiatingWithMaintenancePort = MongoRunner.compareBinVersions(lastLTSFCV, maintenancePortEnabledFCV) == -1; } return skipInitiatingWithMaintenancePort; } /** * @typedef {Object} ShardingTestOtherParams * @property {Object} [rs] Same `rs` parameter to ShardingTest constructor * @property {number} [chunkSize] Same as chunkSize parameter to ShardingTest constructor * @property {string} [keyFile] The location of the keyFile * Can be used to specify options that are common all shards. * @property {Object} [configOptions] Same as the `config` parameter to ShardingTest * constructor. Can be used to specify options that are common all config servers. * @property {Object} [mongosOptions] Same as the `mongos` parameter to ShardingTest * constructor. Can be used to specify options that are common all mongos. * @property {boolean} [enableBalancer] If true, enable the balancer enableBalancer setting * @property {boolean} [manualAddShard] Shards will not be added if true. * @property {number} [migrationLockAcquisitionMaxWaitMS] Number of milliseconds to acquire the * migration lock. * @property {boolean} [useBridge=false] If true, then a mongobridge process is started for each * node in the sharded cluster. * @property {boolean} [causallyConsistent=false] Specifies whether the connections to the * replica set nodes should be created with the 'causal consistency' flag enabled, which means * they will gossip the cluster time and add readConcern afterClusterTime where applicable. * @property {Object} [bridgeOptions={}] Options to apply to all mongobridge processes. * @property {Object} [rsOptions] Same as the `rs` parameter to ShardingTest constructor. Can be * used to specify options that are common all replica members. * @property {boolean} [useMaintenancePorts=false] If true, then a maintenance port will be * specified for each node in the cluster. * * // replica Set only: * @property {boolean} [useHostname] if true, use hostname of machine, otherwise use localhost * @property {number} [numReplicas] * @property {boolean} [configShard] Add the config server as a shard if true. * @property {boolean} [initiateWithDefaultElectionTimeout] Set the electionTimeoutMillis to its * default value when initiating all replica sets for both config and non-config shards. If not * set, 'ReplSetTest.initiate' defaults to a very high election timeout value (24 hours). * @property {boolean} [allNodesAuthorizedToRunRSGetStatus] Informs `ReplSetTest.initiate` * whether all nodes in the replica set are authorized to run `replSetGetStatus`. * @property {boolean} [useAutoBootstrapProcedure] Use the auto-bootstrapping procedure on every * shard and config server if set to true. * @property {boolean} [alwaysUseTestNameForShardName] Always use the testname as the name of * the shard. */ /** * @typedef {Object} ReplSetConfig * @property {number} [nodes=3] Number of replica members. * @property {number} [protocolVersion] Protocol version of replset used by the replset * initiation. For other options, @see ReplSetTest#initiate */ /** * Starts up a sharded cluster with the given specifications. The cluster will be fully * operational after the execution of this constructor function. * * In addition to its own methods, ShardingTest inherits all the functions from the 'sh' utility * with the db set as the first mongos instance in the test (i.e. s0). * * @param {Object} params Contains the key-value pairs for the cluster configuration. * @param {string} [params.name] Name for this test * @param {boolean} [params.shouldFailInit] If set, assert that this will fail initialization * @param {number} [params.verbose] The verbosity for the mongos * @param {number} [params.chunkSize] The chunk size to use as configuration for the cluster * @param {number|Object|Array.} [params.mongos] Number of mongos or mongos * configuration object(s)(*). see MongoRunner.runMongos * @param {ReplSetConfig|Array.} [params.rs] Replica set configuration object. * @param {number|Object|Array.} [params.shards] Number of shards or shard configuration * object(s)(*). see MongoRunner.runMongod * @param {number|Object|Array.} [params.config] Number of config server or config * server configuration object(s)(*). see MongoRunner.runMongod * (*) There are two ways For multiple configuration objects. * (1) Using the object format. Example: * { d0: { verbose: 5 }, d1: { auth: '' }, rs2: { oplogsize: 10 }} * In this format, d0 = shard0, s = mongos & c = config servers * * (2) Using the array format. Example: * [{ verbose: 5 }, { auth: '' }] * * Note: you can only have single server shards for array format. * Note: A special "bridgeOptions" property can be specified in both the object and array * formats to configure the options for the mongobridge corresponding to that node. * These options are merged with the params.bridgeOptions options, where the node-specific * options take precedence. * @param {ShardingTestOtherParams} [params.other] Other parameters * * Member variables: * s {Mongo} - connection to the first mongos * s0, s1, ... {Mongo} - connection to different mongos * rs0, rs1, ... {ReplSetTest} - test objects to replica sets * shard0, shard1, ... {Mongo} - connection to shards * config0, config1, ... {Mongo} - connection to config servers * c0, c1, ... {Mongo} - same as config0, config1, ... * configRS - the config ReplSetTest object */ constructor(params) { if (!(this instanceof ShardingTest)) { return new ShardingTest(params); } if (this.constructor === ReplSetTest && this.constructor[kOverrideConstructor]) { return new this.constructor[kOverrideConstructor](); } // Ensure we don't mutate the passed-in parameters. params = Object.extend({}, params, true); // Used for counting the test duration this._startTime = new Date(); assert(isObject(params), "ShardingTest configuration must be a JSON object"); var testName = params.name || jsTest.name(); let otherParams = Object.deepMerge(params, params.other || {}); let numShards = otherParams.hasOwnProperty("shards") ? otherParams.shards : 2; let mongosVerboseLevel = otherParams.hasOwnProperty("verbose") ? otherParams.verbose : 1; let numMongos = otherParams.hasOwnProperty("mongos") ? otherParams.mongos : 1; const usedDefaultNumConfigs = !otherParams.hasOwnProperty("config") || otherParams.config === undefined; let numConfigs = otherParams.hasOwnProperty("config") ? otherParams.config : 3; let useAutoBootstrapProcedure = otherParams.hasOwnProperty("useAutoBootstrapProcedure") ? otherParams.useAutoBootstrapProcedure : false; useAutoBootstrapProcedure = useAutoBootstrapProcedure || jsTestOptions().useAutoBootstrapProcedure; let alwaysUseTestNameForShardName = otherParams.hasOwnProperty("alwaysUseTestNameForShardName") ? otherParams.alwaysUseTestNameForShardName : false; let isConfigShardMode = otherParams.hasOwnProperty("configShard") ? otherParams.configShard : false; isConfigShardMode = isConfigShardMode || jsTestOptions().configShard || useAutoBootstrapProcedure; Object.defineProperty(this, "isConfigShardMode", { value: isConfigShardMode, writable: false, enumerable: true, configurable: false, }); if ("shardAsReplicaSet" in otherParams) { throw new Error("Use of deprecated option 'shardAsReplicaSet'"); } // Default enableBalancer to false. otherParams.enableBalancer = "enableBalancer" in otherParams && otherParams.enableBalancer === true; // Allow specifying mixed-type options like this: // { mongos : [ { bind_ip : "localhost" } ], // shards : { rs : true, d : true } } if (Array.isArray(numShards)) { for (var i = 0; i < numShards.length; i++) { otherParams["d" + i] = numShards[i]; } numShards = numShards.length; } else if (isObject(numShards)) { let tempCount = 0; for (var i in numShards) { otherParams[i] = numShards[i]; tempCount++; } numShards = tempCount; } defineReadOnlyProperty(this, "_numShards", numShards); if (isConfigShardMode) { assert(numShards > 0, "Config shard mode requires at least one shard"); } if (Array.isArray(numMongos)) { for (var i = 0; i < numMongos.length; i++) { otherParams["s" + i] = numMongos[i]; } numMongos = numMongos.length; } else if (isObject(numMongos)) { let tempCount = 0; for (var i in numMongos) { otherParams[i] = numMongos[i]; tempCount++; } numMongos = tempCount; } defineReadOnlyProperty(this, "_numMongos", numMongos); if (Array.isArray(numConfigs)) { assert(!usedDefaultNumConfigs); for (let i = 0; i < numConfigs.length; i++) { otherParams["c" + i] = numConfigs[i]; } numConfigs = numConfigs.length; } else if (isObject(numConfigs)) { assert(!usedDefaultNumConfigs); let tempCount = 0; for (var i in numConfigs) { otherParams[i] = numConfigs[i]; tempCount++; } numConfigs = tempCount; } defineReadOnlyProperty(this, "_numConfigs", numConfigs); otherParams.useHostname = otherParams.useHostname == undefined ? true : otherParams.useHostname; otherParams.useBridge = otherParams.useBridge || false; otherParams.bridgeOptions = otherParams.bridgeOptions || {}; otherParams.causallyConsistent = otherParams.causallyConsistent || false; otherParams.useMaintenancePorts = otherParams.useMaintenancePorts ?? false; if (jsTestOptions().networkMessageCompressors) { otherParams.bridgeOptions["networkMessageCompressors"] = jsTestOptions().networkMessageCompressors; } this.keyFile = otherParams.keyFile; const hostName = otherParams.host === undefined ? getHostName() : otherParams.host; this._testName = testName; this._otherParams = otherParams; let pathOpts = {testName: testName}; this._connections = []; this._rs = []; this._rsObjects = []; this._shardReplSetToIndex = {}; this._useBridge = otherParams.useBridge; if (this._useBridge) { assert( !jsTestOptions().tlsMode, "useBridge cannot be true when using TLS. Add the requires_mongobridge tag to the test to ensure it will be skipped on variants that use TLS.", ); } this._useMaintenancePorts = otherParams.useMaintenancePorts; this._unbridgedMongos = []; let _allocatePortForMongos; let _allocatePortForBridgeForMongos; if (this._useBridge) { this._unbridgedMongos = []; const _makeAllocatePortFn = (preallocatedPorts, errorMessage) => { let idxNextNodePort = 0; return function() { if (idxNextNodePort >= preallocatedPorts.length) { throw new Error(errorMessage(preallocatedPorts.length)); } const nextPort = preallocatedPorts[idxNextNodePort]; ++idxNextNodePort; return nextPort; }; }; const errorMessage = (length) => "Cannot use more than " + length + " mongos processes when useBridge=true"; _allocatePortForBridgeForMongos = _makeAllocatePortFn( allocatePorts(MongoBridge.kBridgeOffset), errorMessage, ); _allocatePortForMongos = _makeAllocatePortFn(allocatePorts(MongoBridge.kBridgeOffset), errorMessage); } else { _allocatePortForBridgeForMongos = function() { throw new Error("Using mongobridge isn't enabled for this sharded cluster"); }; _allocatePortForMongos = allocatePort; } otherParams.migrationLockAcquisitionMaxWaitMS = otherParams.migrationLockAcquisitionMaxWaitMS || 30000; let randomSeedAlreadySet = false; if (jsTest.options().useRandomBinVersionsWithinReplicaSet) { // We avoid setting the random seed unequivocally to avoid unexpected behavior in tests // that already make use of Random.setRandomSeed(). This conditional can be removed if // it becomes the standard to always be generating the seed through ShardingTest. Random.setRandomFixtureSeed(); randomSeedAlreadySet = true; } jsTest.options().setParameters = jsTest.options().setParameters || {}; let setDefaultTransactionLockTimeout = false; if (jsTest.options().setParameters.maxTransactionLockRequestTimeoutMillis === undefined) { // Set a higher maxTransactionLockRequestTimeoutMillis. Tests written with ShardingTest // are generally single threaded and often don't expect lock timeouts, so a higher // timeout avoids spurious failures on slow machines. // // TODO SERVER-98408: Ideally this would be passed as a default setParameter to // ReplSetTest, but the rules for passing default options to ReplSetTest via // ShardingTest are finnicky and tests rely on the current behaviors. Once this is // refactored, we should be able to avoid using TestData. jsTest.options().setParameters.maxTransactionLockRequestTimeoutMillis = 5 * 60 * 1000; setDefaultTransactionLockTimeout = true; } try { const clusterVersionInfo = this.getClusterVersionInfo(); let startTime = new Date(); // Measure the execution time of startup and initiate. if (!isConfigShardMode) { // // Start up the config server replica set. // let rstOptions = { useHostName: otherParams.useHostname, host: hostName, useBridge: otherParams.useBridge, bridgeOptions: otherParams.bridgeOptions, useMaintenancePorts: otherParams.useMaintenancePorts, keyFile: this.keyFile, waitForKeys: false, name: testName + "-configRS", seedRandomNumberGenerator: !randomSeedAlreadySet, isConfigServer: true, }; // always use wiredTiger as the storage engine for CSRS let startOptions = { pathOpts: pathOpts, // Ensure that journaling is always enabled for config servers. configsvr: "", storageEngine: "wiredTiger", }; if (otherParams.configOptions && otherParams.configOptions.binVersion) { otherParams.configOptions.binVersion = MongoRunner.versionIterator( otherParams.configOptions.binVersion, ); } startOptions = Object.merge(startOptions, otherParams.configOptions); const clusterVersionInfo = this.getClusterVersionInfo(); if (jsTestOptions().otelTraceDirectory && !clusterVersionInfo.isMixedVersion && MongoRunner.compareBinVersions(MongoRunner.getBinVersionFor(startOptions.binVersion ?? "latest"), MongoRunner.getBinVersionFor("8.3.0")) >= 0) { startOptions.setParameter = startOptions.setParameter ?? {}; startOptions.setParameter.opentelemetryTraceDirectory = jsTestOptions().otelTraceDirectory; } rstOptions = Object.merge(rstOptions, otherParams.configReplSetTestOptions); let nodeOptions = []; for (var i = 0; i < numConfigs; ++i) { nodeOptions.push(otherParams["c" + i] || {}); } rstOptions.nodes = nodeOptions; // Start the config server's replica set without waiting for it to complete. This // allows it to proceed in parallel with the startup of each shard. this.configRS = new ReplSetTest(rstOptions); this.configRS.startSetAsync(startOptions); } // // Start each shard replica set. // for (var i = 0; i < numShards; i++) { let setName = testName + "-rs" + i; let rsDefaults = { useHostname: otherParams.useHostname, oplogSize: 16, pathOpts: Object.merge(pathOpts, {shard: i}), }; let setIsConfigSvr = false; if (isConfigShardMode && i == 0) { otherParams.configOptions = Object.merge(otherParams.configOptions, { configsvr: "", storageEngine: "wiredTiger", }); rsDefaults = Object.merge(rsDefaults, otherParams.configOptions); setIsConfigSvr = true; } else { rsDefaults.shardsvr = ""; } if (otherParams.rs || otherParams["rs" + i] || otherParams.rsOptions) { if (otherParams.rs) { rsDefaults = Object.merge(rsDefaults, otherParams.rs); } if (otherParams["rs" + i]) { rsDefaults = Object.merge(rsDefaults, otherParams["rs" + i]); } if (otherParams.rsOptions) { rsDefaults = Object.merge(rsDefaults, otherParams.rsOptions); } rsDefaults.nodes = rsDefaults.nodes || otherParams.numReplicas; } else { rsDefaults = Object.merge(rsDefaults, otherParams["d" + i]); } // TODO SERVER-98408: Passing setParameter via rsDefaults will always override any // setParameters passed via replSetTestOpts. Instead the options should be merged. rsDefaults.setParameter = rsDefaults.setParameter || {}; if (typeof rsDefaults.setParameter === "string") { let eqIdx = rsDefaults.setParameter.indexOf("="); if (eqIdx != -1) { let param = rsDefaults.setParameter.substring(0, eqIdx); let value = rsDefaults.setParameter.substring(eqIdx + 1); rsDefaults.setParameter = {}; rsDefaults.setParameter[param] = value; } } rsDefaults.setParameter.migrationLockAcquisitionMaxWaitMS = otherParams.migrationLockAcquisitionMaxWaitMS; const clusterVersionInfo = this.getClusterVersionInfo(); if (jsTestOptions().otelTraceDirectory && !clusterVersionInfo.isMixedVersion && MongoRunner.compareBinVersions(MongoRunner.getBinVersionFor(rsDefaults.binVersion || "latest"), MongoRunner.getBinVersionFor("8.3.0")) >= 0) { rsDefaults.setParameter.opentelemetryTraceDirectory = jsTestOptions().otelTraceDirectory; } let rsSettings = rsDefaults.settings; delete rsDefaults.settings; // The number of nodes in the rs field will take priority. let numReplicas = 1; /* default */ if (rsDefaults.nodes) { numReplicas = rsDefaults.nodes; } else if (otherParams.rs || otherParams["rs" + i]) { numReplicas = 3; } // Unless explicitly given a number of config servers, a config shard uses the // shard's number of nodes to increase odds of compatibility with test assertions. if (isConfigShardMode && i == 0 && !usedDefaultNumConfigs) { numReplicas = numConfigs; } delete rsDefaults.nodes; let protocolVersion = rsDefaults.protocolVersion; delete rsDefaults.protocolVersion; let replSetTestOpts = { name: setName, nodes: numReplicas, host: hostName, useHostName: otherParams.useHostname, useBridge: otherParams.useBridge, bridgeOptions: otherParams.bridgeOptions, useMaintenancePorts: otherParams.useMaintenancePorts, keyFile: this.keyFile, protocolVersion: protocolVersion, waitForKeys: false, settings: rsSettings, seedRandomNumberGenerator: !randomSeedAlreadySet, isConfigServer: setIsConfigSvr, useAutoBootstrapProcedure: useAutoBootstrapProcedure, }; const rs = new ReplSetTest(replSetTestOpts); jsTest.log.info("ShardingTest starting replica set for shard: " + setName); // Start up the replica set but don't wait for it to complete. This allows the // startup of each shard to proceed in parallel. this._rs[i] = { setName: setName, test: rs, nodes: rs.startSetAsync(rsDefaults), url: rs.getURL(), }; if (i == 0 && isConfigShardMode) { this.configRS = this._rs[0].test; } } // // Wait for each shard replica set to finish starting up. // for (let i = 0; i < numShards; i++) { jsTest.log.info("Waiting for shard " + this._rs[i].setName + " to finish starting up."); if (isConfigShardMode && i == 0) { continue; } this._rs[i].test.startSetAwait(); } // // Wait for the config server to finish starting up. // jsTest.log.info("Waiting for the config server to finish starting up."); this.configRS.startSetAwait(); let config = this.configRS.getReplSetConfig(); config.configsvr = true; config.settings = config.settings || {}; jsTest.log.info( "ShardingTest startup for all nodes took " + (new Date() - startTime) + "ms with " + this.configRS.nodeList().length + " config server nodes and " + totalNumShardNodes(this) + " total shard nodes.", ); if (setDefaultTransactionLockTimeout) { // Clean up TestData.setParameters to avoid affecting other tests. delete jsTest.options().setParameters.maxTransactionLockRequestTimeoutMillis; } let skipInitiatingWithMaintenancePort = this._shouldSkipMaintenancePortDuringInitialization(); // // Initiate each shard replica set and wait for replication. Also initiate the config // replica set. Whenever possible, in parallel. // const shardsRS = this._rs.map((obj) => obj.test); let replSetToIntiateArr = []; if (isConfigShardMode) { replSetToIntiateArr = [...shardsRS]; } else { replSetToIntiateArr = [...shardsRS, this.configRS]; } const replicaSetsToInitiate = replSetToIntiateArr.map((rst) => { const rstConfig = rst.getReplSetConfig(skipInitiatingWithMaintenancePort); // The mongo shell cannot authenticate as the internal __system user in tests that // use x509 for cluster authentication. Choosing the default value for // wcMajorityJournalDefault in ReplSetTest cannot be done automatically without the // shell performing such authentication, so allow tests to pass the value in. if (otherParams.hasOwnProperty("writeConcernMajorityJournalDefault")) { rstConfig.writeConcernMajorityJournalDefault = otherParams.writeConcernMajorityJournalDefault; } if (rst === this.configRS) { rstConfig.configsvr = true; rstConfig.writeConcernMajorityJournalDefault = true; } let rstInitiateArgs = { allNodesAuthorizedToRunRSGetStatus: true, initiateWithDefaultElectionTimeout: false, }; if (otherParams.hasOwnProperty("allNodesAuthorizedToRunRSGetStatus") && otherParams.allNodesAuthorizedToRunRSGetStatus == false) { rstInitiateArgs.allNodesAuthorizedToRunRSGetStatus = false; } if (otherParams.hasOwnProperty("initiateWithDefaultElectionTimeout") && otherParams.initiateWithDefaultElectionTimeout == true) { rstInitiateArgs.initiateWithDefaultElectionTimeout = true; } const makeNodeHost = (node) => { const [_, port] = node.name.split(":"); return `127.0.0.1:${port}`; }; return { rst, // Arguments for creating instances of each replica set within parallel threads. rstArgs: { name: rst.name, nodeHosts: rst.nodes.map((node) => makeNodeHost(node)), nodeOptions: rst.nodeOptions, // Mixed-mode SSL tests may specify a keyFile per replica set rather than // one for the whole cluster. keyFile: rst.keyFile ? rst.keyFile : this.keyFile, host: otherParams.useHostname ? hostName : "localhost", waitForKeys: false, useAutoBootstrapProcedure: useAutoBootstrapProcedure, }, // Replica set configuration for initiating the replica set. rstConfig, // Args to be sent to rst.initiate rstInitiateArgs, }; }); const initiateReplicaSet = (rst, rstConfig, rstInitiateArgs) => { rst.initiate(rstConfig, null, rstInitiateArgs); // Do replication. rst.awaitNodesAgreeOnPrimary(); if (rst.keyFile) { authutil.asCluster(rst.nodes, rst.keyFile, function() { rst.awaitReplication(); }); } rst.awaitSecondaryNodes(); }; const isParallelSupported = (() => { if(TestData.supportsParallelReplSetInitiateAndStop === false) { return false; } for (let {rst} of replicaSetsToInitiate) { if (rst.startOptions && rst.startOptions.clusterAuthMode === "x509") { // The mongo shell performing X.509 authentication as a cluster member // requires starting a parallel shell and using the server's (not the // client's) certificate. The ReplSetTest instance constructed in a Thread // wouldn't have copied the path to the server's certificate. We therefore // fall back to initiating the CSRS and replica set shards sequentially when // X.509 authentication is being used. return false; } for (let n of Object.keys(rst.nodeOptions)) { const nodeOptions = rst.nodeOptions[n]; if (nodeOptions && nodeOptions.clusterAuthMode === "x509") { return false; } } } return true; })(); if (isParallelSupported) { const threads = []; try { for (let {rstArgs, rstConfig, rstInitiateArgs} of replicaSetsToInitiate) { const thread = new Thread( async (rstArgs, rstConfig, rstInitiateArgs, initiateReplicaSet) => { const {ReplSetTest} = await import("jstests/libs/replsettest.js"); try { const rst = new ReplSetTest({rstArgs}); initiateReplicaSet(rst, rstConfig, rstInitiateArgs); return {ok: 1}; } catch (e) { return { ok: 0, hosts: rstArgs.nodeHosts, name: rstArgs.name, error: e.toString(), stack: e.stack, }; } }, rstArgs, rstConfig, rstInitiateArgs, initiateReplicaSet, ); thread.start(); threads.push(thread); } } finally { // Wait for each thread to finish. Throw an error if any thread fails. const returnData = threads.map((thread) => { thread.join(); return thread.returnData(); }); returnData.forEach((res) => { assert.commandWorked( res, "Initiating shard or config servers as a replica set failed"); }); } } else { for (let {rst, rstConfig, rstInitiateArgs} of replicaSetsToInitiate) { initiateReplicaSet(rst, rstConfig, rstInitiateArgs); } } for (let i = 0; i < numShards; i++) { let rs = this._rs[i].test; this._shardReplSetToIndex[rs.name] = i; this["rs" + i] = rs; this._rsObjects[i] = rs; this._connections.push(null); let rsConn = new Mongo(rs.getURL(), undefined, {gRPC: false}); rsConn.name = rs.getURL(); this._connections[i] = rsConn; this["shard" + i] = rsConn; rsConn.rs = rs; } // Wait for master to be elected before starting mongos this.configRS.awaitNodesAgreeOnPrimary(); let csrsPrimary = this.configRS.getPrimary(); // TODO: SERVER-80010 Remove assert.soon. if (useAutoBootstrapProcedure) { assert.soonNoExcept(() => { function isShardingReady() { return csrsPrimary.adminCommand({getShardingReady: 1}).isReady; } return this.keyFile ? authutil.asCluster(csrsPrimary, this.keyFile, isShardingReady) : isShardingReady(); }); } jsTest.log.info( "ShardingTest startup and initiation for all nodes took " + (new Date() - startTime) + "ms with " + this.configRS.nodeList().length + " config server nodes and " + totalNumShardNodes(this) + " total shard nodes.", ); // If 'otherParams.mongosOptions.binVersion' is an array value, then we'll end up // constructing a version iterator. const mongosOptions = []; for (var i = 0; i < numMongos; ++i) { let options = { useHostname: otherParams.useHostname, pathOpts: Object.merge(pathOpts, {mongos: i}), verbose: mongosVerboseLevel, keyFile: this.keyFile, }; if (otherParams.mongosOptions && otherParams.mongosOptions.binVersion) { otherParams.mongosOptions.binVersion = MongoRunner.versionIterator( otherParams.mongosOptions.binVersion, ); } options = Object.merge(options, otherParams.mongosOptions); options = Object.merge(options, otherParams["s" + i]); // The default time for mongos quiesce mode in response to SIGTERM is 15 seconds. // Reduce this to 0 for faster shutdown. options.setParameter = options.setParameter || {}; options.setParameter.mongosShutdownTimeoutMillisForSignaledShutdown = options.setParameter.mongosShutdownTimeoutMillisForSignaledShutdown || 0; const clusterVersionInfo = this.getClusterVersionInfo(); if (jsTestOptions().otelTraceDirectory && !clusterVersionInfo.isMixedVersion && MongoRunner.compareBinVersions(MongoRunner.getBinVersionFor(options.binVersion ?? "latest"), MongoRunner.getBinVersionFor("8.3.0")) >= 0) { options.setParameter.opentelemetryTraceDirectory = jsTestOptions().otelTraceDirectory; } options.port = options.port || _allocatePortForMongos(); if (this._useMaintenancePorts || options.hasOwnProperty("maintenancePort")) { options.maintenancePort = options.hasOwnProperty("maintenancePort") ? options.maintenancePort : _allocatePortForMongos(); } if (jsTestOptions().shellGRPC) { options.grpcPort = options.grpcPort || _allocatePortForMongos(); } mongosOptions.push(options); } const configRS = this.configRS; if (_hasNewFeatureCompatibilityVersion() && clusterVersionInfo.isMixedVersion) { const fcv = binVersionToFCV(clusterVersionInfo.oldestBinVersion); function setFeatureCompatibilityVersion() { assert.commandWorked( csrsPrimary.adminCommand({ setFeatureCompatibilityVersion: fcv, confirm: true, fromConfigServer: true, }), ); // Wait for the new featureCompatibilityVersion to propagate to all nodes in the // CSRS to ensure that older versions of mongos can successfully connect. configRS.awaitReplication(); } if (this.keyFile) { authutil.asCluster( this.configRS.nodes, this.keyFile, setFeatureCompatibilityVersion); } else { setFeatureCompatibilityVersion(); } } // If chunkSize has been requested for this test, write the configuration if (otherParams.chunkSize) { function setChunkSize() { assert.commandWorked( csrsPrimary.getDB("config").settings.update( {_id: "chunksize"}, {$set: {value: otherParams.chunkSize}}, { upsert: true, writeConcern: {w: "majority", wtimeout: kDefaultWTimeoutMs}, }, ), ); configRS.awaitLastOpCommitted(); } if (this.keyFile) { authutil.asCluster(csrsPrimary, this.keyFile, setChunkSize); } else { setChunkSize(); } } this._configDB = this.configRS.getURL(); for (var i = 0; i < numConfigs; ++i) { var conn = this.configRS.nodes[i]; this["config" + i] = conn; this["c" + i] = conn; } jsTest.log.info("Config servers", {configDB: this._configDB}); this.printNodes(); this._mongos = []; // Start and connect to the MongoS servers if needed for (var i = 0; i < numMongos; i++) { const options = mongosOptions[i]; options.configdb = this._configDB; if (otherParams.useBridge) { let bridgeOptions = Object.merge(otherParams.bridgeOptions, options.bridgeOptions || {}); bridgeOptions = Object.merge(bridgeOptions, { hostName: otherParams.useHostname ? hostName : "localhost", port: _allocatePortForBridgeForMongos(), // The mongos processes identify themselves to mongobridge as host:port, // where the host is the actual hostname of the machine and not // localhost. dest: hostName + ":" + options.port, }); var bridge = new MongoBridge(bridgeOptions); } var conn = MongoRunner.runMongos(options, clusterVersionInfo.isMixedVersion); if (!conn) { throw new Error("Failed to start mongos " + i); } if (otherParams.causallyConsistent) { conn.setCausalConsistency(true); } if (otherParams.useBridge) { bridge.connectToBridge(); this._mongos.push(bridge); this._unbridgedMongos.push(conn); } else { this._mongos.push(conn); } if (i === 0) { this.s = this._mongos[i]; this.admin = this._mongos[i].getDB("admin"); this.config = this._mongos[i].getDB("config"); } this["s" + i] = this._mongos[i]; } _extendWithShMethods(this); // If auth is enabled for the test, login the mongos connections as system in order to // configure the instances and then log them out again. if (this.keyFile) { authutil.asCluster(this._mongos, this.keyFile, () => _configureCluster(this)); } else if (mongosOptions[0] && mongosOptions[0].keyFile) { authutil.asCluster( this._mongos, mongosOptions[0].keyFile, () => _configureCluster(this)); } else { _configureCluster(this); // Ensure that all config server nodes are up to date with any changes made to // balancer settings before adding shards to the cluster. This prevents shards, // which read config.settings with readPreference 'nearest', from accidentally // fetching stale values from secondaries that aren't up-to-date. this.configRS.awaitLastOpCommitted(); } try { if (!otherParams.manualAddShard) { var testName = this._testName; let admin = this.admin; let keyFile = this.keyFile; this._connections.forEach(function(z, idx) { let n = z.name || z.host || z; let name; if (isConfigShardMode && idx == 0) { name = "config"; if (!useAutoBootstrapProcedure) { jsTest.log.info("ShardingTest " + testName + " transitioning to config shard"); function transitionFromDedicatedConfigServer() { return assert.commandWorked( admin.runCommand({transitionFromDedicatedConfigServer: 1}), ); } if (keyFile) { authutil.asCluster(admin.getMongo(), keyFile, transitionFromDedicatedConfigServer); } else if (mongosOptions[0] && mongosOptions[0].keyFile) { authutil.asCluster( admin.getMongo(), mongosOptions[0].keyFile, transitionFromDedicatedConfigServer, ); } else { transitionFromDedicatedConfigServer(); } } z.shardName = name; } else { jsTest.log.info("ShardingTest " + testName + " going to add shard : " + n); let addShardCmd = {addShard: n}; if (alwaysUseTestNameForShardName) { addShardCmd.name = `${testName}-${idx}`; } let result = assert.commandWorked( admin.runCommand(addShardCmd), "Failed to add shard " + n, ); z.shardName = result.shardAdded; } }); } } catch (e) { // Clean up the running procceses on failure jsTest.log.info("Failed to add shards, stopping cluster."); this.stop(); throw e; } // TODO (SERVER-112863) Remove this once the maintenance port is supported on lastLTS. this._rs.forEach((rs) => { if (skipInitiatingWithMaintenancePort) { rs.test.reInitiate(); } }); // Ensure that the sessions collection exists so jstests can run things with // logical sessions and test them. We do this by forcing an immediate cache refresh // on the config server, which auto-shards the collection for the cluster. this.configRS.getPrimary().getDB("admin").runCommand( {refreshLogicalSessionCacheNow: 1}); // Ensure that all CSRS nodes are up to date. This is strictly needed for tests that use // multiple mongoses. In those cases, the first mongos initializes the contents of the // 'config' database, but without waiting for those writes to replicate to all the // config servers then the secondary mongoses risk reading from a stale config server // and seeing an empty config database. this.configRS.awaitLastOpCommitted(); if (useAutoBootstrapProcedure) { // This is needed because auto-bootstrapping will initially create a config.shards // entry for the config shard where the host field does not contain all the nodes in // the replica set. assert.soonNoExcept(() => { function getConfigShardDoc() { return csrsPrimary.getDB("config").shards.findOne({_id: "config"}); } const configShardDoc = this.keyFile ? authutil.asCluster(csrsPrimary, this.keyFile, getConfigShardDoc) : getConfigShardDoc(); // TODO SERVER-89498: This check is flaky and should be fixed before re-enabling // the autobootstrap procedure. See BF-31879 for more details. return configShardDoc.host == this.configRS.getURL(); }); } if (jsTestOptions().keyFile) { jsTest.authenticateNodes(this._mongos); } // Flushes the routing table cache on connection 'conn'. If 'keyFileLocal' is defined, // authenticates the keyfile user. const flushRT = function flushRoutingTableAndHandleAuth(conn, keyFileLocal) { // Invokes the actual execution of cache refresh. const execFlushRT = (conn) => { assert.commandWorked( conn.getDB("admin").runCommand( {_flushRoutingTableCacheUpdates: "config.system.sessions"}), ); }; const x509AuthRequired = conn.fullOptions && conn.fullOptions.clusterAuthMode && conn.fullOptions.clusterAuthMode === "x509"; if (keyFileLocal) { authutil.asCluster(conn, keyFileLocal, () => execFlushRT(conn)); } else if (x509AuthRequired) { const exitCode = _runMongoProgram( ...["mongo", conn.host, "--tls", "--tlsAllowInvalidHostnames", "--tlsCertificateKeyFile", conn.fullOptions.tlsCertificateKeyFile ? conn.fullOptions.tlsCertificateKeyFile : conn.fullOptions.sslPEMKeyFile, "--tlsCAFile", conn.fullOptions.tlsCAFile ? conn.fullOptions.tlsCAFile : conn.fullOptions.sslCAFile, "--authenticationDatabase=$external", "--authenticationMechanism=MONGODB-X509", "--eval", `(${execFlushRT.toString()})(db.getMongo())`, ], ); assert.eq(0, exitCode, "parallel shell for x509 auth failed"); } else { execFlushRT(conn); } }; if (!otherParams.manualAddShard) { for (let i = 0; i < numShards; i++) { const keyFileLocal = otherParams.shards && otherParams.shards[i] && otherParams.shards[i].keyFile ? otherParams.shards[i].keyFile : this.keyFile; const rs = this._rs[i].test; flushRT(rs.getPrimary(), keyFileLocal); } this.waitForShardingInitialized(); } } catch (e) { // this was expected to fail, so clean up appropriately if (params.shouldFailInit === true) { this.stopOnFail(); } throw e; } // This initialization was expected to fail, but it did not. assert.neq(true, params.shouldFailInit, "This was expected to fail initialization, but it did not"); } } // Stub for a hook to check that the cluster-wide metadata is consistent. ShardingTest.prototype.checkMetadataConsistency = function() { jsTest.log.info("Unhooked checkMetadataConsistency function"); }; // Stub for a hook to check that collection UUIDs are consistent across shards and the config // server. ShardingTest.prototype.checkUUIDsConsistentAcrossCluster = function() {}; // Stub for a hook to check that indexes are consistent across shards. ShardingTest.prototype.checkIndexesConsistentAcrossCluster = function() {}; ShardingTest.prototype.checkOrphansAreDeleted = function() { jsTest.log.info("Unhooked function"); }; ShardingTest.prototype.checkRoutingTableConsistency = function() { jsTest.log.info("Unhooked checkRoutingTableConsistency function"); }; ShardingTest.prototype.checkShardFilteringMetadata = function() { jsTest.log.info("Unhooked checkShardFilteringMetadata function"); }; /** * Constructs a human-readable string representing a chunk's range. */ function _rangeToString(r) { return tojsononeline(r.min) + " -> " + tojsononeline(r.max); } /** * Extends the ShardingTest class with the methods exposed by the sh utility class. */ function _extendWithShMethods(st) { Object.keys(sh).forEach(function(fn) { if (typeof sh[fn] !== "function") { return; } assert.eq( undefined, st[fn], "ShardingTest contains a method " + fn + " which duplicates a method with the same name on sh. " + "Please select a different function name.", ); st[fn] = function() { const oldDb = globalThis.db; globalThis.db = st.getDB("test"); try { return sh[fn].apply(sh, arguments); } finally { globalThis.db = oldDb; } }; }); } /** * Configures the cluster based on the specified parameters (balancer state, etc). */ function _configureCluster(st) { if (!st._otherParams.enableBalancer) { st.stopBalancer(); } } function connectionURLTheSame(a, b) { if (a == b) return true; if (!a || !b) return false; if (a.name) return connectionURLTheSame(a.name, b); if (b.name) return connectionURLTheSame(a, b.name); if (a.host) return connectionURLTheSame(a.host, b); if (b.host) return connectionURLTheSame(a, b.host); if (a.indexOf("/") < 0 && b.indexOf("/") < 0) { a = a.split(":"); b = b.split(":"); if (a.length != b.length) return false; if (a.length == 2 && a[1] != b[1]) return false; if (a[0] == "localhost" || a[0] == "127.0.0.1") a[0] = getHostName(); if (b[0] == "localhost" || b[0] == "127.0.0.1") b[0] = getHostName(); return a[0] == b[0]; } else { let a0 = a.split("/")[0]; let b0 = b.split("/")[0]; return a0 == b0; } } // These appear to be unit tests for `connectionURLTheSame` assert(connectionURLTheSame("foo", "foo")); assert(!connectionURLTheSame("foo", "bar")); assert(connectionURLTheSame("foo/a,b", "foo/b,a")); assert(!connectionURLTheSame("foo/a,b", "bar/a,b")); /** * Returns boolean for whether the sharding test is compatible to shutdown in parallel. */ function isShutdownParallelSupported(st, opts = {}) { if (st._useBridge) { // Keep the current behavior of shutting down each replica set shard and the // CSRS individually when otherParams.useBridge === true. There appear to only // be 8 instances of {useBridge: true} with ShardingTest and the implementation // complexity is too high return false; } if (st._otherParams.configOptions && st._otherParams.configOptions.clusterAuthMode === "x509") { // The mongo shell performing X.509 authentication as a cluster member requires // starting a parallel shell and using the server's (not the client's) // certificate. The ReplSetTest instance constructed in a Thread wouldn't have // copied the path to the server's certificate. We therefore fall back to // initiating the CSRS and replica set shards sequentially when X.509 // authentication is being used. return false; } if (st._otherParams.configOptions && st._otherParams.configOptions.tlsMode === "preferTLS") { return false; } if (st._otherParams.configOptions && st._otherParams.configOptions.sslMode === "requireSSL") { return false; } if (opts.parallelSupported !== undefined && opts.parallelSupported === false) { // The test has chosen to opt out of parallel shutdown return false; } if(TestData.supportsParallelReplSetInitiateAndStop === false) { // The suite has chosen to opt out of parallel shutdown return false; } return true; } /** * Returns the replica sets args for sets that are to be terminated in parallel threads. */ function replicaSetsToTerminate(st, shardRS) { const replicaSetsToTerminate = []; [...shardRS.map((obj) => obj.test)].forEach((rst) => { // Generating a list of live nodes in the replica set const liveNodes = []; const pidValues = []; rst.nodes.forEach(function(node) { try { node.getDB("admin")._helloOrLegacyHello(); liveNodes.push(node); } catch (err) { // Ignore since the node is not live jsTest.log.info("ShardingTest replicaSetsToTerminate ignoring: " + node.host); return; } if (!node.pid) { // Getting the pid for the node let serverStatus; rst.keyFile = rst.keyFile ? rst.keyFile : st.keyFile; if (rst.keyFile) { serverStatus = authutil.asCluster(node, rst.keyFile, () => { return node.getDB("admin").serverStatus(); }); } else { serverStatus = node.getDB("admin").serverStatus(); } if (serverStatus["pid"]) { node.pid = serverStatus["pid"]; } else { // Shutdown requires PID values for every node. The case we are // unable to obtain a PID value is rare, however, should it // occur, the code will throw this error. throw "Could not obtain node PID value. Shutdown failed."; } } pidValues.push(node.pid.valueOf()); }); if (pidValues.length > 0) { // the number of livenodes must match the number of pidvalues being passed in // rst.Args to ensure the replica set is constructed correctly assert(liveNodes.length == pidValues.length); const hostName = st._otherParams.host === undefined ? getHostName() : st._otherParams.host; replicaSetsToTerminate.push({ // Arguments for each replica set within parallel threads. rstArgs: { name: rst.name, nodeHosts: liveNodes.map((node) => `${node.name}`), nodeOptions: rst.nodeOptions, // Mixed-mode SSL tests may specify a keyFile per replica set rather // than one for the whole cluster. keyFile: rst.keyFile ? rst.keyFile : st.keyFile, host: st._otherParams.useHostname ? hostName : "localhost", waitForKeys: false, pidValue: pidValues, }, }); } }); return replicaSetsToTerminate; } /** * Returns if there is a new feature compatibility version for the "latest" version. This must * be manually changed if and when there is a new feature compatibility version. */ function _hasNewFeatureCompatibilityVersion() { return true; } /** * Returns the total number of mongod nodes across all shards, excluding config server nodes. * Used only for diagnostic logging. */ function totalNumShardNodes(st) { const numNodesPerReplSet = st._rs.map((r) => r.test.nodes.length); return numNodesPerReplSet.reduce((a, b) => a + b, 0); } function clusterHasBinVersion(st, version) { const binVersion = MongoRunner.getBinVersionFor(version); const hasBinVersionInParams = (params) => { return (params && params.binVersion && MongoRunner.areBinVersionsTheSame(binVersion, MongoRunner.getBinVersionFor(params.binVersion))); }; // Must check mongosBinVersion because it does not update mongosOptions.binVersion. const isMixedVersionMongos = jsTestOptions().mongosBinVersion && MongoRunner.areBinVersionsTheSame(binVersion, jsTestOptions().mongosBinVersion); if (isMixedVersionMongos) { return true; } // Check for config servers. if (hasBinVersionInParams(st._otherParams.configOptions)) { return true; } const numConfigs = st._numConfigs; for (let i = 0; i < numConfigs; ++i) { if (hasBinVersionInParams(st._otherParams["c" + i])) { return true; } } if (hasBinVersionInParams(st._otherParams.rs)) { return true; } const numShards = st._numShards; for (let i = 0; i < numShards; ++i) { if (hasBinVersionInParams(st._otherParams["d" + i])) { return true; } if (hasBinVersionInParams(st._otherParams["rs" + i])) { return true; } } // Check for mongos servers. if (hasBinVersionInParams(st._otherParams.mongosOptions)) { return true; } const numMongos = st._numMongos; for (let i = 0; i < numMongos; ++i) { if (hasBinVersionInParams(st._otherParams["s" + i])) { return true; } } return false; } function defineReadOnlyProperty(st, name, value) { Object.defineProperty( st, name, {value: value, writable: false, enumerable: true, configurable: false}); }