mongo/jstests/concurrency/fsm_libs/cluster.js

669 lines
24 KiB
JavaScript

/**
* Represents a MongoDB cluster.
*/
import {FSMShardingTest} from "jstests/concurrency/fsm_libs/shard_fixture.js";
import {validateCollections} from "jstests/hooks/validate_collections.js";
import {ReplSetTest} from "jstests/libs/replsettest.js";
export const Cluster = function (clusterOptions, sessionOptions) {
if (!(this instanceof Cluster)) {
return new Cluster(clusterOptions);
}
function getObjectKeys(obj) {
let values = [];
if (typeof obj !== "object") {
return values;
}
Object.keys(obj).forEach((key) => {
if (key.indexOf(".") > -1) {
throw new Error("illegal key specified " + key);
}
let subKeys = getObjectKeys(obj[key]);
if (subKeys.length === 0) {
values.push(key);
} else {
subKeys.forEach((subKey) => {
values.push(key + "." + subKey);
});
}
});
return values;
}
function validateClusterOptions(options) {
let allowedKeys = [
"replication.enabled",
"replication.numNodes",
"sameCollection",
"sameDB",
"setupFunctions",
"sharded.enabled",
"sharded.enableBalancer",
"sharded.numMongos",
"sharded.numShards",
"sharded.stepdownOptions",
"sharded.stepdownOptions.configStepdown",
"sharded.stepdownOptions.shardStepdown",
"teardownFunctions",
];
getObjectKeys(options).forEach(function (option) {
assert.contains(
option,
allowedKeys,
"invalid option: " + tojson(option) + "; valid options are: " + tojson(allowedKeys),
);
});
options.replication = options.replication || {};
assert.eq("object", typeof options.replication);
options.replication.enabled = options.replication.enabled || false;
assert.eq("boolean", typeof options.replication.enabled);
if (typeof options.replication.numNodes !== "undefined") {
assert(
options.replication.enabled,
"Must have replication.enabled be true if 'replication.numNodes' is specified",
);
}
options.replication.numNodes = options.replication.numNodes || 3;
assert.eq("number", typeof options.replication.numNodes);
options.sameCollection = options.sameCollection || false;
assert.eq("boolean", typeof options.sameCollection);
options.sameDB = options.sameDB || false;
assert.eq("boolean", typeof options.sameDB);
options.sharded = options.sharded || {};
assert.eq("object", typeof options.sharded);
options.sharded.enabled = options.sharded.enabled || false;
assert.eq("boolean", typeof options.sharded.enabled);
if (typeof options.sharded.enableBalancer !== "undefined") {
assert(
options.sharded.enabled,
"Must have sharded.enabled be true if 'sharded.enableBalancer' is specified",
);
}
options.sharded.enableBalancer = options.sharded.enableBalancer || false;
assert.eq("boolean", typeof options.sharded.enableBalancer);
if (typeof options.sharded.numMongos !== "undefined") {
assert(options.sharded.enabled, "Must have sharded.enabled be true if 'sharded.numMongos' is specified");
}
options.sharded.numMongos = options.sharded.numMongos || 2;
assert.eq("number", typeof options.sharded.numMongos);
if (typeof options.sharded.numShards !== "undefined") {
assert(options.sharded.enabled, "Must have sharded.enabled be true if 'sharded.numShards' is specified");
}
options.sharded.numShards = options.sharded.numShards || 2;
assert.eq("number", typeof options.sharded.numShards);
options.setupFunctions = options.setupFunctions || {};
assert.eq("object", typeof options.setupFunctions);
options.setupFunctions.mongod = options.setupFunctions.mongod || [];
assert(Array.isArray(options.setupFunctions.mongod), "Expected setupFunctions.mongod to be an array");
assert(
options.setupFunctions.mongod.every((f) => typeof f === "function"),
"Expected setupFunctions.mongod to be an array of functions",
);
if (typeof options.setupFunctions.mongos !== "undefined") {
assert(
options.sharded.enabled,
"Must have sharded.enabled be true if 'setupFunctions.mongos' is specified",
);
}
options.setupFunctions.mongos = options.setupFunctions.mongos || [];
assert(Array.isArray(options.setupFunctions.mongos), "Expected setupFunctions.mongos to be an array");
assert(
options.setupFunctions.mongos.every((f) => typeof f === "function"),
"Expected setupFunctions.mongos to be an array of functions",
);
options.setupFunctions.config = options.setupFunctions.config || [];
assert(Array.isArray(options.setupFunctions.config), "Expected setupFunctions.config to be an array");
assert(
options.setupFunctions.config.every((f) => typeof f === "function"),
"Expected setupFunctions.config to be an array of functions",
);
options.teardownFunctions = options.teardownFunctions || {};
assert.eq("object", typeof options.teardownFunctions);
options.teardownFunctions.mongod = options.teardownFunctions.mongod || [];
assert(Array.isArray(options.teardownFunctions.mongod), "Expected teardownFunctions.mongod to be an array");
assert(
options.teardownFunctions.mongod.every((f) => typeof f === "function"),
"Expected teardownFunctions.mongod to be an array of functions",
);
if (typeof options.teardownFunctions.mongos !== "undefined") {
assert(
options.sharded.enabled,
"Must have sharded.enabled be true if 'teardownFunctions.mongos' is specified",
);
}
options.teardownFunctions.mongos = options.teardownFunctions.mongos || [];
assert(Array.isArray(options.teardownFunctions.mongos), "Expected teardownFunctions.mongos to be an array");
assert(
options.teardownFunctions.mongos.every((f) => typeof f === "function"),
"Expected teardownFunctions.mongos to be an array of functions",
);
options.teardownFunctions.config = options.teardownFunctions.config || [];
assert(Array.isArray(options.teardownFunctions.config), "Expected teardownFunctions.config to be an array");
assert(
options.teardownFunctions.config.every((f) => typeof f === "function"),
"Expected teardownFunctions.config to be an array of functions",
);
}
let conn;
let session;
let secondaryConns;
let st;
let initialized = false;
let clusterStartTime;
let _conns = {mongos: [], mongod: []};
let nextConn = 0;
let replSets = [];
let rst;
validateClusterOptions(clusterOptions);
Object.freeze(clusterOptions);
this.setup = async function setup() {
let verbosityLevel = 0;
if (initialized) {
throw new Error("cluster has already been initialized");
}
if (clusterOptions.sharded.enabled) {
st = new FSMShardingTest(`mongodb://${db.getMongo().host}`);
conn = st.s(0); // First mongos
this.teardown = function teardown() {
clusterOptions.teardownFunctions.mongod.forEach(this.executeOnMongodNodes);
clusterOptions.teardownFunctions.mongos.forEach(this.executeOnMongosNodes);
clusterOptions.teardownFunctions.config.forEach(this.executeOnConfigNodes);
};
this.reestablishConnectionsAfterFailover = function () {
// Call getPrimary() to re-establish the connections in FSMShardingTest
// as it is not a transparent proxy for ShardingTest.
st._configsvr.getPrimary();
for (let rst of st._shard_rsts) {
rst.getPrimary();
}
};
// Save all mongos, mongod, and ReplSet connections (if any).
let i;
i = 0;
while (st.s(i)) {
_conns.mongos.push(st.s(i++));
}
i = 0;
while (st.d(i)) {
_conns.mongod.push(st.d(i++));
}
i = 0;
while (st.rs(i)) {
const rs = st.rs(i++);
this._addReplicaSetConns(rs);
replSets.push(rs);
}
} else if (clusterOptions.replication.enabled) {
rst = new ReplSetTest(db.getMongo().host);
conn = rst.getPrimary();
secondaryConns = rst.getSecondaries();
replSets = [rst];
this.teardown = function teardown() {
clusterOptions.teardownFunctions.mongod.forEach(this.executeOnMongodNodes);
};
this._addReplicaSetConns(rst);
} else {
// standalone server
conn = db.getMongo();
db.adminCommand({setParameter: 1, logLevel: verbosityLevel});
_conns.mongod = [conn];
}
if (typeof sessionOptions !== "undefined") {
session = conn.startSession(sessionOptions);
const readPreference = session.getOptions().getReadPreference();
if (readPreference && readPreference.mode === "secondary") {
// Unset the explicit read preference so set_read_preference_secondary.js can do
// the right thing based on the DB.
session.getOptions().setReadPreference(undefined);
// We import set_read_preference_secondary.js in order to avoid running
// commands against the "admin" and "config" databases via mongos with
// readPreference={mode: "secondary"} when there's only a single node in
// the CSRS.
await import("jstests/libs/override_methods/set_read_preference_secondary.js");
}
}
initialized = true;
clusterStartTime = new Date();
clusterOptions.setupFunctions.mongod.forEach(this.executeOnMongodNodes);
clusterOptions.setupFunctions.config.forEach(this.executeOnConfigNodes);
if (clusterOptions.sharded) {
clusterOptions.setupFunctions.mongos.forEach(this.executeOnMongosNodes);
}
};
this._addReplicaSetConns = function _addReplicaSetConns(rsTest) {
_conns.mongod.push(rsTest.getPrimary());
rsTest.getSecondaries().forEach(function (secondaryConn) {
_conns.mongod.push(secondaryConn);
});
};
this.executeOnMongodNodes = function executeOnMongodNodes(fn) {
assert(initialized, "cluster must be initialized first");
if (!fn || typeof fn !== "function" || fn.length !== 1) {
throw new Error("mongod function must be a function that takes a db as an argument");
}
if (TestData.shardsAddedRemoved) {
_conns.mongod.forEach(function (mongodConn) {
try {
fn(mongodConn.getDB("admin"));
} catch (e) {
print(
"Error executing function on mongod nodes, with addShard/removeShard running in background: " +
tojson(e),
);
}
});
} else {
_conns.mongod.forEach(function (mongodConn) {
fn(mongodConn.getDB("admin"));
});
}
};
this.executeOnMongosNodes = function executeOnMongosNodes(fn) {
assert(initialized, "cluster must be initialized first");
if (!fn || typeof fn !== "function" || fn.length !== 1) {
throw new Error("mongos function must be a function that takes a db as an argument");
}
_conns.mongos.forEach(function (mongosConn) {
fn(mongosConn.getDB("admin"), true /* isMongos */);
});
};
this.executeOnConfigNodes = function executeOnConfigNodes(fn) {
assert(initialized, "cluster must be initialized first");
if (!fn || typeof fn !== "function" || fn.length !== 1) {
throw new Error("config function must be a function that takes a db as an argument");
}
st._configServers.forEach(function (conn) {
fn(conn.getDB("admin"));
});
};
this.getConfigPrimaryNode = function getConfigPrimaryNode() {
assert(initialized, "cluster must be initialized first");
return st._configsvr.getPrimary();
};
this.synchronizeMongosClusterTimes = function synchronizeMongosClusterTimes() {
const contactConfigServerFn = (mongosConn) => {
// The admin database is hosted on the config server.
assert.commandWorked(mongosConn.adminCommand({find: "foo"}));
};
// After the first iteration, the config server will have been gossiped the highest cluster
// time any mongos has seen. After the second iteration, each mongos should have been
// gossiped this time as well.
this.executeOnMongosNodes(contactConfigServerFn);
this.executeOnMongosNodes(contactConfigServerFn);
};
this.teardown = function teardown() {
assert(initialized, "cluster must be initialized first");
clusterOptions.teardownFunctions.mongod.forEach(this.executeOnMongodNodes);
};
this.getDB = function getDB(dbName) {
assert(initialized, "cluster must be initialized first");
if (session) {
return session.getDatabase(dbName);
}
return conn.getDB(dbName);
};
this.getHost = function getHost() {
assert(initialized, "cluster must be initialized first");
// Alternate mongos connections for sharded clusters
if (this.isSharded()) {
return _conns.mongos[nextConn++ % _conns.mongos.length].host;
}
return conn.host;
};
this.getSecondaryHost = function getSecondaryHost(dbName) {
assert(initialized, "cluster must be initialized first");
if (this.isReplication() && !this.isSharded()) {
return secondaryConns[nextConn++ % secondaryConns.length].host;
}
return undefined;
};
this.getReplSetName = function getReplSetName() {
if (this.isReplication() && !this.isSharded()) {
return rst.name;
}
return undefined;
};
this.getReplSetNumNodes = function getReplSetNumNodes() {
assert(this.isReplication() && !this.isSharded(), "cluster must be a replica set");
return clusterOptions.replication.numNodes;
};
this.isSharded = function isSharded() {
return Cluster.isSharded(clusterOptions);
};
this.isReplication = function isReplication() {
return Cluster.isReplication(clusterOptions);
};
this.isStandalone = function isStandalone() {
return Cluster.isStandalone(clusterOptions);
};
this.shardCollection = function shardCollection() {
assert(initialized, "cluster must be initialized first");
assert(this.isSharded(), "cluster is not sharded");
st.shardColl.apply(st, arguments);
};
// Provide a serializable form of the cluster for use in workload states. This
// method is required because we don't currently support the serialization of Mongo
// connection objects.
//
// Serialized format:
// {
// mongos: [
// "localhost:30998",
// "localhost:30999"
// ],
// config: [
// "localhost:29000",
// "localhost:29001",
// "localhost:29002"
// ],
// shards: {
// "test-rs0": [
// "localhost:20006",
// "localhost:20007",
// "localhost:20008"
// ],
// "test-rs1": [
// "localhost:20009",
// "localhost:20010",
// "localhost:20011"
// ]
// }
// }
this.getSerializedCluster = function getSerializedCluster() {
assert(initialized, "cluster must be initialized first");
// TODO: Add support for non-sharded clusters.
if (!this.isSharded()) {
return "";
}
let cluster = {mongos: [], config: [], shards: {}};
let i = 0;
let mongos = st.s(0);
while (mongos) {
cluster.mongos.push(mongos.name);
++i;
mongos = st.s(i);
}
i = 0;
let config = st.c(0);
while (config) {
cluster.config.push(config.name);
++i;
config = st.c(i);
}
i = 0;
let shard = st.shard(0);
while (shard) {
if (TestData.shardsAddedRemoved && shard.shardName === "config") {
// Skip the config shard if it's transitioning in and out of being a shard to avoid
// tests that use manual shard operations failing with ShardNotFound.
} else if (shard.name.includes("/")) {
// If the shard is a replica set, the format of st.shard(0).name in ShardingTest is
// "test-rs0/localhost:20006,localhost:20007,localhost:20008".
let [_, shards] = shard.name.split("/");
cluster.shards[shard.shardName] = shards.split(",");
} else {
// If the shard is a standalone mongod, the format of st.shard(0).name in
// ShardingTest is "localhost:20006".
cluster.shards[shard.shardName] = [shard.name];
}
++i;
shard = st.shard(i);
}
return cluster;
};
this.getReplicaSets = function getReplicaSets() {
assert(initialized, "cluster must be initialized first");
assert(this.isReplication() || this.isSharded());
return replSets;
};
this.isBalancerEnabled = function isBalancerEnabled() {
return this.isSharded() && clusterOptions.sharded.enableBalancer;
};
this.validateAllCollections = function validateAllCollections(phase) {
assert(initialized, "cluster must be initialized first");
let _validateCollections = function _validateCollections(db, isMongos = false) {
// Validate all the collections on each node.
let res = db.adminCommand({listDatabases: 1});
assert.commandWorked(res);
res.databases.forEach((dbInfo) => {
const validateOptions = {full: true, enforceFastCount: true};
// TODO (SERVER-24266): Once fast counts are tolerant to unclean shutdowns, remove
// the check for TestData.allowUncleanShutdowns.
if (TestData.skipEnforceFastCountOnValidate || TestData.allowUncleanShutdowns) {
validateOptions.enforceFastCount = false;
}
assert.commandWorked(
validateCollections(db.getSiblingDB(dbInfo.name), validateOptions),
phase + " collection validation failed",
);
});
};
let startTime = Date.now();
jsTest.log("Starting to validate collections " + phase);
this.executeOnMongodNodes(_validateCollections);
this.executeOnMongosNodes(_validateCollections);
let totalTime = Date.now() - startTime;
jsTest.log("Finished validating collections in " + totalTime + " ms, " + phase);
};
this.checkReplicationConsistency = function checkReplicationConsistency(dbDenylist, phase) {
assert(initialized, "cluster must be initialized first");
if (!this.isReplication()) {
return;
}
let shouldCheckDBHashes = !this.isBalancerEnabled();
replSets.forEach((rst) => {
let startTime = Date.now();
let primary = rst.getPrimary();
if (shouldCheckDBHashes) {
jsTest.log(
"Starting consistency checks for replica set with " +
primary.host +
" assumed to still be primary, " +
phase,
);
// Compare the dbhashes of the primary and secondaries.
rst.checkReplicatedDataHashes(phase, dbDenylist);
let totalTime = Date.now() - startTime;
jsTest.log(
"Finished consistency checks of replica set with " +
primary.host +
" as primary in " +
totalTime +
" ms, " +
phase,
);
} else {
jsTest.log(
"Skipping consistency checks when the balancer is enabled, " +
"for replica set with " +
primary.host +
" assumed to still be primary, " +
phase,
);
// Get the latest optime from the primary.
let replSetStatus = primary.adminCommand({replSetGetStatus: 1});
assert.commandWorked(replSetStatus, phase + ", error getting replication status");
let primaryInfo = replSetStatus.members.find((memberInfo) => memberInfo.self);
assert(
primaryInfo !== undefined,
phase + ", failed to find self in replication status: " + tojson(replSetStatus),
);
// Wait for all previous workload operations to complete.
rst.awaitReplication();
}
});
};
this.isRunningWiredTigerLSM = function isRunningWiredTigerLSM() {
let adminDB = this.getDB("admin");
if (this.isSharded()) {
// Get the storage engine the sharded cluster is configured to use from one of the
// shards since mongos won't report it.
adminDB = st.shard(0).getDB("admin");
}
let res = adminDB.runCommand({getCmdLineOpts: 1});
assert.commandWorked(res, "failed to get command line options");
let wiredTigerOptions = {};
if (res.parsed && res.parsed.storage) {
wiredTigerOptions = res.parsed.storage.wiredTiger || {};
}
let wiredTigerCollectionConfig = wiredTigerOptions.collectionConfig || {};
let wiredTigerConfigString = wiredTigerCollectionConfig.configString || "";
return wiredTigerConfigString === "type=lsm";
};
this.shouldPerformContinuousStepdowns = function shouldPerformContinuousStepdowns() {
return this.isSharded() && typeof clusterOptions.sharded.stepdownOptions !== "undefined";
};
/*
* Returns true if this cluster has a config shard.
* Config shard always have shard ID equal to "config".
*/
this.hasConfigShard = function hasConfigShard() {
if (!this.isSharded()) {
return false;
}
let i = 0;
while (st.shard(i)) {
if (st.shard(i++).shardName === "config") return true;
}
return false;
};
this.isSteppingDownConfigServers = function isSteppingDownConfigServers() {
return this.shouldPerformContinuousStepdowns() && clusterOptions.sharded.stepdownOptions.configStepdown;
};
this.isSteppingDownShards = function isSteppingDownShards() {
return this.shouldPerformContinuousStepdowns() && clusterOptions.sharded.stepdownOptions.shardStepdown;
};
this.awaitReplication = () => {
assert(this.isReplication(), "cluster does not contain replica sets");
for (let rst of replSets) {
rst.awaitReplication();
}
};
};
/**
* Returns true if 'clusterOptions' represents a standalone mongod,
* and false otherwise.
*/
Cluster.isStandalone = function isStandalone(clusterOptions) {
return !clusterOptions.sharded.enabled && !clusterOptions.replication.enabled;
};
/**
* Returns true if 'clusterOptions' represents a replica set, and returns false otherwise.
*/
Cluster.isReplication = function isReplication(clusterOptions) {
return clusterOptions.replication.enabled;
};
/**
* Returns true if 'clusterOptions' represents a sharded configuration, and returns false otherwise.
*/
Cluster.isSharded = function isSharded(clusterOptions) {
return clusterOptions.sharded.enabled;
};