mirror of https://github.com/mongodb/mongo
1053 lines
44 KiB
Python
1053 lines
44 KiB
Python
"""Sharded cluster fixture for executing JSTests against."""
|
|
|
|
import os.path
|
|
import time
|
|
from concurrent.futures import ThreadPoolExecutor, as_completed
|
|
|
|
import pymongo
|
|
import pymongo.errors
|
|
import yaml
|
|
|
|
from buildscripts.resmokelib import config as _config
|
|
from buildscripts.resmokelib.extensions import (
|
|
delete_extension_configs,
|
|
find_and_generate_extension_configs,
|
|
)
|
|
from buildscripts.resmokelib.testing.fixtures import _builder, external, interface
|
|
from buildscripts.resmokelib.utils.sharded_cluster_util import (
|
|
inject_catalog_metadata_on_the_csrs,
|
|
refresh_logical_session_cache_with_retry,
|
|
)
|
|
|
|
|
|
class ShardedClusterFixture(interface.Fixture, interface._DockerComposeInterface):
|
|
"""Fixture which provides JSTests with a sharded cluster to run against."""
|
|
|
|
AWAIT_SHARDING_INITIALIZATION_TIMEOUT_SECS = 60
|
|
|
|
def __init__(
|
|
self,
|
|
logger,
|
|
job_num,
|
|
fixturelib,
|
|
mongos_options=None,
|
|
mongod_executable=None,
|
|
mongod_options=None,
|
|
dbpath_prefix=None,
|
|
preserve_dbpath=False,
|
|
num_shards=1,
|
|
num_rs_nodes_per_shard=1,
|
|
num_mongos=1,
|
|
enable_balancer=True,
|
|
auth_options=None,
|
|
configsvr_options=None,
|
|
shard_options=None,
|
|
cluster_logging_prefix=None,
|
|
config_shard=None,
|
|
use_auto_bootstrap_procedure=None,
|
|
replica_set_endpoint=False,
|
|
random_migrations=False,
|
|
launch_mongot=False,
|
|
load_all_extensions=False,
|
|
set_cluster_parameter=None,
|
|
inject_catalog_metadata=None,
|
|
shard_replset_name_prefix="shard-rs",
|
|
configsvr_replset_name="config-rs",
|
|
):
|
|
"""
|
|
Initialize ShardedClusterFixture with different options for the cluster processes.
|
|
"""
|
|
interface.Fixture.__init__(self, logger, job_num, fixturelib, dbpath_prefix=dbpath_prefix)
|
|
|
|
if "dbpath" in mongod_options:
|
|
raise ValueError("Cannot specify mongod_options.dbpath")
|
|
|
|
self.mongos_options = self.fixturelib.make_historic(
|
|
self.fixturelib.default_if_none(mongos_options, {})
|
|
)
|
|
# The mongotHost and searchIndexManagementHostAndPort options cannot be set on mongos_options yet because
|
|
# the port value is only assigned in MongoDFixture initialization, which happens later.
|
|
self.launch_mongot = launch_mongot
|
|
|
|
# mongod options
|
|
self.mongod_options = self.fixturelib.make_historic(
|
|
self.fixturelib.default_if_none(mongod_options, {})
|
|
)
|
|
|
|
self.load_all_extensions = load_all_extensions or self.config.LOAD_ALL_EXTENSIONS
|
|
if self.load_all_extensions:
|
|
self.loaded_extensions = find_and_generate_extension_configs(
|
|
is_evergreen=self.config.EVERGREEN_TASK_ID,
|
|
logger=self.logger,
|
|
mongod_options=self.mongod_options,
|
|
mongos_options=self.mongos_options,
|
|
)
|
|
|
|
self.mongod_executable = mongod_executable
|
|
self.mongod_options["set_parameters"] = self.fixturelib.make_historic(
|
|
mongod_options.get("set_parameters", {})
|
|
).copy()
|
|
self.mongod_options["set_parameters"]["migrationLockAcquisitionMaxWaitMS"] = (
|
|
self.mongod_options["set_parameters"].get("migrationLockAcquisitionMaxWaitMS", 30000)
|
|
)
|
|
# Extend time for transactions by default to account for slow machines during testing.
|
|
self.mongod_options["set_parameters"]["maxTransactionLockRequestTimeoutMillis"] = (
|
|
self.mongod_options[
|
|
"set_parameters"
|
|
].get("maxTransactionLockRequestTimeoutMillis", 10 * 1000)
|
|
)
|
|
|
|
# Misc other options for the fixture.
|
|
self.config_shard = config_shard
|
|
self.preserve_dbpath = preserve_dbpath
|
|
self.num_shards = num_shards
|
|
self.num_rs_nodes_per_shard = num_rs_nodes_per_shard
|
|
self.num_mongos = num_mongos
|
|
self.auth_options = auth_options
|
|
self.use_auto_bootstrap_procedure = use_auto_bootstrap_procedure
|
|
self.replica_endpoint_mode = replica_set_endpoint
|
|
self.set_cluster_parameter = set_cluster_parameter
|
|
self.inject_catalog_metadata = inject_catalog_metadata
|
|
self.configsvr_replset_name = configsvr_replset_name
|
|
self.shard_replset_name_prefix = shard_replset_name_prefix
|
|
|
|
# Options for roles - shardsvr, configsvr.
|
|
self.configsvr_options = self.fixturelib.make_historic(
|
|
self.fixturelib.default_if_none(configsvr_options, {})
|
|
)
|
|
self.shard_options = self.fixturelib.make_historic(
|
|
self.fixturelib.default_if_none(shard_options, {})
|
|
)
|
|
|
|
# Logging prefix options.
|
|
# `cluster_logging_prefix` is the logging prefix used in cluster to cluster replication.
|
|
self.cluster_logging_prefix = (
|
|
"" if cluster_logging_prefix is None else f"{cluster_logging_prefix}:"
|
|
)
|
|
self.configsvr_shard_logging_prefix = f"{self.cluster_logging_prefix}configsvr"
|
|
self.rs_shard_logging_prefix = f"{self.cluster_logging_prefix}shard"
|
|
self.mongos_logging_prefix = f"{self.cluster_logging_prefix}mongos"
|
|
|
|
if self.num_rs_nodes_per_shard is None:
|
|
raise TypeError("num_rs_nodes_per_shard must be an integer but found None")
|
|
elif isinstance(self.num_rs_nodes_per_shard, int):
|
|
if self.num_rs_nodes_per_shard <= 0:
|
|
raise ValueError("num_rs_nodes_per_shard must be a positive integer")
|
|
|
|
# Balancer options
|
|
self.enable_balancer = enable_balancer
|
|
self.random_migrations = random_migrations
|
|
if self.random_migrations:
|
|
if not self.enable_balancer:
|
|
raise ValueError(
|
|
"random_migrations can only be enabled when balancer is enabled (enable_balancer=True)"
|
|
)
|
|
|
|
if (
|
|
"failpoint.balancerShouldReturnRandomMigrations"
|
|
in self.mongod_options["set_parameters"]
|
|
):
|
|
raise ValueError(
|
|
"Cannot enable random_migrations because balancerShouldReturnRandomMigrations failpoint is already present in mongod_options"
|
|
)
|
|
|
|
# Enable random migrations failpoint
|
|
self.mongod_options["set_parameters"][
|
|
"failpoint.balancerShouldReturnRandomMigrations"
|
|
] = {"mode": "alwaysOn"}
|
|
|
|
# Reduce migration throttling to increase frequency of random migrations
|
|
self.mongod_options["set_parameters"]["balancerMigrationsThrottlingMs"] = (
|
|
self.mongod_options["set_parameters"].get("balancerMigrationsThrottlingMs", 250)
|
|
) # millis
|
|
|
|
self._dbpath_prefix = os.path.join(
|
|
dbpath_prefix if dbpath_prefix else self._dbpath_prefix, self.config.FIXTURE_SUBDIR
|
|
)
|
|
|
|
self.configsvr = None
|
|
self.mongos = []
|
|
self.shards = []
|
|
# These mongot-related options will be set after each shard has been setup().
|
|
# They're used to connect a sharded cluster's mongos to last launched mongot.
|
|
self.mongotHost = None
|
|
self.searchIndexManagementHostAndPort = None
|
|
|
|
self.is_ready = False
|
|
|
|
def pids(self):
|
|
""":return: pids owned by this fixture if any."""
|
|
out = []
|
|
if self.configsvr is not None:
|
|
out.extend(self.configsvr.pids())
|
|
else:
|
|
self.logger.debug(
|
|
"Config server not running when gathering sharded cluster fixture pids."
|
|
)
|
|
if self.shards is not None:
|
|
for shard in self.shards:
|
|
out.extend(shard.pids())
|
|
else:
|
|
self.logger.debug("No shards when gathering sharded cluster fixture pids.")
|
|
return out
|
|
|
|
def setup(self):
|
|
"""Set up the sharded cluster."""
|
|
|
|
with ThreadPoolExecutor() as executor:
|
|
tasks = []
|
|
|
|
if self.config_shard is None:
|
|
tasks.append(executor.submit(self.configsvr.setup))
|
|
|
|
# Start up each of the shards
|
|
for shard in self.shards:
|
|
tasks.append(executor.submit(shard.setup))
|
|
|
|
# Wait for the setup of all nodes to complete
|
|
for task in as_completed(tasks):
|
|
task.result()
|
|
|
|
for mongos in self.mongos:
|
|
mongos.mongos_options["configdb"] = self.configsvr.get_internal_connection_string()
|
|
|
|
if self.launch_mongot:
|
|
# These mongot parameters are popped from shard.mongod_options when mongod is launched in above
|
|
# setup() call. As such, the final values can't be cleanly copied over from mongod_options, but
|
|
# need to be recreated here.
|
|
if self.mongos[0].mongos_options["set_parameters"].get("useGrpcForSearch"):
|
|
# If mongos & mongod are configured to use egress gRPC for search, then set the
|
|
# `mongotHost` parameter to the mongot listening address expecting communication via
|
|
# the MongoDB gRPC protocol (which we configured in setup_mongot_params).
|
|
self.mongotHost = "localhost:" + str(self.shards[-1].mongot_grpc_port)
|
|
else:
|
|
self.mongotHost = "localhost:" + str(self.shards[-1].mongot_port)
|
|
|
|
self.searchIndexManagementHostAndPort = self.mongotHost
|
|
|
|
for mongos in self.mongos:
|
|
# In search enabled sharded cluster, mongos has to be spun up with a connection string to a
|
|
# mongot in order to issue PlanShardedSearch commands.
|
|
mongos.mongos_options["mongotHost"] = self.mongotHost
|
|
mongos.mongos_options["searchIndexManagementHostAndPort"] = (
|
|
self.searchIndexManagementHostAndPort
|
|
)
|
|
|
|
with ThreadPoolExecutor() as executor:
|
|
tasks = []
|
|
for mongos in self.mongos:
|
|
tasks.append(executor.submit(mongos.setup))
|
|
|
|
# Wait for the setup of all nodes to complete
|
|
for task in as_completed(tasks):
|
|
task.result()
|
|
|
|
def get_rs_fixture_name(self):
|
|
"""Declares the fixture name needed to build the shards of this cluster."""
|
|
return "ReplicaSetFixture"
|
|
|
|
def _all_mongo_d_s_t(self):
|
|
"""Return a list of all `mongo{d,s,t}` `Process` instances in this fixture."""
|
|
# When config_shard is None, we have an additional replset for the configsvr.
|
|
all_nodes = [self.configsvr] if self.config_shard is None else []
|
|
all_nodes += self.mongos
|
|
all_nodes += self.shards
|
|
return sum([node._all_mongo_d_s_t() for node in all_nodes], [])
|
|
|
|
def _all_mongots(self):
|
|
"""Return a list of all `mongot` `Process` instances in this fixture."""
|
|
mongot_processes = []
|
|
for replicaset in self.shards:
|
|
mongot_processes.extend(replicaset._all_mongots())
|
|
return mongot_processes
|
|
|
|
def refresh_logical_session_cache(self, target):
|
|
"""Refresh logical session cache with no timeout."""
|
|
primary = target.get_primary().mongo_client()
|
|
try:
|
|
primary.admin.command({"refreshLogicalSessionCacheNow": 1})
|
|
except pymongo.errors.OperationFailure as err:
|
|
if err.code != self._WRITE_CONCERN_FAILED:
|
|
raise err
|
|
self.logger.info(
|
|
"Ignoring write concern timeout for refreshLogicalSessionCacheNow "
|
|
"command and continuing to wait"
|
|
)
|
|
target.await_last_op_committed(target.AWAIT_REPL_TIMEOUT_FOREVER_MINS * 60)
|
|
|
|
def get_shard_ids(self):
|
|
"""Get the list of shard ids in the cluster."""
|
|
client = interface.build_client(self, self.auth_options)
|
|
res = client.admin.command("listShards")
|
|
return [shard_info["_id"] for shard_info in res["shards"]]
|
|
|
|
def await_ready(self):
|
|
"""Block until the fixture can be used for testing."""
|
|
with ThreadPoolExecutor() as executor:
|
|
tasks = []
|
|
|
|
# Wait for the config server
|
|
if self.configsvr is not None and self.config_shard is None:
|
|
tasks.append(executor.submit(self.configsvr.await_ready))
|
|
|
|
# Wait for each of the shards
|
|
for shard in self.shards:
|
|
tasks.append(executor.submit(shard.await_ready))
|
|
|
|
for mongos in self.mongos:
|
|
tasks.append(executor.submit(mongos.await_ready))
|
|
|
|
# Wait for all the nodes to be ready
|
|
for task in as_completed(tasks):
|
|
task.result()
|
|
|
|
client = interface.build_client(self, self.auth_options)
|
|
|
|
# Turn off the balancer if it is not meant to be enabled.
|
|
if not self.enable_balancer:
|
|
self.stop_balancer(join_migrations=False)
|
|
|
|
# Inform mongos about each of the shards
|
|
for idx, shard in enumerate(self.shards):
|
|
self._add_shard(client, shard, self.config_shard == idx)
|
|
|
|
# Ensure that all CSRS nodes are up to date. This is strictly needed for tests that use
|
|
# multiple mongoses. In those cases, the first mongos initializes the contents of the config
|
|
# database, but without waiting for those writes to replicate to all the config servers then
|
|
# the secondary mongoses risk reading from a stale config server and seeing an empty config
|
|
# database.
|
|
self.configsvr.await_last_op_committed()
|
|
|
|
# Ensure that the sessions collection gets auto-sharded by the config server
|
|
if self.configsvr is not None:
|
|
primary_mongo_client = self.configsvr.get_primary().mongo_client()
|
|
refresh_logical_session_cache_with_retry(primary_mongo_client, self.configsvr)
|
|
|
|
for shard in self.shards:
|
|
self.refresh_logical_session_cache(shard)
|
|
|
|
if self.set_cluster_parameter:
|
|
self.run_set_cluster_parameter()
|
|
|
|
if self.inject_catalog_metadata:
|
|
csrs_client = interface.build_client(self.configsvr, self.auth_options)
|
|
inject_catalog_metadata_on_the_csrs(csrs_client, self.inject_catalog_metadata)
|
|
|
|
self.is_ready = True
|
|
|
|
def run_set_cluster_parameter(self):
|
|
"""Set a cluster parameter for the fixture."""
|
|
client = interface.build_client(self, self.auth_options)
|
|
command_request = {
|
|
"setClusterParameter": {
|
|
self.set_cluster_parameter["parameter"]: self.set_cluster_parameter["value"]
|
|
},
|
|
}
|
|
client.admin.command(command_request)
|
|
|
|
# Make sure all mongos are aware of the new parameter values before tests are ran. This is
|
|
# needed because mongos only refresh their cache of cluster parameters periodically.
|
|
# Running getClusterParameter on a router causes it to refresh its cache.
|
|
for mongos in self.mongos:
|
|
mongos.mongo_client().admin.command(
|
|
{"getClusterParameter": self.set_cluster_parameter["parameter"]}
|
|
)
|
|
|
|
def stop_balancer(self, timeout_ms=300000, join_migrations=True):
|
|
"""Stop the balancer."""
|
|
client = interface.build_client(self, self.auth_options)
|
|
client.admin.command({"balancerStop": 1}, maxTimeMS=timeout_ms)
|
|
if join_migrations:
|
|
for shard in self.shards:
|
|
shard_client = interface.build_client(shard.get_primary(), self.auth_options)
|
|
shard_client.admin.command({"_shardsvrJoinMigrations": 1})
|
|
self.logger.info("Stopped the balancer")
|
|
|
|
def start_balancer(self, timeout_ms=300000):
|
|
"""Start the balancer."""
|
|
client = interface.build_client(self, self.auth_options)
|
|
client.admin.command({"balancerStart": 1}, maxTimeMS=timeout_ms)
|
|
self.logger.info("Started the balancer")
|
|
|
|
def feature_flag_present_and_enabled(self, feature_flag_name):
|
|
full_ff_name = f"featureFlag{feature_flag_name}"
|
|
csrs_client = interface.build_client(self.configsvr, self.auth_options)
|
|
try:
|
|
res = csrs_client.admin.command({"getParameter": 1, full_ff_name: 1})
|
|
return bool(res[full_ff_name]["value"])
|
|
except pymongo.errors.OperationFailure as err:
|
|
if err.code == 72: # InvalidOptions
|
|
# The feature flag is not present
|
|
return False
|
|
raise err
|
|
|
|
def get_shard_object(self, conn_string):
|
|
"""Return the shard object that matches the shard referenced in conn_string."""
|
|
hosts = conn_string.split("/")[1].split(",")
|
|
ports = []
|
|
for host in hosts:
|
|
ports += [host.split(":")[1]]
|
|
|
|
for shard in self.shards:
|
|
shard_info = shard.get_node_info()
|
|
for node in shard_info:
|
|
if str(node.port) in ports:
|
|
return shard
|
|
|
|
def _do_teardown(self, finished=False, mode=None):
|
|
"""Shut down the sharded cluster."""
|
|
self.logger.info("Stopping all members of the sharded cluster...")
|
|
|
|
if finished and self.load_all_extensions and self.loaded_extensions:
|
|
delete_extension_configs(self.loaded_extensions, self.logger)
|
|
|
|
running_at_start = self.is_running()
|
|
if not running_at_start:
|
|
self.logger.warning(
|
|
"All members of the sharded cluster were expected to be running, " "but weren't."
|
|
)
|
|
|
|
# If we're killing or aborting to archive data files, stopping the balancer will execute
|
|
# server commands that might lead to on-disk changes from the point of failure.
|
|
if self.enable_balancer and mode not in (
|
|
interface.TeardownMode.KILL,
|
|
interface.TeardownMode.ABORT,
|
|
):
|
|
self.stop_balancer()
|
|
|
|
teardown_handler = interface.FixtureTeardownHandler(self.logger)
|
|
|
|
with ThreadPoolExecutor() as executor:
|
|
tasks = []
|
|
for mongos in self.mongos:
|
|
tasks.append(executor.submit(teardown_handler.teardown, mongos, "mongos", mode))
|
|
# Wait for all mongod teardown tasks to complete
|
|
for task in as_completed(tasks):
|
|
task.result()
|
|
|
|
tasks = []
|
|
for shard in self.shards:
|
|
if shard is self.configsvr:
|
|
continue
|
|
tasks.append(executor.submit(teardown_handler.teardown, shard, "shard", mode))
|
|
|
|
if self.configsvr is not None:
|
|
tasks.append(
|
|
executor.submit(
|
|
teardown_handler.teardown, self.configsvr, "config server", mode
|
|
)
|
|
)
|
|
# Wait for all mongod teardown tasks to complete
|
|
for task in as_completed(tasks):
|
|
task.result()
|
|
|
|
if teardown_handler.was_successful():
|
|
self.logger.info("Successfully stopped all members of the sharded cluster.")
|
|
else:
|
|
self.logger.error("Stopping the sharded cluster fixture failed.")
|
|
raise self.fixturelib.ServerFailure(teardown_handler.get_error_message())
|
|
|
|
self.is_ready = False
|
|
|
|
def is_running(self):
|
|
"""Return true if all nodes in the cluster are all still operating."""
|
|
return (
|
|
self.configsvr is not None
|
|
and self.configsvr.is_running()
|
|
and all(
|
|
shard.is_running() for shard in self.shards if not shard.removeshard_teardown_marker
|
|
)
|
|
and all(mongos.is_running() for mongos in self.mongos)
|
|
)
|
|
|
|
def get_internal_connection_string(self):
|
|
"""Return the internal connection string."""
|
|
if self.mongos is None:
|
|
raise ValueError("Must call setup() before calling get_internal_connection_string()")
|
|
|
|
return ",".join([mongos.get_internal_connection_string() for mongos in self.mongos])
|
|
|
|
def get_shell_connection_string(self, use_grpc=False):
|
|
if self.mongos is None:
|
|
raise ValueError("Must call setup() before calling get_shell_connection_string()")
|
|
|
|
return ",".join([mongos.get_shell_connection_string(use_grpc) for mongos in self.mongos])
|
|
|
|
def _get_replica_set_endpoint(self):
|
|
# The replica set endpoint would only become active after the replica set has become a
|
|
# config shard (i.e. after the addShard or transitionFromConfigServer step) so before
|
|
# that we must connect to a mongos or the router port of a mongod to run sharding
|
|
# commands.
|
|
if len(self.shards) == 0:
|
|
raise ValueError(
|
|
"Must call install_rs_shard() before calling get_internal_connection_string()"
|
|
)
|
|
if len(self.shards) > 1:
|
|
raise ValueError("Cannot use replica set endpoint on a multi-shard cluster")
|
|
return self.shards[0]
|
|
|
|
def get_shell_connection_url(self):
|
|
"""Return the driver connection URL."""
|
|
if self.is_ready and self.replica_endpoint_mode:
|
|
return self._get_replica_set_endpoint().get_shell_connection_url()
|
|
|
|
return "mongodb://" + self.get_shell_connection_string()
|
|
|
|
def get_driver_connection_url(self):
|
|
"""Return the driver connection URL."""
|
|
if self.is_ready and self.replica_endpoint_mode:
|
|
return self._get_replica_set_endpoint().get_driver_connection_url()
|
|
|
|
return "mongodb://" + self.get_internal_connection_string()
|
|
|
|
def get_node_info(self):
|
|
"""Return a list of dicts of NodeInfo objects."""
|
|
output = []
|
|
for shard in self.shards:
|
|
if shard is self.configsvr:
|
|
continue
|
|
output += shard.get_node_info()
|
|
for mongos in self.mongos:
|
|
output += mongos.get_node_info()
|
|
return output + self.configsvr.get_node_info()
|
|
|
|
def get_configsvr_logger(self):
|
|
"""Return a new logging.Logger instance used for a config server shard."""
|
|
return self.fixturelib.new_fixture_node_logger(
|
|
self.__class__.__name__, self.job_num, self.configsvr_shard_logging_prefix
|
|
)
|
|
|
|
def get_configsvr_kwargs(self):
|
|
"""Return args to create replicaset.ReplicaSetFixture configured as the config server."""
|
|
configsvr_options = self.configsvr_options.copy()
|
|
auth_options = configsvr_options.pop("auth_options", self.auth_options)
|
|
preserve_dbpath = configsvr_options.pop("preserve_dbpath", self.preserve_dbpath)
|
|
num_nodes = configsvr_options.pop("num_nodes", 1)
|
|
|
|
replset_config_options = configsvr_options.pop("replset_config_options", {})
|
|
replset_config_options["configsvr"] = True
|
|
|
|
mongod_options = self.mongod_options.copy()
|
|
mongod_options = self.fixturelib.merge_mongo_option_dicts(
|
|
mongod_options,
|
|
self.fixturelib.make_historic(configsvr_options.pop("mongod_options", {})),
|
|
)
|
|
mongod_options["configsvr"] = ""
|
|
mongod_options["dbpath"] = os.path.join(self._dbpath_prefix, "config")
|
|
mongod_options["replSet"] = self.configsvr_replset_name
|
|
mongod_options["storageEngine"] = "wiredTiger"
|
|
|
|
return {
|
|
"mongod_options": mongod_options,
|
|
"mongod_executable": self.mongod_executable,
|
|
"preserve_dbpath": preserve_dbpath,
|
|
"num_nodes": num_nodes,
|
|
"auth_options": auth_options,
|
|
"replset_config_options": replset_config_options,
|
|
"shard_logging_prefix": self.configsvr_shard_logging_prefix,
|
|
**configsvr_options,
|
|
}
|
|
|
|
def install_configsvr(self, configsvr):
|
|
"""Install a configsvr. Called by a builder."""
|
|
self.configsvr = configsvr
|
|
|
|
def _get_rs_shard_logging_prefix(self, index):
|
|
"""Return replica set shard logging prefix."""
|
|
return f"{self.rs_shard_logging_prefix}{index}"
|
|
|
|
def get_rs_shard_logger(self, index):
|
|
"""Return a new logging.Logger instance used for a replica set shard."""
|
|
shard_logging_prefix = self._get_rs_shard_logging_prefix(index)
|
|
return self.fixturelib.new_fixture_node_logger(
|
|
self.__class__.__name__, self.job_num, shard_logging_prefix
|
|
)
|
|
|
|
def get_rs_shard_kwargs(self, index):
|
|
"""Return args to create replicaset.ReplicaSetFixture configured as a shard in a sharded cluster."""
|
|
shard_options = self.shard_options.copy()
|
|
|
|
auth_options = shard_options.pop("auth_options", self.auth_options)
|
|
preserve_dbpath = shard_options.pop("preserve_dbpath", self.preserve_dbpath)
|
|
|
|
replset_config_options = shard_options.pop("replset_config_options", {})
|
|
replset_config_options = replset_config_options.copy()
|
|
replset_config_options["configsvr"] = False
|
|
|
|
mongod_options = self.mongod_options.copy()
|
|
mongod_options = self.fixturelib.merge_mongo_option_dicts(
|
|
mongod_options, self.fixturelib.make_historic(shard_options.pop("mongod_options", {}))
|
|
)
|
|
mongod_options["shardsvr"] = ""
|
|
mongod_options["dbpath"] = os.path.join(self._dbpath_prefix, "shard{}".format(index))
|
|
mongod_options["replSet"] = self.shard_replset_name_prefix + str(index)
|
|
|
|
if self.config_shard == index:
|
|
del mongod_options["shardsvr"]
|
|
mongod_options["configsvr"] = ""
|
|
replset_config_options["configsvr"] = True
|
|
mongod_options["storageEngine"] = "wiredTiger"
|
|
|
|
configsvr_options = self.configsvr_options.copy()
|
|
|
|
if "mongod_options" in configsvr_options:
|
|
mongod_options = self.fixturelib.merge_mongo_option_dicts(
|
|
mongod_options, configsvr_options["mongod_options"]
|
|
)
|
|
if "replset_config_options" in configsvr_options:
|
|
replset_config_options = self.fixturelib.merge_mongo_option_dicts(
|
|
replset_config_options, configsvr_options["replset_config_options"]
|
|
)
|
|
|
|
for option, value in configsvr_options.items():
|
|
if option in ("num_nodes", "mongod_options", "replset_config_options"):
|
|
continue
|
|
if option in shard_options:
|
|
if shard_options[option] != value:
|
|
raise Exception(
|
|
"Conflicting values when combining shard and configsvr options"
|
|
)
|
|
else:
|
|
shard_options[option] = value
|
|
|
|
use_auto_bootstrap_procedure = (
|
|
self.use_auto_bootstrap_procedure and self.config_shard == index
|
|
)
|
|
shard_logging_prefix = self._get_rs_shard_logging_prefix(index)
|
|
|
|
return {
|
|
"mongod_options": mongod_options,
|
|
"mongod_executable": self.mongod_executable,
|
|
"auth_options": auth_options,
|
|
"preserve_dbpath": preserve_dbpath,
|
|
"replset_config_options": replset_config_options,
|
|
"shard_logging_prefix": shard_logging_prefix,
|
|
"use_auto_bootstrap_procedure": use_auto_bootstrap_procedure,
|
|
**shard_options,
|
|
}
|
|
|
|
def install_rs_shard(self, rs_shard):
|
|
"""Install a replica set shard. Called by a builder."""
|
|
self.shards.append(rs_shard)
|
|
|
|
def get_mongos_logger(self, index, total):
|
|
"""Return a new logging.Logger instance used for a mongos."""
|
|
logger_name = (
|
|
self.mongos_logging_prefix if total == 1 else f"{self.mongos_logging_prefix}{index}"
|
|
)
|
|
return self.fixturelib.new_fixture_node_logger(
|
|
self.__class__.__name__, self.job_num, logger_name
|
|
)
|
|
|
|
def get_mongos_kwargs(self):
|
|
"""Return options that may be passed to a mongos."""
|
|
mongos_options = self.mongos_options.copy()
|
|
if _config.DOCKER_COMPOSE_BUILD_IMAGES:
|
|
# Suites generating Docker Compose resources need to retrieve the connection string to the config server
|
|
# while constructing this fixture.
|
|
mongos_options["configdb"] = self.configsvr.get_internal_connection_string()
|
|
else:
|
|
# Regular test suite execution: the connection string will be assigned at ShardedClusterFixture.setup() time,
|
|
# after ensuring that the stack of nodes and services backing the config server replica set has been correctly started.
|
|
mongos_options["configdb"] = None
|
|
if self.config_shard is not None:
|
|
if "set_parameters" not in mongos_options:
|
|
mongos_options["set_parameters"] = {}
|
|
mongos_options["set_parameters"] = mongos_options.get(
|
|
"set_parameters", self.fixturelib.make_historic({})
|
|
).copy()
|
|
return {"dbpath_prefix": self._dbpath_prefix, "mongos_options": mongos_options}
|
|
|
|
def install_mongos(self, mongos):
|
|
"""Install a mongos. Called by a builder."""
|
|
self.mongos.append(mongos)
|
|
|
|
def _add_shard(self, client, shard, is_config_shard):
|
|
"""
|
|
Add the specified program as a shard by executing the addShard command.
|
|
|
|
See https://docs.mongodb.org/manual/reference/command/addShard for more details.
|
|
"""
|
|
connection_string = shard.get_internal_connection_string()
|
|
if is_config_shard:
|
|
if not self.use_auto_bootstrap_procedure:
|
|
self.logger.info("Adding %s as config shard...", connection_string)
|
|
client.admin.command({"transitionFromDedicatedConfigServer": 1})
|
|
else:
|
|
self.logger.info("Adding %s as a shard...", connection_string)
|
|
client.admin.command({"addShard": connection_string})
|
|
|
|
def internode_validation(self):
|
|
for replicaset in self.shards:
|
|
replicaset.internode_validation()
|
|
|
|
|
|
class ExternalShardedClusterFixture(external.ExternalFixture, ShardedClusterFixture):
|
|
"""Fixture to interact with external sharded cluster fixture."""
|
|
|
|
REGISTERED_NAME = "ExternalShardedClusterFixture"
|
|
|
|
def __init__(self, logger, job_num, fixturelib, original_suite_name):
|
|
"""Initialize ExternalShardedClusterFixture."""
|
|
self.dummy_fixture = _builder.make_dummy_fixture(original_suite_name)
|
|
self.shell_conn_string = "mongodb://" + ",".join(
|
|
[f"mongos{i}:27017" for i in range(self.dummy_fixture.num_mongos)]
|
|
)
|
|
|
|
external.ExternalFixture.__init__(self, logger, job_num, fixturelib, self.shell_conn_string)
|
|
ShardedClusterFixture.__init__(self, logger, job_num, fixturelib, mongod_options={})
|
|
|
|
def setup(self):
|
|
"""Execute some setup before offically starting testing against this external cluster."""
|
|
client = pymongo.MongoClient(self.get_driver_connection_url())
|
|
for i in range(50):
|
|
if i == 49:
|
|
raise RuntimeError("Sharded Cluster setup has timed out.")
|
|
payload = client.admin.command({"listShards": 1})
|
|
if len(payload["shards"]) == self.dummy_fixture.num_shards:
|
|
print("Sharded Cluster available.")
|
|
break
|
|
if len(payload["shards"]) < self.dummy_fixture.num_shards:
|
|
print("Waiting for shards to be added to cluster.")
|
|
time.sleep(5)
|
|
continue
|
|
if len(payload["shards"]) > self.dummy_fixture.num_shards:
|
|
raise RuntimeError("More shards in cluster than expected.")
|
|
|
|
def pids(self):
|
|
"""Use ExternalFixture method."""
|
|
return external.ExternalFixture.pids(self)
|
|
|
|
def await_ready(self):
|
|
"""Use ExternalFixture method."""
|
|
return external.ExternalFixture.await_ready(self)
|
|
|
|
def _do_teardown(self, finished=False, mode=None):
|
|
"""Use ExternalFixture method."""
|
|
return external.ExternalFixture._do_teardown(self)
|
|
|
|
def _is_process_running(self):
|
|
"""Use ExternalFixture method."""
|
|
return external.ExternalFixture._is_process_running(self)
|
|
|
|
def is_running(self):
|
|
"""Use ExternalFixture method."""
|
|
return external.ExternalFixture.is_running(self)
|
|
|
|
def get_internal_connection_string(self):
|
|
"""Use ExternalFixture method."""
|
|
return external.ExternalFixture.get_internal_connection_string(self)
|
|
|
|
def get_driver_connection_url(self):
|
|
"""Use ExternalFixture method."""
|
|
return external.ExternalFixture.get_driver_connection_url(self)
|
|
|
|
def get_shell_connection_url(self):
|
|
"""Use ExternalFixture method."""
|
|
return external.ExternalFixture.get_shell_connection_url(self)
|
|
|
|
def get_node_info(self):
|
|
"""Use ExternalFixture method."""
|
|
return external.ExternalFixture.get_node_info(self)
|
|
|
|
|
|
class _MongoSFixture(interface.Fixture, interface._DockerComposeInterface):
|
|
"""Fixture which provides JSTests with a mongos to connect to."""
|
|
|
|
def __init__(
|
|
self,
|
|
logger,
|
|
job_num,
|
|
fixturelib,
|
|
dbpath_prefix,
|
|
mongos_executable=None,
|
|
mongos_options=None,
|
|
add_feature_flags=False,
|
|
):
|
|
"""Initialize _MongoSFixture."""
|
|
|
|
interface.Fixture.__init__(self, logger, job_num, fixturelib)
|
|
|
|
self.config = self.fixturelib.get_config()
|
|
|
|
# Default to command line options if the YAML configuration is not passed in.
|
|
self.mongos_executable = self.fixturelib.default_if_none(
|
|
mongos_executable, self.config.MONGOS_EXECUTABLE
|
|
)
|
|
|
|
self.mongos_options = self.fixturelib.make_historic(
|
|
self.fixturelib.default_if_none(mongos_options, {})
|
|
).copy()
|
|
|
|
if add_feature_flags:
|
|
for ff in self.config.ENABLED_FEATURE_FLAGS:
|
|
self.mongos_options["set_parameters"][ff] = "true"
|
|
|
|
self.mongos = None
|
|
self.port = None
|
|
self.grpcPort = None
|
|
self.port = fixturelib.get_next_port(job_num)
|
|
self.mongos_options["port"] = self.port
|
|
if "featureFlagGRPC" in self.config.ENABLED_FEATURE_FLAGS:
|
|
self.grpcPort = fixturelib.get_next_port(job_num)
|
|
self.mongos_options["grpcPort"] = self.grpcPort
|
|
|
|
self._dbpath_prefix = dbpath_prefix
|
|
|
|
def setup(self):
|
|
"""Set up the sharded cluster."""
|
|
|
|
if self.config.ALWAYS_USE_LOG_FILES:
|
|
self.mongos_options["logpath"] = self._dbpath_prefix + "/{name}.log".format(
|
|
name=self.logger.name
|
|
)
|
|
self.mongos_options["logappend"] = ""
|
|
|
|
launcher = MongosLauncher(self.fixturelib)
|
|
mongos, _ = launcher.launch_mongos_program(
|
|
self.logger,
|
|
self.job_num,
|
|
executable=self.mongos_executable,
|
|
mongos_options=self.mongos_options,
|
|
)
|
|
self.mongos_options["port"] = self.port
|
|
try:
|
|
self.logger.info("Starting mongos on port %d...\n%s", self.port, mongos.as_command())
|
|
mongos.start()
|
|
self.logger.info("mongos started on port %d with pid %d.", self.port, mongos.pid)
|
|
except Exception as err:
|
|
msg = "Failed to start mongos on port {:d}: {}".format(self.port, err)
|
|
self.logger.exception(msg)
|
|
raise self.fixturelib.ServerFailure(msg)
|
|
|
|
self.mongos = mongos
|
|
|
|
def _all_mongo_d_s_t(self):
|
|
"""Return the standalone `mongos` `Process` instance."""
|
|
return [self]
|
|
|
|
def pids(self):
|
|
""":return: pids owned by this fixture if any."""
|
|
if self.mongos is not None:
|
|
return [self.mongos.pid]
|
|
else:
|
|
self.logger.debug("Mongos not running when gathering mongos fixture pids.")
|
|
return []
|
|
|
|
def is_from_configsvr(self):
|
|
"""Return false, because this router is not part of a mongod."""
|
|
return False
|
|
|
|
def await_ready(self):
|
|
"""Block until the fixture can be used for testing."""
|
|
deadline = time.time() + interface.Fixture.AWAIT_READY_TIMEOUT_SECS
|
|
|
|
# Wait until the mongos is accepting connections. The retry logic is necessary to support
|
|
# versions of PyMongo <3.0 that immediately raise a ConnectionFailure if a connection cannot
|
|
# be established.
|
|
while True:
|
|
# Check whether the mongos exited for some reason.
|
|
exit_code = self.mongos.poll()
|
|
if exit_code is not None:
|
|
raise self.fixturelib.ServerFailure(
|
|
"Could not connect to mongos on port {}, process ended"
|
|
" unexpectedly with code {}.".format(self.port, exit_code)
|
|
)
|
|
|
|
try:
|
|
# Use a shorter connection timeout to more closely satisfy the requested deadline.
|
|
client = self.mongo_client(timeout_millis=500)
|
|
client.admin.command("ping")
|
|
break
|
|
except pymongo.errors.ConnectionFailure:
|
|
remaining = deadline - time.time()
|
|
if remaining <= 0.0:
|
|
raise self.fixturelib.ServerFailure(
|
|
"Failed to connect to mongos on port {} after {} seconds".format(
|
|
self.port, interface.Fixture.AWAIT_READY_TIMEOUT_SECS
|
|
)
|
|
)
|
|
|
|
self.logger.info("Waiting to connect to mongos on port %d.", self.port)
|
|
time.sleep(0.1) # Wait a little bit before trying again.
|
|
|
|
self.logger.info("Successfully contacted the mongos on port %d.", self.port)
|
|
|
|
def _do_teardown(self, finished=False, mode=None):
|
|
if self.config.NOOP_MONGO_D_S_PROCESSES:
|
|
self.logger.info(
|
|
"This is running against an External System Under Test setup with `docker-compose.yml` -- skipping teardown."
|
|
)
|
|
return
|
|
|
|
if self.mongos is None:
|
|
self.logger.warning("The mongos fixture has not been set up yet.")
|
|
return # Teardown is still a success even if nothing is running.
|
|
|
|
if mode == interface.TeardownMode.ABORT:
|
|
self.logger.info(
|
|
"Attempting to send SIGABRT from resmoke to mongos on port %d with pid %d...",
|
|
self.port,
|
|
self.mongos.pid,
|
|
)
|
|
else:
|
|
self.logger.info(
|
|
"Stopping mongos on port %d with pid %d...", self.port, self.mongos.pid
|
|
)
|
|
if not self.is_running():
|
|
exit_code = self.mongos.poll()
|
|
msg = (
|
|
"mongos on port {:d} was expected to be running, but wasn't. "
|
|
"Process exited with code {:d}"
|
|
).format(self.port, exit_code)
|
|
self.logger.warning(msg)
|
|
raise self.fixturelib.ServerFailure(msg)
|
|
|
|
self.mongos.stop(mode=mode)
|
|
exit_code = self.mongos.wait()
|
|
|
|
# Python's subprocess module returns negative versions of system calls.
|
|
if exit_code == 0 or (mode is not None and exit_code == -(mode.value)):
|
|
self.logger.info("Successfully stopped the mongos on port {:d}".format(self.port))
|
|
else:
|
|
self.logger.warning(
|
|
"Stopped the mongos on port {:d}. " "Process exited with code {:d}.".format(
|
|
self.port, exit_code
|
|
)
|
|
)
|
|
raise self.fixturelib.ServerFailure(
|
|
"mongos on port {:d} with pid {:d} exited with code {:d}".format(
|
|
self.port, self.mongos.pid, exit_code
|
|
)
|
|
)
|
|
|
|
def is_running(self):
|
|
"""Return true if the cluster is still operating."""
|
|
return self.mongos is not None and self.mongos.poll() is None
|
|
|
|
def _get_hostname(self):
|
|
return (
|
|
self.logger.external_sut_hostname
|
|
if self.config.NOOP_MONGO_D_S_PROCESSES
|
|
else "localhost"
|
|
)
|
|
|
|
def get_internal_connection_string(self):
|
|
"""Return the internal connection string."""
|
|
return f"{self._get_hostname()}:{self.port}"
|
|
|
|
def get_shell_connection_string(self, use_grpc=False):
|
|
port = self.port if not (self.config.SHELL_GRPC or use_grpc) else self.grpcPort
|
|
return f"{self._get_hostname()}:{port}"
|
|
|
|
def get_shell_connection_url(self):
|
|
return "mongodb://" + self.get_shell_connection_string()
|
|
|
|
def get_driver_connection_url(self):
|
|
"""Return the driver connection URL."""
|
|
return "mongodb://" + self.get_internal_connection_string()
|
|
|
|
def get_node_info(self):
|
|
"""Return a list of NodeInfo objects."""
|
|
if self.mongos is None:
|
|
self.logger.warning("The mongos fixture has not been set up yet.")
|
|
return []
|
|
|
|
info = interface.NodeInfo(
|
|
full_name=self.logger.full_name,
|
|
name=self.logger.name,
|
|
port=self.port,
|
|
pid=self.mongos.pid,
|
|
)
|
|
return [info]
|
|
|
|
|
|
# Default shutdown quiesce mode duration for mongos
|
|
DEFAULT_MONGOS_SHUTDOWN_TIMEOUT_MILLIS = 0
|
|
|
|
# The default verbosity setting for any tests that are not started with an Evergreen task id. This
|
|
# will apply to any tests run locally.
|
|
DEFAULT_MONGOS_LOG_COMPONENT_VERBOSITY = {"transaction": 3}
|
|
|
|
# The default verbosity setting for any tests running in Evergreen i.e. started with an Evergreen
|
|
# task id.
|
|
DEFAULT_EVERGREEN_MONGOS_LOG_COMPONENT_VERBOSITY = {"transaction": 3}
|
|
|
|
|
|
class MongosLauncher(object):
|
|
"""Class with utilities for launching a mongos."""
|
|
|
|
def __init__(self, fixturelib):
|
|
"""Initialize MongosLauncher."""
|
|
self.fixturelib = fixturelib
|
|
self.config = fixturelib.get_config()
|
|
|
|
def default_mongos_log_component_verbosity(self):
|
|
"""Return the default 'logComponentVerbosity' value to use for mongos processes."""
|
|
if self.config.EVERGREEN_TASK_ID:
|
|
return DEFAULT_EVERGREEN_MONGOS_LOG_COMPONENT_VERBOSITY
|
|
return DEFAULT_MONGOS_LOG_COMPONENT_VERBOSITY
|
|
|
|
def launch_mongos_program(
|
|
self, logger, job_num, executable=None, process_kwargs=None, mongos_options=None
|
|
):
|
|
"""Return a Process instance that starts a mongos with arguments constructed from 'kwargs'."""
|
|
|
|
executable = self.fixturelib.default_if_none(
|
|
executable, self.config.DEFAULT_MONGOS_EXECUTABLE
|
|
)
|
|
|
|
# Apply the --setParameter command line argument. Command line options to resmoke.py override
|
|
# the YAML configuration.
|
|
suite_set_parameters = mongos_options.setdefault("set_parameters", {})
|
|
|
|
if self.config.MONGOS_SET_PARAMETERS is not None:
|
|
suite_set_parameters.update(yaml.safe_load(self.config.MONGOS_SET_PARAMETERS))
|
|
|
|
if "mongotHost" in mongos_options:
|
|
suite_set_parameters["mongotHost"] = mongos_options.pop("mongotHost")
|
|
suite_set_parameters["searchIndexManagementHostAndPort"] = mongos_options.pop(
|
|
"searchIndexManagementHostAndPort"
|
|
)
|
|
|
|
# Use a higher timeout finding a host to avoid spurious failures on slow machines.
|
|
if "defaultConfigCommandTimeoutMS" not in suite_set_parameters:
|
|
suite_set_parameters["defaultConfigCommandTimeoutMS"] = 5 * 60 * 1000
|
|
|
|
# Set default log verbosity levels if none were specified.
|
|
if "logComponentVerbosity" not in suite_set_parameters:
|
|
suite_set_parameters["logComponentVerbosity"] = (
|
|
self.default_mongos_log_component_verbosity()
|
|
)
|
|
|
|
# Set default shutdown timeout millis if none was specified.
|
|
if "mongosShutdownTimeoutMillisForSignaledShutdown" not in suite_set_parameters:
|
|
suite_set_parameters["mongosShutdownTimeoutMillisForSignaledShutdown"] = (
|
|
DEFAULT_MONGOS_SHUTDOWN_TIMEOUT_MILLIS
|
|
)
|
|
|
|
# If a JS_GC_ZEAL value has been provided in the configuration under MOZJS_JS_GC_ZEAL,
|
|
# we inject this value directly as an environment variable to be passed to the spawned
|
|
# mongos process.
|
|
if self.config.MOZJS_JS_GC_ZEAL:
|
|
process_kwargs = self.fixturelib.default_if_none(process_kwargs, {}).copy()
|
|
env_vars = process_kwargs.setdefault("env_vars", {}).copy()
|
|
env_vars.setdefault("JS_GC_ZEAL", self.config.MOZJS_JS_GC_ZEAL)
|
|
process_kwargs["env_vars"] = env_vars
|
|
|
|
_add_testing_set_parameters(suite_set_parameters)
|
|
|
|
return self.fixturelib.mongos_program(
|
|
logger, job_num, executable, process_kwargs, mongos_options
|
|
)
|
|
|
|
|
|
def _add_testing_set_parameters(suite_set_parameters):
|
|
"""
|
|
Add certain behaviors should only be enabled for resmoke usage.
|
|
|
|
These are traditionally enable new commands, insecure access, and increased diagnostics.
|
|
"""
|
|
suite_set_parameters.setdefault("testingDiagnosticsEnabled", True)
|
|
suite_set_parameters.setdefault("enableTestCommands", True)
|