mirror of https://github.com/mongodb/mongo
SERVER-113367 Allow throuhgput probing to work with de-prioritization (#43712)
GitOrigin-RevId: 186564c7de169a684cffc13b48b017687f8fc58d
This commit is contained in:
parent
1e03a5260e
commit
c2a4341513
|
|
@ -548,12 +548,16 @@ config_fuzzer_params = {
|
|||
"executionControlConcurrencyAdjustmentAlgorithm": {
|
||||
"choices": [
|
||||
"fixedConcurrentTransactions",
|
||||
"fixedConcurrentTransactionsWithPrioritization",
|
||||
"throughputProbing",
|
||||
],
|
||||
"period": 60,
|
||||
"fuzz_at": ["startup", "runtime"],
|
||||
},
|
||||
"executionControlDeprioritizationGate": {
|
||||
"choices": [True, False],
|
||||
"period": 60,
|
||||
"fuzz_at": ["startup", "runtime"],
|
||||
},
|
||||
"executionControlHeuristicDeprioritization": {
|
||||
"choices": [True, False],
|
||||
"period": 60,
|
||||
|
|
|
|||
|
|
@ -1,6 +1,6 @@
|
|||
"""
|
||||
Hook that periodically rotates the 'storageEngineConcurrencyAdjustmentAlgorithm' server parameter to
|
||||
a new random valid value on all mongod processes.
|
||||
Hook that periodically rotates the 'executionControlConcurrencyAdjustmentAlgorithm' and deprioritization
|
||||
parameters to new random valid values on all mongod processes.
|
||||
"""
|
||||
|
||||
import random
|
||||
|
|
@ -17,19 +17,16 @@ from buildscripts.resmokelib.testing.hooks import lifecycle as lifecycle_interfa
|
|||
|
||||
class RotateConcurrencyAdjustmentAlgorithm(interface.Hook):
|
||||
"""
|
||||
Periodically sets 'storageEngineConcurrencyAdjustmentAlgorithm' to a random valid value from a
|
||||
predefined list.
|
||||
Periodically sets 'executionControlConcurrencyAdjustmentAlgorithm' and deprioritization parameters
|
||||
to random valid values.
|
||||
"""
|
||||
|
||||
DESCRIPTION = (
|
||||
"Periodically rotates 'storageEngineConcurrencyAdjustmentAlgorithm' to a random valid value"
|
||||
)
|
||||
DESCRIPTION = "Periodically rotates 'executionControlConcurrencyAdjustmentAlgorithm' and deprioritization parameters to random valid values"
|
||||
IS_BACKGROUND = True
|
||||
|
||||
# The list of valid values to choose from.
|
||||
_ALGORITHM_OPTIONS = [
|
||||
"fixedConcurrentTransactions",
|
||||
"fixedConcurrentTransactionsWithPrioritization",
|
||||
"throughputProbing",
|
||||
]
|
||||
|
||||
|
|
@ -140,21 +137,43 @@ class RotateConcurrencyAdjustmentAlgorithm(interface.Hook):
|
|||
|
||||
def _invoke_get_parameter_and_log(self, node):
|
||||
"""
|
||||
Helper to print the current state of the 'storageEngineConcurrencyAdjustmentAlgorithm' parameter.
|
||||
Helper to print the current state of the execution control parameters.
|
||||
"""
|
||||
client = fixture_interface.build_client(node, self._auth_options)
|
||||
try:
|
||||
get_result = client.admin.command(
|
||||
"getParameter", 1, storageEngineConcurrencyAdjustmentAlgorithm=1
|
||||
algorithm_result = client.admin.command(
|
||||
"getParameter",
|
||||
1,
|
||||
executionControlConcurrencyAdjustmentAlgorithm=1,
|
||||
)
|
||||
heuristic_result = client.admin.command(
|
||||
"getParameter",
|
||||
1,
|
||||
executionControlHeuristicDeprioritization=1,
|
||||
)
|
||||
background_result = client.admin.command(
|
||||
"getParameter",
|
||||
1,
|
||||
executionControlBackgroundTasksDeprioritization=1,
|
||||
)
|
||||
deprioritization_result = client.admin.command(
|
||||
"getParameter",
|
||||
1,
|
||||
executionControlDeprioritizationGate=1,
|
||||
)
|
||||
self.logger.info(
|
||||
"Current state of 'storageEngineConcurrencyAdjustmentAlgorithm' on node %d: %s",
|
||||
"Current state on node %d: algorithm=%s, heuristic=%s, background=%s, deprio=%s",
|
||||
node.port,
|
||||
get_result.get("storageEngineConcurrencyAdjustmentAlgorithm", "NOT_FOUND"),
|
||||
algorithm_result.get("executionControlConcurrencyAdjustmentAlgorithm", "NOT_FOUND"),
|
||||
heuristic_result.get("executionControlHeuristicDeprioritization", "NOT_FOUND"),
|
||||
background_result.get(
|
||||
"executionControlBackgroundTasksDeprioritization", "NOT_FOUND"
|
||||
),
|
||||
deprioritization_result.get("executionControlDeprioritizationGate", "NOT_FOUND"),
|
||||
)
|
||||
except Exception as e:
|
||||
self.logger.warning(
|
||||
"Failed to getParameter 'storageEngineConcurrencyAdjustmentAlgorithm' from node %d: %s",
|
||||
"Failed to getParameter from node %d: %s",
|
||||
node.port,
|
||||
e,
|
||||
)
|
||||
|
|
@ -261,16 +280,23 @@ class _SetConcurrencyAlgorithmThread(threading.Thread):
|
|||
self.logger.error(msg)
|
||||
raise errors.ServerFailure(msg)
|
||||
|
||||
def _invoke_set_parameter(self, client, params):
|
||||
"""Helper to invoke setParameter on a given client."""
|
||||
client.admin.command("setParameter", 1, **params)
|
||||
def _invoke_set_parameter(self, client, param_name, param_value):
|
||||
"""Helper to invoke setParameter on a given client for a single parameter."""
|
||||
client.admin.command("setParameter", 1, **{param_name: param_value})
|
||||
|
||||
def _do_set_parameter(self):
|
||||
"""
|
||||
Picks a new random algorithm and applies it to all standalone and replica set nodes.
|
||||
Picks a new random algorithm and random boolean values for the deprioritization parameters,
|
||||
then applies them to all standalone and replica set nodes.
|
||||
"""
|
||||
new_algorithm = self._rng.choice(self._algorithm_options)
|
||||
params_to_set = {"storageEngineConcurrencyAdjustmentAlgorithm": new_algorithm}
|
||||
params_to_set = {
|
||||
"executionControlConcurrencyAdjustmentAlgorithm": self._rng.choice(
|
||||
self._algorithm_options
|
||||
),
|
||||
"executionControlHeuristicDeprioritization": self._rng.choice([True, False]),
|
||||
"executionControlBackgroundTasksDeprioritization": self._rng.choice([True, False]),
|
||||
"executionControlDeprioritizationGate": self._rng.choice([True, False]),
|
||||
}
|
||||
|
||||
for repl_set in self._rs_fixtures:
|
||||
self.logger.info(
|
||||
|
|
@ -280,7 +306,8 @@ class _SetConcurrencyAlgorithmThread(threading.Thread):
|
|||
)
|
||||
for node in repl_set.nodes:
|
||||
client = fixture_interface.build_client(node, self._auth_options)
|
||||
self._invoke_set_parameter(client, params_to_set)
|
||||
for param_name, param_value in params_to_set.items():
|
||||
self._invoke_set_parameter(client, param_name, param_value)
|
||||
|
||||
for standalone in self._standalone_fixtures:
|
||||
self.logger.info(
|
||||
|
|
@ -289,4 +316,5 @@ class _SetConcurrencyAlgorithmThread(threading.Thread):
|
|||
params_to_set,
|
||||
)
|
||||
client = fixture_interface.build_client(standalone, self._auth_options)
|
||||
self._invoke_set_parameter(client, params_to_set)
|
||||
for param_name, param_value in params_to_set.items():
|
||||
self._invoke_set_parameter(client, param_name, param_value)
|
||||
|
|
|
|||
|
|
@ -366,7 +366,7 @@ buildvariants:
|
|||
<<: *amazon_linux2023_arm64_all_feature_flags_dynamic_expansions
|
||||
test_flags: >-
|
||||
--runAllFeatureFlagTests
|
||||
--mongodSetParameters="{executionControlConcurrencyAdjustmentAlgorithm: fixedConcurrentTransactionsWithPrioritization}"
|
||||
--mongodSetParameters="{executionControlDeprioritizationGate: true}"
|
||||
--excludeWithAnyTags=incompatible_with_execution_control_with_prioritization,incompatible_with_amazon_linux,requires_ldap_pool,requires_external_data_source,incompatible_with_atlas_environment
|
||||
tasks:
|
||||
- name: .development_critical !.requires_large_host !.requires_compile_variant !.incompatible_development_variant !.incompatible_all_feature_flags !.suggested_excluding_required__for_devprod_mitigation_only !.requires_all_feature_flags !.multiversion
|
||||
|
|
|
|||
|
|
@ -12,56 +12,94 @@ import {extendWorkload} from "jstests/concurrency/fsm_libs/extend_workload.js";
|
|||
import {$config as $baseConfig} from "jstests/concurrency/fsm_workloads/crud/crud_and_commands.js";
|
||||
|
||||
export const $config = extendWorkload($baseConfig, function ($config, $super) {
|
||||
$config.data.algorithms = [
|
||||
"fixedConcurrentTransactions",
|
||||
"fixedConcurrentTransactionsWithPrioritization",
|
||||
"throughputProbing",
|
||||
];
|
||||
$config.data.algorithms = ["fixedConcurrentTransactions", "throughputProbing"];
|
||||
$config.data.originalParams = {};
|
||||
|
||||
$config.data.originalAlgorithmValue = "";
|
||||
const getServerParameterValue = (cluster, paramName) => {
|
||||
let value;
|
||||
cluster.executeOnMongodNodes((db) => {
|
||||
value = assert.commandWorked(db.adminCommand({getParameter: 1, [paramName]: 1}))[paramName];
|
||||
});
|
||||
return value;
|
||||
};
|
||||
|
||||
const setServerParameterOnAllNodes = (cluster, paramName, value) => {
|
||||
cluster.executeOnMongodNodes((db) => {
|
||||
assert.commandWorked(db.adminCommand({setParameter: 1, [paramName]: value}));
|
||||
});
|
||||
};
|
||||
|
||||
const setServerParameter = (db, paramName, value) => {
|
||||
jsTest.log.info(`Attempting to set ${paramName} to: ${value}`);
|
||||
|
||||
assert.commandWorked(db.adminCommand({setParameter: 1, [paramName]: value}));
|
||||
|
||||
jsTest.log.info(`Successfully set ${paramName} to: ${value}`);
|
||||
};
|
||||
|
||||
$config.setup = function (db, collName, cluster) {
|
||||
cluster.executeOnMongodNodes(function (db) {
|
||||
$config.data.originalAlgorithmValue = assert.commandWorked(
|
||||
db.adminCommand({
|
||||
getParameter: 1,
|
||||
executionControlConcurrencyAdjustmentAlgorithm: 1,
|
||||
}),
|
||||
).executionControlConcurrencyAdjustmentAlgorithm;
|
||||
});
|
||||
this.originalParams = {
|
||||
algorithm: getServerParameterValue(cluster, "executionControlConcurrencyAdjustmentAlgorithm"),
|
||||
deprioritizationGate: getServerParameterValue(cluster, "executionControlDeprioritizationGate"),
|
||||
heuristicDeprioritization: getServerParameterValue(cluster, "executionControlHeuristicDeprioritization"),
|
||||
backgroundTasksDeprioritization: getServerParameterValue(
|
||||
cluster,
|
||||
"executionControlBackgroundTasksDeprioritization",
|
||||
),
|
||||
};
|
||||
};
|
||||
|
||||
$config.teardown = function (db, collName, cluster) {
|
||||
cluster.executeOnMongodNodes(function (db) {
|
||||
assert.commandWorked(
|
||||
db.adminCommand({
|
||||
setParameter: 1,
|
||||
executionControlConcurrencyAdjustmentAlgorithm: $config.data.originalAlgorithmValue,
|
||||
}),
|
||||
);
|
||||
});
|
||||
setServerParameterOnAllNodes(
|
||||
cluster,
|
||||
"executionControlConcurrencyAdjustmentAlgorithm",
|
||||
this.originalParams.algorithm,
|
||||
);
|
||||
setServerParameterOnAllNodes(
|
||||
cluster,
|
||||
"executionControlDeprioritizationGate",
|
||||
this.originalParams.deprioritizationGate,
|
||||
);
|
||||
setServerParameterOnAllNodes(
|
||||
cluster,
|
||||
"executionControlHeuristicDeprioritization",
|
||||
this.originalParams.heuristicDeprioritization,
|
||||
);
|
||||
setServerParameterOnAllNodes(
|
||||
cluster,
|
||||
"executionControlBackgroundTasksDeprioritization",
|
||||
this.originalParams.backgroundTasksDeprioritization,
|
||||
);
|
||||
};
|
||||
|
||||
$config.states.setConcurrencyAlgorithm = function (db, collName) {
|
||||
const targetAlgorithm = $config.data.algorithms[Random.randInt($config.data.algorithms.length)];
|
||||
const targetAlgorithm = this.algorithms[Random.randInt(this.algorithms.length)];
|
||||
setServerParameter(db, "executionControlConcurrencyAdjustmentAlgorithm", targetAlgorithm);
|
||||
};
|
||||
|
||||
jsTest.log.info(`Attempting to set executionControlConcurrencyAdjustmentAlgorithm to: ${targetAlgorithm}`);
|
||||
$config.states.setDeprioritizationGate = function (db, collName) {
|
||||
setServerParameter(db, "executionControlDeprioritizationGate", Random.randInt(2) === 1);
|
||||
};
|
||||
|
||||
assert.commandWorked(
|
||||
db.adminCommand({setParameter: 1, executionControlConcurrencyAdjustmentAlgorithm: targetAlgorithm}),
|
||||
);
|
||||
$config.states.setHeuristicDeprioritization = function (db, collName) {
|
||||
setServerParameter(db, "executionControlHeuristicDeprioritization", Random.randInt(2) === 1);
|
||||
};
|
||||
|
||||
jsTest.log.info(`Successfully set executionControlConcurrencyAdjustmentAlgorithm to: ${targetAlgorithm}`);
|
||||
$config.states.setBackgroundTasksDeprioritization = function (db, collName) {
|
||||
setServerParameter(db, "executionControlBackgroundTasksDeprioritization", Random.randInt(2) === 1);
|
||||
};
|
||||
|
||||
const statesProbability = {
|
||||
insertDocs: 0.16,
|
||||
updateDocs: 0.16,
|
||||
findAndModifyDocs: 0.16,
|
||||
readDocs: 0.16,
|
||||
deleteDocs: 0.16,
|
||||
dropCollection: 0.16,
|
||||
insertDocs: 0.14,
|
||||
updateDocs: 0.14,
|
||||
findAndModifyDocs: 0.14,
|
||||
readDocs: 0.14,
|
||||
deleteDocs: 0.14,
|
||||
dropCollection: 0.14,
|
||||
setConcurrencyAlgorithm: 0.04,
|
||||
setDeprioritizationGate: 0.04,
|
||||
setHeuristicDeprioritization: 0.04,
|
||||
setBackgroundTasksDeprioritization: 0.04,
|
||||
};
|
||||
$config.transitions = {
|
||||
init: statesProbability,
|
||||
|
|
@ -72,6 +110,9 @@ export const $config = extendWorkload($baseConfig, function ($config, $super) {
|
|||
deleteDocs: statesProbability,
|
||||
dropCollection: statesProbability,
|
||||
setConcurrencyAlgorithm: statesProbability,
|
||||
setDeprioritizationGate: statesProbability,
|
||||
setHeuristicDeprioritization: statesProbability,
|
||||
setBackgroundTasksDeprioritization: statesProbability,
|
||||
};
|
||||
|
||||
return $config;
|
||||
|
|
|
|||
|
|
@ -16,8 +16,8 @@ import {
|
|||
getExecutionControlAlgorithm,
|
||||
getExecutionControlStats,
|
||||
kFixedConcurrentTransactionsAlgorithm,
|
||||
kFixedConcurrentTransactionsWithPrioritizationAlgorithm,
|
||||
kThroughputProbingAlgorithm,
|
||||
setDeprioritizationGate,
|
||||
setExecutionControlAlgorithm,
|
||||
setExecutionControlTickets,
|
||||
} from "jstests/noPassthrough/admission/execution_control/libs/execution_control_helper.js";
|
||||
|
|
@ -134,40 +134,19 @@ describe("Execution control concurrency adjustment algorithm", function () {
|
|||
assert.eq(kFixedConcurrentTransactionsAlgorithm, getExecutionControlAlgorithm(mongod));
|
||||
});
|
||||
|
||||
it("should not override to fixed concurrency adjustment algorithm when prioritization is set at startup", function () {
|
||||
it("should allow resizing all tickets with deprioritization enabled", function () {
|
||||
replTest = new ReplSetTest({
|
||||
nodes: 1,
|
||||
nodeOptions: {
|
||||
setParameter: {
|
||||
executionControlConcurrencyAdjustmentAlgorithm:
|
||||
kFixedConcurrentTransactionsWithPrioritizationAlgorithm,
|
||||
executionControlConcurrentReadTransactions: 20,
|
||||
executionControlDeprioritizationGate: true,
|
||||
},
|
||||
},
|
||||
});
|
||||
replTest.startSet();
|
||||
replTest.initiate();
|
||||
const mongod = replTest.getPrimary();
|
||||
|
||||
assert.eq(kFixedConcurrentTransactionsWithPrioritizationAlgorithm, getExecutionControlAlgorithm(mongod));
|
||||
|
||||
assertTicketSizing(mongod);
|
||||
});
|
||||
|
||||
it("should allow resizing all ticket types with fixed concurrency adjustment algorithm and prioritization", function () {
|
||||
replTest = new ReplSetTest({
|
||||
nodes: 1,
|
||||
nodeOptions: {
|
||||
setParameter: {
|
||||
executionControlConcurrencyAdjustmentAlgorithm:
|
||||
kFixedConcurrentTransactionsWithPrioritizationAlgorithm,
|
||||
},
|
||||
},
|
||||
});
|
||||
replTest.startSet();
|
||||
replTest.initiate();
|
||||
const mongod = replTest.getPrimary();
|
||||
assertTicketSizing(mongod);
|
||||
assertTicketSizing(mongod, {expectDynamicAdjustmentWarnings: true});
|
||||
});
|
||||
});
|
||||
|
||||
|
|
@ -233,14 +212,14 @@ describe("Execution control concurrency adjustment algorithm", function () {
|
|||
it("should allow transitions to other algorithms and enabling/disabling prioritization", function () {
|
||||
assertTicketSizing(mongod, {expectPrioritizationWarnings: true});
|
||||
|
||||
setExecutionControlAlgorithm(mongod, kFixedConcurrentTransactionsWithPrioritizationAlgorithm);
|
||||
setDeprioritizationGate(mongod, true);
|
||||
assertTicketSizing(mongod);
|
||||
|
||||
setExecutionControlAlgorithm(mongod, kThroughputProbingAlgorithm);
|
||||
assertTicketSizing(mongod, {expectDynamicAdjustmentWarnings: true, expectPrioritizationWarnings: true});
|
||||
assertTicketSizing(mongod, {expectDynamicAdjustmentWarnings: true});
|
||||
|
||||
setExecutionControlAlgorithm(mongod, kFixedConcurrentTransactionsAlgorithm);
|
||||
assertTicketSizing(mongod, {expectPrioritizationWarnings: true});
|
||||
assertTicketSizing(mongod);
|
||||
});
|
||||
});
|
||||
|
||||
|
|
@ -252,8 +231,7 @@ describe("Execution control concurrency adjustment algorithm", function () {
|
|||
nodes: 1,
|
||||
nodeOptions: {
|
||||
setParameter: {
|
||||
executionControlConcurrencyAdjustmentAlgorithm:
|
||||
kFixedConcurrentTransactionsWithPrioritizationAlgorithm,
|
||||
executionControlDeprioritizationGate: true,
|
||||
},
|
||||
},
|
||||
});
|
||||
|
|
@ -272,15 +250,13 @@ describe("Execution control concurrency adjustment algorithm", function () {
|
|||
});
|
||||
|
||||
it("should allow transitions to other algorithms and back", function () {
|
||||
setExecutionControlAlgorithm(mongod, kFixedConcurrentTransactionsAlgorithm);
|
||||
assertTicketSizing(mongod);
|
||||
|
||||
setExecutionControlAlgorithm(mongod, kFixedConcurrentTransactionsAlgorithm);
|
||||
assertTicketSizing(mongod, {expectPrioritizationWarnings: true});
|
||||
|
||||
setExecutionControlAlgorithm(mongod, kThroughputProbingAlgorithm);
|
||||
assertTicketSizing(mongod, {expectDynamicAdjustmentWarnings: true, expectPrioritizationWarnings: true});
|
||||
assertTicketSizing(mongod, {expectDynamicAdjustmentWarnings: true});
|
||||
|
||||
setExecutionControlAlgorithm(mongod, kFixedConcurrentTransactionsWithPrioritizationAlgorithm);
|
||||
setExecutionControlAlgorithm(mongod, kFixedConcurrentTransactionsAlgorithm);
|
||||
assertTicketSizing(mongod);
|
||||
});
|
||||
});
|
||||
|
|
@ -317,11 +293,11 @@ describe("Execution control concurrency adjustment algorithm", function () {
|
|||
setExecutionControlAlgorithm(mongod, kFixedConcurrentTransactionsAlgorithm);
|
||||
assertTicketSizing(mongod, {expectPrioritizationWarnings: true});
|
||||
|
||||
setExecutionControlAlgorithm(mongod, kFixedConcurrentTransactionsWithPrioritizationAlgorithm);
|
||||
setDeprioritizationGate(mongod, true);
|
||||
assertTicketSizing(mongod);
|
||||
|
||||
setExecutionControlAlgorithm(mongod, kThroughputProbingAlgorithm);
|
||||
assertTicketSizing(mongod, {expectDynamicAdjustmentWarnings: true, expectPrioritizationWarnings: true});
|
||||
assertTicketSizing(mongod, {expectDynamicAdjustmentWarnings: true});
|
||||
});
|
||||
});
|
||||
});
|
||||
|
|
@ -365,11 +341,11 @@ describe("Execution control concurrency adjustment algorithm", function () {
|
|||
|
||||
it("should preserve low priority ticket values across prioritization changes", function () {
|
||||
const customTicketCount = 40;
|
||||
setExecutionControlAlgorithm(mongod, kFixedConcurrentTransactionsWithPrioritizationAlgorithm);
|
||||
setDeprioritizationGate(mongod, true);
|
||||
setExecutionControlTickets(mongod, {readLowPriority: customTicketCount});
|
||||
|
||||
setExecutionControlAlgorithm(mongod, kFixedConcurrentTransactionsAlgorithm);
|
||||
setExecutionControlAlgorithm(mongod, kFixedConcurrentTransactionsWithPrioritizationAlgorithm);
|
||||
setDeprioritizationGate(mongod, false);
|
||||
setDeprioritizationGate(mongod, true);
|
||||
|
||||
const res = assert.commandWorked(
|
||||
mongod.adminCommand({
|
||||
|
|
@ -419,7 +395,7 @@ describe("Execution control concurrency adjustment algorithm", function () {
|
|||
});
|
||||
|
||||
beforeEach(function () {
|
||||
setExecutionControlAlgorithm(mongod, kFixedConcurrentTransactionsWithPrioritizationAlgorithm);
|
||||
setDeprioritizationGate(mongod, true);
|
||||
});
|
||||
|
||||
afterEach(function () {
|
||||
|
|
@ -434,16 +410,7 @@ describe("Execution control concurrency adjustment algorithm", function () {
|
|||
assert.commandFailedWithCode(
|
||||
mongod.adminCommand({
|
||||
setParameter: 1,
|
||||
executionControlConcurrencyAdjustmentAlgorithm: kFixedConcurrentTransactionsAlgorithm,
|
||||
}),
|
||||
ErrorCodes.IllegalOperation,
|
||||
);
|
||||
|
||||
// Attempt to transition to kThroughputProbing should also fail.
|
||||
assert.commandFailedWithCode(
|
||||
mongod.adminCommand({
|
||||
setParameter: 1,
|
||||
executionControlConcurrencyAdjustmentAlgorithm: kThroughputProbingAlgorithm,
|
||||
executionControlDeprioritizationGate: false,
|
||||
}),
|
||||
ErrorCodes.IllegalOperation,
|
||||
);
|
||||
|
|
@ -456,40 +423,31 @@ describe("Execution control concurrency adjustment algorithm", function () {
|
|||
assert.commandFailedWithCode(
|
||||
mongod.adminCommand({
|
||||
setParameter: 1,
|
||||
executionControlConcurrencyAdjustmentAlgorithm: kFixedConcurrentTransactionsAlgorithm,
|
||||
}),
|
||||
ErrorCodes.IllegalOperation,
|
||||
);
|
||||
|
||||
// Attempt to transition to kThroughputProbing should also fail.
|
||||
assert.commandFailedWithCode(
|
||||
mongod.adminCommand({
|
||||
setParameter: 1,
|
||||
executionControlConcurrencyAdjustmentAlgorithm: kThroughputProbingAlgorithm,
|
||||
executionControlDeprioritizationGate: false,
|
||||
}),
|
||||
ErrorCodes.IllegalOperation,
|
||||
);
|
||||
});
|
||||
|
||||
it("should allow staying in prioritization mode regardless of low-priority ticket values", function () {
|
||||
setExecutionControlAlgorithm(mongod, kFixedConcurrentTransactionsWithPrioritizationAlgorithm);
|
||||
setDeprioritizationGate(mongod, true);
|
||||
|
||||
// Set low-priority tickets to 0.
|
||||
setExecutionControlTickets(mongod, {readLowPriority: 0, writeLowPriority: 0});
|
||||
|
||||
// Staying in prioritization mode should succeed.
|
||||
setExecutionControlAlgorithm(mongod, kFixedConcurrentTransactionsWithPrioritizationAlgorithm);
|
||||
setDeprioritizationGate(mongod, true);
|
||||
});
|
||||
|
||||
it("should allow enabling deprioritization regardless of low-priority ticket values", function () {
|
||||
// Start with non-prioritization mode.
|
||||
setExecutionControlAlgorithm(mongod, kFixedConcurrentTransactionsAlgorithm);
|
||||
setDeprioritizationGate(mongod, false);
|
||||
|
||||
// Set low-priority tickets to 0 (this is allowed in non-prioritization modes).
|
||||
setExecutionControlTickets(mongod, {readLowPriority: 0, writeLowPriority: 0});
|
||||
|
||||
// Transition to prioritization should succeed even with 0 low-priority tickets.
|
||||
setExecutionControlAlgorithm(mongod, kFixedConcurrentTransactionsWithPrioritizationAlgorithm);
|
||||
setDeprioritizationGate(mongod, true);
|
||||
});
|
||||
});
|
||||
|
||||
|
|
@ -535,7 +493,7 @@ describe("Execution control concurrency adjustment algorithm", function () {
|
|||
});
|
||||
|
||||
it("should align tickets when transitioning from throughput probing to fixed concurrency adjustment algorithm with prioritization", function () {
|
||||
setExecutionControlAlgorithm(mongod, kFixedConcurrentTransactionsWithPrioritizationAlgorithm);
|
||||
setDeprioritizationGate(mongod, true);
|
||||
|
||||
const customReadTickets = 35;
|
||||
const customWriteTickets = 45;
|
||||
|
|
|
|||
|
|
@ -20,9 +20,11 @@ import {
|
|||
getTotalDeprioritizationCount,
|
||||
insertTestDocuments,
|
||||
kFixedConcurrentTransactionsAlgorithm,
|
||||
kFixedConcurrentTransactionsWithPrioritizationAlgorithm,
|
||||
kThroughputProbingAlgorithm,
|
||||
setBackgroundTaskDeprioritization,
|
||||
setDeprioritizationGate,
|
||||
setExecutionControlAlgorithm,
|
||||
setHeuristicDeprioritization,
|
||||
} from "jstests/noPassthrough/admission/execution_control/libs/execution_control_helper.js";
|
||||
|
||||
describe("Execution control statistics and observability", function () {
|
||||
|
|
@ -53,6 +55,30 @@ describe("Execution control statistics and observability", function () {
|
|||
assertFunc(obj1.totalTickets, obj2.totalTickets);
|
||||
}
|
||||
|
||||
/**
|
||||
* Verifies that execution control stats from serverStatus match the expected values.
|
||||
* Checks algorithm type, prioritization settings, and ticket aggregation stats.
|
||||
*
|
||||
* The 'expected' object should contain:
|
||||
* - usesThroughputProbing: whether throughput probing algorithm is active.
|
||||
* - usesPrioritization: whether any form of prioritization is enabled.
|
||||
* - deprioritizationGate: the state of the deprioritization gate.
|
||||
* - heuristicDeprioritization: whether heuristic deprioritization is enabled.
|
||||
* - backgroundTasksDeprioritization: whether background task deprioritization is enabled.
|
||||
* - ticketAssertFunc: assertion function (assert.eq or assert.gt) for comparing aggregated
|
||||
* ticket stats to normal priority ticket stats.
|
||||
*/
|
||||
function verifyExecutionControlStats(expected) {
|
||||
const stats = getExecutionControlStats(mongod);
|
||||
assert.eq(expected.usesThroughputProbing, stats.usesThroughputProbing);
|
||||
assert.eq(expected.usesPrioritization, stats.usesPrioritization);
|
||||
assert.eq(expected.deprioritizationGate, stats.deprioritizationGate);
|
||||
assert.eq(expected.heuristicDeprioritization, stats.heuristicDeprioritization);
|
||||
assert.eq(expected.backgroundTasksDeprioritization, stats.backgroundTasksDeprioritization);
|
||||
verifyTicketAggregationStats(expected.ticketAssertFunc, stats.read, stats.read.normalPriority);
|
||||
verifyTicketAggregationStats(expected.ticketAssertFunc, stats.write, stats.write.normalPriority);
|
||||
}
|
||||
|
||||
before(function () {
|
||||
replTest = new ReplSetTest({
|
||||
nodes: 1,
|
||||
|
|
@ -77,36 +103,154 @@ describe("Execution control statistics and observability", function () {
|
|||
|
||||
it("should report throughput probing correctly", function () {
|
||||
setExecutionControlAlgorithm(mongod, kThroughputProbingAlgorithm);
|
||||
setDeprioritizationGate(mongod, false);
|
||||
|
||||
const stats = getExecutionControlStats(mongod);
|
||||
assert.eq(2, stats.executionControlConcurrencyAdjustmentAlgorithm);
|
||||
assert.eq(false, stats.prioritizationEnabled);
|
||||
verifyTicketAggregationStats(assert.eq, stats.read, stats.read.normalPriority);
|
||||
verifyTicketAggregationStats(assert.eq, stats.write, stats.write.normalPriority);
|
||||
verifyExecutionControlStats({
|
||||
usesThroughputProbing: true,
|
||||
usesPrioritization: false,
|
||||
deprioritizationGate: false,
|
||||
heuristicDeprioritization: true,
|
||||
backgroundTasksDeprioritization: true,
|
||||
ticketAssertFunc: assert.eq,
|
||||
});
|
||||
});
|
||||
|
||||
it("should report fixed algorithm correctly", function () {
|
||||
setExecutionControlAlgorithm(mongod, kFixedConcurrentTransactionsAlgorithm);
|
||||
setDeprioritizationGate(mongod, false);
|
||||
|
||||
const stats = getExecutionControlStats(mongod);
|
||||
assert.eq(0, stats.executionControlConcurrencyAdjustmentAlgorithm);
|
||||
assert.eq(false, stats.prioritizationEnabled);
|
||||
verifyTicketAggregationStats(assert.eq, stats.read, stats.read.normalPriority);
|
||||
verifyTicketAggregationStats(assert.eq, stats.write, stats.write.normalPriority);
|
||||
verifyExecutionControlStats({
|
||||
usesThroughputProbing: false,
|
||||
usesPrioritization: false,
|
||||
deprioritizationGate: false,
|
||||
heuristicDeprioritization: true,
|
||||
backgroundTasksDeprioritization: true,
|
||||
ticketAssertFunc: assert.eq,
|
||||
});
|
||||
});
|
||||
|
||||
it("should report throughput probing with prioritization correctly", function () {
|
||||
setExecutionControlAlgorithm(mongod, kThroughputProbingAlgorithm);
|
||||
setDeprioritizationGate(mongod, true);
|
||||
|
||||
verifyExecutionControlStats({
|
||||
usesThroughputProbing: true,
|
||||
usesPrioritization: true,
|
||||
deprioritizationGate: true,
|
||||
heuristicDeprioritization: true,
|
||||
backgroundTasksDeprioritization: true,
|
||||
ticketAssertFunc: assert.gt,
|
||||
});
|
||||
});
|
||||
|
||||
it("should report fixed algorithm with prioritization correctly", function () {
|
||||
setExecutionControlAlgorithm(mongod, kFixedConcurrentTransactionsWithPrioritizationAlgorithm);
|
||||
setExecutionControlAlgorithm(mongod, kFixedConcurrentTransactionsAlgorithm);
|
||||
setDeprioritizationGate(mongod, true);
|
||||
|
||||
const stats = getExecutionControlStats(mongod);
|
||||
assert.eq(1, stats.executionControlConcurrencyAdjustmentAlgorithm);
|
||||
assert.eq(true, stats.prioritizationEnabled);
|
||||
verifyTicketAggregationStats(assert.gt, stats.read, stats.read.normalPriority);
|
||||
verifyTicketAggregationStats(assert.gt, stats.write, stats.write.normalPriority);
|
||||
verifyExecutionControlStats({
|
||||
usesThroughputProbing: false,
|
||||
usesPrioritization: true,
|
||||
deprioritizationGate: true,
|
||||
heuristicDeprioritization: true,
|
||||
backgroundTasksDeprioritization: true,
|
||||
ticketAssertFunc: assert.gt,
|
||||
});
|
||||
});
|
||||
|
||||
// TODO SERVER-113367: Once we can enable the heuristic with throughput probing, we need to
|
||||
// add cases of the heuristic and algorithms combinations.
|
||||
it("should report heuristic deprioritization disabled correctly", function () {
|
||||
setExecutionControlAlgorithm(mongod, kFixedConcurrentTransactionsAlgorithm);
|
||||
setDeprioritizationGate(mongod, true);
|
||||
setHeuristicDeprioritization(mongod, false);
|
||||
|
||||
verifyExecutionControlStats({
|
||||
usesThroughputProbing: false,
|
||||
usesPrioritization: true,
|
||||
deprioritizationGate: true,
|
||||
heuristicDeprioritization: false,
|
||||
backgroundTasksDeprioritization: true,
|
||||
ticketAssertFunc: assert.gt,
|
||||
});
|
||||
});
|
||||
|
||||
it("should report background tasks deprioritization disabled correctly", function () {
|
||||
setExecutionControlAlgorithm(mongod, kFixedConcurrentTransactionsAlgorithm);
|
||||
setDeprioritizationGate(mongod, true);
|
||||
setHeuristicDeprioritization(mongod, true);
|
||||
setBackgroundTaskDeprioritization(mongod, false);
|
||||
|
||||
verifyExecutionControlStats({
|
||||
usesThroughputProbing: false,
|
||||
usesPrioritization: true,
|
||||
deprioritizationGate: true,
|
||||
heuristicDeprioritization: true,
|
||||
backgroundTasksDeprioritization: false,
|
||||
ticketAssertFunc: assert.gt,
|
||||
});
|
||||
});
|
||||
|
||||
it("should report both heuristic and background tasks disabled correctly", function () {
|
||||
setExecutionControlAlgorithm(mongod, kFixedConcurrentTransactionsAlgorithm);
|
||||
setDeprioritizationGate(mongod, true);
|
||||
setHeuristicDeprioritization(mongod, false);
|
||||
setBackgroundTaskDeprioritization(mongod, false);
|
||||
|
||||
verifyExecutionControlStats({
|
||||
usesThroughputProbing: false,
|
||||
usesPrioritization: false,
|
||||
deprioritizationGate: true,
|
||||
heuristicDeprioritization: false,
|
||||
backgroundTasksDeprioritization: false,
|
||||
ticketAssertFunc: assert.eq,
|
||||
});
|
||||
});
|
||||
|
||||
it("should report prioritization with throughput probing and only heuristic enabled", function () {
|
||||
setExecutionControlAlgorithm(mongod, kThroughputProbingAlgorithm);
|
||||
setDeprioritizationGate(mongod, true);
|
||||
setHeuristicDeprioritization(mongod, true);
|
||||
setBackgroundTaskDeprioritization(mongod, false);
|
||||
|
||||
verifyExecutionControlStats({
|
||||
usesThroughputProbing: true,
|
||||
usesPrioritization: true,
|
||||
deprioritizationGate: true,
|
||||
heuristicDeprioritization: true,
|
||||
backgroundTasksDeprioritization: false,
|
||||
ticketAssertFunc: assert.gt,
|
||||
});
|
||||
});
|
||||
|
||||
it("should report prioritization with throughput probing and only background tasks enabled", function () {
|
||||
setExecutionControlAlgorithm(mongod, kThroughputProbingAlgorithm);
|
||||
setDeprioritizationGate(mongod, true);
|
||||
setHeuristicDeprioritization(mongod, false);
|
||||
setBackgroundTaskDeprioritization(mongod, true);
|
||||
|
||||
verifyExecutionControlStats({
|
||||
usesThroughputProbing: true,
|
||||
usesPrioritization: true,
|
||||
deprioritizationGate: true,
|
||||
heuristicDeprioritization: false,
|
||||
backgroundTasksDeprioritization: true,
|
||||
ticketAssertFunc: assert.gt,
|
||||
});
|
||||
});
|
||||
|
||||
it("should report no prioritization when gate is open but both flags are disabled", function () {
|
||||
setExecutionControlAlgorithm(mongod, kThroughputProbingAlgorithm);
|
||||
setDeprioritizationGate(mongod, true);
|
||||
setHeuristicDeprioritization(mongod, false);
|
||||
setBackgroundTaskDeprioritization(mongod, false);
|
||||
|
||||
verifyExecutionControlStats({
|
||||
usesThroughputProbing: true,
|
||||
usesPrioritization: false,
|
||||
deprioritizationGate: true,
|
||||
heuristicDeprioritization: false,
|
||||
backgroundTasksDeprioritization: false,
|
||||
ticketAssertFunc: assert.eq,
|
||||
});
|
||||
});
|
||||
});
|
||||
|
||||
describe("Deprioritized operation statistics and observability", function () {
|
||||
|
|
@ -121,8 +265,7 @@ describe("Execution control statistics and observability", function () {
|
|||
// Force the query to yield frequently to better expose the low-priority
|
||||
// behavior.
|
||||
internalQueryExecYieldIterations: 1,
|
||||
executionControlConcurrencyAdjustmentAlgorithm:
|
||||
kFixedConcurrentTransactionsWithPrioritizationAlgorithm,
|
||||
executionControlDeprioritizationGate: true,
|
||||
executionControlHeuristicNumAdmissionsDeprioritizeThreshold: 1,
|
||||
logComponentVerbosity: {command: 2},
|
||||
},
|
||||
|
|
@ -262,8 +405,7 @@ describe("Execution control statistics and observability", function () {
|
|||
nodes: 1,
|
||||
nodeOptions: {
|
||||
setParameter: {
|
||||
executionControlConcurrencyAdjustmentAlgorithm:
|
||||
kFixedConcurrentTransactionsWithPrioritizationAlgorithm,
|
||||
executionControlDeprioritizationGate: true,
|
||||
executionControlHeuristicNumAdmissionsDeprioritizeThreshold: 1,
|
||||
internalQueryExecYieldIterations: 1,
|
||||
featureFlagRecordDelinquentMetrics: true,
|
||||
|
|
|
|||
|
|
@ -22,7 +22,6 @@ import {
|
|||
getLowPriorityReadCount,
|
||||
getLowPriorityWriteCount,
|
||||
insertTestDocuments,
|
||||
kFixedConcurrentTransactionsWithPrioritizationAlgorithm,
|
||||
setBackgroundTaskDeprioritization,
|
||||
} from "jstests/noPassthrough/admission/execution_control/libs/execution_control_helper.js";
|
||||
import {IndexBuildTest} from "jstests/noPassthrough/libs/index_builds/index_build.js";
|
||||
|
|
@ -41,8 +40,7 @@ describe("Execution control deprioritization mechanisms", function () {
|
|||
// Force the query to yield frequently to better expose the low-priority
|
||||
// behavior.
|
||||
internalQueryExecYieldIterations: 1,
|
||||
executionControlConcurrencyAdjustmentAlgorithm:
|
||||
kFixedConcurrentTransactionsWithPrioritizationAlgorithm,
|
||||
executionControlDeprioritizationGate: true,
|
||||
},
|
||||
},
|
||||
});
|
||||
|
|
@ -202,8 +200,7 @@ describe("Execution control deprioritization mechanisms", function () {
|
|||
nodes: 2,
|
||||
nodeOptions: {
|
||||
setParameter: {
|
||||
executionControlConcurrencyAdjustmentAlgorithm:
|
||||
kFixedConcurrentTransactionsWithPrioritizationAlgorithm,
|
||||
executionControlDeprioritizationGate: true,
|
||||
executionControlHeuristicDeprioritization: false,
|
||||
},
|
||||
},
|
||||
|
|
@ -265,8 +262,7 @@ describe("Execution control deprioritization mechanisms", function () {
|
|||
setParameter: {
|
||||
ttlMonitorSleepSecs: 1,
|
||||
ttlMonitorEnabled: false,
|
||||
executionControlConcurrencyAdjustmentAlgorithm:
|
||||
kFixedConcurrentTransactionsWithPrioritizationAlgorithm,
|
||||
executionControlDeprioritizationGate: true,
|
||||
executionControlHeuristicDeprioritization: false,
|
||||
},
|
||||
},
|
||||
|
|
@ -344,8 +340,7 @@ describe("Execution control deprioritization mechanisms", function () {
|
|||
other: {
|
||||
rsOptions: {
|
||||
setParameter: {
|
||||
executionControlConcurrencyAdjustmentAlgorithm:
|
||||
kFixedConcurrentTransactionsWithPrioritizationAlgorithm,
|
||||
executionControlDeprioritizationGate: true,
|
||||
executionControlHeuristicDeprioritization: false,
|
||||
},
|
||||
},
|
||||
|
|
|
|||
|
|
@ -43,6 +43,13 @@ export function getExecutionControlAlgorithm(node) {
|
|||
return getExecutionControlParameter(node, "executionControlConcurrencyAdjustmentAlgorithm");
|
||||
}
|
||||
|
||||
/**
|
||||
* Enables or disables the global deprioritization gate.
|
||||
*/
|
||||
export function setDeprioritizationGate(node, enabled) {
|
||||
setExecutionControlParameter(node, "executionControlDeprioritizationGate", enabled);
|
||||
}
|
||||
|
||||
/**
|
||||
* Enables or disables heuristic deprioritization for long-running operations.
|
||||
*/
|
||||
|
|
|
|||
|
|
@ -31,7 +31,7 @@ rst.initiate();
|
|||
assert.commandWorked(
|
||||
rst.getPrimary().adminCommand({
|
||||
setParameter: 1,
|
||||
executionControlHeuristicDeprioritization: false,
|
||||
executionControlDeprioritizationGate: false,
|
||||
}),
|
||||
);
|
||||
assert.commandWorked(
|
||||
|
|
|
|||
|
|
@ -28,18 +28,12 @@ const delinquentIntervalMs = waitPerIterationMs - 20;
|
|||
const findComment = "delinquent_ops.js-COMMENT";
|
||||
|
||||
function disableDeprioritizationHeuristic(conn) {
|
||||
let algorithm = assert.commandWorked(
|
||||
conn.adminCommand({getParameter: 1, executionControlConcurrencyAdjustmentAlgorithm: 1}),
|
||||
assert.commandWorked(
|
||||
conn.adminCommand({
|
||||
setParameter: 1,
|
||||
executionControlHeuristicDeprioritization: false,
|
||||
}),
|
||||
);
|
||||
if (algorithm.executionControlConcurrencyAdjustmentAlgorithm == "fixedConcurrentTransactionsWithPrioritization") {
|
||||
// Set a very high number of admissions, effectively disabling the heuristic.
|
||||
assert.commandWorked(
|
||||
conn.adminCommand({
|
||||
setParameter: 1,
|
||||
executionControlHeuristicNumAdmissionsDeprioritizeThreshold: 999999,
|
||||
}),
|
||||
);
|
||||
}
|
||||
}
|
||||
|
||||
function assertDelinquentStats(metrics, count, msg, previousOperationMetrics) {
|
||||
|
|
|
|||
|
|
@ -39,18 +39,10 @@ enums:
|
|||
# executionControlConcurrentWriteTransactions/executionControlConcurrentReadTransactions and
|
||||
# will not be adjusted automatically based on overall system throughput.
|
||||
#
|
||||
# 'kFixedConcurrentTransactionsWithPrioritization': The number of concurrent transactions is
|
||||
# controlled by server parameters and divided into two pools based on operation priority
|
||||
# (normal and low priority). These values are not adjusted automatically based on overall
|
||||
# system throughput. Instead, this algorithm is intended for use with an external server-side
|
||||
# policy (e.g., mongotune) that dynamically adjusts the number of concurrent transactions in
|
||||
# each pool.
|
||||
#
|
||||
# 'kThroughputProbing': Number of concurrent transactions are dynamically adjusted, either
|
||||
# increasing or decreasing concurrency in the execution control, based on system throughput.
|
||||
values:
|
||||
kFixedConcurrentTransactions: "fixedConcurrentTransactions"
|
||||
kFixedConcurrentTransactionsWithPrioritization: "fixedConcurrentTransactionsWithPrioritization"
|
||||
kThroughputProbing: "throughputProbing"
|
||||
|
||||
server_parameters:
|
||||
|
|
@ -190,15 +182,27 @@ server_parameters:
|
|||
callback: TicketingSystem::validateConcurrencyAdjustmentAlgorithm
|
||||
redact: false
|
||||
|
||||
executionControlDeprioritizationGate:
|
||||
description: >-
|
||||
Global gate for enabling deprioritization in the ticketing system. When true, feature-specific
|
||||
deprioritization controls (such as heuristic and background task deprioritization) are
|
||||
permitted to take effect. When false, all deprioritization mechanisms are blocked,
|
||||
regardless of their individual settings.
|
||||
set_at: [startup, runtime]
|
||||
cpp_vartype: AtomicWord<bool>
|
||||
cpp_varname: gDeprioritizationGate
|
||||
on_update: "TicketingSystem::updateDeprioritizationGate"
|
||||
default: false
|
||||
redact: false
|
||||
|
||||
executionControlHeuristicDeprioritization:
|
||||
description: >-
|
||||
Enables a heuristic that will evaluate long-running operations and deprioritize them to
|
||||
the low priority ticket pool. This parameter only takes effect if the execution
|
||||
admission control with prioritization is enabled. This only takes effect if
|
||||
`executionControlConcurrencyAdjustmentAlgorithm` is "fixedConcurrentTransactionsWithPrioritization".
|
||||
the low priority ticket pool.
|
||||
set_at: [startup, runtime]
|
||||
cpp_vartype: AtomicWord<bool>
|
||||
cpp_varname: gHeuristicDeprioritization
|
||||
on_update: "TicketingSystem::updateHeuristicDeprioritization"
|
||||
default: true
|
||||
redact: false
|
||||
|
||||
|
|
@ -206,7 +210,7 @@ server_parameters:
|
|||
description: >-
|
||||
The number of admissions an operation must perform before it is deprioritized to the low-priority
|
||||
ticket pool. This parameter only takes effect when `executionControlHeuristicDeprioritization`
|
||||
is true and `executionControlConcurrencyAdjustmentAlgorithm` is "fixedConcurrentTransactionsWithPrioritization".
|
||||
is true.
|
||||
set_at: [startup, runtime]
|
||||
cpp_vartype: Atomic<int>
|
||||
cpp_varname: gHeuristicNumAdmissionsDeprioritizeThreshold
|
||||
|
|
@ -221,10 +225,10 @@ server_parameters:
|
|||
TTL deletions) are automatically deprioritized to the low-priority pool. If set to false,
|
||||
these tasks will not be automatically deprioritized upon starting. However, they can
|
||||
still be deprioritized later by the heuristic if the operation exceeds the admissions
|
||||
threshold. This only takes effect if `executionControlConcurrencyAdjustmentAlgorithm` is
|
||||
"fixedConcurrentTransactionsWithPrioritization".
|
||||
threshold.
|
||||
set_at: [startup, runtime]
|
||||
cpp_vartype: AtomicWord<bool>
|
||||
cpp_varname: gBackgroundTasksDeprioritization
|
||||
on_update: "TicketingSystem::updateBackgroundTasksDeprioritization"
|
||||
default: true
|
||||
redact: false
|
||||
|
|
|
|||
|
|
@ -118,6 +118,21 @@ bool wasOperationDowngradedToLowPriority(OperationContext* opCtx,
|
|||
return admCtx->getAdmissions() >= gHeuristicNumAdmissionsDeprioritizeThreshold.load();
|
||||
}
|
||||
|
||||
Status checkPrioritizationTransition(bool oldStateUsesPrioritization,
|
||||
bool newStateUsesPrioritization) {
|
||||
const bool transitioningAwayFromPrioritization =
|
||||
oldStateUsesPrioritization && !newStateUsesPrioritization;
|
||||
|
||||
if (transitioningAwayFromPrioritization &&
|
||||
(gConcurrentReadLowPriorityTransactions.load() == 0 ||
|
||||
gConcurrentWriteLowPriorityTransactions.load() == 0)) {
|
||||
return Status{ErrorCodes::IllegalOperation,
|
||||
"Cannot transition to not use prioritization when low-priority read or write "
|
||||
"tickets are set to 0"};
|
||||
}
|
||||
return Status::OK();
|
||||
}
|
||||
|
||||
} // namespace
|
||||
|
||||
Status TicketingSystem::NormalPrioritySettings::updateWriteMaxQueueDepth(
|
||||
|
|
@ -221,7 +236,6 @@ Status TicketingSystem::LowPrioritySettings::updateConcurrentWriteTransactions(
|
|||
const int32_t& newWriteTransactions) {
|
||||
const auto spName = "low priority concurrent write transactions limit";
|
||||
return updateSettings(spName, [&](Client* client, TicketingSystem* ticketingSystem) {
|
||||
warnIfDynamicAdjustmentEnabled(ticketingSystem, spName);
|
||||
warnIfPrioritizationDisabled(ticketingSystem, spName);
|
||||
|
||||
ticketingSystem->setConcurrentTransactions(client->getOperationContext(),
|
||||
|
|
@ -236,7 +250,6 @@ Status TicketingSystem::LowPrioritySettings::updateConcurrentReadTransactions(
|
|||
const int32_t& newReadTransactions) {
|
||||
const auto spName = "low priority concurrent read transactions limit";
|
||||
return updateSettings(spName, [&](Client* client, TicketingSystem* ticketingSystem) {
|
||||
warnIfDynamicAdjustmentEnabled(ticketingSystem, spName);
|
||||
warnIfPrioritizationDisabled(ticketingSystem, spName);
|
||||
|
||||
ticketingSystem->setConcurrentTransactions(client->getOperationContext(),
|
||||
|
|
@ -272,8 +285,30 @@ Status TicketingSystem::LowPrioritySettings::validateConcurrentReadTransactions(
|
|||
Status TicketingSystem::updateConcurrencyAdjustmentAlgorithm(std::string algorithmName) {
|
||||
return updateSettings("concurrency adjustment algorithm",
|
||||
[&](Client* client, TicketingSystem* ticketingSystem) {
|
||||
return ticketingSystem->setConcurrencyAdjustmentAlgorithm(
|
||||
ticketingSystem->setConcurrencyAdjustmentAlgorithm(
|
||||
client->getOperationContext(), algorithmName);
|
||||
return Status::OK();
|
||||
});
|
||||
}
|
||||
|
||||
Status TicketingSystem::updateDeprioritizationGate(bool enabled) {
|
||||
return updateSettings("deprioritization gate",
|
||||
[&](Client* client, TicketingSystem* ticketingSystem) {
|
||||
return ticketingSystem->setDeprioritizationGate(enabled);
|
||||
});
|
||||
}
|
||||
|
||||
Status TicketingSystem::updateHeuristicDeprioritization(bool enabled) {
|
||||
return updateSettings("heuristic deprioritization",
|
||||
[&](Client* client, TicketingSystem* ticketingSystem) {
|
||||
return ticketingSystem->setHeuristicDeprioritization(enabled);
|
||||
});
|
||||
}
|
||||
|
||||
Status TicketingSystem::updateBackgroundTasksDeprioritization(bool enabled) {
|
||||
return updateSettings("background tasks deprioritization",
|
||||
[&](Client* client, TicketingSystem* ticketingSystem) {
|
||||
return ticketingSystem->setBackgroundTasksDeprioritization(enabled);
|
||||
});
|
||||
}
|
||||
|
||||
|
|
@ -299,7 +334,12 @@ TicketingSystem::TicketingSystem(
|
|||
RWTicketHolder normal,
|
||||
RWTicketHolder low,
|
||||
ExecutionControlConcurrencyAdjustmentAlgorithmEnum concurrencyAdjustmentAlgorithm)
|
||||
: _state({concurrencyAdjustmentAlgorithm}),
|
||||
: _state({.usesThroughputProbing = concurrencyAdjustmentAlgorithm ==
|
||||
ExecutionControlConcurrencyAdjustmentAlgorithmEnum::kThroughputProbing,
|
||||
|
||||
.deprioritization = {.gate = gDeprioritizationGate.load(),
|
||||
.heuristic = gHeuristicDeprioritization.load(),
|
||||
.backgroundTasks = gBackgroundTasksDeprioritization.load()}}),
|
||||
_throughputProbing(svcCtx,
|
||||
normal.read.get(),
|
||||
normal.write.get(),
|
||||
|
|
@ -309,7 +349,7 @@ TicketingSystem::TicketingSystem(
|
|||
};
|
||||
|
||||
bool TicketingSystem::isRuntimeResizable() const {
|
||||
return _state.loadRelaxed().isRuntimeResizable();
|
||||
return !_state.loadRelaxed().usesThroughputProbing;
|
||||
}
|
||||
|
||||
bool TicketingSystem::usesPrioritization() const {
|
||||
|
|
@ -333,39 +373,28 @@ void TicketingSystem::setConcurrentTransactions(OperationContext* opCtx,
|
|||
holder->resize(opCtx, transactions, Date_t::max());
|
||||
}
|
||||
|
||||
Status TicketingSystem::setConcurrencyAdjustmentAlgorithm(OperationContext* opCtx,
|
||||
std::string algorithmName) {
|
||||
void TicketingSystem::setConcurrencyAdjustmentAlgorithm(OperationContext* opCtx,
|
||||
std::string algorithmName) {
|
||||
const auto parsedAlgorithm = ExecutionControlConcurrencyAdjustmentAlgorithm_parse(
|
||||
algorithmName, IDLParserContext{"executionControlConcurrencyAdjustmentAlgorithm"});
|
||||
|
||||
const TicketingState oldState = _state.loadRelaxed();
|
||||
const TicketingState newState = {parsedAlgorithm};
|
||||
|
||||
const bool wasThroughputProbingRunning = oldState.usesThroughputProbing();
|
||||
const bool isThroughputProbingRunningNow = newState.usesThroughputProbing();
|
||||
const bool transitioningAwayFromPrioritization =
|
||||
oldState.usesPrioritization() && !newState.usesPrioritization();
|
||||
|
||||
if (transitioningAwayFromPrioritization &&
|
||||
(gConcurrentReadLowPriorityTransactions.load() == 0 ||
|
||||
gConcurrentWriteLowPriorityTransactions.load() == 0)) {
|
||||
return Status{ErrorCodes::IllegalOperation,
|
||||
"Cannot transition to not use prioritization when low-priority read or write "
|
||||
"tickets are set to 0"};
|
||||
}
|
||||
TicketingState newState = oldState;
|
||||
newState.usesThroughputProbing =
|
||||
(parsedAlgorithm == ExecutionControlConcurrencyAdjustmentAlgorithmEnum::kThroughputProbing);
|
||||
|
||||
_state.store(newState);
|
||||
|
||||
if (wasThroughputProbingRunning == isThroughputProbingRunningNow) {
|
||||
// There has been a change in the algorithm related to the prioritization of operations, but
|
||||
// no change in the throughput probing state. There is nothing more to update.
|
||||
return Status::OK();
|
||||
if (oldState.usesThroughputProbing == newState.usesThroughputProbing) {
|
||||
// No-op. Nothing has changed.
|
||||
return;
|
||||
}
|
||||
|
||||
if (isThroughputProbingRunningNow) {
|
||||
if (newState.usesThroughputProbing) {
|
||||
// Throughput probing needs to start. Apart from that, there is nothing more to update.
|
||||
_throughputProbing.start();
|
||||
return Status::OK();
|
||||
return;
|
||||
}
|
||||
|
||||
// There has been a change in the algorithm from throughput probing to fixed concurrency. We
|
||||
|
|
@ -381,20 +410,41 @@ Status TicketingSystem::setConcurrencyAdjustmentAlgorithm(OperationContext* opCt
|
|||
AdmissionContext::Priority::kNormal,
|
||||
OperationType::kWrite,
|
||||
gConcurrentWriteTransactions.load());
|
||||
setConcurrentTransactions(
|
||||
opCtx,
|
||||
AdmissionContext::Priority::kLow,
|
||||
OperationType::kRead,
|
||||
TicketingSystem::resolveLowPriorityTickets(gConcurrentReadLowPriorityTransactions));
|
||||
setConcurrentTransactions(
|
||||
opCtx,
|
||||
AdmissionContext::Priority::kLow,
|
||||
OperationType::kWrite,
|
||||
TicketingSystem::resolveLowPriorityTickets(gConcurrentWriteLowPriorityTransactions));
|
||||
}
|
||||
|
||||
Status TicketingSystem::_setDeprioritizationFlag(bool enabled,
|
||||
std::function<void(TicketingState&)> setFlag) {
|
||||
const TicketingState oldState = _state.loadRelaxed();
|
||||
|
||||
TicketingState newState = oldState;
|
||||
setFlag(newState);
|
||||
|
||||
auto status =
|
||||
checkPrioritizationTransition(oldState.usesPrioritization(), newState.usesPrioritization());
|
||||
if (!status.isOK()) {
|
||||
return status;
|
||||
}
|
||||
|
||||
_state.store(newState);
|
||||
return Status::OK();
|
||||
}
|
||||
|
||||
Status TicketingSystem::setDeprioritizationGate(bool enabled) {
|
||||
return _setDeprioritizationFlag(
|
||||
enabled, [enabled](TicketingState& state) { state.deprioritization.gate = enabled; });
|
||||
}
|
||||
|
||||
Status TicketingSystem::setHeuristicDeprioritization(bool enabled) {
|
||||
return _setDeprioritizationFlag(
|
||||
enabled, [enabled](TicketingState& state) { state.deprioritization.heuristic = enabled; });
|
||||
}
|
||||
|
||||
Status TicketingSystem::setBackgroundTasksDeprioritization(bool enabled) {
|
||||
return _setDeprioritizationFlag(enabled, [enabled](TicketingState& state) {
|
||||
state.deprioritization.backgroundTasks = enabled;
|
||||
});
|
||||
}
|
||||
|
||||
void TicketingSystem::appendStats(BSONObjBuilder& b) const {
|
||||
boost::optional<BSONObjBuilder> readStats;
|
||||
boost::optional<BSONObjBuilder> writeStats;
|
||||
|
|
@ -402,7 +452,6 @@ void TicketingSystem::appendStats(BSONObjBuilder& b) const {
|
|||
int32_t writeOut = 0, writeAvailable = 0, writeTotalTickets = 0;
|
||||
_state.loadRelaxed().appendStats(b);
|
||||
b.append("totalDeprioritizations", _opsDeprioritized.loadRelaxed());
|
||||
b.append("prioritizationEnabled", usesPrioritization());
|
||||
|
||||
for (size_t i = 0; i < _holders.size(); ++i) {
|
||||
const auto priority = static_cast<AdmissionContext::Priority>(i);
|
||||
|
|
@ -558,7 +607,7 @@ void TicketingSystem::startThroughputProbe() {
|
|||
tassert(11132202,
|
||||
"The throughput probing parameter should be enabled. This is only safe to use for "
|
||||
"initialization purposes.",
|
||||
_state.loadRelaxed().usesThroughputProbing());
|
||||
_state.loadRelaxed().usesThroughputProbing);
|
||||
|
||||
_throughputProbing.start();
|
||||
}
|
||||
|
|
@ -577,21 +626,15 @@ TicketHolder* TicketingSystem::_getHolder(AdmissionContext::Priority p, Operatio
|
|||
}
|
||||
|
||||
bool TicketingSystem::TicketingState::usesPrioritization() const {
|
||||
return algorithm ==
|
||||
ExecutionControlConcurrencyAdjustmentAlgorithmEnum::
|
||||
kFixedConcurrentTransactionsWithPrioritization;
|
||||
}
|
||||
|
||||
bool TicketingSystem::TicketingState::usesThroughputProbing() const {
|
||||
return algorithm == ExecutionControlConcurrencyAdjustmentAlgorithmEnum::kThroughputProbing;
|
||||
}
|
||||
|
||||
bool TicketingSystem::TicketingState::isRuntimeResizable() const {
|
||||
return !usesThroughputProbing();
|
||||
return deprioritization.isActive();
|
||||
}
|
||||
|
||||
void TicketingSystem::TicketingState::appendStats(BSONObjBuilder& b) const {
|
||||
b.append("executionControlConcurrencyAdjustmentAlgorithm", algorithm);
|
||||
b.append("usesThroughputProbing", usesThroughputProbing);
|
||||
b.append("usesPrioritization", usesPrioritization());
|
||||
b.append("deprioritizationGate", deprioritization.gate);
|
||||
b.append("heuristicDeprioritization", deprioritization.heuristic);
|
||||
b.append("backgroundTasksDeprioritization", deprioritization.backgroundTasks);
|
||||
}
|
||||
|
||||
} // namespace mongo::admission::execution_control
|
||||
|
|
|
|||
|
|
@ -117,6 +117,12 @@ public:
|
|||
|
||||
static Status updateConcurrencyAdjustmentAlgorithm(std::string newAlgorithm);
|
||||
|
||||
static Status updateDeprioritizationGate(bool enabled);
|
||||
|
||||
static Status updateHeuristicDeprioritization(bool enabled);
|
||||
|
||||
static Status updateBackgroundTasksDeprioritization(bool enabled);
|
||||
|
||||
/**
|
||||
* Resolves the configured number of low-priority tickets.
|
||||
*
|
||||
|
|
@ -160,7 +166,33 @@ public:
|
|||
* adjust the number of concurrent transactions. This includes switching between fixed
|
||||
* concurrency and throughput probing-based algorithms.
|
||||
*/
|
||||
Status setConcurrencyAdjustmentAlgorithm(OperationContext* opCtx, std::string algorithmName);
|
||||
void setConcurrencyAdjustmentAlgorithm(OperationContext* opCtx, std::string algorithmName);
|
||||
|
||||
/**
|
||||
* Opens or closes the global deprioritization gate in the ticketing system.
|
||||
*
|
||||
* When opened (`enabled = true`), feature-specific deprioritization controls (such as heuristic
|
||||
* and background task deprioritization) are allowed to take effect. When closed (`enabled =
|
||||
* false`), all deprioritization mechanisms are blocked, regardless of their individual
|
||||
* settings.
|
||||
*/
|
||||
Status setDeprioritizationGate(bool enabled);
|
||||
|
||||
/**
|
||||
* Enables or disables automatic deprioritization of operations based on a heuristic.
|
||||
*
|
||||
* When enabled, operations that yield frequently are automatically assigned to the low-priority
|
||||
* ticket pool.
|
||||
*/
|
||||
Status setHeuristicDeprioritization(bool enabled);
|
||||
|
||||
/**
|
||||
* Enables or disables automatic deprioritization of background tasks.
|
||||
*
|
||||
* When enabled, certain background operations (index builds, range deletions, and TTL
|
||||
* deletions) are automatically assigned to the low-priority ticket pool upon starting.
|
||||
*/
|
||||
Status setBackgroundTasksDeprioritization(bool enabled);
|
||||
|
||||
/**
|
||||
* Appends statistics about the ticketing system's state to a BSON.
|
||||
|
|
@ -201,11 +233,53 @@ private:
|
|||
* Encapsulates the ticketing system's concurrency mode and the logic that defines its behavior.
|
||||
*/
|
||||
struct TicketingState {
|
||||
ExecutionControlConcurrencyAdjustmentAlgorithmEnum algorithm;
|
||||
/**
|
||||
* If true, the system uses the throughput probing algorithm (dynamic). Otherwise, it uses
|
||||
* the fixed concurrency algorithm (static).
|
||||
*/
|
||||
bool usesThroughputProbing = false;
|
||||
|
||||
/**
|
||||
* Logically groups the settings related to operation deprioritization.
|
||||
*/
|
||||
struct DeprioritizationPolicy {
|
||||
/**
|
||||
* The global switch. If false, NO deprioritization occurs, regardless of the heuristic
|
||||
* or background flags.
|
||||
*/
|
||||
bool gate = false;
|
||||
|
||||
/**
|
||||
* If true (and gate is true), operations may be deprioritized based on admissions
|
||||
* frequency.
|
||||
*/
|
||||
bool heuristic = false;
|
||||
|
||||
/**
|
||||
* If true (and gate is true), specific background tasks (e.g., index builds) start as
|
||||
* low priority.
|
||||
*/
|
||||
bool backgroundTasks = false;
|
||||
|
||||
/**
|
||||
* Returns true if the gate is open and at least one specific policy is enabled.
|
||||
*/
|
||||
bool isActive() const {
|
||||
return gate && (heuristic || backgroundTasks);
|
||||
}
|
||||
};
|
||||
|
||||
/**
|
||||
* Holds the current configuration for all deprioritization mechanisms (heuristic,
|
||||
* background tasks).
|
||||
*/
|
||||
DeprioritizationPolicy deprioritization;
|
||||
|
||||
/**
|
||||
* Returns true if the system is configured to use multiple ticket pools (prioritization).
|
||||
*/
|
||||
bool usesPrioritization() const;
|
||||
bool usesThroughputProbing() const;
|
||||
bool isRuntimeResizable() const;
|
||||
|
||||
void appendStats(BSONObjBuilder& b) const;
|
||||
};
|
||||
|
||||
|
|
@ -215,6 +289,12 @@ private:
|
|||
*/
|
||||
AtomicWord<TicketingState> _state;
|
||||
|
||||
/**
|
||||
* Helper method to set a deprioritization flag with proper state transition checking.
|
||||
* The setFlag function is invoked to modify the new state before storing it.
|
||||
*/
|
||||
Status _setDeprioritizationFlag(bool enabled, std::function<void(TicketingState&)> setFlag);
|
||||
|
||||
/**
|
||||
* Holds the ticket pools for different priority levels.
|
||||
*
|
||||
|
|
|
|||
Loading…
Reference in New Issue