mirror of https://github.com/mongodb/mongo
SERVER-90588 Allow changing `maxIncomingConnectionsOverride` at runtime (#31643)
GitOrigin-RevId: a662d13c8e8d0f3d003a2018a18221efe4a07bd9
This commit is contained in:
parent
0e29529162
commit
d01064c94b
|
|
@ -0,0 +1,54 @@
|
|||
// Tests using a server parameter to set `maxIncomingConnectionsOverride` at runtime.
|
||||
|
||||
function runTest(args, testFunc) {
|
||||
// Run tests in isolation to make sure we always start with a clean slate.
|
||||
var mongo = MongoRunner.runMongod(args);
|
||||
testFunc(mongo);
|
||||
MongoRunner.stopMongod(mongo);
|
||||
}
|
||||
|
||||
function setMaxIncomingConnectionsOverride(conn, newValue) {
|
||||
conn.adminCommand({setParameter: 1, maxIncomingConnectionsOverride: newValue});
|
||||
}
|
||||
|
||||
function getMaxIncomingConnectionsOverride(conn) {
|
||||
const res = conn.adminCommand({getParameter: 1, maxIncomingConnectionsOverride: 1});
|
||||
return res.maxIncomingConnectionsOverride;
|
||||
}
|
||||
|
||||
function setMaxIncomingConnectionsOverrideAndVerify(conn, newValue) {
|
||||
setMaxIncomingConnectionsOverride(conn, newValue);
|
||||
assert.eq(getMaxIncomingConnectionsOverride(conn), newValue);
|
||||
}
|
||||
|
||||
/**
|
||||
* Verify that there are no exemptions set by default, and retrieving that works.
|
||||
*/
|
||||
runTest({}, function(conn) {
|
||||
assert.eq(getMaxIncomingConnectionsOverride(conn).ranges, []);
|
||||
});
|
||||
|
||||
/**
|
||||
* Reset the list of exemptions and then clear it out -- verify that both operations succeed.
|
||||
*/
|
||||
runTest({}, function(conn) {
|
||||
setMaxIncomingConnectionsOverrideAndVerify(conn, {ranges: ["localhost"]});
|
||||
setMaxIncomingConnectionsOverrideAndVerify(conn, {ranges: []});
|
||||
});
|
||||
|
||||
/**
|
||||
* Verify that passing a mix of CIDR and HostAndPort ranges work.
|
||||
*/
|
||||
runTest({}, function(conn) {
|
||||
const ranges = {ranges: ["127.0.0.1/8", "/tmp/mongodb.sock", "8.8.8.8/8", "localhost"]};
|
||||
setMaxIncomingConnectionsOverrideAndVerify(conn, ranges);
|
||||
});
|
||||
|
||||
/**
|
||||
* Verify the behavior of the server parameter when `net.maxIncomingConnectionsOverride` is set at
|
||||
* startup, and then can be modified at runtime.
|
||||
*/
|
||||
runTest({config: "jstests/noPassthrough/libs/max_conns_override_config.yaml"}, function(conn) {
|
||||
assert.eq(getMaxIncomingConnectionsOverride(conn).ranges, ["127.0.0.1/32"]);
|
||||
setMaxIncomingConnectionsOverrideAndVerify(conn, {ranges: ["localhost"]});
|
||||
});
|
||||
|
|
@ -168,7 +168,8 @@ namespace mongo {
|
|||
namespace repl {
|
||||
|
||||
namespace {
|
||||
thread_local ReplicationCoordinatorImpl::SharedReplSetConfig::Lease _cachedRsConfigLease;
|
||||
using VersionedConfigType = VersionedValue<ReplSetConfig, WriteRarelyRWMutex>;
|
||||
thread_local VersionedConfigType::Snapshot cachedRsConfig;
|
||||
} // namespace
|
||||
|
||||
MONGO_FAIL_POINT_DEFINE(stepdownHangBeforePerformingPostMemberStateUpdateActions);
|
||||
|
|
@ -342,38 +343,6 @@ void ReplicationCoordinatorImpl::WaiterList::setErrorAll_inlock(Status status) {
|
|||
_updateMetric_inlock();
|
||||
}
|
||||
|
||||
ReplicationCoordinatorImpl::SharedReplSetConfig::SharedReplSetConfig()
|
||||
: _version(1), _current(std::make_shared<ReplSetConfig>()) {}
|
||||
|
||||
MONGO_COMPILER_NOINLINE MONGO_COMPILER_COLD_FUNCTION
|
||||
ReplicationCoordinatorImpl::SharedReplSetConfig::Lease
|
||||
ReplicationCoordinatorImpl::SharedReplSetConfig::renew() const {
|
||||
auto readLock = _rwMutex.readLock();
|
||||
return Lease{_version.load(), _current};
|
||||
}
|
||||
|
||||
bool ReplicationCoordinatorImpl::SharedReplSetConfig::isStale(const Lease& lease) const {
|
||||
return _version.load() != lease.version;
|
||||
}
|
||||
|
||||
ReplSetConfig& ReplicationCoordinatorImpl::SharedReplSetConfig::getConfig() const {
|
||||
if (MONGO_unlikely(isStale(_cachedRsConfigLease))) {
|
||||
_cachedRsConfigLease = renew();
|
||||
}
|
||||
return *(_cachedRsConfigLease.config);
|
||||
}
|
||||
|
||||
ReplSetConfig& ReplicationCoordinatorImpl::SharedReplSetConfig::getConfig(WithLock) const {
|
||||
return *_current;
|
||||
}
|
||||
|
||||
void ReplicationCoordinatorImpl::SharedReplSetConfig::setConfig(
|
||||
std::shared_ptr<ReplSetConfig> newConfig) {
|
||||
auto writeLock = _rwMutex.writeLock();
|
||||
_version.fetchAndAdd(1);
|
||||
_current = std::move(newConfig);
|
||||
}
|
||||
|
||||
namespace {
|
||||
InitialSyncerInterface::Options createInitialSyncerOptions(
|
||||
ReplicationCoordinator* replCoord, ReplicationCoordinatorExternalState* externalState) {
|
||||
|
|
@ -430,6 +399,7 @@ ReplicationCoordinatorImpl::ReplicationCoordinatorImpl(
|
|||
_inShutdown(false),
|
||||
_memberState(MemberState::RS_STARTUP),
|
||||
_rsConfigState(kConfigPreStart),
|
||||
_rsConfig(std::make_shared<ReplSetConfig>()), // Initialize with empty configuration.
|
||||
_selfIndex(-1),
|
||||
_sleptLastElection(false),
|
||||
_readWriteAbility(std::make_unique<ReadWriteAbility>(!settings.isReplSet())),
|
||||
|
|
@ -465,9 +435,9 @@ ReplicationCoordinatorImpl::ReplicationCoordinatorImpl(
|
|||
|
||||
_externalState->setupNoopWriter(Seconds(periodicNoopIntervalSecs.load()));
|
||||
|
||||
// Reset this so that tests that run back-to-back on the same thread don't
|
||||
// re-use the value from the previous test.
|
||||
_cachedRsConfigLease = {};
|
||||
// Reset this so that tests that run back-to-back on the same thread don't re-use the value from
|
||||
// the previous test.
|
||||
cachedRsConfig = {};
|
||||
}
|
||||
|
||||
ReplicationCoordinatorImpl::~ReplicationCoordinatorImpl() = default;
|
||||
|
|
@ -476,6 +446,11 @@ void ReplicationCoordinatorImpl::waitForStartUpComplete_forTest() {
|
|||
_waitForStartUpComplete();
|
||||
}
|
||||
|
||||
const ReplSetConfig& ReplicationCoordinatorImpl::_getReplSetConfig() const {
|
||||
_rsConfig.refreshSnapshot(cachedRsConfig);
|
||||
return *cachedRsConfig;
|
||||
}
|
||||
|
||||
void ReplicationCoordinatorImpl::_waitForStartUpComplete() {
|
||||
CallbackHandle handle;
|
||||
{
|
||||
|
|
@ -491,7 +466,8 @@ void ReplicationCoordinatorImpl::_waitForStartUpComplete() {
|
|||
}
|
||||
|
||||
ReplSetConfig ReplicationCoordinatorImpl::getReplicaSetConfig_forTest() {
|
||||
return _rsConfig.getConfig();
|
||||
stdx::unique_lock lk(_mutex);
|
||||
return _rsConfig.unsafePeek();
|
||||
}
|
||||
|
||||
Date_t ReplicationCoordinatorImpl::getElectionTimeout_forTest() const {
|
||||
|
|
@ -671,7 +647,7 @@ bool ReplicationCoordinatorImpl::_startLoadLocalConfig(
|
|||
}
|
||||
|
||||
void ReplicationCoordinatorImpl::_createHorizonTopologyChangePromiseMapping(WithLock lk) {
|
||||
auto horizonMappings = _rsConfig.getConfig(lk).getMemberAt(_selfIndex).getHorizonMappings();
|
||||
auto horizonMappings = _rsConfig.unsafePeek().getMemberAt(_selfIndex).getHorizonMappings();
|
||||
// Create a new horizon to promise mapping since it is possible for the horizons
|
||||
// to change after a replica set reconfig.
|
||||
_horizonToTopologyChangePromiseMap.clear();
|
||||
|
|
@ -1068,7 +1044,7 @@ void ReplicationCoordinatorImpl::startup(OperationContext* opCtx,
|
|||
// _finishLoadLocalConfig.
|
||||
{
|
||||
stdx::lock_guard<Latch> lk(_mutex);
|
||||
invariant(!_rsConfig.getConfig(lk).isInitialized());
|
||||
invariant(!_rsConfig.unsafePeek().isInitialized());
|
||||
_setConfigState_inlock(kConfigUninitialized);
|
||||
}
|
||||
if (_settings.shouldAutoInitiate()) {
|
||||
|
|
@ -1092,7 +1068,7 @@ void ReplicationCoordinatorImpl::_setImplicitDefaultWriteConcern(OperationContex
|
|||
WithLock lk) {
|
||||
auto& rwcDefaults = ReadWriteConcernDefaults::get(opCtx);
|
||||
bool isImplicitDefaultWriteConcernMajority =
|
||||
_rsConfig.getConfig(lk).isImplicitDefaultWriteConcernMajority();
|
||||
_rsConfig.unsafePeek().isImplicitDefaultWriteConcernMajority();
|
||||
rwcDefaults.setImplicitDefaultWriteConcernMajority(isImplicitDefaultWriteConcernMajority);
|
||||
}
|
||||
|
||||
|
|
@ -1293,13 +1269,13 @@ Status ReplicationCoordinatorImpl::waitForMemberState(Interruptible* interruptib
|
|||
|
||||
Seconds ReplicationCoordinatorImpl::getSecondaryDelaySecs() const {
|
||||
stdx::lock_guard<Latch> lk(_mutex);
|
||||
invariant(_rsConfig.getConfig(lk).isInitialized());
|
||||
invariant(_rsConfig.unsafePeek().isInitialized());
|
||||
if (_selfIndex == -1) {
|
||||
// We aren't currently in the set. Return 0 seconds so we can clear out the applier's
|
||||
// queue of work.
|
||||
return Seconds(0);
|
||||
}
|
||||
return _rsConfig.getConfig(lk).getMemberAt(_selfIndex).getSecondaryDelay();
|
||||
return _rsConfig.unsafePeek().getMemberAt(_selfIndex).getSecondaryDelay();
|
||||
}
|
||||
|
||||
void ReplicationCoordinatorImpl::clearSyncSourceDenylist() {
|
||||
|
|
@ -1474,8 +1450,8 @@ void ReplicationCoordinatorImpl::signalApplierDrainComplete(OperationContext* op
|
|||
{
|
||||
// If the config doesn't have a term, don't change it.
|
||||
auto needBumpConfigTerm =
|
||||
_rsConfig.getConfig(lk).getConfigTerm() != OpTime::kUninitializedTerm;
|
||||
auto currConfigVersionAndTerm = _rsConfig.getConfig(lk).getConfigVersionAndTerm();
|
||||
_rsConfig.unsafePeek().getConfigTerm() != OpTime::kUninitializedTerm;
|
||||
auto currConfigVersionAndTerm = _rsConfig.unsafePeek().getConfigVersionAndTerm();
|
||||
lk.unlock();
|
||||
|
||||
if (needBumpConfigTerm) {
|
||||
|
|
@ -1692,7 +1668,7 @@ void ReplicationCoordinatorImpl::_setMyLastWrittenOpTimeAndWallTime(
|
|||
opTime);
|
||||
|
||||
// If we are using written times to calculate the commit level, update it now.
|
||||
if (!_rsConfig.getConfig(lk).getWriteConcernMajorityShouldJournal()) {
|
||||
if (!_rsConfig.unsafePeek().getWriteConcernMajorityShouldJournal()) {
|
||||
_updateLastCommittedOpTimeAndWallTime(lk);
|
||||
}
|
||||
}
|
||||
|
|
@ -1752,7 +1728,7 @@ void ReplicationCoordinatorImpl::_setMyLastDurableOpTimeAndWallTime(
|
|||
_topCoord->setMyLastDurableOpTimeAndWallTime(
|
||||
opTimeAndWallTime, _replExecutor->now(), isRollbackAllowed);
|
||||
// If we are using durable times to calculate the commit level, update it now.
|
||||
if (_rsConfig.getConfig(lk).getWriteConcernMajorityShouldJournal()) {
|
||||
if (_rsConfig.unsafePeek().getWriteConcernMajorityShouldJournal()) {
|
||||
_updateLastCommittedOpTimeAndWallTime(lk);
|
||||
}
|
||||
// There could be replication waiters waiting for our lastDurable for {j: true}, wake up those
|
||||
|
|
@ -1779,7 +1755,7 @@ bool ReplicationCoordinatorImpl::_setMyLastAppliedOpTimeAndWallTimeForward(
|
|||
}
|
||||
|
||||
if (_readWriteAbility->canAcceptNonLocalWrites(lk) &&
|
||||
_rsConfig.getConfig(lk).getWriteMajority() == 1) {
|
||||
_rsConfig.unsafePeek().getWriteMajority() == 1) {
|
||||
// Single vote primaries may have a lagged stable timestamp due to paring back the
|
||||
// stable timestamp to the all committed timestamp.
|
||||
_setStableTimestampForStorage(lk);
|
||||
|
|
@ -1896,8 +1872,7 @@ Status ReplicationCoordinatorImpl::waitUntilOpTimeForReadUntil(OperationContext*
|
|||
{
|
||||
stdx::lock_guard lock(_mutex);
|
||||
if (_rsConfigState == kConfigUninitialized || _rsConfigState == kConfigInitiating ||
|
||||
(_rsConfigState == kConfigHBReconfiguring &&
|
||||
!_rsConfig.getConfig(lock).isInitialized())) {
|
||||
(_rsConfigState == kConfigHBReconfiguring && !_rsConfig.unsafePeek().isInitialized())) {
|
||||
return {
|
||||
ErrorCodes::NotYetInitialized,
|
||||
"Cannot use non-local read concern until replica set is finished initializing."};
|
||||
|
|
@ -2008,8 +1983,7 @@ Status ReplicationCoordinatorImpl::waitUntilOpTimeWrittenUntil(OperationContext*
|
|||
{
|
||||
stdx::lock_guard lock(_mutex);
|
||||
if (_rsConfigState == kConfigUninitialized || _rsConfigState == kConfigInitiating ||
|
||||
(_rsConfigState == kConfigHBReconfiguring &&
|
||||
!_rsConfig.getConfig(lock).isInitialized())) {
|
||||
(_rsConfigState == kConfigHBReconfiguring && !_rsConfig.unsafePeek().isInitialized())) {
|
||||
return {
|
||||
ErrorCodes::NotYetInitialized,
|
||||
"Cannot use non-local read concern until replica set is finished initializing."};
|
||||
|
|
@ -2243,14 +2217,14 @@ bool ReplicationCoordinatorImpl::isCommitQuorumSatisfied(
|
|||
patternName = commitQuorum.mode;
|
||||
}
|
||||
|
||||
auto tagPattern = uassertStatusOK(_rsConfig.getConfig(lock).findCustomWriteMode(patternName));
|
||||
auto tagPattern = uassertStatusOK(_rsConfig.unsafePeek().findCustomWriteMode(patternName));
|
||||
return _haveTaggedNodesSatisfiedCommitQuorum(lock, tagPattern, members);
|
||||
}
|
||||
|
||||
bool ReplicationCoordinatorImpl::_haveNumNodesSatisfiedCommitQuorum(
|
||||
WithLock lk, int numNodes, const std::vector<mongo::HostAndPort>& members) const {
|
||||
for (auto&& member : members) {
|
||||
auto memberConfig = _rsConfig.getConfig(lk).findMemberByHostAndPort(member);
|
||||
auto memberConfig = _rsConfig.unsafePeek().findMemberByHostAndPort(member);
|
||||
// We do not count arbiters and members that aren't part of replica set config,
|
||||
// towards the commit quorum.
|
||||
if (!memberConfig || memberConfig->isArbiter())
|
||||
|
|
@ -2272,7 +2246,7 @@ bool ReplicationCoordinatorImpl::_haveTaggedNodesSatisfiedCommitQuorum(
|
|||
ReplSetTagMatch matcher(tagPattern);
|
||||
|
||||
for (auto&& member : members) {
|
||||
auto memberConfig = _rsConfig.getConfig(lk).findMemberByHostAndPort(member);
|
||||
auto memberConfig = _rsConfig.unsafePeek().findMemberByHostAndPort(member);
|
||||
// We do not count arbiters and members that aren't part of replica set config,
|
||||
// towards the commit quorum.
|
||||
if (!memberConfig || memberConfig->isArbiter())
|
||||
|
|
@ -2294,7 +2268,7 @@ bool ReplicationCoordinatorImpl::_doneWaitingForReplication_inlock(
|
|||
const bool useDurableOpTime = writeConcern.syncMode == WriteConcernOptions::SyncMode::JOURNAL;
|
||||
if (!holds_alternative<std::string>(writeConcern.w)) {
|
||||
if (auto wTags = std::get_if<WTags>(&writeConcern.w)) {
|
||||
auto tagPattern = uassertStatusOK(_rsConfig.getConfig(lk).makeCustomWriteMode(*wTags));
|
||||
auto tagPattern = uassertStatusOK(_rsConfig.unsafePeek().makeCustomWriteMode(*wTags));
|
||||
return _topCoord->haveTaggedNodesReachedOpTime(opTime, tagPattern, useDurableOpTime);
|
||||
}
|
||||
|
||||
|
|
@ -2348,7 +2322,7 @@ bool ReplicationCoordinatorImpl::_doneWaitingForReplication_inlock(
|
|||
patternName = wMode;
|
||||
}
|
||||
|
||||
auto tagPattern = uassertStatusOK(_rsConfig.getConfig(lk).findCustomWriteMode(patternName));
|
||||
auto tagPattern = uassertStatusOK(_rsConfig.unsafePeek().findCustomWriteMode(patternName));
|
||||
if (writeConcern.checkCondition == WriteConcernOptions::CheckCondition::OpTime) {
|
||||
return _topCoord->haveTaggedNodesReachedOpTime(opTime, tagPattern, useDurableOpTime);
|
||||
} else {
|
||||
|
|
@ -2723,10 +2697,10 @@ ReplicationCoordinatorImpl::getHelloResponseFuture(
|
|||
boost::optional<std::string> ReplicationCoordinatorImpl::_getHorizonString(
|
||||
WithLock lk, const SplitHorizon::Parameters& horizonParams) const {
|
||||
const auto myState = _topCoord->getMemberState();
|
||||
const bool hasValidConfig = _rsConfig.getConfig(lk).isInitialized() && !myState.removed();
|
||||
const bool hasValidConfig = _rsConfig.unsafePeek().isInitialized() && !myState.removed();
|
||||
boost::optional<std::string> horizonString;
|
||||
if (hasValidConfig) {
|
||||
const auto& self = _rsConfig.getConfig(lk).getMemberAt(_selfIndex);
|
||||
const auto& self = _rsConfig.unsafePeek().getMemberAt(_selfIndex);
|
||||
horizonString = self.determineHorizon(horizonParams);
|
||||
}
|
||||
// A horizonString that is boost::none indicates that we do not have a valid config.
|
||||
|
|
@ -3321,7 +3295,7 @@ void ReplicationCoordinatorImpl::_performElectionHandoff() {
|
|||
return;
|
||||
}
|
||||
|
||||
auto target = _rsConfig.getConfig(lock).getMemberAt(candidateIndex).getHostAndPort();
|
||||
auto target = _rsConfig.unsafePeek().getMemberAt(candidateIndex).getHostAndPort();
|
||||
executor::RemoteCommandRequest request(
|
||||
target, DatabaseName::kAdmin, BSON("replSetStepUp" << 1 << "skipDryRun" << true), nullptr);
|
||||
LOGV2(21347, "Handing off election", "target"_attr = target);
|
||||
|
|
@ -3588,11 +3562,11 @@ HostAndPort ReplicationCoordinatorImpl::getMyHostAndPort() const {
|
|||
if (_selfIndex == -1) {
|
||||
return HostAndPort();
|
||||
}
|
||||
return _rsConfig.getConfig(lk).getMemberAt(_selfIndex).getHostAndPort();
|
||||
return _rsConfig.unsafePeek().getMemberAt(_selfIndex).getHostAndPort();
|
||||
}
|
||||
|
||||
int ReplicationCoordinatorImpl::_getMyId_inlock(WithLock lk) const {
|
||||
const MemberConfig& self = _rsConfig.getConfig(lk).getMemberAt(_selfIndex);
|
||||
const MemberConfig& self = _rsConfig.unsafePeek().getMemberAt(_selfIndex);
|
||||
return self.getId().getData();
|
||||
}
|
||||
|
||||
|
|
@ -3684,70 +3658,70 @@ void ReplicationCoordinatorImpl::appendSecondaryInfoData(BSONObjBuilder* result)
|
|||
}
|
||||
|
||||
ReplSetConfig ReplicationCoordinatorImpl::getConfig() const {
|
||||
return _rsConfig.getConfig();
|
||||
return _getReplSetConfig();
|
||||
}
|
||||
|
||||
ReplSetConfig ReplicationCoordinatorImpl::getConfig(WithLock lk) const {
|
||||
return _rsConfig.getConfig(lk);
|
||||
return _rsConfig.unsafePeek();
|
||||
}
|
||||
|
||||
ConnectionString ReplicationCoordinatorImpl::getConfigConnectionString() const {
|
||||
return _rsConfig.getConfig().getConnectionString();
|
||||
return _getReplSetConfig().getConnectionString();
|
||||
}
|
||||
|
||||
Milliseconds ReplicationCoordinatorImpl::getConfigElectionTimeoutPeriod() const {
|
||||
return _rsConfig.getConfig().getElectionTimeoutPeriod();
|
||||
return _getReplSetConfig().getElectionTimeoutPeriod();
|
||||
}
|
||||
|
||||
std::vector<MemberConfig> ReplicationCoordinatorImpl::getConfigVotingMembers() const {
|
||||
return _rsConfig.getConfig().votingMembers();
|
||||
return _getReplSetConfig().votingMembers();
|
||||
}
|
||||
|
||||
size_t ReplicationCoordinatorImpl::getNumConfigVotingMembers() const {
|
||||
return _rsConfig.getConfig().getCountOfVotingMembers();
|
||||
return _getReplSetConfig().getCountOfVotingMembers();
|
||||
}
|
||||
|
||||
std::int64_t ReplicationCoordinatorImpl::getConfigTerm() const {
|
||||
return _rsConfig.getConfig().getConfigTerm();
|
||||
return _getReplSetConfig().getConfigTerm();
|
||||
}
|
||||
|
||||
std::int64_t ReplicationCoordinatorImpl::getConfigVersion() const {
|
||||
return _rsConfig.getConfig().getConfigVersion();
|
||||
return _getReplSetConfig().getConfigVersion();
|
||||
}
|
||||
|
||||
ConfigVersionAndTerm ReplicationCoordinatorImpl::getConfigVersionAndTerm() const {
|
||||
return _rsConfig.getConfig().getConfigVersionAndTerm();
|
||||
return _getReplSetConfig().getConfigVersionAndTerm();
|
||||
}
|
||||
|
||||
int ReplicationCoordinatorImpl::getConfigNumMembers() const {
|
||||
return _rsConfig.getConfig().getNumMembers();
|
||||
return _getReplSetConfig().getNumMembers();
|
||||
}
|
||||
|
||||
Milliseconds ReplicationCoordinatorImpl::getConfigHeartbeatTimeoutPeriodMillis() const {
|
||||
return _rsConfig.getConfig().getHeartbeatTimeoutPeriodMillis();
|
||||
return _getReplSetConfig().getHeartbeatTimeoutPeriodMillis();
|
||||
}
|
||||
|
||||
BSONObj ReplicationCoordinatorImpl::getConfigBSON() const {
|
||||
return _rsConfig.getConfig().toBSON();
|
||||
return _getReplSetConfig().toBSON();
|
||||
}
|
||||
|
||||
boost::optional<MemberConfig> ReplicationCoordinatorImpl::findConfigMemberByHostAndPort_deprecated(
|
||||
const HostAndPort& hap) const {
|
||||
const MemberConfig* result = _rsConfig.getConfig().findMemberByHostAndPort(hap);
|
||||
const MemberConfig* result = _getReplSetConfig().findMemberByHostAndPort(hap);
|
||||
return result ? boost::make_optional(*result) : boost::none;
|
||||
}
|
||||
|
||||
bool ReplicationCoordinatorImpl::isConfigLocalHostAllowed() const {
|
||||
return _rsConfig.getConfig().isLocalHostAllowed();
|
||||
return _getReplSetConfig().isLocalHostAllowed();
|
||||
}
|
||||
|
||||
Milliseconds ReplicationCoordinatorImpl::getConfigHeartbeatInterval() const {
|
||||
return _rsConfig.getConfig().getHeartbeatInterval();
|
||||
return _getReplSetConfig().getHeartbeatInterval();
|
||||
}
|
||||
|
||||
Status ReplicationCoordinatorImpl::validateWriteConcern(
|
||||
const WriteConcernOptions& writeConcern) const {
|
||||
return _rsConfig.getConfig().validateWriteConcern(writeConcern);
|
||||
return _getReplSetConfig().validateWriteConcern(writeConcern);
|
||||
}
|
||||
|
||||
WriteConcernOptions ReplicationCoordinatorImpl::_getOplogCommitmentWriteConcern(WithLock lk) {
|
||||
|
|
@ -3772,9 +3746,9 @@ void ReplicationCoordinatorImpl::processReplSetGetConfig(BSONObjBuilder* result,
|
|||
bool includeNewlyAdded) {
|
||||
stdx::lock_guard<Latch> lock(_mutex);
|
||||
if (includeNewlyAdded) {
|
||||
result->append("config", _rsConfig.getConfig(lock).toBSON());
|
||||
result->append("config", _rsConfig.unsafePeek().toBSON());
|
||||
} else {
|
||||
result->append("config", _rsConfig.getConfig(lock).toBSONWithoutNewlyAdded());
|
||||
result->append("config", _rsConfig.unsafePeek().toBSONWithoutNewlyAdded());
|
||||
}
|
||||
|
||||
if (commitmentStatus) {
|
||||
|
|
@ -3874,7 +3848,7 @@ bool ReplicationCoordinatorImpl::shouldDropSyncSourceAfterShardSplit(const OID r
|
|||
if (!_settings.isServerless()) {
|
||||
return false;
|
||||
}
|
||||
return replicaSetId != _rsConfig.getConfig().getReplicaSetId();
|
||||
return replicaSetId != _getReplSetConfig().getReplicaSetId();
|
||||
}
|
||||
|
||||
Status ReplicationCoordinatorImpl::processReplSetSyncFrom(OperationContext* opCtx,
|
||||
|
|
@ -4064,7 +4038,7 @@ Status ReplicationCoordinatorImpl::_doReplSetReconfig(OperationContext* opCtx,
|
|||
}
|
||||
|
||||
LOGV2(6015313, "Replication config state is Steady, starting reconfig");
|
||||
invariant(_rsConfig.getConfig(lk).isInitialized());
|
||||
invariant(_rsConfig.unsafePeek().isInitialized());
|
||||
|
||||
if (!force && !_readWriteAbility->canAcceptNonLocalWrites(lk) && !skipSafetyChecks) {
|
||||
return Status(
|
||||
|
|
@ -4079,8 +4053,8 @@ Status ReplicationCoordinatorImpl::_doReplSetReconfig(OperationContext* opCtx,
|
|||
// For safety of reconfig, since we must commit a config in our own term before executing a
|
||||
// reconfig, we should never have a config in an older term. If the current config was
|
||||
// installed via a force reconfig, we aren't concerned about this safety guarantee.
|
||||
invariant(_rsConfig.getConfig(lk).getConfigTerm() == OpTime::kUninitializedTerm ||
|
||||
_rsConfig.getConfig(lk).getConfigTerm() == topCoordTerm);
|
||||
invariant(_rsConfig.unsafePeek().getConfigTerm() == OpTime::kUninitializedTerm ||
|
||||
_rsConfig.unsafePeek().getConfigTerm() == topCoordTerm);
|
||||
}
|
||||
|
||||
auto configWriteConcern = _getConfigReplicationWriteConcern();
|
||||
|
|
@ -4092,7 +4066,7 @@ Status ReplicationCoordinatorImpl::_doReplSetReconfig(OperationContext* opCtx,
|
|||
return Status(ErrorCodes::CurrentConfigNotCommittedYet,
|
||||
str::stream()
|
||||
<< "Cannot run replSetReconfig because the current config: "
|
||||
<< _rsConfig.getConfig(lk).getConfigVersionAndTerm().toString()
|
||||
<< _rsConfig.unsafePeek().getConfigVersionAndTerm().toString()
|
||||
<< " is not "
|
||||
<< "majority committed.");
|
||||
}
|
||||
|
|
@ -4101,10 +4075,10 @@ Status ReplicationCoordinatorImpl::_doReplSetReconfig(OperationContext* opCtx,
|
|||
// current config. If this is the initial reconfig, then we don't need to check this
|
||||
// condition, since there were no prior configs. Also, for force reconfigs we bypass this
|
||||
// safety check condition.
|
||||
auto isInitialReconfig = (_rsConfig.getConfig(lk).getConfigVersion() == 1);
|
||||
auto isInitialReconfig = (_rsConfig.unsafePeek().getConfigVersion() == 1);
|
||||
// If our config was installed via a "force" reconfig, we bypass the oplog commitment check.
|
||||
auto leavingForceConfig =
|
||||
(_rsConfig.getConfig(lk).getConfigTerm() == OpTime::kUninitializedTerm);
|
||||
(_rsConfig.unsafePeek().getConfigTerm() == OpTime::kUninitializedTerm);
|
||||
auto configOplogCommitmentOpTime = _topCoord->getConfigOplogCommitmentOpTime();
|
||||
auto oplogWriteConcern = _getOplogCommitmentWriteConcern(lk);
|
||||
|
||||
|
|
@ -4126,7 +4100,7 @@ Status ReplicationCoordinatorImpl::_doReplSetReconfig(OperationContext* opCtx,
|
|||
auto configStateGuard =
|
||||
ScopeGuard([&] { lockAndCall(&lk, [=, this] { _setConfigState_inlock(kConfigSteady); }); });
|
||||
|
||||
ReplSetConfig oldConfig = _rsConfig.getConfig(lk);
|
||||
ReplSetConfig oldConfig = _rsConfig.unsafePeek();
|
||||
int myIndex = _selfIndex;
|
||||
lk.unlock();
|
||||
|
||||
|
|
@ -4430,9 +4404,9 @@ void ReplicationCoordinatorImpl::_finishReplSetReconfig(OperationContext* opCtx,
|
|||
|
||||
invariant(_rsConfigState == kConfigReconfiguring);
|
||||
|
||||
invariant(_rsConfig.getConfig(lk).isInitialized());
|
||||
invariant(_rsConfig.unsafePeek().isInitialized());
|
||||
|
||||
const ReplSetConfig oldConfig = _rsConfig.getConfig(lk);
|
||||
const ReplSetConfig oldConfig = _rsConfig.unsafePeek();
|
||||
const PostMemberStateUpdateAction action = _setCurrentRSConfig(lk, opCtx, newConfig, myIndex);
|
||||
|
||||
// Record the latest committed optime in the current config atomically with the new config
|
||||
|
|
@ -4485,7 +4459,7 @@ Status ReplicationCoordinatorImpl::awaitConfigCommitment(OperationContext* opCtx
|
|||
}
|
||||
auto configOplogCommitmentOpTime = _topCoord->getConfigOplogCommitmentOpTime();
|
||||
auto oplogWriteConcern = _getOplogCommitmentWriteConcern(lk);
|
||||
auto currConfig = _rsConfig.getConfig(lk);
|
||||
auto currConfig = _rsConfig.unsafePeek();
|
||||
lk.unlock();
|
||||
|
||||
OpTime fakeOpTime(Timestamp(1, 1), term);
|
||||
|
|
@ -4690,7 +4664,7 @@ Status ReplicationCoordinatorImpl::_runReplSetInitiate(const BSONObj& configObj,
|
|||
resultObj->append("info", "try querying local.system.replset to see current configuration");
|
||||
return Status(ErrorCodes::AlreadyInitialized, "already initialized");
|
||||
}
|
||||
invariant(!_rsConfig.getConfig(lk).isInitialized());
|
||||
invariant(!_rsConfig.unsafePeek().isInitialized());
|
||||
_setConfigState_inlock(kConfigInitiating);
|
||||
|
||||
ScopeGuard configStateGuard = [&] {
|
||||
|
|
@ -4814,7 +4788,7 @@ void ReplicationCoordinatorImpl::_finishReplSetInitiate(OperationContext* opCtx,
|
|||
int myIndex) {
|
||||
stdx::unique_lock<Latch> lk(_mutex);
|
||||
invariant(_rsConfigState == kConfigInitiating);
|
||||
invariant(!_rsConfig.getConfig(lk).isInitialized());
|
||||
invariant(!_rsConfig.unsafePeek().isInitialized());
|
||||
auto action = _setCurrentRSConfig(lk, opCtx, newConfig, myIndex);
|
||||
lk.unlock();
|
||||
ReplicaSetAwareServiceRegistry::get(_service).onSetCurrentConfig(opCtx);
|
||||
|
|
@ -4909,7 +4883,7 @@ void ReplicationCoordinatorImpl::_fulfillTopologyChangePromise(WithLock lock) {
|
|||
_cachedTopologyVersionCounter.store(_topCoord->getTopologyVersion().getCounter());
|
||||
const auto myState = _topCoord->getMemberState();
|
||||
|
||||
const bool hasValidConfig = _rsConfig.getConfig(lock).isInitialized() && !myState.removed();
|
||||
const bool hasValidConfig = _rsConfig.unsafePeek().isInitialized() && !myState.removed();
|
||||
// Create a hello response for each horizon the server is knowledgeable about.
|
||||
for (auto iter = _horizonToTopologyChangePromiseMap.begin();
|
||||
iter != _horizonToTopologyChangePromiseMap.end();
|
||||
|
|
@ -4931,7 +4905,7 @@ void ReplicationCoordinatorImpl::_fulfillTopologyChangePromise(WithLock lock) {
|
|||
// requests that are waiting on a horizon that does not exist in the new config. Otherwise,
|
||||
// reply with an updated hello response.
|
||||
const auto& reverseHostMappings =
|
||||
_rsConfig.getConfig(lock).getMemberAt(_selfIndex).getHorizonReverseHostMappings();
|
||||
_rsConfig.unsafePeek().getMemberAt(_selfIndex).getHorizonReverseHostMappings();
|
||||
for (const auto& [sni, promise] : _sniToValidConfigPromiseMap) {
|
||||
const auto iter = reverseHostMappings.find(sni);
|
||||
if (!sni.empty() && iter == end(reverseHostMappings)) {
|
||||
|
|
@ -4974,7 +4948,7 @@ ReplicationCoordinatorImpl::_updateMemberStateFromTopologyCoordinator(WithLock l
|
|||
// because in that case we have already said we cannot accept writes in the hello response
|
||||
// and explictly incremented the toplogy version.
|
||||
ON_BLOCK_EXIT([&] {
|
||||
if (_rsConfig.getConfig(lk).isInitialized() && !_stepDownPending) {
|
||||
if (_rsConfig.unsafePeek().isInitialized() && !_stepDownPending) {
|
||||
_fulfillTopologyChangePromise(lk);
|
||||
}
|
||||
});
|
||||
|
|
@ -5151,13 +5125,13 @@ void ReplicationCoordinatorImpl::CatchupState::start_inlock(WithLock lk) {
|
|||
_numCatchUpOps = 0;
|
||||
|
||||
// No catchup in single node replica set.
|
||||
if (_repl->_rsConfig.getConfig(lk).getNumMembers() == 1) {
|
||||
if (_repl->_rsConfig.unsafePeek().getNumMembers() == 1) {
|
||||
LOGV2(6015304, "Skipping primary catchup since we are the only node in the replica set.");
|
||||
abort_inlock(PrimaryCatchUpConclusionReason::kSkipped);
|
||||
return;
|
||||
}
|
||||
|
||||
auto catchupTimeout = _repl->_rsConfig.getConfig(lk).getCatchUpTimeoutPeriod();
|
||||
auto catchupTimeout = _repl->_rsConfig.unsafePeek().getCatchUpTimeoutPeriod();
|
||||
|
||||
// When catchUpTimeoutMillis is 0, we skip doing catchup entirely.
|
||||
if (catchupTimeout == ReplSetConfig::kCatchUpDisabled) {
|
||||
|
|
@ -5366,9 +5340,9 @@ ReplicationCoordinatorImpl::_setCurrentRSConfig(WithLock lk,
|
|||
// updateConfig() can change terms, so update our term shadow to match.
|
||||
_termShadow.store(_topCoord->getTerm());
|
||||
|
||||
const ReplSetConfig oldConfig = _rsConfig.getConfig(lk);
|
||||
_rsConfig.setConfig(std::make_shared<ReplSetConfig>(newConfig));
|
||||
_protVersion.store(_rsConfig.getConfig(lk).getProtocolVersion());
|
||||
const ReplSetConfig oldConfig = _rsConfig.unsafePeek();
|
||||
_rsConfig.update(std::make_shared<ReplSetConfig>(newConfig));
|
||||
_protVersion.store(_rsConfig.unsafePeek().getProtocolVersion());
|
||||
|
||||
if (!oldConfig.isInitialized()) {
|
||||
// We allow the IDWC to be set only once after initial configuration is loaded.
|
||||
|
|
@ -5465,13 +5439,12 @@ ReplicationCoordinatorImpl::_setCurrentRSConfig(WithLock lk,
|
|||
// If the SplitHorizon has changed, reply to all waiting hellos with an error.
|
||||
_errorOnPromisesIfHorizonChanged(lk, opCtx, oldConfig, newConfig, _selfIndex, myIndex);
|
||||
|
||||
LOGV2(21392, "New replica set config in use", "config"_attr = _rsConfig.getConfig(lk).toBSON());
|
||||
LOGV2(21392, "New replica set config in use", "config"_attr = _rsConfig.unsafePeek().toBSON());
|
||||
_selfIndex = myIndex;
|
||||
if (_selfIndex >= 0) {
|
||||
LOGV2(21393,
|
||||
"Found self in config",
|
||||
"hostAndPort"_attr =
|
||||
_rsConfig.getConfig(lk).getMemberAt(_selfIndex).getHostAndPort());
|
||||
"hostAndPort"_attr = _rsConfig.unsafePeek().getMemberAt(_selfIndex).getHostAndPort());
|
||||
} else {
|
||||
LOGV2(21394, "This node is not a member of the config");
|
||||
}
|
||||
|
|
@ -5585,7 +5558,7 @@ bool ReplicationCoordinatorImpl::buildsIndexes() {
|
|||
if (_selfIndex == -1) {
|
||||
return true;
|
||||
}
|
||||
const MemberConfig& self = _rsConfig.getConfig(lk).getMemberAt(_selfIndex);
|
||||
const MemberConfig& self = _rsConfig.unsafePeek().getMemberAt(_selfIndex);
|
||||
return self.shouldBuildIndexes();
|
||||
}
|
||||
|
||||
|
|
@ -5608,7 +5581,7 @@ Status ReplicationCoordinatorImpl::_checkIfWriteConcernCanBeSatisfied_inlock(
|
|||
"No replication enabled when checking if write concern can be satisfied");
|
||||
}
|
||||
|
||||
return _rsConfig.getConfig(lk).checkIfWriteConcernCanBeSatisfied(writeConcern);
|
||||
return _rsConfig.unsafePeek().checkIfWriteConcernCanBeSatisfied(writeConcern);
|
||||
}
|
||||
|
||||
Status ReplicationCoordinatorImpl::checkIfCommitQuorumCanBeSatisfied(
|
||||
|
|
@ -5630,7 +5603,7 @@ Status ReplicationCoordinatorImpl::_checkIfCommitQuorumCanBeSatisfied(
|
|||
}
|
||||
|
||||
WriteConcernOptions ReplicationCoordinatorImpl::getGetLastErrorDefault() {
|
||||
auto rsc = _rsConfig.getConfig();
|
||||
auto rsc = _getReplSetConfig();
|
||||
if (rsc.isInitialized()) {
|
||||
return rsc.getDefaultWriteConcern();
|
||||
}
|
||||
|
|
@ -5672,7 +5645,7 @@ ReadPreference ReplicationCoordinatorImpl::_getSyncSourceReadPreference(WithLock
|
|||
} catch (const DBException& e) {
|
||||
fassertFailedWithStatus(3873100, e.toStatus());
|
||||
}
|
||||
} else if (_rsConfig.getConfig(lk).getMemberAt(_selfIndex).getNumVotes() > 0) {
|
||||
} else if (_rsConfig.unsafePeek().getMemberAt(_selfIndex).getNumVotes() > 0) {
|
||||
// Voting nodes prefer to sync from the primary. A voting node that is initial syncing
|
||||
// may have acknowledged writes which are part of the set's write majority; if it then
|
||||
// resyncs from a node which does not have those writes, and (before it replicates them
|
||||
|
|
@ -5683,7 +5656,7 @@ ReadPreference ReplicationCoordinatorImpl::_getSyncSourceReadPreference(WithLock
|
|||
}
|
||||
}
|
||||
if (!parsedSyncSourceFromInitialSync && !memberState.primary() &&
|
||||
!_rsConfig.getConfig(lk).isChainingAllowed() &&
|
||||
!_rsConfig.unsafePeek().isChainingAllowed() &&
|
||||
!enableOverrideClusterChainingSetting.load()) {
|
||||
// If we are not the primary and chaining is disabled in the config (without overrides), we
|
||||
// should only be syncing from the primary.
|
||||
|
|
@ -5710,7 +5683,7 @@ HostAndPort ReplicationCoordinatorImpl::chooseNewSyncSource(const OpTime& lastOp
|
|||
// of other members's state, allowing us to make informed sync source decisions.
|
||||
if (newSyncSource.empty() && !oldSyncSource.empty() && _selfIndex >= 0 &&
|
||||
!_getMemberState_inlock().primary()) {
|
||||
_restartScheduledHeartbeats_inlock(_rsConfig.getConfig(lk).getReplSetName().toString());
|
||||
_restartScheduledHeartbeats_inlock(_rsConfig.unsafePeek().getReplSetName().toString());
|
||||
}
|
||||
|
||||
return newSyncSource;
|
||||
|
|
@ -5768,7 +5741,7 @@ ChangeSyncSourceAction ReplicationCoordinatorImpl::shouldChangeSyncSource(
|
|||
// Drop the last batch of message following a change of replica set due to a shard split.
|
||||
LOGV2(6394902,
|
||||
"Choosing new sync source because we left the replica set due to a shard split.",
|
||||
"currentReplicaSetId"_attr = _rsConfig.getConfig().getReplicaSetId(),
|
||||
"currentReplicaSetId"_attr = _getReplSetConfig().getReplicaSetId(),
|
||||
"otherReplicaSetId"_attr = replMetadata.getReplicaSetId());
|
||||
return ChangeSyncSourceAction::kStopSyncingAndDropLastBatchIfPresent;
|
||||
}
|
||||
|
|
@ -6055,12 +6028,12 @@ Status ReplicationCoordinatorImpl::processReplSetRequestVotes(
|
|||
}
|
||||
|
||||
const int candidateIndex = args.getCandidateIndex();
|
||||
if (candidateIndex < 0 || candidateIndex >= _rsConfig.getConfig(lk).getNumMembers()) {
|
||||
if (candidateIndex < 0 || candidateIndex >= _rsConfig.unsafePeek().getNumMembers()) {
|
||||
return Status(ErrorCodes::BadValue,
|
||||
str::stream()
|
||||
<< "Invalid candidateIndex: " << candidateIndex
|
||||
<< ". Must be between 0 and "
|
||||
<< _rsConfig.getConfig(lk).getNumMembers() - 1 << " inclusive");
|
||||
<< _rsConfig.unsafePeek().getNumMembers() - 1 << " inclusive");
|
||||
}
|
||||
|
||||
if (_selfIndex == -1) {
|
||||
|
|
@ -6075,14 +6048,14 @@ Status ReplicationCoordinatorImpl::processReplSetRequestVotes(
|
|||
const long long electionTerm = args.getTerm();
|
||||
const Date_t lastVoteDate = _replExecutor->now();
|
||||
const int electionCandidateMemberId =
|
||||
_rsConfig.getConfig(lk).getMemberAt(candidateIndex).getId().getData();
|
||||
_rsConfig.unsafePeek().getMemberAt(candidateIndex).getId().getData();
|
||||
const std::string voteReason = response->getReason();
|
||||
const OpTime lastWrittenOpTime = _topCoord->getMyLastWrittenOpTime();
|
||||
const OpTime maxWrittenOpTime = _topCoord->latestKnownWrittenOpTime();
|
||||
const OpTime lastAppliedOpTime = _topCoord->getMyLastAppliedOpTime();
|
||||
const OpTime maxAppliedOpTime = _topCoord->latestKnownAppliedOpTime();
|
||||
const double priorityAtElection =
|
||||
_rsConfig.getConfig(lk).getMemberAt(_selfIndex).getPriority();
|
||||
_rsConfig.unsafePeek().getMemberAt(_selfIndex).getPriority();
|
||||
ReplicationMetrics::get(getServiceContext())
|
||||
.setElectionParticipantMetrics(votedForCandidate,
|
||||
electionTerm,
|
||||
|
|
@ -6163,7 +6136,7 @@ bool ReplicationCoordinatorImpl::getWriteConcernMajorityShouldJournal() {
|
|||
}
|
||||
|
||||
bool ReplicationCoordinatorImpl::getWriteConcernMajorityShouldJournal_inlock(WithLock lk) const {
|
||||
return _rsConfig.getConfig(lk).getWriteConcernMajorityShouldJournal();
|
||||
return _rsConfig.unsafePeek().getWriteConcernMajorityShouldJournal();
|
||||
}
|
||||
|
||||
namespace {
|
||||
|
|
@ -6199,7 +6172,7 @@ Status ReplicationCoordinatorImpl::processHeartbeatV1(const ReplSetHeartbeatArgs
|
|||
|
||||
Status result(ErrorCodes::InternalError, "didn't set status in prepareHeartbeatResponse");
|
||||
|
||||
auto rsc = _rsConfig.getConfig(lk);
|
||||
auto rsc = _rsConfig.unsafePeek();
|
||||
|
||||
std::string replSetName = [&]() {
|
||||
if (_settings.shouldAutoInitiate()) {
|
||||
|
|
@ -6608,7 +6581,7 @@ int64_t ReplicationCoordinatorImpl::_nextRandomInt64_inlock(int64_t limit) {
|
|||
}
|
||||
|
||||
bool ReplicationCoordinatorImpl::setContainsArbiter() const {
|
||||
return _rsConfig.getConfig().containsArbiter();
|
||||
return _getReplSetConfig().containsArbiter();
|
||||
}
|
||||
|
||||
void ReplicationCoordinatorImpl::ReadWriteAbility::setCanAcceptNonLocalWrites(
|
||||
|
|
@ -6678,7 +6651,7 @@ void ReplicationCoordinatorImpl::_validateDefaultWriteConcernOnShardStartup(With
|
|||
// shard node for upgrading or any other reason, sharding initialization happens before
|
||||
// config initialization.
|
||||
if (_wasCWWCSetOnConfigServerOnStartup && !_wasCWWCSetOnConfigServerOnStartup.value() &&
|
||||
!_rsConfig.getConfig(lk).isImplicitDefaultWriteConcernMajority()) {
|
||||
!_rsConfig.unsafePeek().isImplicitDefaultWriteConcernMajority()) {
|
||||
auto msg =
|
||||
"Cannot start shard because the implicit default write concern on this shard is "
|
||||
"set to {w : 1}, since the number of writable voting members is not strictly more "
|
||||
|
|
|
|||
|
|
@ -112,6 +112,7 @@
|
|||
#include "mongo/util/string_map.h"
|
||||
#include "mongo/util/time_support.h"
|
||||
#include "mongo/util/uuid.h"
|
||||
#include "mongo/util/versioned_value.h"
|
||||
|
||||
namespace mongo {
|
||||
|
||||
|
|
@ -682,29 +683,6 @@ public:
|
|||
|
||||
boost::optional<UUID> getInitialSyncId(OperationContext* opCtx) override;
|
||||
|
||||
class SharedReplSetConfig {
|
||||
public:
|
||||
struct Lease {
|
||||
uint64_t version = 0;
|
||||
std::shared_ptr<ReplSetConfig> config;
|
||||
};
|
||||
|
||||
SharedReplSetConfig();
|
||||
Lease renew() const;
|
||||
bool isStale(const Lease& lease) const;
|
||||
ReplSetConfig& getConfig() const;
|
||||
// This must be called while holding a lock on the
|
||||
// ReplicationCoordinatorImpl _mutex. Unlike getConfig(), it does not
|
||||
// provide any locking of its own.
|
||||
ReplSetConfig& getConfig(WithLock lk) const;
|
||||
void setConfig(std::shared_ptr<ReplSetConfig> newConfig);
|
||||
|
||||
private:
|
||||
mutable WriteRarelyRWMutex _rwMutex;
|
||||
Atomic<uint64_t> _version;
|
||||
std::shared_ptr<ReplSetConfig> _current;
|
||||
};
|
||||
|
||||
private:
|
||||
using CallbackFn = executor::TaskExecutor::CallbackFn;
|
||||
|
||||
|
|
@ -1826,6 +1804,12 @@ private:
|
|||
*/
|
||||
bool _isCollectionReplicated(OperationContext* opCtx, const NamespaceStringOrUUID& nsOrUUID);
|
||||
|
||||
/**
|
||||
* Returns the latest configuration without acquiring `_mutex`. Internally, it reads the config
|
||||
* from a thread-local cache. The config is refreshed to the latest if stale.
|
||||
*/
|
||||
const ReplSetConfig& _getReplSetConfig() const;
|
||||
|
||||
//
|
||||
// All member variables are labeled with one of the following codes indicating the
|
||||
// synchronization rules for accessing them.
|
||||
|
|
@ -1914,7 +1898,7 @@ private:
|
|||
// An instance for getting a lease on the current ReplicaSet
|
||||
// configuration object, including the information about tag groups that is
|
||||
// used to satisfy write concern requests with named gle modes.
|
||||
SharedReplSetConfig _rsConfig; // (S)
|
||||
mutable VersionedValue<ReplSetConfig, WriteRarelyRWMutex> _rsConfig; // (S)
|
||||
|
||||
// This member's index position in the current config.
|
||||
int _selfIndex; // (M)
|
||||
|
|
|
|||
|
|
@ -143,7 +143,7 @@ ReplicationCoordinatorImpl::ElectionState::_startVoteRequester(WithLock lk,
|
|||
int primaryIndex) {
|
||||
_voteRequester.reset(new VoteRequester);
|
||||
return _voteRequester->start(_replExecutor,
|
||||
_repl->_rsConfig.getConfig(lk),
|
||||
_repl->_rsConfig.unsafePeek(),
|
||||
_repl->_selfIndex,
|
||||
term,
|
||||
dryRun,
|
||||
|
|
@ -207,7 +207,7 @@ void ReplicationCoordinatorImpl::ElectionState::start(WithLock lk, StartElection
|
|||
}
|
||||
_electionDryRunFinishedEvent = dryRunFinishedEvent;
|
||||
|
||||
invariant(_repl->_rsConfig.getConfig(lk).getMemberAt(_repl->_selfIndex).isElectable());
|
||||
invariant(_repl->_rsConfig.unsafePeek().getMemberAt(_repl->_selfIndex).isElectable());
|
||||
const auto lastWrittenOpTime = _repl->_getMyLastWrittenOpTime_inlock();
|
||||
const auto lastAppliedOpTime = _repl->_getMyLastAppliedOpTime_inlock();
|
||||
|
||||
|
|
@ -300,7 +300,7 @@ void ReplicationCoordinatorImpl::ElectionState::_processDryRunResult(
|
|||
void ReplicationCoordinatorImpl::ElectionState::_startRealElection(WithLock lk,
|
||||
long long newTerm,
|
||||
StartElectionReasonEnum reason) {
|
||||
const auto& rsConfig = _repl->_rsConfig.getConfig(lk);
|
||||
const auto& rsConfig = _repl->_rsConfig.unsafePeek();
|
||||
const auto selfIndex = _repl->_selfIndex;
|
||||
|
||||
const Date_t now = _replExecutor->now();
|
||||
|
|
|
|||
|
|
@ -128,7 +128,7 @@ using executor::RemoteCommandRequest;
|
|||
|
||||
long long ReplicationCoordinatorImpl::_getElectionOffsetUpperBound_inlock(WithLock lk) {
|
||||
long long electionTimeout =
|
||||
durationCount<Milliseconds>(_rsConfig.getConfig(lk).getElectionTimeoutPeriod());
|
||||
durationCount<Milliseconds>(_rsConfig.unsafePeek().getElectionTimeoutPeriod());
|
||||
return electionTimeout * _externalState->getElectionTimeoutOffsetLimitFraction();
|
||||
}
|
||||
|
||||
|
|
@ -208,7 +208,7 @@ void ReplicationCoordinatorImpl::handleHeartbeatResponse_forTest(BSONObj respons
|
|||
{
|
||||
stdx::unique_lock<Latch> lk(_mutex);
|
||||
|
||||
ReplSetConfig rsc = _rsConfig.getConfig(lk);
|
||||
ReplSetConfig rsc = _rsConfig.unsafePeek();
|
||||
request.target = rsc.getMemberAt(targetIndex).getHostAndPort();
|
||||
|
||||
StringData replSetName = rsc.getReplSetName();
|
||||
|
|
@ -279,12 +279,12 @@ void ReplicationCoordinatorImpl::_handleHeartbeatResponse(
|
|||
"target"_attr = target,
|
||||
"response"_attr = resp);
|
||||
|
||||
if (responseStatus.isOK() && _rsConfig.getConfig(lk).isInitialized() &&
|
||||
_rsConfig.getConfig(lk).getReplSetName() != hbResponse.getReplicaSetName()) {
|
||||
if (responseStatus.isOK() && _rsConfig.unsafePeek().isInitialized() &&
|
||||
_rsConfig.unsafePeek().getReplSetName() != hbResponse.getReplicaSetName()) {
|
||||
responseStatus =
|
||||
Status(ErrorCodes::InconsistentReplicaSetNames,
|
||||
str::stream() << "replica set names do not match, ours: "
|
||||
<< _rsConfig.getConfig(lk).getReplSetName()
|
||||
<< _rsConfig.unsafePeek().getReplSetName()
|
||||
<< "; remote node's: " << hbResponse.getReplicaSetName());
|
||||
// Ignore metadata.
|
||||
replMetadata = responseStatus;
|
||||
|
|
@ -296,14 +296,13 @@ void ReplicationCoordinatorImpl::_handleHeartbeatResponse(
|
|||
// configuration version. A heartbeat reconfiguration would not take place in that case.
|
||||
// Additionally, this is where we would stop further processing of the metadata from an
|
||||
// unknown replica set.
|
||||
if (replMetadata.isOK() && _rsConfig.getConfig(lk).isInitialized() &&
|
||||
_rsConfig.getConfig(lk).hasReplicaSetId() &&
|
||||
if (replMetadata.isOK() && _rsConfig.unsafePeek().isInitialized() &&
|
||||
_rsConfig.unsafePeek().hasReplicaSetId() &&
|
||||
replMetadata.getValue().getReplicaSetId().isSet() &&
|
||||
_rsConfig.getConfig(lk).getReplicaSetId() !=
|
||||
replMetadata.getValue().getReplicaSetId()) {
|
||||
_rsConfig.unsafePeek().getReplicaSetId() != replMetadata.getValue().getReplicaSetId()) {
|
||||
responseStatus = Status(ErrorCodes::InvalidReplicaSetConfig,
|
||||
str::stream() << "replica set IDs do not match, ours: "
|
||||
<< _rsConfig.getConfig(lk).getReplicaSetId()
|
||||
<< _rsConfig.unsafePeek().getReplicaSetId()
|
||||
<< "; remote node's: "
|
||||
<< replMetadata.getValue().getReplicaSetId());
|
||||
// Ignore metadata.
|
||||
|
|
@ -329,9 +328,9 @@ void ReplicationCoordinatorImpl::_handleHeartbeatResponse(
|
|||
|
||||
// Arbiters are always expected to report null durable optimes (and wall times).
|
||||
// If that is not the case here, make sure to correct these times before ingesting them.
|
||||
auto memberInConfig = _rsConfig.getConfig(lk).findMemberByHostAndPort(target);
|
||||
auto memberInConfig = _rsConfig.unsafePeek().findMemberByHostAndPort(target);
|
||||
if ((hbResponse.hasState() && hbResponse.getState().arbiter()) ||
|
||||
(_rsConfig.getConfig(lk).isInitialized() && memberInConfig &&
|
||||
(_rsConfig.unsafePeek().isInitialized() && memberInConfig &&
|
||||
memberInConfig->isArbiter())) {
|
||||
if (hbResponse.hasDurableOpTime() &&
|
||||
(!hbResponse.getDurableOpTime().isNull() ||
|
||||
|
|
@ -403,10 +402,10 @@ void ReplicationCoordinatorImpl::_handleHeartbeatResponse(
|
|||
auto remoteState = hbStatusResponse.getValue().getState();
|
||||
if (remoteState == MemberState::RS_SECONDARY || remoteState == MemberState::RS_RECOVERING ||
|
||||
remoteState == MemberState::RS_ROLLBACK) {
|
||||
const auto mem = _rsConfig.getConfig(lk).findMemberByHostAndPort(target);
|
||||
const auto mem = _rsConfig.unsafePeek().findMemberByHostAndPort(target);
|
||||
if (mem && mem->isNewlyAdded()) {
|
||||
const auto memId = mem->getId();
|
||||
const auto configVersion = _rsConfig.getConfig(lk).getConfigVersionAndTerm();
|
||||
const auto configVersion = _rsConfig.unsafePeek().getConfigVersionAndTerm();
|
||||
auto status = _replExecutor->scheduleWork(
|
||||
[=, this](const executor::TaskExecutor::CallbackArgs& cbData) {
|
||||
_reconfigToRemoveNewlyAddedField(cbData, memId, configVersion);
|
||||
|
|
@ -454,7 +453,7 @@ stdx::unique_lock<Latch> ReplicationCoordinatorImpl::_handleHeartbeatResponseAct
|
|||
const StatusWith<ReplSetHeartbeatResponse>& responseStatus,
|
||||
stdx::unique_lock<Latch> lock) {
|
||||
invariant(lock.owns_lock());
|
||||
auto rsc = _rsConfig.getConfig(lock);
|
||||
auto rsc = _rsConfig.unsafePeek();
|
||||
switch (action.getAction()) {
|
||||
case HeartbeatResponseAction::NoAction:
|
||||
// Update the cached member state if different than the current topology member state
|
||||
|
|
@ -698,7 +697,7 @@ void ReplicationCoordinatorImpl::_scheduleHeartbeatReconfig(WithLock lk,
|
|||
}
|
||||
|
||||
_setConfigState_inlock(kConfigHBReconfiguring);
|
||||
auto rsc = _rsConfig.getConfig(lk);
|
||||
auto rsc = _rsConfig.unsafePeek();
|
||||
invariant(!rsc.isInitialized() ||
|
||||
rsc.getConfigVersionAndTerm() < newConfig.getConfigVersionAndTerm() ||
|
||||
_selfIndex < 0);
|
||||
|
|
@ -730,7 +729,7 @@ void ReplicationCoordinatorImpl::_scheduleHeartbeatReconfig(WithLock lk,
|
|||
isSplitRecipientConfig](Status status) {
|
||||
if (!status.isOK()) {
|
||||
stdx::lock_guard<Latch> lg(_mutex);
|
||||
_setConfigState_inlock(!_rsConfig.getConfig(lg).isInitialized()
|
||||
_setConfigState_inlock(!_rsConfig.unsafePeek().isInitialized()
|
||||
? kConfigUninitialized
|
||||
: kConfigSteady);
|
||||
return;
|
||||
|
|
@ -753,7 +752,7 @@ std::tuple<StatusWith<ReplSetConfig>, bool> ReplicationCoordinatorImpl::_resolve
|
|||
}
|
||||
|
||||
stdx::unique_lock<Latch> lk(_mutex);
|
||||
auto rsc = _rsConfig.getConfig(lk);
|
||||
auto rsc = _rsConfig.unsafePeek();
|
||||
if (!rsc.isInitialized()) {
|
||||
// Unlock the lock because isSelf performs network I/O.
|
||||
lk.unlock();
|
||||
|
|
@ -809,7 +808,7 @@ void ReplicationCoordinatorImpl::_heartbeatReconfigStore(
|
|||
"config"_attr = newConfig);
|
||||
}
|
||||
|
||||
auto rsc = _rsConfig.getConfig();
|
||||
auto rsc = _getReplSetConfig();
|
||||
|
||||
const auto myIndex = [&]() -> StatusWith<int> {
|
||||
// We always check the config when _selfIndex is not valid, in order to be able to
|
||||
|
|
@ -1012,7 +1011,7 @@ void ReplicationCoordinatorImpl::_heartbeatReconfigFinish(
|
|||
|
||||
boost::optional<AutoGetRstlForStepUpStepDown> arsd;
|
||||
stdx::unique_lock<Latch> lk(_mutex);
|
||||
auto rsc = _rsConfig.getConfig(lk);
|
||||
auto rsc = _rsConfig.unsafePeek();
|
||||
if (_shouldStepDownOnReconfig(lk, newConfig, myIndex)) {
|
||||
_topCoord->prepareForUnconditionalStepDown();
|
||||
lk.unlock();
|
||||
|
|
@ -1154,7 +1153,7 @@ void ReplicationCoordinatorImpl::_cancelHeartbeats_inlock() {
|
|||
void ReplicationCoordinatorImpl::restartScheduledHeartbeats_forTest() {
|
||||
stdx::unique_lock<Latch> lk(_mutex);
|
||||
invariant(getTestCommandsEnabled());
|
||||
_restartScheduledHeartbeats_inlock(_rsConfig.getConfig(lk).getReplSetName().toString());
|
||||
_restartScheduledHeartbeats_inlock(_rsConfig.unsafePeek().getReplSetName().toString());
|
||||
};
|
||||
|
||||
void ReplicationCoordinatorImpl::_restartScheduledHeartbeats_inlock(
|
||||
|
|
@ -1188,7 +1187,7 @@ void ReplicationCoordinatorImpl::_startHeartbeats_inlock(WithLock lk) {
|
|||
const Date_t now = _replExecutor->now();
|
||||
_seedList.clear();
|
||||
|
||||
auto rsc = _rsConfig.getConfig(lk);
|
||||
auto rsc = _rsConfig.unsafePeek();
|
||||
for (int i = 0; i < rsc.getNumMembers(); ++i) {
|
||||
if (i == _selfIndex) {
|
||||
continue;
|
||||
|
|
@ -1236,7 +1235,7 @@ void ReplicationCoordinatorImpl::_scheduleNextLivenessUpdate_inlock(WithLock lk,
|
|||
return;
|
||||
}
|
||||
|
||||
auto nextTimeout = earliestDate + _rsConfig.getConfig(lk).getElectionTimeoutPeriod();
|
||||
auto nextTimeout = earliestDate + _rsConfig.unsafePeek().getElectionTimeoutPeriod();
|
||||
LOGV2_DEBUG(21483, 3, "Scheduling next check", "nextTimeout"_attr = nextTimeout);
|
||||
|
||||
// It is possible we will schedule the next timeout in the past.
|
||||
|
|
@ -1293,7 +1292,7 @@ void ReplicationCoordinatorImpl::_cancelAndRescheduleElectionTimeout_inlock(With
|
|||
const bool wasActive = oldWhen != Date_t();
|
||||
auto now = _replExecutor->now();
|
||||
const bool doNotReschedule = _inShutdown || !_memberState.secondary() || _selfIndex < 0 ||
|
||||
!_rsConfig.getConfig(lk).getMemberAt(_selfIndex).isElectable();
|
||||
!_rsConfig.unsafePeek().getMemberAt(_selfIndex).isElectable();
|
||||
|
||||
if (doNotReschedule || !wasActive || (now - logThrottleTime) >= Seconds(1)) {
|
||||
cancelAndRescheduleLogLevel = 4;
|
||||
|
|
@ -1311,7 +1310,7 @@ void ReplicationCoordinatorImpl::_cancelAndRescheduleElectionTimeout_inlock(With
|
|||
return;
|
||||
|
||||
Milliseconds upperBound = Milliseconds(_getElectionOffsetUpperBound_inlock(lk));
|
||||
auto requestedWhen = now + _rsConfig.getConfig(lk).getElectionTimeoutPeriod();
|
||||
auto requestedWhen = now + _rsConfig.unsafePeek().getElectionTimeoutPeriod();
|
||||
invariant(requestedWhen > now);
|
||||
Status delayStatus =
|
||||
_handleElectionTimeoutCallback.delayUntilWithJitter(requestedWhen, upperBound);
|
||||
|
|
@ -1411,7 +1410,7 @@ void ReplicationCoordinatorImpl::_startElectSelfIfEligibleV1(WithLock lk,
|
|||
4615652,
|
||||
0,
|
||||
"Starting an election, since we've seen no PRIMARY in election timeout period",
|
||||
"electionTimeoutPeriod"_attr = _rsConfig.getConfig(lk).getElectionTimeoutPeriod());
|
||||
"electionTimeoutPeriod"_attr = _rsConfig.unsafePeek().getElectionTimeoutPeriod());
|
||||
break;
|
||||
case StartElectionReasonEnum::kPriorityTakeover:
|
||||
LOGV2_FOR_ELECTION(4615660, 0, "Starting an election for a priority takeover");
|
||||
|
|
|
|||
|
|
@ -154,62 +154,6 @@ std::shared_ptr<const repl::HelloResponse> awaitHelloWithNewOpCtx(
|
|||
return replCoord->awaitHelloResponse(newOpCtx.get(), horizonParams, topologyVersion, deadline);
|
||||
}
|
||||
|
||||
TEST_F(ReplCoordTest, LeaseIsntStaleIfConfigHasntChanged) {
|
||||
auto rsConfig = ReplicationCoordinatorImpl::SharedReplSetConfig();
|
||||
auto lease = rsConfig.renew();
|
||||
ASSERT_FALSE(rsConfig.isStale(lease));
|
||||
lease = rsConfig.renew();
|
||||
ASSERT_FALSE(rsConfig.isStale(lease));
|
||||
}
|
||||
|
||||
TEST_F(ReplCoordTest, LeaseIsStaleIfConfigHasChanged) {
|
||||
auto rsConfig = ReplicationCoordinatorImpl::SharedReplSetConfig();
|
||||
auto lease = rsConfig.renew();
|
||||
rsConfig.setConfig(std::make_shared<ReplSetConfig>(ReplSetConfig()));
|
||||
ASSERT_TRUE(rsConfig.isStale(lease));
|
||||
}
|
||||
|
||||
TEST_F(ReplCoordTest, LeaseRenewDoesntChangeVersionIfConfigHasntChanged) {
|
||||
auto rsConfig = ReplicationCoordinatorImpl::SharedReplSetConfig();
|
||||
auto lease = rsConfig.renew();
|
||||
ASSERT_EQUALS(1, lease.version);
|
||||
lease = rsConfig.renew();
|
||||
ASSERT_EQUALS(1, lease.version);
|
||||
}
|
||||
|
||||
TEST_F(ReplCoordTest, LeaseRenewChangesVersionIfConfigHasChanged) {
|
||||
auto rsConfig = ReplicationCoordinatorImpl::SharedReplSetConfig();
|
||||
auto lease = rsConfig.renew();
|
||||
ASSERT_EQUALS(1, lease.version);
|
||||
rsConfig.setConfig(std::make_shared<ReplSetConfig>(ReplSetConfig()));
|
||||
ASSERT_EQUALS(1, lease.version);
|
||||
lease = rsConfig.renew();
|
||||
ASSERT_EQUALS(2, lease.version);
|
||||
}
|
||||
|
||||
TEST_F(ReplCoordTest, LeaseRenewChangesConfigIfConfigHasChanged) {
|
||||
auto oldConfigObj = BSON("_id"
|
||||
<< "myOldSet"
|
||||
<< "version" << 1 << "protocolVersion" << 1 << "members"
|
||||
<< BSON_ARRAY(BSON("_id" << 0 << "host"
|
||||
<< "h1:1")));
|
||||
ReplSetConfig oldConfig = assertMakeRSConfig(oldConfigObj);
|
||||
auto rsConfig = ReplicationCoordinatorImpl::SharedReplSetConfig();
|
||||
rsConfig.setConfig(std::make_shared<ReplSetConfig>(oldConfig));
|
||||
auto lease = rsConfig.renew();
|
||||
ASSERT_EQUALS(oldConfig.getReplicaSetId(), lease.config->getReplicaSetId());
|
||||
auto newConfigObj = BSON("_id"
|
||||
<< "myNewSet"
|
||||
<< "version" << 1 << "protocolVersion" << 1 << "members"
|
||||
<< BSON_ARRAY(BSON("_id" << 0 << "host"
|
||||
<< "h1:1")));
|
||||
ReplSetConfig newConfig = assertMakeRSConfig(oldConfigObj);
|
||||
rsConfig.setConfig(std::make_shared<ReplSetConfig>(newConfig));
|
||||
ASSERT_EQUALS(oldConfig.getReplicaSetId(), lease.config->getReplicaSetId());
|
||||
lease = rsConfig.renew();
|
||||
ASSERT_EQUALS(newConfig.getReplicaSetId(), lease.config->getReplicaSetId());
|
||||
}
|
||||
|
||||
TEST_F(ReplCoordTest, IsWritablePrimaryFalseDuringStepdown) {
|
||||
BSONObj configObj = BSON("_id"
|
||||
<< "mySet"
|
||||
|
|
|
|||
|
|
@ -49,6 +49,7 @@
|
|||
#include "mongo/util/assert_util_core.h"
|
||||
#include "mongo/util/net/cidr.h"
|
||||
#include "mongo/util/version/releases.h"
|
||||
#include "mongo/util/versioned_value.h"
|
||||
|
||||
#ifdef _WIN32
|
||||
#include <winsock2.h>
|
||||
|
|
@ -119,7 +120,7 @@ struct ServerGlobalParams {
|
|||
std::string socket = "/tmp"; // UNIX domain socket directory
|
||||
|
||||
size_t maxConns = DEFAULT_MAX_CONN; // Maximum number of simultaneous open connections.
|
||||
std::vector<std::variant<CIDR, std::string>> maxConnsOverride;
|
||||
VersionedValue<std::vector<std::variant<CIDR, std::string>>> maxConnsOverride;
|
||||
int reservedAdminThreads = 0;
|
||||
|
||||
int unixSocketPermissions = DEFAULT_UNIX_PERMS; // permissions for the UNIX domain socket
|
||||
|
|
|
|||
|
|
@ -363,15 +363,18 @@ Status storeServerOptions(const moe::Environment& params) {
|
|||
}
|
||||
|
||||
if (params.count("net.maxIncomingConnectionsOverride")) {
|
||||
std::vector<std::variant<CIDR, std::string>> maxConnsOverride;
|
||||
auto ranges = params["net.maxIncomingConnectionsOverride"].as<std::vector<std::string>>();
|
||||
for (const auto& range : ranges) {
|
||||
auto swr = CIDR::parse(range);
|
||||
if (!swr.isOK()) {
|
||||
serverGlobalParams.maxConnsOverride.push_back(range);
|
||||
maxConnsOverride.push_back(range);
|
||||
} else {
|
||||
serverGlobalParams.maxConnsOverride.push_back(std::move(swr.getValue()));
|
||||
maxConnsOverride.push_back(std::move(swr.getValue()));
|
||||
}
|
||||
}
|
||||
serverGlobalParams.maxConnsOverride.update(
|
||||
std::make_shared<decltype(maxConnsOverride)>(std::move(maxConnsOverride)));
|
||||
}
|
||||
|
||||
if (params.count("net.reservedAdminThreads")) {
|
||||
|
|
|
|||
|
|
@ -112,6 +112,7 @@ tlEnv.Library(
|
|||
'asio/asio_transport_layer.cpp',
|
||||
'asio/asio_utils.cpp',
|
||||
'proxy_protocol_header_parser.cpp',
|
||||
"transport_options.cpp",
|
||||
'transport_options.idl',
|
||||
],
|
||||
LIBDEPS=[
|
||||
|
|
@ -125,6 +126,7 @@ tlEnv.Library(
|
|||
'$BUILD_DIR/mongo/db/commands/server_status_core',
|
||||
'$BUILD_DIR/mongo/db/server_base',
|
||||
'$BUILD_DIR/mongo/db/server_feature_flags',
|
||||
'$BUILD_DIR/mongo/db/server_options',
|
||||
'$BUILD_DIR/mongo/db/stats/counters',
|
||||
'$BUILD_DIR/mongo/s/common_s',
|
||||
'$BUILD_DIR/mongo/util/concurrency/spin_lock',
|
||||
|
|
|
|||
|
|
@ -101,7 +101,8 @@ void AsioSessionManager::appendStats(BSONObjBuilder* bob) const {
|
|||
// some sessions would have used the non-threaded ServiceExecutorFixed.
|
||||
// Currently all sessions are threaded, so this number is redundant.
|
||||
appendInt("threaded", sessionCount);
|
||||
if (!serverGlobalParams.maxConnsOverride.empty()) {
|
||||
auto maxConnsOverride = serverGlobalParams.maxConnsOverride.makeSnapshot();
|
||||
if (maxConnsOverride && !maxConnsOverride->empty()) {
|
||||
appendInt("limitExempt", serviceExecutorStats.limitExempt.load());
|
||||
}
|
||||
|
||||
|
|
|
|||
|
|
@ -37,6 +37,7 @@
|
|||
|
||||
#include "mongo/db/auth/restriction_environment.h"
|
||||
#include "mongo/db/multitenancy_gen.h"
|
||||
#include "mongo/db/server_options.h"
|
||||
#include "mongo/logv2/log.h"
|
||||
#include "mongo/platform/atomic_word.h"
|
||||
#include "mongo/platform/mutex.h"
|
||||
|
|
@ -52,6 +53,8 @@
|
|||
namespace mongo::transport {
|
||||
namespace {
|
||||
|
||||
thread_local decltype(ServerGlobalParams::maxConnsOverride)::Snapshot maxConnsOverride;
|
||||
|
||||
/** Some diagnostic data that we will want to log about a Client after its death. */
|
||||
struct ClientSummary {
|
||||
explicit ClientSummary(const Client* c)
|
||||
|
|
@ -237,8 +240,9 @@ void SessionManagerCommon::startSession(std::shared_ptr<Session> session) {
|
|||
invariant(session);
|
||||
IngressHandshakeMetrics::get(*session).onSessionStarted(_svcCtx->getTickSource());
|
||||
|
||||
serverGlobalParams.maxConnsOverride.refreshSnapshot(maxConnsOverride);
|
||||
const bool isPrivilegedSession =
|
||||
session->shouldOverrideMaxConns(serverGlobalParams.maxConnsOverride);
|
||||
maxConnsOverride && session->shouldOverrideMaxConns(*maxConnsOverride);
|
||||
const bool verbose = !quiet();
|
||||
|
||||
auto service = _svcCtx->getService();
|
||||
|
|
|
|||
|
|
@ -0,0 +1,103 @@
|
|||
/**
|
||||
* Copyright (C) 2024-present MongoDB, Inc.
|
||||
*
|
||||
* This program is free software: you can redistribute it and/or modify
|
||||
* it under the terms of the Server Side Public License, version 1,
|
||||
* as published by MongoDB, Inc.
|
||||
*
|
||||
* This program is distributed in the hope that it will be useful,
|
||||
* but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||||
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
||||
* Server Side Public License for more details.
|
||||
*
|
||||
* You should have received a copy of the Server Side Public License
|
||||
* along with this program. If not, see
|
||||
* <http://www.mongodb.com/licensing/server-side-public-license>.
|
||||
*
|
||||
* As a special exception, the copyright holders give permission to link the
|
||||
* code of portions of this program with the OpenSSL library under certain
|
||||
* conditions as described in each individual source file and distribute
|
||||
* linked combinations including the program with the OpenSSL library. You
|
||||
* must comply with the Server Side Public License in all respects for
|
||||
* all of the code used other than as permitted herein. If you modify file(s)
|
||||
* with this exception, you may extend this exception to your version of the
|
||||
* file(s), but you are not obligated to do so. If you do not wish to do so,
|
||||
* delete this exception statement from your version. If you delete this
|
||||
* exception statement from all source files in the program, then also delete
|
||||
* it in the license file.
|
||||
*/
|
||||
|
||||
#include <string>
|
||||
#include <variant>
|
||||
#include <vector>
|
||||
|
||||
#include "mongo/base/status.h"
|
||||
#include "mongo/base/string_data.h"
|
||||
#include "mongo/bson/bsonobjbuilder.h"
|
||||
#include "mongo/bson/json.h"
|
||||
#include "mongo/db/server_options.h"
|
||||
#include "mongo/idl/idl_parser.h"
|
||||
#include "mongo/transport/transport_options_gen.h"
|
||||
#include "mongo/util/net/cidr.h"
|
||||
#include "mongo/util/overloaded_visitor.h"
|
||||
|
||||
namespace mongo::transport {
|
||||
namespace {
|
||||
auto parseMaxIncomingConnectionsParameters(const BSONObj& obj) {
|
||||
IDLParserContext ctx("maxIncomingConnections");
|
||||
const auto params = MaxIncomingConnectionsParameters::parse(ctx, obj);
|
||||
std::vector<std::variant<CIDR, std::string>> output;
|
||||
for (const auto& range : params.getRanges()) {
|
||||
auto swr = CIDR::parse(range);
|
||||
if (!swr.isOK()) {
|
||||
output.push_back(range.toString());
|
||||
} else {
|
||||
output.push_back(std::move(swr.getValue()));
|
||||
}
|
||||
}
|
||||
return output;
|
||||
}
|
||||
|
||||
void updateMaxIncomingConnectionsOverride(BSONObj obj) {
|
||||
auto maxConnsOverride = parseMaxIncomingConnectionsParameters(obj);
|
||||
serverGlobalParams.maxConnsOverride.update(
|
||||
std::make_shared<decltype(maxConnsOverride)>(std::move(maxConnsOverride)));
|
||||
}
|
||||
} // namespace
|
||||
|
||||
void MaxIncomingConnectionsOverrideServerParameter::append(OperationContext*,
|
||||
BSONObjBuilder* bob,
|
||||
StringData name,
|
||||
const boost::optional<TenantId>&) {
|
||||
BSONObjBuilder subBob(bob->subobjStart(name));
|
||||
BSONArrayBuilder subArray(subBob.subarrayStart("ranges"_sd));
|
||||
auto snapshot = serverGlobalParams.maxConnsOverride.makeSnapshot();
|
||||
if (!snapshot)
|
||||
return;
|
||||
|
||||
for (const auto& range : *snapshot) {
|
||||
subArray.append(std::visit(OverloadedVisitor{
|
||||
[](const CIDR& arg) { return arg.toString(); },
|
||||
[](const std::string& arg) { return arg; },
|
||||
},
|
||||
range));
|
||||
}
|
||||
}
|
||||
|
||||
Status MaxIncomingConnectionsOverrideServerParameter::set(const BSONElement& value,
|
||||
const boost::optional<TenantId>&) try {
|
||||
updateMaxIncomingConnectionsOverride(value.Obj());
|
||||
return Status::OK();
|
||||
} catch (const AssertionException& e) {
|
||||
return e.toStatus();
|
||||
}
|
||||
|
||||
Status MaxIncomingConnectionsOverrideServerParameter::setFromString(
|
||||
StringData str, const boost::optional<TenantId>&) try {
|
||||
updateMaxIncomingConnectionsOverride(fromjson(str));
|
||||
return Status::OK();
|
||||
} catch (const AssertionException& e) {
|
||||
return e.toStatus();
|
||||
}
|
||||
|
||||
} // namespace mongo::transport
|
||||
|
|
@ -29,6 +29,18 @@
|
|||
global:
|
||||
cpp_namespace: "mongo::transport"
|
||||
|
||||
imports:
|
||||
- "mongo/db/basic_types.idl"
|
||||
|
||||
structs:
|
||||
MaxIncomingConnectionsParameters:
|
||||
description: "Represents CIDR ranges that are exempt from the maxIncomingConnections limit"
|
||||
strict: true
|
||||
fields:
|
||||
ranges:
|
||||
description: 'An array of exempted CIDR ranges'
|
||||
type: array<string>
|
||||
|
||||
server_parameters:
|
||||
# Options to configure inbound TFO connections.
|
||||
tcpFastOpenServer:
|
||||
|
|
@ -57,3 +69,12 @@ server_parameters:
|
|||
cpp_vartype: bool
|
||||
default: true
|
||||
redact: false
|
||||
|
||||
maxIncomingConnectionsOverride:
|
||||
description: 'CIDR ranges that are exempt from the maxIncomingConnections limit'
|
||||
set_at: runtime # Use the configuration option for setting this at startup.
|
||||
cpp_class:
|
||||
name: "MaxIncomingConnectionsOverrideServerParameter"
|
||||
# Expects the payload to be an instance of `MaxIncomingConnectionsParameters`.
|
||||
override_set: true
|
||||
redact: false
|
||||
|
|
|
|||
|
|
@ -337,6 +337,16 @@ env.CppUnitTest(
|
|||
],
|
||||
)
|
||||
|
||||
env.CppUnitTest(
|
||||
target="versioned_value_test",
|
||||
source=[
|
||||
"versioned_value_test.cpp",
|
||||
],
|
||||
LIBDEPS=[
|
||||
"$BUILD_DIR/mongo/base",
|
||||
],
|
||||
)
|
||||
|
||||
tcmallocAttrs = None
|
||||
for impl in [
|
||||
{
|
||||
|
|
|
|||
|
|
@ -0,0 +1,180 @@
|
|||
/**
|
||||
* Copyright (C) 2024-present MongoDB, Inc.
|
||||
*
|
||||
* This program is free software: you can redistribute it and/or modify
|
||||
* it under the terms of the Server Side Public License, version 1,
|
||||
* as published by MongoDB, Inc.
|
||||
*
|
||||
* This program is distributed in the hope that it will be useful,
|
||||
* but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||||
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
||||
* Server Side Public License for more details.
|
||||
*
|
||||
* You should have received a copy of the Server Side Public License
|
||||
* along with this program. If not, see
|
||||
* <http://www.mongodb.com/licensing/server-side-public-license>.
|
||||
*
|
||||
* As a special exception, the copyright holders give permission to link the
|
||||
* code of portions of this program with the OpenSSL library under certain
|
||||
* conditions as described in each individual source file and distribute
|
||||
* linked combinations including the program with the OpenSSL library. You
|
||||
* must comply with the Server Side Public License in all respects for
|
||||
* all of the code used other than as permitted herein. If you modify file(s)
|
||||
* with this exception, you may extend this exception to your version of the
|
||||
* file(s), but you are not obligated to do so. If you do not wish to do so,
|
||||
* delete this exception statement from your version. If you delete this
|
||||
* exception statement from all source files in the program, then also delete
|
||||
* it in the license file.
|
||||
*/
|
||||
|
||||
#pragma once
|
||||
|
||||
#include <memory>
|
||||
#include <shared_mutex>
|
||||
|
||||
#include "mongo/platform/atomic.h"
|
||||
#include "mongo/platform/compiler.h"
|
||||
#include "mongo/platform/rwmutex.h"
|
||||
#include "mongo/stdx/mutex.h"
|
||||
|
||||
namespace mongo {
|
||||
namespace versioned_value_detail {
|
||||
/**
|
||||
* The default policy that works for the most commonly used mutex types that we have anticipated.
|
||||
*/
|
||||
struct DefaultLockPolicy {
|
||||
auto makeSharedLock(stdx::mutex& m) const {
|
||||
return stdx::lock_guard(m);
|
||||
}
|
||||
|
||||
auto makeExclusiveLock(stdx::mutex& m) const {
|
||||
return stdx::lock_guard(m);
|
||||
}
|
||||
|
||||
auto makeSharedLock(RWMutex& m) const {
|
||||
return std::shared_lock(m); // NOLINT
|
||||
}
|
||||
|
||||
auto makeExclusiveLock(RWMutex& m) const {
|
||||
return stdx::lock_guard(m);
|
||||
}
|
||||
|
||||
auto makeSharedLock(WriteRarelyRWMutex& m) const {
|
||||
return m.readLock();
|
||||
}
|
||||
|
||||
auto makeExclusiveLock(WriteRarelyRWMutex& m) const {
|
||||
return m.writeLock();
|
||||
}
|
||||
};
|
||||
} // namespace versioned_value_detail
|
||||
|
||||
/**
|
||||
* The ideal synchronization primitive for values that are accessed frequently, but updated rarely.
|
||||
* So long as the value remains unchanged, readers can hold on to their snapshots and access the
|
||||
* value without acquiring any locks (see `Snapshot`). Readers must make a new snapshot once their
|
||||
* current snapshot gets stale -- i.e. `isStale(mySnapshot)` returns `true`.
|
||||
*
|
||||
* When updates to the value are synchronized via an external mutex, users can call into `peek` to
|
||||
* read the latest configuration without holding onto a snapshot.
|
||||
*
|
||||
* You can also bring your own `MutexType` for synchronizing reads and writes. You may need to
|
||||
* define a new `LockPolicy` if your custom `MutexType` is not covered by the default policy above.
|
||||
*/
|
||||
template <typename ValueType,
|
||||
typename MutexType = stdx::mutex,
|
||||
typename LockPolicy = versioned_value_detail::DefaultLockPolicy>
|
||||
class VersionedValue {
|
||||
public:
|
||||
using VersionType = uint64_t;
|
||||
|
||||
/**
|
||||
* Holds a consistent snapshot of the versioned value. Snapshots remain valid regardless of
|
||||
* future updates to the versioned value, and can be renewed to track the most recent version.
|
||||
*/
|
||||
class Snapshot {
|
||||
public:
|
||||
/**
|
||||
* Makes an empty snapshot -- empty snapshots are both stale and invalid (i.e. hold a null
|
||||
* reference and cannot be dereferenced).
|
||||
*/
|
||||
Snapshot() = default;
|
||||
|
||||
Snapshot(VersionType version, std::shared_ptr<ValueType> value)
|
||||
: _version(version), _value(std::move(value)) {}
|
||||
|
||||
VersionType version() const noexcept {
|
||||
return _version;
|
||||
}
|
||||
|
||||
const ValueType& operator*() const noexcept {
|
||||
invariant(!!_value, "Dereferencing an uninitialized snapshot!");
|
||||
return *_value;
|
||||
}
|
||||
|
||||
const ValueType* operator->() const noexcept {
|
||||
return _value.get();
|
||||
}
|
||||
|
||||
explicit operator bool() const noexcept {
|
||||
return !!_value;
|
||||
}
|
||||
|
||||
private:
|
||||
VersionType _version = 0;
|
||||
std::shared_ptr<ValueType> _value;
|
||||
};
|
||||
|
||||
VersionedValue() = default;
|
||||
|
||||
/**
|
||||
* Note that the initial version must always be greater than zero so that we can consider
|
||||
* default constructed snapshots as stale.
|
||||
*/
|
||||
explicit VersionedValue(std::shared_ptr<ValueType> initialValue)
|
||||
: _version(1), _current(std::move(initialValue)) {}
|
||||
|
||||
bool isCurrent(const Snapshot& snapshot) const noexcept {
|
||||
return _version.load() == snapshot.version();
|
||||
}
|
||||
|
||||
MONGO_COMPILER_NOINLINE Snapshot makeSnapshot() const {
|
||||
auto lk = _lockPolicy.makeSharedLock(_mutex);
|
||||
return {_version.load(), _current};
|
||||
}
|
||||
|
||||
void update(std::shared_ptr<ValueType> newValue) {
|
||||
auto lk = _lockPolicy.makeExclusiveLock(_mutex);
|
||||
_version.fetchAndAdd(1);
|
||||
_current = std::move(newValue);
|
||||
}
|
||||
|
||||
void refreshSnapshot(Snapshot& snapshot) const {
|
||||
if (MONGO_unlikely(!isCurrent(snapshot))) {
|
||||
snapshot = makeSnapshot();
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Peek at the current value without using any synchronization. This is only thread-safe if the
|
||||
* caller utilizes other means (e.g. a higher level lock) to block other threads from accessing
|
||||
* the underlying value (i.e. `_current`). Always prefer using snapshots to access the value,
|
||||
* unless there are clear performance gains from skipping synchronization.
|
||||
*/
|
||||
const ValueType& unsafePeek() const {
|
||||
invariant(_current, "Attempted to peek at uninitialized value!");
|
||||
return *_current;
|
||||
}
|
||||
|
||||
const std::shared_ptr<ValueType>& getValue_forTest() const {
|
||||
return _current;
|
||||
}
|
||||
|
||||
private:
|
||||
MONGO_COMPILER_NO_UNIQUE_ADDRESS LockPolicy _lockPolicy{};
|
||||
mutable MutexType _mutex; // NOLINT
|
||||
Atomic<VersionType> _version{0};
|
||||
std::shared_ptr<ValueType> _current;
|
||||
};
|
||||
|
||||
} // namespace mongo
|
||||
|
|
@ -0,0 +1,101 @@
|
|||
/**
|
||||
* Copyright (C) 2024-present MongoDB, Inc.
|
||||
*
|
||||
* This program is free software: you can redistribute it and/or modify
|
||||
* it under the terms of the Server Side Public License, version 1,
|
||||
* as published by MongoDB, Inc.
|
||||
*
|
||||
* This program is distributed in the hope that it will be useful,
|
||||
* but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||||
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
||||
* Server Side Public License for more details.
|
||||
*
|
||||
* You should have received a copy of the Server Side Public License
|
||||
* along with this program. If not, see
|
||||
* <http://www.mongodb.com/licensing/server-side-public-license>.
|
||||
*
|
||||
* As a special exception, the copyright holders give permission to link the
|
||||
* code of portions of this program with the OpenSSL library under certain
|
||||
* conditions as described in each individual source file and distribute
|
||||
* linked combinations including the program with the OpenSSL library. You
|
||||
* must comply with the Server Side Public License in all respects for
|
||||
* all of the code used other than as permitted herein. If you modify file(s)
|
||||
* with this exception, you may extend this exception to your version of the
|
||||
* file(s), but you are not obligated to do so. If you do not wish to do so,
|
||||
* delete this exception statement from your version. If you delete this
|
||||
* exception statement from all source files in the program, then also delete
|
||||
* it in the license file.
|
||||
*/
|
||||
|
||||
#include "mongo/util/versioned_value.h"
|
||||
|
||||
#include <memory>
|
||||
|
||||
#include "mongo/platform/atomic.h"
|
||||
#include "mongo/unittest/death_test.h"
|
||||
#include "mongo/unittest/unittest.h"
|
||||
|
||||
namespace mongo {
|
||||
namespace {
|
||||
|
||||
class VersionedValueTest : public unittest::Test {
|
||||
public:
|
||||
VersionedValueTest() {
|
||||
update();
|
||||
}
|
||||
|
||||
const auto& value() const {
|
||||
return _value;
|
||||
}
|
||||
|
||||
void update() {
|
||||
_value.update(std::make_shared<int>(_counter++));
|
||||
}
|
||||
|
||||
private:
|
||||
int _counter;
|
||||
VersionedValue<int> _value;
|
||||
};
|
||||
|
||||
TEST_F(VersionedValueTest, SnapshotIsNotStaleIfValueHasNotChanged) {
|
||||
auto snapshot = value().makeSnapshot();
|
||||
ASSERT_TRUE(value().isCurrent(snapshot));
|
||||
}
|
||||
|
||||
TEST_F(VersionedValueTest, SnapshotIsStaleIfValueHasChanged) {
|
||||
auto snapshot = value().makeSnapshot();
|
||||
update();
|
||||
ASSERT_FALSE(value().isCurrent(snapshot));
|
||||
}
|
||||
|
||||
TEST_F(VersionedValueTest, MakeSnapshotDoesNotChangeVersionIfValueHasNotChanged) {
|
||||
auto snapshot1 = value().makeSnapshot();
|
||||
auto snapshot2 = value().makeSnapshot();
|
||||
|
||||
ASSERT_EQ(snapshot1.version(), snapshot2.version());
|
||||
ASSERT_EQ(*snapshot1, *snapshot2);
|
||||
}
|
||||
|
||||
TEST_F(VersionedValueTest, MakeSnapshotRetrievesNewVersionAfterUpdate) {
|
||||
auto snapshot1 = value().makeSnapshot();
|
||||
update();
|
||||
auto snapshot2 = value().makeSnapshot();
|
||||
ASSERT_EQ(snapshot2.version(), snapshot1.version() + 1);
|
||||
ASSERT_EQ(*snapshot2, *snapshot1 + 1);
|
||||
}
|
||||
|
||||
TEST_F(VersionedValueTest, SnapshotRemainsValidAfterUpdate) {
|
||||
auto snapshot = value().makeSnapshot();
|
||||
const auto& valuePtr = value().getValue_forTest();
|
||||
ASSERT_EQ(valuePtr.use_count(), 2);
|
||||
update();
|
||||
ASSERT_EQ(valuePtr.use_count(), 1);
|
||||
}
|
||||
|
||||
DEATH_TEST_F(VersionedValueTest, CannotDereferenceUninitializedValue, "invariant") {
|
||||
VersionedValue<bool> value;
|
||||
*value.makeSnapshot();
|
||||
}
|
||||
|
||||
} // namespace
|
||||
} // namespace mongo
|
||||
Loading…
Reference in New Issue