SERVER-112800: Make replWriterThreadCount and replWriterMinThreadCount settable at runtime (#44087)

Co-authored-by: Pierre Turin <pierre.turin@mongodb.com>
Co-authored-by: Guillaume Racicot <guillaume.racicot@mongodb.com>
GitOrigin-RevId: 1ce17a1a537820d5fcb44cd8c68cfca84ecb3ec1
This commit is contained in:
Indy Prentice 2025-12-10 11:32:54 -06:00 committed by MongoDB Bot
parent 0b456b6711
commit 46eca3e064
25 changed files with 1195 additions and 148 deletions

View File

@ -167,7 +167,12 @@ config_fuzzer_params = {
"period": 5, "period": 5,
"fuzz_at": ["startup", "runtime"], "fuzz_at": ["startup", "runtime"],
}, },
"replWriterThreadCount": {"min": 1, "max": 256, "fuzz_at": ["startup"]}, "replWriterThreadCount": {
"min": 1,
"max": 256,
"period": 5,
"fuzz_at": ["startup", "runtime"],
},
# Default value 1000; many tests don't insert enough measurements to rollover due to count, so we enable a larger range for this parameter. # Default value 1000; many tests don't insert enough measurements to rollover due to count, so we enable a larger range for this parameter.
"timeseriesBucketMaxCount": { "timeseriesBucketMaxCount": {
"min": 20, "min": 20,

View File

@ -1,42 +0,0 @@
// This test ensures that the replWriterThreadCount server parameter:
// 1) cannot be less than 1
// 2) cannot be greater than 256
// 3) is actually set to the passed in value
// 4) cannot be altered at run time
// too low a count
clearRawMongoProgramOutput();
assert.throws(() => MongoRunner.runMongod({setParameter: "replWriterThreadCount=0"}));
assert(
rawMongoProgramOutput("Invalid value for parameter replWriterThreadCount: ").match(
"0 is not greater than or equal to 1",
),
"mongod started with too low a value for replWriterThreadCount",
);
// too high a count
clearRawMongoProgramOutput();
assert.throws(() => MongoRunner.runMongod({setParameter: "replWriterThreadCount=257"}));
assert(
rawMongoProgramOutput("Invalid value for parameter replWriterThreadCount: ").match(
"257 is not less than or equal to 256",
),
"mongod started with too high a value for replWriterThreadCount",
);
// proper count
clearRawMongoProgramOutput();
let mongo = MongoRunner.runMongod({setParameter: "replWriterThreadCount=24"});
assert.neq(null, mongo, "mongod failed to start with a suitable replWriterThreadCount value");
assert(
rawMongoProgramOutput("Invalid value for parameter replWriterThreadCount").length == 0,
"despite accepting the replWriterThreadCount value, mongod logged an error",
);
// getParameter to confirm the value was set
let result = mongo.getDB("admin").runCommand({getParameter: 1, replWriterThreadCount: 1});
assert.eq(24, result.replWriterThreadCount, "replWriterThreadCount was not set internally");
// setParameter to ensure it is not possible
assert.commandFailed(mongo.getDB("admin").runCommand({setParameter: 1, replWriterThreadCount: 1}));
MongoRunner.stopMongod(mongo);

View File

@ -0,0 +1,385 @@
// This test ensures that the replWriterThreadCount server parameter:
// 1) cannot be less than 1
// 2) cannot be greater than 256
// 3) is actually set to the passed in value
// 4) can be altered at run time
import {ReplSetTest} from "jstests/libs/replsettest.js";
function testSettingParameter() {
// too low a count
clearRawMongoProgramOutput();
const tooLowThreadCount = 0;
assert.throws(() =>
MongoRunner.runMongod({replSet: "rs0", setParameter: "replWriterThreadCount=" + tooLowThreadCount.toString()}),
);
assert(
rawMongoProgramOutput("Invalid value for parameter replWriterThreadCount: ").match(
tooLowThreadCount.toString() + " is not greater than or equal to 1",
),
"mongod started with too low a value for replWriterThreadCount",
);
// too high a count
clearRawMongoProgramOutput();
const tooHighThreadCount = 257;
assert.throws(() =>
MongoRunner.runMongod({replSet: "rs0", setParameter: "replWriterThreadCount=" + tooHighThreadCount.toString()}),
);
assert(
rawMongoProgramOutput("Invalid value for parameter replWriterThreadCount: ").match(
tooHighThreadCount.toString() + " is not less than or equal to 256",
),
"mongod started with too high a value for replWriterThreadCount",
);
// proper counts
clearRawMongoProgramOutput();
const acceptableMinThreadCount = 4;
const acceptableThreadCount = 24;
const conn = MongoRunner.runMongod({
replSet: "rs0",
setParameter: {
replWriterMinThreadCount: acceptableMinThreadCount,
replWriterThreadCount: acceptableThreadCount,
},
});
assert.neq(null, conn, "mongod failed to start with a suitable replWriterThreadCount value");
// Initialize replica set
const adminDB = conn.getDB("admin");
assert.commandWorked(
adminDB.runCommand({
replSetInitiate: {
_id: "rs0",
members: [{_id: 0, host: conn.host}],
},
}),
);
const hostInfoRes = adminDB.runCommand({hostInfo: 1});
const numCores = hostInfoRes.system.numCores;
jsTest.log.info("System info: numCores=", numCores);
const threadCountCap = numCores * 2;
const actualExpectedMaxThreads = Math.min(acceptableThreadCount, threadCountCap);
assert.soon(
() =>
rawMongoProgramOutput("11280000.*Starting thread pool").match(
'ReplWriterWorkerThreadPool.*"minThreads":' +
acceptableMinThreadCount.toString() +
'.*"maxThreads":' +
actualExpectedMaxThreads.toString(),
),
"ReplWriterWorker thread pool did not start",
// Wait up to 10 seconds for repl set initialization to finish
10 * 1000,
);
assert(
rawMongoProgramOutput("Invalid value for parameter replWriterThreadCount").length == 0,
"despite accepting the replWriterThreadCount value, mongod logged an error",
);
assert(
rawMongoProgramOutput("Invalid value for parameter replWriterMinThreadCount").length == 0,
"despite accepting the replWriterMinThreadCount value, mongod logged an error",
);
// getParameter to confirm the server parameter value:
{
const result = adminDB.runCommand({getParameter: 1, replWriterThreadCount: 1, replWriterMinThreadCount: 1});
assert.eq(acceptableThreadCount, result.replWriterThreadCount, "replWriterThreadCount was not set internally");
assert.eq(
acceptableMinThreadCount,
result.replWriterMinThreadCount,
"replWriterMinThreadCount was not set internally",
);
}
// Make sure using setParameter with a wrong max thread count still fails:
{
// Can't set max to 0
assert.commandFailed(adminDB.runCommand({setParameter: 1, replWriterThreadCount: tooLowThreadCount}));
let result = adminDB.runCommand({getParameter: 1, replWriterThreadCount: 1});
assert.eq(
acceptableThreadCount,
result.replWriterThreadCount,
"replWriterThreadCount was overwritten by invalid value",
);
// Can't set max to less than min (4)
assert.commandFailed(
adminDB.runCommand({setParameter: 1, replWriterThreadCount: acceptableMinThreadCount - 1}),
);
result = adminDB.runCommand({getParameter: 1, replWriterThreadCount: 1});
assert.eq(
acceptableThreadCount,
result.replWriterThreadCount,
"replWriterThreadCount was overwritten by invalid value",
);
// Can't set max to >256
assert.commandFailed(adminDB.runCommand({setParameter: 1, replWriterThreadCount: tooHighThreadCount}));
result = adminDB.runCommand({getParameter: 1, replWriterThreadCount: 1});
assert.eq(
acceptableThreadCount,
result.replWriterThreadCount,
"replWriterThreadCount was overwritten by invalid value",
);
}
// Make sure using setParameter with a wrong min thread count still fails:
{
// Can't set min to <0
assert.commandFailed(adminDB.runCommand({setParameter: 1, replWriterMinThreadCount: -3}));
let result = adminDB.runCommand({getParameter: 1, replWriterMinThreadCount: 1});
assert.eq(
acceptableMinThreadCount,
result.replWriterMinThreadCount,
"replWriterMinThreadCount was overwritten by invalid value",
);
// Can't set min to >max
assert.commandFailed(
adminDB.runCommand({setParameter: 1, replWriterMinThreadCount: acceptableThreadCount + 1}),
);
result = adminDB.runCommand({getParameter: 1, replWriterMinThreadCount: 1});
assert.eq(
acceptableMinThreadCount,
result.replWriterMinThreadCount,
"replWriterMinThreadCount was overwritten by invalid value",
);
// Can't set min to > pool size, even if max is higher
assert.commandFailed(
adminDB.runCommand({setParameter: 1, replWriterMinThreadCount: actualExpectedMaxThreads + 1}),
);
result = adminDB.runCommand({getParameter: 1, replWriterMinThreadCount: 1});
assert.eq(
acceptableMinThreadCount,
result.replWriterMinThreadCount,
"replWriterMinThreadCount was overwritten by invalid value",
);
}
// decrease maximum thread count
{
jsTest.log.info("Decreasing max thread count");
const lowerMax = actualExpectedMaxThreads - 1;
// If this is not the case, we cannot continue with the test
assert(
lowerMax >= acceptableMinThreadCount,
"Must have at least 3 cores available to run this test (current number: " + numCores + ")",
);
assert.commandWorked(adminDB.runCommand({setParameter: 1, replWriterThreadCount: lowerMax}));
const result = adminDB.runCommand({getParameter: 1, replWriterThreadCount: 1});
assert.eq(lowerMax, result.replWriterThreadCount, "replWriterThreadCount was not set internally");
}
// increase maximum thread count
{
jsTest.log.info("Increasing max thread count");
assert.commandWorked(adminDB.runCommand({setParameter: 1, replWriterThreadCount: actualExpectedMaxThreads}));
const result = adminDB.runCommand({getParameter: 1, replWriterThreadCount: 1});
assert.eq(
actualExpectedMaxThreads,
result.replWriterThreadCount,
"replWriterThreadCount was not set internally",
);
}
// increase minimum thread count
{
jsTest.log.info("Increasing min thread count");
const higherMin = acceptableMinThreadCount + 2;
// we have already ensured that there are at least 3 cores.
assert.commandWorked(adminDB.runCommand({setParameter: 1, replWriterMinThreadCount: higherMin}));
const result = adminDB.runCommand({getParameter: 1, replWriterMinThreadCount: 1});
assert.eq(higherMin, result.replWriterMinThreadCount, "replWriterMinThreadCount was not set internally");
}
// decrease minimum thread count
{
jsTest.log.info("Decreasing min thread count");
const lowerMin = acceptableMinThreadCount;
assert.commandWorked(adminDB.runCommand({setParameter: 1, replWriterMinThreadCount: lowerMin}));
const result = adminDB.runCommand({getParameter: 1, replWriterMinThreadCount: 1});
assert.eq(lowerMin, result.replWriterMinThreadCount, "replWriterMinThreadCount was not set internally");
}
// increase maximum thread count above number of cores * 2
{
jsTest.log.info("Increase maximum threads over number of cores * 2");
const maxAboveActualPoolMaxSize = threadCountCap + 4;
assert.commandWorked(adminDB.runCommand({setParameter: 1, replWriterThreadCount: maxAboveActualPoolMaxSize}));
const result = adminDB.runCommand({getParameter: 1, replWriterThreadCount: 1});
assert.eq(
maxAboveActualPoolMaxSize,
result.replWriterThreadCount,
"replWriterThreadCount was not set internally",
);
const matchParameters =
'.*"replWriterThreadCount":"' +
maxAboveActualPoolMaxSize.toString() +
'.*"maxThreads":"' +
threadCountCap.toString() +
'.*"numCores":"' +
numCores.toString();
jsTest.log.info("looking for " + matchParameters + " in test output");
assert.soon(
() =>
rawMongoProgramOutput(
"11280003.*replWriterThreadCount is set to higher than the max number of threads",
).match(matchParameters),
"mongod did not warn the user that the value will not take effect due to exceeding the number of cores times two",
// Wait up to 10 seconds for the log to publish
10 * 1000,
);
}
MongoRunner.stopMongod(conn);
}
function testOplogApplication() {
clearRawMongoProgramOutput();
const rst = new ReplSetTest({name: jsTest.name(), nodes: 2});
rst.startSet();
rst.initiate();
const primary = rst.getPrimary();
const secondaries = rst.getSecondaries();
const dbName = "testDB";
const collName = jsTest.name();
const primaryDB = primary.getDB(dbName);
const primaryColl = primaryDB[collName];
const hostInfoRes = primaryDB.runCommand({hostInfo: 1});
const numCores = hostInfoRes.system.numCores;
jsTest.log.info("System info: numCores=", numCores);
const twoThreadsPerCore = numCores * 2;
jsTest.log.info("Set thread pool size to two threads per core");
for (const secondary of secondaries) {
assert.commandWorked(secondary.adminCommand({setParameter: 1, replWriterThreadCount: twoThreadsPerCore}));
let result = secondary.adminCommand({getParameter: 1, replWriterThreadCount: 1});
assert.eq(twoThreadsPerCore, result.replWriterThreadCount, "replWriterThreadCount was not set internally");
assert.commandWorked(secondary.adminCommand({setParameter: 1, replWriterMinThreadCount: twoThreadsPerCore}));
result = secondary.adminCommand({getParameter: 1, replWriterMinThreadCount: 1});
assert.eq(
twoThreadsPerCore,
result.replWriterMinThreadCount,
"replWriterMinThreadCount was not set internally",
);
secondary.adminCommand({
setParameter: 1,
logComponentVerbosity: {replication: {verbosity: 2}},
});
secondary.adminCommand({
setParameter: 1,
logComponentVerbosity: {executor: {verbosity: 1}},
});
}
assert.commandWorked(primaryDB.createCollection(collName, {writeConcern: {w: "majority"}}));
const numWrites = 500;
assert.commandWorked(primaryColl.insert({_id: "writeAllDurable"}, {writeConcern: {w: 2}}));
for (let i = 0; i < numWrites; i++) {
assert.commandWorked(primaryColl.insert({_id: "majority2" + i}, {writeConcern: {w: 2}}));
}
assert(
rawMongoProgramOutput("11280004.*Number of workers for oplog application").match(
'.*"nWorkers":' + twoThreadsPerCore.toString(),
),
"mongod did not print the correct thread pool size (2*number of cores) when applying oplog",
);
assert(
rawMongoProgramOutput("Reaping this thread as we are above maxThreads").length == 0,
"Threads were reaped unexpectedly before the pool size was modified",
);
clearRawMongoProgramOutput();
jsTest.log.info("Decrease thread pool size to one thread per core");
for (const secondary of secondaries) {
assert.commandWorked(secondary.adminCommand({setParameter: 1, replWriterMinThreadCount: numCores}));
let result = secondary.adminCommand({getParameter: 1, replWriterMinThreadCount: 1});
assert.eq(numCores, result.replWriterMinThreadCount, "replWriterMinThreadCount was not set internally");
assert.commandWorked(secondary.adminCommand({setParameter: 1, replWriterThreadCount: numCores}));
result = secondary.adminCommand({getParameter: 1, replWriterThreadCount: 1});
assert.eq(numCores, result.replWriterThreadCount, "replWriterThreadCount was not set internally");
}
// Ensure thread pool size has changed
for (let i = numWrites; i < numWrites * 2; i++) {
assert.commandWorked(primaryColl.insert({_id: "majority2" + i}, {writeConcern: {w: 2}}));
}
assert(
rawMongoProgramOutput("11280004.*Number of workers for oplog application").match(
'.*"nWorkers":' + numCores.toString(),
),
"mongod did not print the correct thread pool size (1*number of cores) when applying oplog",
);
// Check that threads above numCores were reaped
for (let i = numCores; i < twoThreadsPerCore; i++) {
const threadNum = i + 1;
assert(
rawMongoProgramOutput("Reaping this thread as we are above maxThreads").match(
'"numThreads":' + threadNum + ',"maxThreads":' + numCores,
),
"the right number of threads were not reaped to meet maxThreads (" + threadNum + " was not present)",
);
}
// Should not have reaped more than numCores threads
assert(
!rawMongoProgramOutput("Reaping this thread as we are above maxThreads").match(
'"numThreads":' + numCores + ',"maxThreads":' + numCores,
),
"the right number of threads were not reaped to meet maxThreads",
);
clearRawMongoProgramOutput();
jsTest.log.info("Increase thread pool size");
const newThreads = numCores + 2;
for (const secondary of secondaries) {
assert.commandWorked(secondary.adminCommand({setParameter: 1, replWriterThreadCount: newThreads}));
let result = secondary.adminCommand({getParameter: 1, replWriterThreadCount: 1});
assert.eq(newThreads, result.replWriterThreadCount, "replWriterThreadCount was not set internally");
assert.commandWorked(secondary.adminCommand({setParameter: 1, replWriterMinThreadCount: newThreads}));
result = secondary.adminCommand({getParameter: 1, replWriterMinThreadCount: 1});
assert.eq(newThreads, result.replWriterMinThreadCount, "replWriterMinThreadCount was not set internally");
}
// check that new threads were spawned to meet minThreads
for (let i = numCores; i < newThreads; i++) {
assert(
rawMongoProgramOutput("Spawning new thread as we are below minThreads").match(
'"numThreads":' + i + ',"minThreads":' + newThreads,
),
"the right number of threads were not spawned to meet minThreads",
);
}
// Check that we did not launch any extra threads
assert(
!rawMongoProgramOutput("Spawning new thread as we are below minThreads").match(
'"numThreads":' + newThreads + ',"minThreads":' + newThreads,
),
"the right number of threads were not spawned to meet minThreads",
);
// Ensure thread pool size has changed
for (let i = numWrites * 2; i < numWrites * 3 + 1; i++) {
assert.commandWorked(primaryColl.insert({_id: "majority2" + i}, {writeConcern: {w: 2}}));
}
assert(
rawMongoProgramOutput("11280004.*Number of workers for oplog application").match(
'.*"nWorkers":' + newThreads.toString(),
),
"mongod did not print the correct thread pool size (1*number of cores) when applying oplog",
);
rst.stopSet();
}
testSettingParameter();
testOplogApplication();

View File

@ -1027,7 +1027,7 @@ replication.configs:
- src/mongo/db/repl/*member_config* - src/mongo/db/repl/*member_config*
- src/mongo/db/repl/*member_data* - src/mongo/db/repl/*member_data*
- src/mongo/db/repl/*member_id* - src/mongo/db/repl/*member_id*
- src/mongo/db/repl/repl_server_parameters.idl - src/mongo/db/repl/repl_*server_parameters.idl
- src/mongo/db/repl/*member_state* - src/mongo/db/repl/*member_state*
- src/mongo/db/repl/*repl_settings* - src/mongo/db/repl/*repl_settings*
- src/mongo/db/repl/*repl_set_tag* - src/mongo/db/repl/*repl_set_tag*
@ -1039,28 +1039,29 @@ replication.oplog:
jira: Replication jira: Replication
fully_marked: true fully_marked: true
files: files:
- src/mongo/db/commands/apply_ops*
- src/mongo/db/commands/oplog*
- src/mongo/db/op_observer/op_observer* - src/mongo/db/op_observer/op_observer*
- src/mongo/db/op_observer/operation_logger* - src/mongo/db/op_observer/operation_logger*
- src/mongo/db/repl/local_oplog_info* - src/mongo/db/repl/*apply_ops*
- src/mongo/db/commands/oplog*
- src/mongo/db/commands/apply_ops*
- src/mongo/db/repl/*oplog*
- src/mongo/db/repl/*noop_writer*
- src/mongo/db/repl/*bgsync* - src/mongo/db/repl/*bgsync*
- src/mongo/db/repl/*consistency_markers*
- src/mongo/db/repl/*get_next_optimes_test*
- src/mongo/db/repl/*idempotency_test_fixture*
- src/mongo/db/repl/*insert_group*
- src/mongo/db/repl/*noop_writer*
- src/mongo/db/repl/*oplog*
- src/mongo/db/repl/*oplog_visibility_manager*
- src/mongo/db/repl/*optime*
- src/mongo/db/repl/*reporter*
- src/mongo/db/repl/*session_update_tracker*
- src/mongo/db/repl/*slotted_timestamp_list*
- src/mongo/db/repl/*sync_source* - src/mongo/db/repl/*sync_source*
- src/mongo/db/repl/apply_container_ops_test.cpp
- src/mongo/db/repl/local_oplog_info*
- src/mongo/db/repl/repl_worker_pool_thread_count*
- src/mongo/s/commands/cluster_oplog_note_cmd.cpp - src/mongo/s/commands/cluster_oplog_note_cmd.cpp
- src/mongo/s/commands/cluster_repl_set_get_status_cmd.cpp - src/mongo/s/commands/cluster_repl_set_get_status_cmd.cpp
- src/mongo/db/repl/*insert_group*
- src/mongo/db/repl/*apply_ops*
- src/mongo/db/repl/*oplog_visibility_manager*
- src/mongo/db/repl/*slotted_timestamp_list*
- src/mongo/db/repl/*session_update_tracker*
- src/mongo/db/repl/*consistency_markers*
- src/mongo/db/repl/*optime*
- src/mongo/db/repl/apply_container_ops_test.cpp
- src/mongo/db/repl/*reporter*
- src/mongo/db/repl/*idempotency_test_fixture*
- src/mongo/db/repl/*get_next_optimes_test*
replication.server_rw_concerns: replication.server_rw_concerns:
meta: meta:

View File

@ -318,6 +318,27 @@ mongo_cc_library(
], ],
) )
idl_generator(
name = "repl_writer_thread_pool_server_parameters_gen",
src = "repl_writer_thread_pool_server_parameters.idl",
deps = [
"//src/mongo/db:basic_types_gen",
],
)
mongo_cc_library(
name = "repl_writer_thread_pool_server_parameters",
srcs = [
"repl_worker_pool_thread_count.cpp",
":repl_writer_thread_pool_server_parameters_gen",
],
deps = [
":repl_coordinator_interface",
"//src/mongo/util:processinfo",
"//src/mongo/util/concurrency:thread_pool",
],
)
mongo_cc_library( mongo_cc_library(
name = "oplog_write_interface", name = "oplog_write_interface",
srcs = [ srcs = [
@ -906,7 +927,7 @@ mongo_cc_library(
":oplog", ":oplog",
":oplog_entry", ":oplog_entry",
":repl_coordinator_interface", ":repl_coordinator_interface",
":repl_server_parameters", ":repl_writer_thread_pool_server_parameters",
":truncate_range_oplog_entry_idl", ":truncate_range_oplog_entry_idl",
"//src/mongo/db:change_stream_pre_image_util", "//src/mongo/db:change_stream_pre_image_util",
"//src/mongo/db:server_base", "//src/mongo/db:server_base",
@ -1856,6 +1877,7 @@ mongo_cc_unit_test(
":repl_coordinator_impl", ":repl_coordinator_impl",
":repl_coordinator_test_fixture", ":repl_coordinator_test_fixture",
":repl_server_parameters", ":repl_server_parameters",
":repl_writer_thread_pool_server_parameters",
":task_executor_mock", ":task_executor_mock",
"//src/mongo/db:change_stream_pre_images_collection_manager", "//src/mongo/db:change_stream_pre_images_collection_manager",
"//src/mongo/db:server_base", "//src/mongo/db:server_base",
@ -2106,6 +2128,7 @@ mongo_cc_benchmark(
":repl_coordinator_impl", ":repl_coordinator_impl",
":repl_coordinator_interface", ":repl_coordinator_interface",
":repl_server_parameters", ":repl_server_parameters",
":repl_writer_thread_pool_server_parameters",
":replication_consistency_markers_impl", ":replication_consistency_markers_impl",
":replication_recovery", ":replication_recovery",
":replmocks", ":replmocks",

View File

@ -32,20 +32,17 @@
#include "mongo/db/auth/authorization_session.h" #include "mongo/db/auth/authorization_session.h"
#include "mongo/db/client.h" #include "mongo/db/client.h"
#include "mongo/db/repl/repl_server_parameters_gen.h" #include "mongo/db/repl/repl_worker_pool_thread_count.h"
#include "mongo/logv2/log.h" #include "mongo/logv2/log.h"
#include "mongo/util/assert_util.h" #include "mongo/util/assert_util.h"
#include "mongo/util/concurrency/thread_name.h" #include "mongo/util/concurrency/thread_name.h"
#include "mongo/util/debug_util.h" #include "mongo/util/debug_util.h"
#include "mongo/util/fail_point.h" #include "mongo/util/fail_point.h"
#include "mongo/util/future_impl.h" #include "mongo/util/future_impl.h"
#include "mongo/util/processinfo.h"
#include <algorithm> #include <algorithm>
#include <functional>
#include <mutex> #include <mutex>
#include <string> #include <string>
#include <type_traits>
#include <utility> #include <utility>
#include <boost/move/utility_core.hpp> #include <boost/move/utility_core.hpp>
@ -158,34 +155,17 @@ void OplogApplier::setMinValid(const OpTime& minValid) {
_minValid = minValid; _minValid = minValid;
} }
std::unique_ptr<ThreadPool> makeReplWorkerPool() {
// Reduce content pinned in cache by single oplog batch on small machines by reducing the number
// of threads of ReplWriter to reduce the number of concurrent open WT transactions.
if (replWriterThreadCount < replWriterMinThreadCount) {
LOGV2_FATAL_NOTRACE(
5605400,
"replWriterMinThreadCount must be less than or equal to replWriterThreadCount",
"replWriterMinThreadCount"_attr = replWriterMinThreadCount,
"replWriterThreadCount"_attr = replWriterThreadCount);
}
auto numberOfThreads =
std::min(replWriterThreadCount, 2 * static_cast<int>(ProcessInfo::getNumAvailableCores()));
return makeReplWorkerPool(numberOfThreads);
}
std::unique_ptr<ThreadPool> makeReplWorkerPool(int threadCount) { namespace {
return makeReplWorkerPool(threadCount, "ReplWriterWorker"_sd);
}
std::unique_ptr<ThreadPool> makeReplWorkerPool(int threadCount, std::unique_ptr<ThreadPool> makeReplWorkerPool(size_t threadCount,
StringData name, StringData name,
bool isKillableByStepdown) { bool isKillableByStepdown) {
ThreadPool::Options options; ThreadPool::Options options;
options.threadNamePrefix = name + "-"; options.threadNamePrefix = name + "-";
options.poolName = name + "ThreadPool"; options.poolName = name + "ThreadPool";
options.minThreads = options.minThreads = std::min(getMinThreadCountForReplWorkerPool(), threadCount);
replWriterMinThreadCount < threadCount ? replWriterMinThreadCount : threadCount; options.maxThreads = threadCount;
options.maxThreads = static_cast<size_t>(threadCount);
options.onCreateThread = [isKillableByStepdown](const std::string&) { options.onCreateThread = [isKillableByStepdown](const std::string&) {
Client::initThread(getThreadName(), Client::initThread(getThreadName(),
getGlobalServiceContext()->getService(ClusterRole::ShardServer), getGlobalServiceContext()->getService(ClusterRole::ShardServer),
@ -199,5 +179,15 @@ std::unique_ptr<ThreadPool> makeReplWorkerPool(int threadCount,
return pool; return pool;
} }
} // namespace
std::unique_ptr<ThreadPool> makeReplWorkerPool() {
return makeReplWorkerPool(getThreadCountForReplWorkerPool());
}
std::unique_ptr<ThreadPool> makeReplWorkerPool(size_t threadCount) {
return makeReplWorkerPool(threadCount, "ReplWriterWorker"_sd, false);
}
} // namespace repl } // namespace repl
} // namespace mongo } // namespace mongo

View File

@ -31,7 +31,6 @@
#pragma once #pragma once
#include "mongo/base/status_with.h" #include "mongo/base/status_with.h"
#include "mongo/base/string_data.h"
#include "mongo/db/operation_context.h" #include "mongo/db/operation_context.h"
#include "mongo/db/repl/oplog.h" #include "mongo/db/repl/oplog.h"
#include "mongo/db/repl/oplog_applier_batcher.h" #include "mongo/db/repl/oplog_applier_batcher.h"
@ -40,7 +39,6 @@
#include "mongo/db/repl/optime.h" #include "mongo/db/repl/optime.h"
#include "mongo/executor/task_executor.h" #include "mongo/executor/task_executor.h"
#include "mongo/stdx/mutex.h" #include "mongo/stdx/mutex.h"
#include "mongo/util/concurrency/thread_pool.h"
#include "mongo/util/duration.h" #include "mongo/util/duration.h"
#include "mongo/util/future.h" #include "mongo/util/future.h"
#include "mongo/util/modules.h" #include "mongo/util/modules.h"
@ -270,17 +268,10 @@ public:
extern NoopOplogApplierObserver noopOplogApplierObserver; extern NoopOplogApplierObserver noopOplogApplierObserver;
/** /**
* Creates the default thread pool for writer tasks. * Creates the thread pool for writer tasks.
*/ */
std::unique_ptr<ThreadPool> makeReplWorkerPool(); std::unique_ptr<ThreadPool> makeReplWorkerPool();
std::unique_ptr<ThreadPool> makeReplWorkerPool(int threadCount); std::unique_ptr<ThreadPool> makeReplWorkerPool(size_t threadCount);
/**
* Creates a thread pool suitable for writer tasks, with the specified name
*/
std::unique_ptr<ThreadPool> makeReplWorkerPool(int threadCount,
StringData name,
bool isKillableByStepdown = false);
} // namespace MONGO_MOD_PUB repl } // namespace MONGO_MOD_PUB repl
} // namespace mongo } // namespace mongo

