mirror of https://github.com/mongodb/mongo
SERVER-112193 Introduce new suites for dynamic execution control testing (#43196)
GitOrigin-RevId: 8e4dba699fe74d923723e519cee76d04e4a398e6
This commit is contained in:
parent
0cb63f346c
commit
cfe98b0e96
|
|
@ -124,6 +124,7 @@ WORKSPACE.bazel @10gen/devprod-build @svc-auto-approve-bot
|
|||
/buildscripts/resmokeconfig/matrix_suites/generated_suites/*primary_driven_index_builds.yml @10gen/server-index-builds @svc-auto-approve-bot
|
||||
/buildscripts/resmokeconfig/matrix_suites/generated_suites/*query_shape_hash_stability* @10gen/query-execution-query-settings @svc-auto-approve-bot
|
||||
/buildscripts/resmokeconfig/matrix_suites/generated_suites/replica_sets_recordids_replicated_jscore_passthrough.yml @10gen/server-collection-write-path @svc-auto-approve-bot
|
||||
/buildscripts/resmokeconfig/matrix_suites/generated_suites/*with_dynamic_concurrency_adjustment_algorithm.yml @10gen/server-workload-resilience @svc-auto-approve-bot
|
||||
/buildscripts/resmokeconfig/matrix_suites/generated_suites/transitioning_replica_sets_jscore_passthrough.yml @10gen/server-catalog-and-routing @svc-auto-approve-bot
|
||||
/buildscripts/resmokeconfig/matrix_suites/generated_suites/unified_write_executor* @10gen/query-execution-router @svc-auto-approve-bot
|
||||
|
||||
|
|
@ -153,6 +154,7 @@ WORKSPACE.bazel @10gen/devprod-build @svc-auto-approve-bot
|
|||
/buildscripts/resmokeconfig/matrix_suites/mappings/**/*primary_driven_index_builds.yml @10gen/server-index-builds @svc-auto-approve-bot
|
||||
/buildscripts/resmokeconfig/matrix_suites/mappings/**/replica_sets_kill_secondaries_jscore_passthrough.yml @10gen/server-replication-reviewers @svc-auto-approve-bot
|
||||
/buildscripts/resmokeconfig/matrix_suites/mappings/**/replica_sets_recordids_replicated_jscore_passthrough.yml @10gen/server-collection-write-path @svc-auto-approve-bot
|
||||
/buildscripts/resmokeconfig/matrix_suites/mappings/**/*with_dynamic_concurrency_adjustment_algorithm.yml @10gen/server-workload-resilience @svc-auto-approve-bot
|
||||
/buildscripts/resmokeconfig/matrix_suites/mappings/**/transitioning_replica_sets_jscore_passthrough.yml @10gen/server-catalog-and-routing @svc-auto-approve-bot
|
||||
/buildscripts/resmokeconfig/matrix_suites/mappings/**/unified_write_executor* @10gen/query-execution-router @svc-auto-approve-bot
|
||||
|
||||
|
|
@ -173,6 +175,7 @@ WORKSPACE.bazel @10gen/devprod-build @svc-auto-approve-bot
|
|||
/buildscripts/resmokeconfig/matrix_suites/overrides/**/primary_driven_index_builds.yml @10gen/server-index-builds @svc-auto-approve-bot
|
||||
/buildscripts/resmokeconfig/matrix_suites/overrides/**/replica_sets_kill_secondary.yml @10gen/server-replication-reviewers @svc-auto-approve-bot
|
||||
/buildscripts/resmokeconfig/matrix_suites/overrides/**/recordids_replicated.yml @10gen/server-collection-write-path @svc-auto-approve-bot
|
||||
/buildscripts/resmokeconfig/matrix_suites/overrides/**/concurrency_adjustment_algorithm.yml @10gen/server-workload-resilience @svc-auto-approve-bot
|
||||
|
||||
# The following patterns are parsed from ./buildscripts/resmokeconfig/suites/OWNERS.yml
|
||||
/buildscripts/resmokeconfig/suites/**/* @10gen/mongo-default-approvers @svc-auto-approve-bot
|
||||
|
|
@ -268,6 +271,7 @@ WORKSPACE.bazel @10gen/devprod-build @svc-auto-approve-bot
|
|||
/buildscripts/resmokelib/testing/hooks/**/generate_and_check_perf_results.py @10gen/devprod-performance-analysis @svc-auto-approve-bot
|
||||
/buildscripts/resmokelib/testing/hooks/**/replicaset_transition_to_and_from_csrs.py @10gen/server-catalog-and-routing-routing-and-topology @svc-auto-approve-bot
|
||||
/buildscripts/resmokelib/testing/hooks/**/drop_sessions_collection.py @10gen/server-catalog-and-routing-ddl @svc-auto-approve-bot
|
||||
/buildscripts/resmokelib/testing/hooks/**/rotate_concurrency_adjustment_algorithm.py @10gen/server-workload-resilience @svc-auto-approve-bot
|
||||
/buildscripts/resmokelib/testing/hooks/**/fuzz_runtime_parameters.py @10gen/server-programmability @svc-auto-approve-bot
|
||||
/buildscripts/resmokelib/testing/hooks/**/validate.py @10gen/devprod-correctness @10gen/server-validate @svc-auto-approve-bot
|
||||
|
||||
|
|
|
|||
|
|
@ -39,6 +39,9 @@ filters:
|
|||
- "generated_suites/replica_sets_recordids_replicated_jscore_passthrough.yml":
|
||||
approvers:
|
||||
- 10gen/server-collection-write-path
|
||||
- "generated_suites/*with_dynamic_concurrency_adjustment_algorithm.yml":
|
||||
approvers:
|
||||
- 10gen/server-workload-resilience
|
||||
- "generated_suites/transitioning_replica_sets_jscore_passthrough.yml":
|
||||
approvers:
|
||||
- 10gen/server-catalog-and-routing
|
||||
|
|
|
|||
|
|
@ -0,0 +1,59 @@
|
|||
##########################################################
|
||||
# THIS IS A GENERATED FILE -- DO NOT MODIFY.
|
||||
# IF YOU WISH TO MODIFY THIS SUITE, MODIFY THE CORRESPONDING MATRIX SUITE MAPPING FILE
|
||||
# AND REGENERATE THE MATRIX SUITES.
|
||||
#
|
||||
# matrix suite mapping file: buildscripts/resmokeconfig/matrix_suites/mappings/replica_sets_jscore_passthrough_with_dynamic_concurrency_adjustment_algorithm.yml
|
||||
# regenerate matrix suites: buildscripts/resmoke.py generate-matrix-suites && bazel run //:format
|
||||
##########################################################
|
||||
executor:
|
||||
archive:
|
||||
hooks:
|
||||
- RunDBCheckInBackground
|
||||
- CheckReplDBHashInBackground
|
||||
- ValidateCollectionsInBackground
|
||||
- CheckReplDBHash
|
||||
- CheckReplOplogs
|
||||
- ValidateCollections
|
||||
- RotateConcurrencyAdjustmentAlgorithm
|
||||
test: true
|
||||
config:
|
||||
shell_options:
|
||||
eval: globalThis.testingReplication = true;
|
||||
fixture:
|
||||
class: ReplicaSetFixture
|
||||
mongod_options:
|
||||
set_parameters:
|
||||
enableTestCommands: 1
|
||||
num_nodes: 2
|
||||
hooks:
|
||||
- class: RunDBCheckInBackground
|
||||
- class: CheckReplDBHashInBackground
|
||||
- class: ValidateCollectionsInBackground
|
||||
- class: CheckReplOplogs
|
||||
- class: CheckReplDBHash
|
||||
- class: ValidateCollections
|
||||
- class: CleanEveryN
|
||||
n: 20
|
||||
- class: RotateConcurrencyAdjustmentAlgorithm
|
||||
matrix_suite: true
|
||||
selector:
|
||||
exclude_files:
|
||||
- jstests/core/txns/abort_expired_transaction.js
|
||||
- jstests/core/txns/abort_transaction_thread_does_not_block_on_locks.js
|
||||
- jstests/core/txns/kill_op_on_txn_expiry.js
|
||||
- jstests/core/**/set_param1.js
|
||||
- jstests/core/query/awaitdata_getmore_cmd.js
|
||||
- jstests/core/administrative/current_op/currentop.js
|
||||
- jstests/core/administrative/fsync/fsync.js
|
||||
- jstests/core/txns/prepare_conflict.js
|
||||
- jstests/core/txns/prepare_conflict_aggregation_behavior.js
|
||||
- jstests/core/timeseries/write/timeseries_update_multi.js
|
||||
exclude_with_any_tags:
|
||||
- assumes_standalone_mongod
|
||||
- requires_profiling
|
||||
roots:
|
||||
- jstests/core/**/*.js
|
||||
- jstests/fle2/**/*.js
|
||||
- src/mongo/db/modules/*/jstests/fle2/**/*.js
|
||||
test_kind: js_test
|
||||
|
|
@ -0,0 +1,64 @@
|
|||
##########################################################
|
||||
# THIS IS A GENERATED FILE -- DO NOT MODIFY.
|
||||
# IF YOU WISH TO MODIFY THIS SUITE, MODIFY THE CORRESPONDING MATRIX SUITE MAPPING FILE
|
||||
# AND REGENERATE THE MATRIX SUITES.
|
||||
#
|
||||
# matrix suite mapping file: buildscripts/resmokeconfig/matrix_suites/mappings/sharding_jscore_passthrough_with_dynamic_concurrency_adjustment_algorithm.yml
|
||||
# regenerate matrix suites: buildscripts/resmoke.py generate-matrix-suites && bazel run //:format
|
||||
##########################################################
|
||||
executor:
|
||||
archive:
|
||||
hooks:
|
||||
- CheckReplDBHash
|
||||
- CheckMetadataConsistencyInBackground
|
||||
- ValidateCollections
|
||||
- RotateConcurrencyAdjustmentAlgorithm
|
||||
config:
|
||||
shell_options: {}
|
||||
fixture:
|
||||
class: ShardedClusterFixture
|
||||
mongod_options:
|
||||
set_parameters:
|
||||
enableTestCommands: 1
|
||||
mongos_options:
|
||||
set_parameters:
|
||||
enableTestCommands: 1
|
||||
num_rs_nodes_per_shard: 1
|
||||
num_shards: 2
|
||||
hooks:
|
||||
- class: CheckReplDBHash
|
||||
- class: CheckMetadataConsistencyInBackground
|
||||
- class: ValidateCollections
|
||||
- class: CheckOrphansDeleted
|
||||
- class: CleanEveryN
|
||||
n: 20
|
||||
- class: RotateConcurrencyAdjustmentAlgorithm
|
||||
matrix_suite: true
|
||||
selector:
|
||||
exclude_files:
|
||||
- jstests/core/txns/**/*.js
|
||||
- jstests/core/**/apitest_db.js
|
||||
- jstests/core/**/check_shard_index.js
|
||||
- jstests/core/**/compact_keeps_indexes.js
|
||||
- jstests/core/**/currentop.js
|
||||
- jstests/core/**/dbhash.js
|
||||
- jstests/core/**/fsync.js
|
||||
- jstests/core/**/geo_s2cursorlimitskip.js
|
||||
- jstests/core/**/geo_update_btree2.js
|
||||
- jstests/core/**/queryoptimizera.js
|
||||
- jstests/core/**/startup_log.js
|
||||
- jstests/core/**/query/top/top.js
|
||||
- jstests/core/**/geo_2d_explain.js
|
||||
- jstests/core/**/geo_s2explain.js
|
||||
- jstests/core/**/geo_s2sparse.js
|
||||
- jstests/core/**/operation_latency_histogram.js
|
||||
exclude_with_any_tags:
|
||||
- assumes_standalone_mongod
|
||||
- assumes_against_mongod_not_mongos
|
||||
- requires_profiling
|
||||
roots:
|
||||
- jstests/core/**/*.js
|
||||
- jstests/core_sharding/**/*.js
|
||||
- jstests/fle2/**/*.js
|
||||
- src/mongo/db/modules/*/jstests/fle2/**/*.js
|
||||
test_kind: js_test
|
||||
|
|
@ -77,6 +77,9 @@ filters:
|
|||
- "replica_sets_recordids_replicated_jscore_passthrough.yml":
|
||||
approvers:
|
||||
- 10gen/server-collection-write-path
|
||||
- "*with_dynamic_concurrency_adjustment_algorithm.yml":
|
||||
approvers:
|
||||
- 10gen/server-workload-resilience
|
||||
- "transitioning_replica_sets_jscore_passthrough.yml":
|
||||
approvers:
|
||||
- 10gen/server-catalog-and-routing
|
||||
|
|
|
|||
|
|
@ -0,0 +1,4 @@
|
|||
base_suite: replica_sets_jscore_passthrough
|
||||
overrides:
|
||||
- "concurrency_adjustment_algorithm.replica_sets_reconfig_hooks"
|
||||
- "concurrency_adjustment_algorithm.replica_sets_reconfig_archive"
|
||||
|
|
@ -0,0 +1,4 @@
|
|||
base_suite: sharding_jscore_passthrough
|
||||
overrides:
|
||||
- "concurrency_adjustment_algorithm.sharding_reconfig_hooks"
|
||||
- "concurrency_adjustment_algorithm.sharding_reconfig_archive"
|
||||
|
|
@ -48,3 +48,6 @@ filters:
|
|||
- "recordids_replicated.yml":
|
||||
approvers:
|
||||
- 10gen/server-collection-write-path
|
||||
- "concurrency_adjustment_algorithm.yml":
|
||||
approvers:
|
||||
- 10gen/server-workload-resilience
|
||||
|
|
|
|||
|
|
@ -0,0 +1,54 @@
|
|||
### Overrides for some suites with RotateConcurrencyAdjustmentAlgorithm hook ###
|
||||
|
||||
- name: replica_sets_reconfig_hooks
|
||||
value:
|
||||
executor:
|
||||
hooks:
|
||||
- class: RunDBCheckInBackground
|
||||
# The CheckReplDBHash hook waits until all operations have replicated to and have been applied
|
||||
# on the secondaries, so we run the ValidateCollections hook after it to ensure we're
|
||||
# validating the entire contents of the collection.
|
||||
- class: CheckReplDBHashInBackground
|
||||
- class: ValidateCollectionsInBackground
|
||||
- class: CheckReplOplogs
|
||||
- class: CheckReplDBHash
|
||||
- class: ValidateCollections
|
||||
- class: CleanEveryN
|
||||
n: 20
|
||||
- class: RotateConcurrencyAdjustmentAlgorithm
|
||||
|
||||
- name: replica_sets_reconfig_archive
|
||||
value:
|
||||
executor:
|
||||
archive:
|
||||
test: true
|
||||
hooks:
|
||||
- RunDBCheckInBackground
|
||||
- CheckReplDBHashInBackground
|
||||
- ValidateCollectionsInBackground
|
||||
- CheckReplDBHash
|
||||
- CheckReplOplogs
|
||||
- ValidateCollections
|
||||
- RotateConcurrencyAdjustmentAlgorithm
|
||||
|
||||
- name: sharding_reconfig_hooks
|
||||
value:
|
||||
executor:
|
||||
hooks:
|
||||
- class: CheckReplDBHash
|
||||
- class: CheckMetadataConsistencyInBackground
|
||||
- class: ValidateCollections
|
||||
- class: CheckOrphansDeleted
|
||||
- class: CleanEveryN
|
||||
n: 20
|
||||
- class: RotateConcurrencyAdjustmentAlgorithm
|
||||
|
||||
- name: sharding_reconfig_archive
|
||||
value:
|
||||
executor:
|
||||
archive:
|
||||
hooks:
|
||||
- CheckReplDBHash
|
||||
- CheckMetadataConsistencyInBackground
|
||||
- ValidateCollections
|
||||
- RotateConcurrencyAdjustmentAlgorithm
|
||||
|
|
@ -60,6 +60,9 @@ filters:
|
|||
- "drop_sessions_collection.py":
|
||||
approvers:
|
||||
- 10gen/server-catalog-and-routing-ddl
|
||||
- "rotate_concurrency_adjustment_algorithm.py":
|
||||
approvers:
|
||||
- 10gen/server-workload-resilience
|
||||
- "fuzz_runtime_parameters.py":
|
||||
approvers:
|
||||
- 10gen/server-programmability
|
||||
|
|
|
|||
|
|
@ -52,6 +52,7 @@ Specify any of the following as the `hooks` in your [Suite](../../../../buildscr
|
|||
- Also verifies that the secondaries can reach the SECONDARY state without having connectivity to the primary after an unclean shutdown.
|
||||
- [`PeriodicStackTrace`](./periodic_stack_trace.py) - Test hook that sends the stacktracing signal to mongo processes at randomized intervals.
|
||||
- [`QueryableServerHook`](./queryable_server_hook.py) - Starts the queryable server before each test for queryable restores. Restarts the queryable server between tests.
|
||||
- [`RotateConcurrencyAdjustmentAlgorithm`](./rotate_concurrency_adjustment_algorithm.py) - Periodically sets 'storageEngineConcurrencyAdjustmentAlgorithm' to a random valid value from a predefined list.
|
||||
- [`RunChangeStreamsInBackground`](./change_streams.py) - Run in the background full cluster change streams while a test is running. Open and close the change stream every `1..10` tests (random using `config.RANDOM_SEED`).
|
||||
- [`RunDBCheckInBackground`](./dbcheck_background.py) - A hook for running `dbCheck` on a replica set while a test is running.
|
||||
- This includes dbhashes for all non-local databases and non-replicated system collections that match on the primary and secondaries.
|
||||
|
|
|
|||
|
|
@ -0,0 +1,292 @@
|
|||
"""
|
||||
Hook that periodically rotates the 'storageEngineConcurrencyAdjustmentAlgorithm' server parameter to
|
||||
a new random valid value on all mongod processes.
|
||||
"""
|
||||
|
||||
import random
|
||||
import sys
|
||||
import threading
|
||||
import time
|
||||
|
||||
from buildscripts.resmokelib import errors
|
||||
from buildscripts.resmokelib.testing.fixtures import interface as fixture_interface
|
||||
from buildscripts.resmokelib.testing.fixtures import replicaset, shardedcluster, standalone
|
||||
from buildscripts.resmokelib.testing.hooks import interface
|
||||
from buildscripts.resmokelib.testing.hooks import lifecycle as lifecycle_interface
|
||||
|
||||
|
||||
class RotateConcurrencyAdjustmentAlgorithm(interface.Hook):
|
||||
"""
|
||||
Periodically sets 'storageEngineConcurrencyAdjustmentAlgorithm' to a random valid value from a
|
||||
predefined list.
|
||||
"""
|
||||
|
||||
DESCRIPTION = (
|
||||
"Periodically rotates 'storageEngineConcurrencyAdjustmentAlgorithm' to a random valid value"
|
||||
)
|
||||
IS_BACKGROUND = True
|
||||
|
||||
# The list of valid values to choose from.
|
||||
_ALGORITHM_OPTIONS = [
|
||||
"fixedConcurrentTransactions",
|
||||
"fixedConcurrentTransactionsWithPrioritization",
|
||||
"throughputProbing",
|
||||
]
|
||||
|
||||
def __init__(
|
||||
self,
|
||||
hook_logger,
|
||||
fixture,
|
||||
seed=random.randrange(sys.maxsize),
|
||||
auth_options=None,
|
||||
):
|
||||
"""Initialize the RotateConcurrencyAdjustmentAlgorithm hook.
|
||||
|
||||
Args:
|
||||
hook_logger: the logger instance for this hook.
|
||||
fixture: the target fixture (standalone, replica set, sharded cluster, or multi-cluster fixture).
|
||||
seed: the random seed to use.
|
||||
auth_options: dictionary of auth options.
|
||||
"""
|
||||
interface.Hook.__init__(
|
||||
self, hook_logger, fixture, RotateConcurrencyAdjustmentAlgorithm.DESCRIPTION
|
||||
)
|
||||
self._fixture = fixture
|
||||
self._auth_options = auth_options
|
||||
self._rng = random.Random(seed)
|
||||
|
||||
self._standalone_fixtures = []
|
||||
self._rs_fixtures = []
|
||||
self._set_param_thread = None
|
||||
|
||||
def before_suite(self, test_report):
|
||||
"""Before suite."""
|
||||
self.logger.info("Finding all mongod fixtures to target...")
|
||||
for cluster in self._fixture.get_testable_clusters():
|
||||
self._add_fixture(cluster)
|
||||
|
||||
self.logger.info(
|
||||
f"Found {len(self._standalone_fixtures)} standalone and {len(self._rs_fixtures)} replica set fixtures."
|
||||
)
|
||||
|
||||
self._set_param_thread = _SetConcurrencyAlgorithmThread(
|
||||
self.logger,
|
||||
self._rs_fixtures,
|
||||
self._standalone_fixtures,
|
||||
self._rng,
|
||||
self._ALGORITHM_OPTIONS,
|
||||
lifecycle_interface.FlagBasedThreadLifecycle(),
|
||||
self._auth_options,
|
||||
)
|
||||
self.logger.info("Starting the concurrency adjustment algorithm rotation thread.")
|
||||
self._set_param_thread.start()
|
||||
|
||||
def after_suite(self, test_report, teardown_flag=None):
|
||||
"""After suite."""
|
||||
self.logger.info("Stopping the concurrency adjustment algorithm rotation thread.")
|
||||
if self._set_param_thread:
|
||||
self._set_param_thread.stop()
|
||||
self.logger.info("Concurrency adjustment algorithm rotation thread stopped.")
|
||||
|
||||
def before_test(self, test, test_report):
|
||||
"""Before test. Log current config."""
|
||||
self.logger.info("Logging current parameter state before test...")
|
||||
for repl_set in self._rs_fixtures:
|
||||
for node in repl_set.nodes:
|
||||
self._invoke_get_parameter_and_log(node)
|
||||
|
||||
for standalone in self._standalone_fixtures:
|
||||
self._invoke_get_parameter_and_log(standalone)
|
||||
|
||||
self.logger.info("Resuming the concurrency adjustment algorithm rotation thread.")
|
||||
self._set_param_thread.pause()
|
||||
self._set_param_thread.resume()
|
||||
|
||||
def after_test(self, test, test_report):
|
||||
"""After test. Log current config."""
|
||||
self.logger.info("Pausing the concurrency adjustment algorithm rotation thread.")
|
||||
self._set_param_thread.pause()
|
||||
self.logger.info("Paused the concurrency adjustment algorithm rotation thread.")
|
||||
|
||||
self.logger.info("Logging current parameter state after test...")
|
||||
for repl_set in self._rs_fixtures:
|
||||
for node in repl_set.nodes:
|
||||
self._invoke_get_parameter_and_log(node)
|
||||
|
||||
for standalone in self._standalone_fixtures:
|
||||
self._invoke_get_parameter_and_log(standalone)
|
||||
|
||||
def _add_fixture(self, fixture):
|
||||
"""
|
||||
Recursively find and add all mongod fixtures (standalone or replicaset) to our internal lists.
|
||||
"""
|
||||
if isinstance(fixture, standalone.MongoDFixture):
|
||||
self._standalone_fixtures.append(fixture)
|
||||
elif isinstance(fixture, replicaset.ReplicaSetFixture):
|
||||
self._rs_fixtures.append(fixture)
|
||||
elif isinstance(fixture, shardedcluster.ShardedClusterFixture):
|
||||
# Recurse into shards
|
||||
for shard_fixture in fixture.shards:
|
||||
self._add_fixture(shard_fixture)
|
||||
|
||||
# Recurse into config server
|
||||
if fixture.config_shard is None:
|
||||
self._add_fixture(fixture.configsvr)
|
||||
|
||||
# We intentionally DO NOT add fixture.mongos, as the parameter is not valid on mongos.
|
||||
else:
|
||||
# This could be a direct MongoSFixture or other non-mongod fixture.
|
||||
self.logger.debug(f"Skipping fixture {fixture} as it is not a mongod.")
|
||||
|
||||
def _invoke_get_parameter_and_log(self, node):
|
||||
"""
|
||||
Helper to print the current state of the 'storageEngineConcurrencyAdjustmentAlgorithm' parameter.
|
||||
"""
|
||||
client = fixture_interface.build_client(node, self._auth_options)
|
||||
try:
|
||||
get_result = client.admin.command(
|
||||
"getParameter", 1, storageEngineConcurrencyAdjustmentAlgorithm=1
|
||||
)
|
||||
self.logger.info(
|
||||
"Current state of 'storageEngineConcurrencyAdjustmentAlgorithm' on node %d: %s",
|
||||
node.port,
|
||||
get_result.get("storageEngineConcurrencyAdjustmentAlgorithm", "NOT_FOUND"),
|
||||
)
|
||||
except Exception as e:
|
||||
self.logger.warning(
|
||||
"Failed to getParameter 'storageEngineConcurrencyAdjustmentAlgorithm' from node %d: %s",
|
||||
node.port,
|
||||
e,
|
||||
)
|
||||
|
||||
|
||||
class _SetConcurrencyAlgorithmThread(threading.Thread):
|
||||
def __init__(
|
||||
self,
|
||||
logger,
|
||||
rs_fixtures,
|
||||
standalone_fixtures,
|
||||
rng,
|
||||
algorithm_options,
|
||||
lifecycle,
|
||||
auth_options=None,
|
||||
):
|
||||
"""Initialize _SetConcurrencyAlgorithmThread."""
|
||||
threading.Thread.__init__(self, name="RotateConcurrencyAlgorithmThread")
|
||||
self.daemon = True
|
||||
self.logger = logger
|
||||
self._rs_fixtures = rs_fixtures
|
||||
self._standalone_fixtures = standalone_fixtures
|
||||
self._rng = rng
|
||||
self._algorithm_options = algorithm_options
|
||||
self.__lifecycle = lifecycle
|
||||
self._auth_options = auth_options
|
||||
self._setparameter_interval_secs = 30 # Set parameter every 30 seconds
|
||||
self._last_exec = time.time()
|
||||
|
||||
# Event set when the thread has been stopped using the 'stop()' method.
|
||||
self._is_stopped_evt = threading.Event()
|
||||
# Event set when the thread is not performing stepdowns.
|
||||
self._is_idle_evt = threading.Event()
|
||||
self._is_idle_evt.set()
|
||||
|
||||
def run(self):
|
||||
"""Execute the thread."""
|
||||
try:
|
||||
while True:
|
||||
self._is_idle_evt.set()
|
||||
|
||||
permitted = self.__lifecycle.wait_for_action_permitted()
|
||||
if not permitted:
|
||||
break # Thread was stopped
|
||||
|
||||
self._is_idle_evt.clear()
|
||||
|
||||
now = time.time()
|
||||
if now - self._last_exec > self._setparameter_interval_secs:
|
||||
self._do_set_parameter()
|
||||
self._last_exec = time.time()
|
||||
|
||||
found_idle_request = self.__lifecycle.poll_for_idle_request()
|
||||
if found_idle_request:
|
||||
self.__lifecycle.send_idle_acknowledgement()
|
||||
continue
|
||||
|
||||
# The 'wait_secs' is used to wait 'self._setparameter_interval_secs' from the moment
|
||||
# the last setParameter command was sent.
|
||||
now = time.time()
|
||||
wait_secs = max(0, self._setparameter_interval_secs - (now - self._last_exec))
|
||||
self.__lifecycle.wait_for_action_interval(wait_secs)
|
||||
except Exception:
|
||||
# Proactively log the exception
|
||||
self.logger.exception("RotateConcurrencyAlgorithmThread threw exception")
|
||||
self._is_idle_evt.set()
|
||||
|
||||
def stop(self):
|
||||
"""Stop the thread."""
|
||||
self.__lifecycle.stop()
|
||||
self._is_stopped_evt.set()
|
||||
# Unpause to allow the thread to finish.
|
||||
self.resume()
|
||||
self.join()
|
||||
|
||||
def pause(self):
|
||||
"""Pause the thread."""
|
||||
self.__lifecycle.mark_test_finished()
|
||||
|
||||
# Wait until we are no longer executing setParameter.
|
||||
self._is_idle_evt.wait()
|
||||
# Check if the thread is alive
|
||||
self._check_thread()
|
||||
|
||||
# Check that fixtures are still running
|
||||
for rs_fixture in self._rs_fixtures:
|
||||
if not rs_fixture.is_running():
|
||||
raise errors.ServerFailure(
|
||||
f"ReplicaSetFixture with pids {rs_fixture.pids()} expected to be running in"
|
||||
" SetParameter, but wasn't."
|
||||
)
|
||||
|
||||
def resume(self):
|
||||
"""Resume the thread."""
|
||||
self.__lifecycle.mark_test_started()
|
||||
|
||||
def _wait(self, timeout):
|
||||
# Wait until stop or timeout.
|
||||
self._is_stopped_evt.wait(timeout)
|
||||
|
||||
def _check_thread(self):
|
||||
if not self.is_alive():
|
||||
msg = "The RotateConcurrencyAlgorithmThread thread is not running."
|
||||
self.logger.error(msg)
|
||||
raise errors.ServerFailure(msg)
|
||||
|
||||
def _invoke_set_parameter(self, client, params):
|
||||
"""Helper to invoke setParameter on a given client."""
|
||||
client.admin.command("setParameter", 1, **params)
|
||||
|
||||
def _do_set_parameter(self):
|
||||
"""
|
||||
Picks a new random algorithm and applies it to all standalone and replica set nodes.
|
||||
"""
|
||||
new_algorithm = self._rng.choice(self._algorithm_options)
|
||||
params_to_set = {"storageEngineConcurrencyAdjustmentAlgorithm": new_algorithm}
|
||||
|
||||
for repl_set in self._rs_fixtures:
|
||||
self.logger.info(
|
||||
"Setting parameters on all nodes of replica set %s. Parameters: %s",
|
||||
repl_set.replset_name,
|
||||
params_to_set,
|
||||
)
|
||||
for node in repl_set.nodes:
|
||||
client = fixture_interface.build_client(node, self._auth_options)
|
||||
self._invoke_set_parameter(client, params_to_set)
|
||||
|
||||
for standalone in self._standalone_fixtures:
|
||||
self.logger.info(
|
||||
"Setting parameters on standalone on port %d. Parameters: %s",
|
||||
standalone.port,
|
||||
params_to_set,
|
||||
)
|
||||
client = fixture_interface.build_client(standalone, self._auth_options)
|
||||
self._invoke_set_parameter(client, params_to_set)
|
||||
|
|
@ -2373,6 +2373,33 @@ tasks:
|
|||
resmoke_args: >-
|
||||
--disableFeatureFlags=featureFlagCreateViewlessTimeseriesCollections
|
||||
|
||||
- <<: *gen_task_template
|
||||
name: sharding_jscore_passthrough_with_dynamic_concurrency_adjustment_algorithm_gen
|
||||
tags: [
|
||||
"assigned_to_jira_team_server_workload_scheduling",
|
||||
"default",
|
||||
"sharding",
|
||||
"jscore",
|
||||
"common",
|
||||
# TODO (SERVER-111526): Remove the `requires_all_feature_flags` tag
|
||||
"requires_all_feature_flags",
|
||||
]
|
||||
commands:
|
||||
- func: "generate resmoke tasks"
|
||||
|
||||
- <<: *gen_task_template
|
||||
name: replica_sets_jscore_passthrough_with_dynamic_concurrency_adjustment_algorithm_gen
|
||||
tags: [
|
||||
"assigned_to_jira_team_server_workload_scheduling",
|
||||
"default",
|
||||
"jscore",
|
||||
"common",
|
||||
# TODO (SERVER-111526): Remove the `requires_all_feature_flags` tag
|
||||
"requires_all_feature_flags",
|
||||
]
|
||||
commands:
|
||||
- func: "generate resmoke tasks"
|
||||
|
||||
- <<: *gen_task_template
|
||||
name: sharding_jscore_passthrough_with_injected_catalog_metadata_gen
|
||||
tags:
|
||||
|
|
|
|||
Loading…
Reference in New Issue