View File

@ -642,7 +642,12 @@ StatusWith<OpTime> OplogApplierImpl::_applyOplogBatch(OperationContext* opCtx,
// Increment the batch size stat. // Increment the batch size stat.
oplogApplicationBatchSize.increment(ops.size()); oplogApplicationBatchSize.increment(ops.size());
std::vector<WorkerMultikeyPathInfo> multikeyVector(_workerPool->getStats().options.maxThreads); // The worker pool's maxThreads can be updated at any point in time. So we record its current
// value here and use it to construct our different work vectors for the next batch of work to
// schedule.
const size_t nWorkers = _workerPool->getStats().options.maxThreads;
LOGV2_DEBUG(11280004, 2, "Number of workers for oplog application", "nWorkers"_attr = nWorkers);
std::vector<WorkerMultikeyPathInfo> multikeyVector(nWorkers);
{ {
// Each node records cumulative batch application stats for itself using this timer. // Each node records cumulative batch application stats for itself using this timer.
TimerHolder timer(&applyBatchStats); TimerHolder timer(&applyBatchStats);
@ -668,8 +673,7 @@ StatusWith<OpTime> OplogApplierImpl::_applyOplogBatch(OperationContext* opCtx,
// and create a pseudo oplog. // and create a pseudo oplog.
std::vector<std::vector<OplogEntry>> derivedOps; std::vector<std::vector<OplogEntry>> derivedOps;
std::vector<std::vector<ApplierOperation>> writerVectors( std::vector<std::vector<ApplierOperation>> writerVectors(nWorkers);
_workerPool->getStats().options.maxThreads);
_fillWriterVectors(opCtx, &ops, &writerVectors, &derivedOps); _fillWriterVectors(opCtx, &ops, &writerVectors, &derivedOps);
// Wait for oplog writes to finish. // Wait for oplog writes to finish.
@ -712,8 +716,7 @@ StatusWith<OpTime> OplogApplierImpl::_applyOplogBatch(OperationContext* opCtx,
const bool isDataConsistent = getMinValid() < ops.front().getOpTime(); const bool isDataConsistent = getMinValid() < ops.front().getOpTime();
{ {
std::vector<Status> statusVector(_workerPool->getStats().options.maxThreads, std::vector<Status> statusVector(nWorkers, Status::OK());
Status::OK());
// Doles out all the work to the writer pool threads. writerVectors is not modified, // Doles out all the work to the writer pool threads. writerVectors is not modified,
// but applyOplogBatchPerWorker will modify the vectors that it contains. // but applyOplogBatchPerWorker will modify the vectors that it contains.
invariant(writerVectors.size() == statusVector.size()); invariant(writerVectors.size() == statusVector.size());

View File

@ -58,6 +58,7 @@
#include "mongo/db/repl/oplog_writer_impl.h" #include "mongo/db/repl/oplog_writer_impl.h"
#include "mongo/db/repl/optime.h" #include "mongo/db/repl/optime.h"
#include "mongo/db/repl/repl_settings.h" #include "mongo/db/repl/repl_settings.h"
#include "mongo/db/repl/repl_writer_thread_pool_server_parameters_gen.h"
#include "mongo/db/repl/replication_consistency_markers_mock.h" #include "mongo/db/repl/replication_consistency_markers_mock.h"
#include "mongo/db/repl/replication_coordinator.h" #include "mongo/db/repl/replication_coordinator.h"
#include "mongo/db/repl/replication_coordinator_mock.h" #include "mongo/db/repl/replication_coordinator_mock.h"

View File

@ -34,6 +34,7 @@ global:
cpp_includes: cpp_includes:
- "mongo/client/read_preference.h" - "mongo/client/read_preference.h"
- "mongo/client/read_preference_validators.h" - "mongo/client/read_preference_validators.h"
- "mongo/db/repl/repl_worker_pool_thread_count.h"
imports: imports:
- "mongo/db/basic_types.idl" - "mongo/db/basic_types.idl"
@ -287,29 +288,6 @@ server_parameters:
gte: 0 gte: 0
redact: false redact: false
# From oplog_applier.cpp
replWriterThreadCount:
description: The number of threads in the thread pool used to apply the oplog
set_at: startup
cpp_vartype: int
cpp_varname: replWriterThreadCount
default: 16
validator:
gte: 1
lte: 256
redact: false
replWriterMinThreadCount:
description: The minimum number of threads in the thread pool used to apply the oplog
set_at: startup
cpp_vartype: int
cpp_varname: replWriterMinThreadCount
default: 0
validator:
gte: 0
lte: 256
redact: false
replBatchLimitOperations: replBatchLimitOperations:
description: The maximum number of operations to apply in a single batch description: The maximum number of operations to apply in a single batch
set_at: [startup, runtime] set_at: [startup, runtime]

View File

@ -0,0 +1,129 @@
/**
* Copyright (C) 2025-present MongoDB, Inc.
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the Server Side Public License, version 1,
* as published by MongoDB, Inc.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* Server Side Public License for more details.
*
* You should have received a copy of the Server Side Public License
* along with this program. If not, see
* <http://www.mongodb.com/licensing/server-side-public-license>.
*
* As a special exception, the copyright holders give permission to link the
* code of portions of this program with the OpenSSL library under certain
* conditions as described in each individual source file and distribute
* linked combinations including the program with the OpenSSL library. You
* must comply with the Server Side Public License in all respects for
* all of the code used other than as permitted herein. If you modify file(s)
* with this exception, you may extend this exception to your version of the
* file(s), but you are not obligated to do so. If you do not wish to do so,
* delete this exception statement from your version. If you delete this
* exception statement from all source files in the program, then also delete
* it in the license file.
*/
#include "mongo/db/repl/repl_worker_pool_thread_count.h"
#include "mongo/db/repl/repl_writer_thread_pool_server_parameters_gen.h"
#include "mongo/db/repl/replication_coordinator.h"
#include "mongo/logv2/log.h"
#include "mongo/util/processinfo.h"
#include <algorithm>
#define MONGO_LOGV2_DEFAULT_COMPONENT ::mongo::logv2::LogComponent::kReplication
namespace mongo {
namespace repl {
size_t getMinThreadCountForReplWorkerPool() {
return static_cast<size_t>(replWriterMinThreadCount);
}
size_t getThreadCountForReplWorkerPool() {
return std::min(static_cast<size_t>(replWriterThreadCount),
static_cast<size_t>(2 * ProcessInfo::getNumAvailableCores()));
}
Status validateUpdateReplWriterThreadCount(const int count, const boost::optional<TenantId>&) {
if (count < replWriterMinThreadCount) {
return Status(ErrorCodes::BadValue,
str::stream() << "replWriterThreadCount must be greater or equal to '"
<< replWriterMinThreadCount
<< "', which is the current value of replWriterMinThreadCount");
}
if (!std::in_range<size_t>(count)) {
return Status(ErrorCodes::BadValue,
str::stream() << "replWriterThreadCount must be greater than 0'");
}
size_t newCount = static_cast<size_t>(count);
size_t numCores = ProcessInfo::getNumAvailableCores();
size_t maxThreads = 2 * numCores;
if (newCount > maxThreads) {
LOGV2_WARNING(11280003,
"replWriterThreadCount is set to higher than the max number of threads for "
"the writer pool, which is 2 * the number of cores available. The pool size "
"will be capped at 2 * the number of cores.",
"replWriterThreadCount"_attr = std::to_string(newCount),
"maxThreads"_attr = std::to_string(maxThreads),
"numCores"_attr = std::to_string(numCores));
}
return Status::OK();
}
Status validateUpdateReplWriterMinThreadCount(const int count, const boost::optional<TenantId>&) {
if (!std::in_range<size_t>(count)) {
return Status(ErrorCodes::BadValue,
str::stream() << "replWriterMinThreadCount must be greater than 0'");
}
size_t newCount = static_cast<size_t>(count);
// May be replWriterThreadCount, or may be capped by the number of CPUs
size_t poolActualSize = getThreadCountForReplWorkerPool();
if (newCount > poolActualSize) {
return Status(ErrorCodes::BadValue,
str::stream() << "replWriterMinThreadCount must be less than or equal to '"
<< poolActualSize
<< "', which is the current max threads for the thread pool");
}
return Status::OK();
}
Status onUpdateReplWriterThreadCount(const int) {
// Reduce content pinned in cache by single oplog batch on small machines by reducing the number
// of threads of ReplWriter to reduce the number of concurrent open WT transactions.
if (hasGlobalServiceContext()) {
// If the global service context is set, then we're past startup, so we need to update the
// repl worker thread pool.
auto replCoord = ReplicationCoordinator::get(getGlobalServiceContext());
auto replWorkThreadPool = replCoord->getDbWorkThreadPool();
if (replWorkThreadPool) {
replWorkThreadPool->setMaxThreads(getThreadCountForReplWorkerPool());
}
}
return Status::OK();
}
Status onUpdateReplWriterMinThreadCount(const int) {
if (hasGlobalServiceContext()) {
// If the global service context is set, then we're past startup, so we need to update the
// repl worker thread pool.
auto replCoord = ReplicationCoordinator::get(getGlobalServiceContext());
auto replWorkThreadPool = replCoord->getDbWorkThreadPool();
if (replWorkThreadPool) {
replWorkThreadPool->setMinThreads(getMinThreadCountForReplWorkerPool());
}
}
return Status::OK();
}
} // namespace repl
} // namespace mongo

View File

@ -0,0 +1,50 @@
/**
* Copyright (C) 2025-present MongoDB, Inc.
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the Server Side Public License, version 1,
* as published by MongoDB, Inc.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* Server Side Public License for more details.
*
* You should have received a copy of the Server Side Public License
* along with this program. If not, see
* <http://www.mongodb.com/licensing/server-side-public-license>.
*
* As a special exception, the copyright holders give permission to link the
* code of portions of this program with the OpenSSL library under certain
* conditions as described in each individual source file and distribute
* linked combinations including the program with the OpenSSL library. You
* must comply with the Server Side Public License in all respects for
* all of the code used other than as permitted herein. If you modify file(s)
* with this exception, you may extend this exception to your version of the
* file(s), but you are not obligated to do so. If you do not wish to do so,
* delete this exception statement from your version. If you delete this
* exception statement from all source files in the program, then also delete
* it in the license file.
*/
#pragma once
#include "mongo/base/status.h"
#include "mongo/db/tenant_id.h"
#include "mongo/util/modules.h"
#include <boost/optional/optional.hpp>
namespace mongo {
namespace MONGO_MOD_PARENT_PRIVATE repl {
size_t getMinThreadCountForReplWorkerPool();
size_t getThreadCountForReplWorkerPool();
Status validateUpdateReplWriterThreadCount(int count, const boost::optional<TenantId>&);
Status onUpdateReplWriterThreadCount(int);
Status validateUpdateReplWriterMinThreadCount(int count, const boost::optional<TenantId>&);
Status onUpdateReplWriterMinThreadCount(int);
} // namespace MONGO_MOD_PARENT_PRIVATE repl
} // namespace mongo

View File

@ -0,0 +1,66 @@
# Copyright (C) 2025-present MongoDB, Inc.
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the Server Side Public License, version 1,
# as published by MongoDB, Inc.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# Server Side Public License for more details.
#
# You should have received a copy of the Server Side Public License
# along with this program. If not, see
# <http://www.mongodb.com/licensing/server-side-public-license>.
#
# As a special exception, the copyright holders give permission to link the
# code of portions of this program with the OpenSSL library under certain
# conditions as described in each individual source file and distribute
# linked combinations including the program with the OpenSSL library. You
# must comply with the Server Side Public License in all respects for
# all of the code used other than as permitted herein. If you modify file(s)
# with this exception, you may extend this exception to your version of the
# file(s), but you are not obligated to do so. If you do not wish to do so,
# delete this exception statement from your version. If you delete this
# exception statement from all source files in the program, then also delete
# it in the license file.
#
# Server parameters related to the replication thread pool used by oplog appliers/writers
global:
cpp_namespace: "mongo::repl"
mod_visibility: pub
cpp_includes:
- "mongo/db/repl/repl_worker_pool_thread_count.h"
imports:
- "mongo/db/basic_types.idl"
server_parameters:
# From repl_worker_pool_thread_count.cpp
replWriterThreadCount:
description: The number of threads in the thread pool used to apply the oplog
set_at: [startup, runtime]
cpp_vartype: int
cpp_varname: replWriterThreadCount
on_update: onUpdateReplWriterThreadCount
default: 16
validator:
gte: 1
lte: 256
callback: validateUpdateReplWriterThreadCount
redact: false
replWriterMinThreadCount:
description: The minimum number of threads in the thread pool used to apply the oplog
set_at: [startup, runtime]
cpp_vartype: int
cpp_varname: replWriterMinThreadCount
on_update: onUpdateReplWriterMinThreadCount
default: 0
validator:
gte: 0
lte: 256
callback: validateUpdateReplWriterMinThreadCount
redact: false

View File

@ -59,6 +59,7 @@
#include "mongo/idl/generic_argument_gen.h" #include "mongo/idl/generic_argument_gen.h"
#include "mongo/platform/compiler.h" #include "mongo/platform/compiler.h"
#include "mongo/rpc/topology_version_gen.h" #include "mongo/rpc/topology_version_gen.h"
#include "mongo/util/concurrency/thread_pool.h"
#include "mongo/util/duration.h" #include "mongo/util/duration.h"
#include "mongo/util/future.h" #include "mongo/util/future.h"
#include "mongo/util/interruptible.h" #include "mongo/util/interruptible.h"
@ -705,6 +706,13 @@ public:
*/ */
virtual void appendSecondaryInfoData(BSONObjBuilder* result) = 0; virtual void appendSecondaryInfoData(BSONObjBuilder* result) = 0;
/**
* Returns the ThreadPool used by replication to apply the sync source's operations in parallel
* (in OplogApplier) and to clone the databases and collections during initial sync.
* Note: the returned pointer can be null if called before the replication logic was started.
*/
virtual ThreadPool* getDbWorkThreadPool() const = 0;
/** /**
* Returns the current ReplSetConfig. * Returns the current ReplSetConfig.
*/ */

View File

@ -102,7 +102,7 @@ public:
virtual std::shared_ptr<executor::TaskExecutor> getSharedTaskExecutor() const = 0; virtual std::shared_ptr<executor::TaskExecutor> getSharedTaskExecutor() const = 0;
/** /**
* Returns shared db worker thread pool for collection cloning. * Returns shared db worker thread pool for collection cloning and oplog applier.
*/ */
virtual ThreadPool* getDbWorkThreadPool() const = 0; virtual ThreadPool* getDbWorkThreadPool() const = 0;

View File

@ -253,7 +253,7 @@ private:
// Task executor used to run replication tasks. // Task executor used to run replication tasks.
std::shared_ptr<executor::TaskExecutor> _taskExecutor; std::shared_ptr<executor::TaskExecutor> _taskExecutor;
// Used by repl::applyOplogBatch() to apply the sync source's operations in parallel. // Used by OplogApplier::applyOplogBatch() to apply the sync source's operations in parallel.
// Also used by database and collection cloners to perform storage operations. // Also used by database and collection cloners to perform storage operations.
// Cloners and oplog application run in separate phases of initial sync so it is fine to share // Cloners and oplog application run in separate phases of initial sync so it is fine to share
// this thread pool. // this thread pool.

View File

@ -3201,6 +3201,10 @@ void ReplicationCoordinatorImpl::appendSecondaryInfoData(BSONObjBuilder* result)
_topCoord->fillMemberData(result); _topCoord->fillMemberData(result);
} }
ThreadPool* ReplicationCoordinatorImpl::getDbWorkThreadPool() const noexcept {
return _externalState->getDbWorkThreadPool();
}
ReplSetConfig ReplicationCoordinatorImpl::getConfig() const { ReplSetConfig ReplicationCoordinatorImpl::getConfig() const {
return _getReplSetConfig(); return _getReplSetConfig();
} }

View File

@ -342,6 +342,8 @@ public:
void appendSecondaryInfoData(BSONObjBuilder* result) override; void appendSecondaryInfoData(BSONObjBuilder* result) override;
ThreadPool* getDbWorkThreadPool() const noexcept override;
ReplSetConfig getConfig() const override; ReplSetConfig getConfig() const override;
ConnectionString getConfigConnectionString() const override; ConnectionString getConfigConnectionString() const override;

View File

@ -458,6 +458,10 @@ StatusWith<BSONObj> ReplicationCoordinatorMock::prepareReplSetUpdatePositionComm
return cmdBuilder.obj(); return cmdBuilder.obj();
} }
ThreadPool* ReplicationCoordinatorMock::getDbWorkThreadPool() const noexcept {
return nullptr;
}
ReplSetConfig ReplicationCoordinatorMock::getConfig() const { ReplSetConfig ReplicationCoordinatorMock::getConfig() const {
stdx::lock_guard<stdx::mutex> lk(_mutex); stdx::lock_guard<stdx::mutex> lk(_mutex);

View File

@ -252,6 +252,8 @@ public:
void appendConnectionStats(executor::ConnectionPoolStats* stats) const override; void appendConnectionStats(executor::ConnectionPoolStats* stats) const override;
ThreadPool* getDbWorkThreadPool() const noexcept override;
ReplSetConfig getConfig() const override; ReplSetConfig getConfig() const override;
ConnectionString getConfigConnectionString() const override; ConnectionString getConfigConnectionString() const override;

View File

@ -316,6 +316,10 @@ void ReplicationCoordinatorNoOp::appendSecondaryInfoData(BSONObjBuilder*) {
MONGO_UNREACHABLE; MONGO_UNREACHABLE;
} }
ThreadPool* ReplicationCoordinatorNoOp::getDbWorkThreadPool() const noexcept {
MONGO_UNREACHABLE;
}
ReplSetConfig ReplicationCoordinatorNoOp::getConfig() const { ReplSetConfig ReplicationCoordinatorNoOp::getConfig() const {
MONGO_UNREACHABLE; MONGO_UNREACHABLE;
} }

View File

@ -228,6 +228,8 @@ public:
void appendSecondaryInfoData(BSONObjBuilder*) final; void appendSecondaryInfoData(BSONObjBuilder*) final;
ThreadPool* getDbWorkThreadPool() const noexcept final;
ReplSetConfig getConfig() const final; ReplSetConfig getConfig() const final;
ConnectionString getConfigConnectionString() const final; ConnectionString getConfigConnectionString() const final;

View File

@ -73,6 +73,26 @@ std::string threadIdToString(stdx::thread::id id) {
return oss.str(); return oss.str();
} }
/**
* Check the options limits, and fassert if they don't make sense.
*/
void checkOptionsLimits(const ThreadPool::Options& options) {
if (options.maxThreads < 1) {
LOGV2_FATAL(28702,
"Cannot configure pool with maximum number of threads less than 1",
"poolName"_attr = options.poolName,
"maxThreads"_attr = options.maxThreads);
}
if (options.minThreads > options.maxThreads) {
LOGV2_FATAL(28686,
"Cannot configure pool with minimum number of threads larger than the "
"maximum",
"poolName"_attr = options.poolName,
"minThreads"_attr = options.minThreads,
"maxThreads"_attr = options.maxThreads);
}
}
/** /**
* Sets defaults and checks bounds limits on "options", and returns it. * Sets defaults and checks bounds limits on "options", and returns it.
* *
@ -85,20 +105,7 @@ ThreadPool::Options cleanUpOptions(ThreadPool::Options&& options) {
if (options.threadNamePrefix.empty()) { if (options.threadNamePrefix.empty()) {
options.threadNamePrefix = fmt::format("{}-", options.poolName); options.threadNamePrefix = fmt::format("{}-", options.poolName);
} }
if (options.maxThreads < 1) { checkOptionsLimits(options);
LOGV2_FATAL(28702,
"Cannot create pool with maximum number of threads less than 1",
"poolName"_attr = options.poolName,
"maxThreads"_attr = options.maxThreads);
}
if (options.minThreads > options.maxThreads) {
LOGV2_FATAL(28686,
"Cannot create pool with minimum number of threads larger than the "
"configured maximum",
"poolName"_attr = options.poolName,
"minThreads"_attr = options.minThreads,
"maxThreads"_attr = options.maxThreads);
}
return {std::move(options)}; return {std::move(options)};
} }
@ -116,6 +123,8 @@ public:
void schedule(Task task); void schedule(Task task);
void waitForIdle(); void waitForIdle();
Stats getStats() const; Stats getStats() const;
void setMaxThreads(size_t maxThreads);
void setMinThreads(size_t minThreads);
private: private:
/** /**
@ -195,8 +204,8 @@ private:
*/ */
void _joinRetired_inlock(); void _joinRetired_inlock();
// These are the options with which the pool was configured at construction time. // These are the options with which the pool is configured.
const Options _options; Options _options;
// Mutex guarding all non-const member variables. // Mutex guarding all non-const member variables.
mutable stdx::mutex _mutex; mutable stdx::mutex _mutex;
@ -249,7 +258,9 @@ ThreadPool::Impl::~Impl() {
} }
if (_state != shutdownComplete) { if (_state != shutdownComplete) {
LOGV2_FATAL(28704, "Failed to shutdown pool during destruction"); LOGV2_FATAL(28704,
"Failed to shutdown pool during destruction",
"poolName"_attr = _options.poolName);
} }
invariant(_threads.empty()); invariant(_threads.empty());
invariant(_pendingTasks.empty()); invariant(_pendingTasks.empty());
@ -262,6 +273,12 @@ void ThreadPool::Impl::startup() {
"Attempted to start pool that has already started", "Attempted to start pool that has already started",
"poolName"_attr = _options.poolName); "poolName"_attr = _options.poolName);
} }
LOGV2(11280000,
"Starting thread pool",
"poolName"_attr = _options.poolName,
"numThreads"_attr = _threads.size(),
"minThreads"_attr = _options.minThreads,
"maxThreads"_attr = _options.maxThreads);
_setState_inlock(running); _setState_inlock(running);
invariant(_threads.empty()); invariant(_threads.empty());
size_t numToStart = std::clamp(_pendingTasks.size(), _options.minThreads, _options.maxThreads); size_t numToStart = std::clamp(_pendingTasks.size(), _options.minThreads, _options.maxThreads);
@ -299,6 +316,8 @@ void ThreadPool::Impl::_joinRetired_inlock() {
while (!_retiredThreads.empty()) { while (!_retiredThreads.empty()) {
auto& t = _retiredThreads.front(); auto& t = _retiredThreads.front();
t.join(); t.join();
if (_options.onJoinRetiredThread)
_options.onJoinRetiredThread(t);
_retiredThreads.pop_front(); _retiredThreads.pop_front();
} }
} }
@ -377,7 +396,7 @@ void ThreadPool::Impl::schedule(Task task) {
if (_state == preStart) { if (_state == preStart) {
return; return;
} }
if (_numIdleThreads < _pendingTasks.size()) { if (_numIdleThreads < _pendingTasks.size() && _threads.size() < _options.maxThreads) {
_startWorkerThread_inlock(); _startWorkerThread_inlock();
} }
if (_numIdleThreads <= _pendingTasks.size()) { if (_numIdleThreads <= _pendingTasks.size()) {
@ -431,6 +450,21 @@ void ThreadPool::Impl::_workerThreadBody(const std::string& threadName) noexcept
void ThreadPool::Impl::_consumeTasks(const std::string& threadName) { void ThreadPool::Impl::_consumeTasks(const std::string& threadName) {
stdx::unique_lock<stdx::mutex> lk(_mutex); stdx::unique_lock<stdx::mutex> lk(_mutex);
while (_state == running) { while (_state == running) {
if (_threads.size() > _options.maxThreads) {
LOGV2_DEBUG(23114,
1,
"Reaping this thread as we are above maxThreads",
"poolName"_attr = _options.poolName,
"threadName"_attr = threadName,
"numThreads"_attr = _threads.size(),
"maxThreads"_attr = _options.maxThreads);
// Wake up someone else if there is work to do, as we will be exiting without doing it.
if (!_pendingTasks.empty()) {
_workAvailable.notify_one();
}
break;
}
if (!_pendingTasks.empty()) { if (!_pendingTasks.empty()) {
_doOneTask(&lk); _doOneTask(&lk);
continue; continue;
@ -455,6 +489,7 @@ void ThreadPool::Impl::_consumeTasks(const std::string& threadName) {
1, 1,
"Reaping this thread", "Reaping this thread",
"threadName"_attr = threadName, "threadName"_attr = threadName,
"poolName"_attr = _options.poolName,
"nextThreadRetirementDate"_attr = "nextThreadRetirementDate"_attr =
_lastFullUtilizationDate + _options.maxIdleThreadAge); _lastFullUtilizationDate + _options.maxIdleThreadAge);
break; break;
@ -463,6 +498,7 @@ void ThreadPool::Impl::_consumeTasks(const std::string& threadName) {
LOGV2_DEBUG(23107, LOGV2_DEBUG(23107,
3, 3,
"Not reaping this thread", "Not reaping this thread",
"poolName"_attr = _options.poolName,
"threadName"_attr = threadName, "threadName"_attr = threadName,
"nextThreadRetirementDate"_attr = nextRetirement); "nextThreadRetirementDate"_attr = nextRetirement);
waitDeadline = nextRetirement; waitDeadline = nextRetirement;
@ -474,6 +510,7 @@ void ThreadPool::Impl::_consumeTasks(const std::string& threadName) {
LOGV2_DEBUG(23108, LOGV2_DEBUG(23108,
3, 3,
"Waiting for work", "Waiting for work",
"poolName"_attr = _options.poolName,
"threadName"_attr = threadName, "threadName"_attr = threadName,
"numThreads"_attr = _threads.size(), "numThreads"_attr = _threads.size(),
"minThreads"_attr = _options.minThreads); "minThreads"_attr = _options.minThreads);
@ -511,7 +548,7 @@ void ThreadPool::Impl::_consumeTasks(const std::string& threadName) {
"expectedState"_attr = static_cast<int32_t>(running)); "expectedState"_attr = static_cast<int32_t>(running));
} }
// This thread is ending because it was idle for too long. // This thread is ending because it was idle for too long, or we were over maxThreads.
// Move self from _threads to _retiredThreads. // Move self from _threads to _retiredThreads.
auto selfId = stdx::this_thread::get_id(); auto selfId = stdx::this_thread::get_id();
auto pos = std::find_if( auto pos = std::find_if(
@ -601,6 +638,47 @@ void ThreadPool::Impl::_setState_inlock(const LifecycleState newState) {
_stateChange.notify_all(); _stateChange.notify_all();
} }
void ThreadPool::Impl::setMinThreads(size_t minThreads) {
stdx::lock_guard<stdx::mutex> lk(_mutex);
const auto oldMinThreads = _options.minThreads;
_options.minThreads = minThreads;
checkOptionsLimits(_options);
LOGV2(11280001,
"setting thread pool minThreads",
"poolName"_attr = _options.poolName,
"numThreads"_attr = _threads.size(),
"minThreads"_attr = _options.minThreads,
"maxThreads"_attr = _options.maxThreads,
"old minThreads"_attr = oldMinThreads);
// Check if we need to create new threads
while (_threads.size() < _options.minThreads) {
LOGV2_DEBUG(1280005,
1,
"Spawning new thread as we are below minThreads",
"poolName"_attr = _options.poolName,
"numThreads"_attr = _threads.size(),
"minThreads"_attr = _options.minThreads,
"maxThreads"_attr = _options.maxThreads);
_startWorkerThread_inlock();
}
}
void ThreadPool::Impl::setMaxThreads(size_t maxThreads) {
stdx::lock_guard<stdx::mutex> lk(_mutex);
const auto oldMaxThreads = _options.maxThreads;
_options.maxThreads = maxThreads;
checkOptionsLimits(_options);
LOGV2(11280002,
"setting thread pool maxThreads",
"poolName"_attr = _options.poolName,
"numThreads"_attr = _threads.size(),
"minThreads"_attr = _options.minThreads,
"maxThreads"_attr = _options.maxThreads,
"old maxThreads"_attr = oldMaxThreads);
// Reaping extra threads will automatically be done in _consumeTasks().
}
// ======================================== // ========================================
// ThreadPool public functions that simply forward to the `_impl`. // ThreadPool public functions that simply forward to the `_impl`.
@ -632,4 +710,12 @@ ThreadPool::Stats ThreadPool::getStats() const {
return _impl->getStats(); return _impl->getStats();
} }
void ThreadPool::setMinThreads(size_t minThreads) {
_impl->setMinThreads(minThreads);
}
void ThreadPool::setMaxThreads(size_t maxThreads) {
_impl->setMaxThreads(maxThreads);
}
} // namespace mongo } // namespace mongo

View File

@ -105,6 +105,16 @@ public:
/** If callable, called before each worker thread begins consuming tasks. */ /** If callable, called before each worker thread begins consuming tasks. */
std::function<void(const std::string&)> onCreateThread; std::function<void(const std::string&)> onCreateThread;
/**
* If callable, called after joining each retired thread.
* These joins happen when a thread completes a task, and there is no more work in the
* thread pool. That is, they will be done by a single thread (the function does not need to
* be thread safe unless it will also be called in other places).
* Since there could be multiple calls to this function in a single critical section, avoid
* complex logic in the callback.
*/
std::function<void(const stdx::thread&)> onJoinRetiredThread;
}; };
/** /**
@ -169,6 +179,20 @@ public:
*/ */
Stats getStats() const; Stats getStats() const;
/**
* Set the minimum number of threads for this ThreadPool.
* Calling this method will spin up new threads if the new minimum is greater than the current
* number of threads.
*/
void setMinThreads(size_t minThreads);
/**
* Set the maximum number of threads for this ThreadPool.
* Calling this method will cause threads to be reaped once they finish their tasks if more than
* the maximum are running.
*/
void setMaxThreads(size_t maxThreads);
private: private:
class Impl; class Impl;
std::unique_ptr<Impl> _impl; std::unique_ptr<Impl> _impl;

View File

@ -224,7 +224,7 @@ TEST_F(ThreadPoolTest, MaxPoolSize20MinPoolSize15) {
DEATH_TEST_REGEX_F(ThreadPoolDeathTest, DEATH_TEST_REGEX_F(ThreadPoolDeathTest,
MaxThreadsTooFewDies, MaxThreadsTooFewDies,
"Cannot create pool.*with maximum number of threads.*less than 1") { "Cannot configure pool.*with maximum number of threads.*less than 1") {
ThreadPool::Options options; ThreadPool::Options options;
options.maxThreads = 0; options.maxThreads = 0;
makePool(options); makePool(options);
@ -233,7 +233,7 @@ DEATH_TEST_REGEX_F(ThreadPoolDeathTest,
DEATH_TEST_REGEX_F( DEATH_TEST_REGEX_F(
ThreadPoolDeathTest, ThreadPoolDeathTest,
MinThreadsTooManyDies, MinThreadsTooManyDies,
R"re(.*Cannot create pool.*with minimum number of threads.*larger than the configured maximum.*minThreads":6,"maxThreads":5)re") { R"re(.*Cannot configure pool.*with minimum number of threads.*larger than the maximum.*minThreads":6,"maxThreads":5)re") {
ThreadPool::Options options; ThreadPool::Options options;
options.maxThreads = 5; options.maxThreads = 5;
options.minThreads = 6; options.minThreads = 6;
@ -316,6 +316,337 @@ TEST_F(ThreadPoolTest, ThreadPoolRunsOnCreateThreadFunctionBeforeConsumingTasks)
ASSERT_EQ(journal, "[onCreate(mythread0)][Call(OK)]"); ASSERT_EQ(journal, "[onCreate(mythread0)][Call(OK)]");
} }
TEST_F(ThreadPoolTest, JoinAllRetiredThreads) {
Atomic<int> retiredThreads(0);
ThreadPool::Options options;
options.minThreads = 4;
options.maxThreads = 8;
options.maxIdleThreadAge = Milliseconds(100);
options.onJoinRetiredThread = [&](const stdx::thread& t) {
retiredThreads.addAndFetch(1);
};
unittest::Barrier barrier(options.maxThreads + 1);
auto& pool = makePool(options);
for (auto i = size_t{0}; i < options.maxThreads; ++i) {
pool.schedule([&](auto status) {
ASSERT_OK(status);
barrier.countDownAndWait();
});
}
ASSERT_EQ(pool.getStats().numThreads, 0);
pool.startup();
barrier.countDownAndWait();
while (pool.getStats().numThreads > options.minThreads) {
sleepFor(Microseconds{1});
}
pool.shutdown();
pool.join();
ASSERT_EQ(retiredThreads.load(), options.maxThreads - options.minThreads);
ASSERT_EQ(pool.getStats().numIdleThreads, 0);
}
DEATH_TEST_REGEX_F(
ThreadPoolDeathTest,
ModifyMinThreadsGreaterThanMax,
R"re(.*Cannot configure pool.*with minimum number of threads.*larger than the maximum.*minThreads":7,"maxThreads":5)re") {
ThreadPool::Options options;
options.maxThreads = 5;
options.minThreads = 3;
auto& pool = makePool(options);
const size_t newMinThreads = 7;
pool.setMinThreads(newMinThreads);
}
DEATH_TEST_REGEX_F(
ThreadPoolDeathTest,
ModifyMaxLessThanMin,
R"re(.*Cannot configure pool.*with minimum number of threads.*larger than the maximum.*minThreads":3,"maxThreads":2)re") {
ThreadPool::Options options;
options.maxThreads = 5;
options.minThreads = 3;
auto& pool = makePool(options);
const size_t newMaxThreads = 2;
pool.setMaxThreads(newMaxThreads);
}
DEATH_TEST_REGEX_F(ThreadPoolDeathTest,
ModifyMaxToZero,
"Cannot configure pool.*with maximum number of threads.*less than 1") {
ThreadPool::Options options;
options.maxThreads = 5;
options.minThreads = 0;
auto& pool = makePool(options);
const size_t newMaxThreads = 0;
pool.setMaxThreads(newMaxThreads);
}
TEST_F(ThreadPoolTest, ModifyMinThreads) {
Atomic<int> retiredThreads(0);
ThreadPool::Options options;
options.minThreads = 4;
options.maxThreads = 8;
options.maxIdleThreadAge = Milliseconds(100);
options.onJoinRetiredThread = [&](const stdx::thread& t) {
retiredThreads.addAndFetch(1);
};
unittest::Barrier barrier(options.maxThreads + 1);
auto& pool = makePool(options);
for (auto i = size_t{0}; i < options.maxThreads; ++i) {
pool.schedule([&](auto status) {
ASSERT_OK(status);
barrier.countDownAndWait();
});
}
ASSERT_EQ(pool.getStats().numThreads, 0);
pool.startup();
barrier.countDownAndWait();
while (pool.getStats().numThreads > options.minThreads) {
sleepFor(Microseconds{1});
}
ASSERT_EQ(pool.getStats().numIdleThreads, options.minThreads);
sleepFor(Milliseconds{100});
ASSERT_EQ(retiredThreads.load(), options.maxThreads - options.minThreads);
// Modify to lower value
// reset # of retired threads
retiredThreads.store(0);
// barrier was reset when counter reached 0
const size_t newMinThreads = 2;
pool.setMinThreads(newMinThreads);
for (auto i = size_t{0}; i < options.maxThreads; ++i) {
pool.schedule([&](auto status) {
ASSERT_OK(status);
barrier.countDownAndWait();
});
}
barrier.countDownAndWait();
while (pool.getStats().numThreads > newMinThreads) {
sleepFor(Microseconds{1});
}
ASSERT_EQ(pool.getStats().numIdleThreads, newMinThreads);
sleepFor(Milliseconds{100});
ASSERT_EQ(retiredThreads.load(), options.maxThreads - newMinThreads);
// modify to higher value
// reset # of retired threads
retiredThreads.store(0);
// barrier was reset when counter reached 0
const size_t higherMinThreads = 6;
pool.setMinThreads(higherMinThreads);
for (auto i = size_t{0}; i < options.maxThreads; ++i) {
pool.schedule([&](auto status) {
ASSERT_OK(status);
barrier.countDownAndWait();
});
}
barrier.countDownAndWait();
while (pool.getStats().numThreads > higherMinThreads) {
sleepFor(Microseconds{1});
}
ASSERT_EQ(pool.getStats().numIdleThreads, higherMinThreads);
sleepFor(Milliseconds{100});
ASSERT_EQ(retiredThreads.load(), options.maxThreads - higherMinThreads);
pool.shutdown();
pool.join();
}
TEST_F(ThreadPoolTest, DecreaseMaxThreadsAndDoLessWork) {
Atomic<int> retiredThreads(0);
ThreadPool::Options options;
const size_t originalMaxThreads = 8;
options.minThreads = 4;
options.maxThreads = originalMaxThreads;
options.maxIdleThreadAge = Milliseconds(1000);
options.onJoinRetiredThread = [&](const stdx::thread& t) {
retiredThreads.addAndFetch(1);
};
auto& pool = makePool(options);
ASSERT_EQ(pool.getStats().numThreads, 0);
pool.startup();
unittest::Barrier barrier(options.maxThreads + 1);
for (auto i = size_t{0}; i < options.maxThreads; ++i) {
pool.schedule([&](auto status) {
ASSERT_OK(status);
barrier.countDownAndWait();
});
}
barrier.countDownAndWait();
// No threads should have retired yet.
ASSERT_EQ(retiredThreads.load(), 0);
// Modify maxThreads to a lower value;
const size_t lowerMaxThreads = 4;
pool.setMaxThreads(lowerMaxThreads);
unittest::Barrier barrier2(2);
// Schedule only one task
pool.schedule([&](auto status) {
ASSERT_OK(status);
barrier2.countDownAndWait();
});
barrier2.countDownAndWait();
// Cleaning up the retired threads happens after task execution, so wait briefly for this to
// complete.
sleepFor(Milliseconds{100});
// Four threads should have retired due to lowering max from 8 to 4.
ASSERT_EQ(retiredThreads.load(), originalMaxThreads - lowerMaxThreads);
pool.shutdown();
pool.join();
}
TEST_F(ThreadPoolTest, ModifyMaxThreads) {
Atomic<int> retiredThreads(0);
ThreadPool::Options options;
options.minThreads = 4;
options.maxThreads = 8;
options.maxIdleThreadAge = Milliseconds(100);
options.onJoinRetiredThread = [&](const stdx::thread& t) {
retiredThreads.addAndFetch(1);
};
unittest::Barrier barrier(options.maxThreads + 1);
auto& pool = makePool(options);
for (auto i = size_t{0}; i < options.maxThreads; ++i) {
pool.schedule([&](auto status) {
ASSERT_OK(status);
barrier.countDownAndWait();
});
}
ASSERT_EQ(pool.getStats().numThreads, 0);
pool.startup();
barrier.countDownAndWait();
while (pool.getStats().numThreads > options.minThreads) {
sleepFor(Microseconds{1});
}
ASSERT_EQ(pool.getStats().numIdleThreads, options.minThreads);
sleepFor(Milliseconds{100});
ASSERT_EQ(retiredThreads.load(), options.maxThreads - options.minThreads);
// Modify to higher value
// reset # of retired threads
retiredThreads.store(0);
const size_t newMaxThreads = 12;
pool.setMaxThreads(newMaxThreads);
// create new barrier to reflect new number of threads
unittest::Barrier barrier2(newMaxThreads + 1);
for (auto i = size_t{0}; i < newMaxThreads; ++i) {
pool.schedule([&](auto status) {
ASSERT_OK(status);
barrier2.countDownAndWait();
});
}
barrier2.countDownAndWait();
while (pool.getStats().numThreads > options.minThreads) {
sleepFor(Microseconds{1});
}
ASSERT_EQ(pool.getStats().numIdleThreads, options.minThreads);
sleepFor(Milliseconds{100});
ASSERT_EQ(retiredThreads.load(), newMaxThreads - options.minThreads);
// modify to lower value
// reset # of retired threads
retiredThreads.store(0);
const size_t lowerMaxThreads = 6;
pool.setMaxThreads(lowerMaxThreads);
// create new barrier to reflect new number of threads
unittest::Barrier barrier3(lowerMaxThreads + 1);
for (auto i = size_t{0}; i < lowerMaxThreads; ++i) {
pool.schedule([&](auto status) {
ASSERT_OK(status);
barrier3.countDownAndWait();
});
}
barrier3.countDownAndWait();
while (pool.getStats().numThreads > options.minThreads) {
sleepFor(Microseconds{1});
}
ASSERT_EQ(pool.getStats().numIdleThreads, options.minThreads);
sleepFor(Milliseconds{100});
ASSERT_EQ(retiredThreads.load(), lowerMaxThreads - options.minThreads);
pool.shutdown();
pool.join();
}
TEST_F(ThreadPoolTest, ModifyMaxAndMinThreads) {
Atomic<int> retiredThreads(0);
ThreadPool::Options options;
options.minThreads = 4;
options.maxThreads = 8;
options.maxIdleThreadAge = Milliseconds(100);
options.onJoinRetiredThread = [&](const stdx::thread& t) {
retiredThreads.addAndFetch(1);
};
unittest::Barrier barrier(options.maxThreads + 1);
auto& pool = makePool(options);
for (auto i = size_t{0}; i < options.maxThreads; ++i) {
pool.schedule([&](auto status) {
ASSERT_OK(status);
barrier.countDownAndWait();
});
}
ASSERT_EQ(pool.getStats().numThreads, 0);
pool.startup();
barrier.countDownAndWait();
while (pool.getStats().numThreads > options.minThreads) {
sleepFor(Microseconds{1});
}
ASSERT_EQ(pool.getStats().numIdleThreads, options.minThreads);
sleepFor(Milliseconds{100});
ASSERT_EQ(retiredThreads.load(), options.maxThreads - options.minThreads);
// reset # of retired threads
retiredThreads.store(0);
const size_t newMaxThreads = 12;
const size_t newMinThreads = 2;
pool.setMaxThreads(newMaxThreads);
pool.setMinThreads(newMinThreads);
// create new barrier to reflect new number of threads
unittest::Barrier barrier2(newMaxThreads + 1);
for (auto i = size_t{0}; i < newMaxThreads; ++i) {
pool.schedule([&](auto status) {
ASSERT_OK(status);
barrier2.countDownAndWait();
});
}
barrier2.countDownAndWait();
while (pool.getStats().numThreads > newMinThreads) {
sleepFor(Microseconds{1});
}
ASSERT_EQ(pool.getStats().numIdleThreads, newMinThreads);
sleepFor(Milliseconds{100});
ASSERT_EQ(retiredThreads.load(), newMaxThreads - newMinThreads);
pool.shutdown();
pool.join();
}
TEST_F(ThreadPoolTest, SafeToCallWaitForIdleBeforeShutdown) { TEST_F(ThreadPoolTest, SafeToCallWaitForIdleBeforeShutdown) {
ThreadPool::Options options; ThreadPool::Options options;
options.minThreads = 1; options.minThreads = 1;