mirror of https://github.com/mongodb/mongo
SERVER-114107 Revert SERVER-113363 (#44152)
GitOrigin-RevId: e17d35464e74e1066aad3d81e34b48ab7d7d2743
This commit is contained in:
parent
80b8487086
commit
80d797cf8e
|
|
@ -1127,7 +1127,6 @@ replication.replication_coordinator:
|
|||
- src/mongo/db/command_can_run_here*
|
||||
- src/mongo/db/repl/*always_allow_non_local_writes*
|
||||
- src/mongo/db/repl/*isself*
|
||||
- src/mongo/db/repl/data_with_lock_free_reads*
|
||||
- src/mongo/db/repl/replication_waiter_list_bm.cpp
|
||||
- src/mongo/db/modules/atlas/src/disagg_storage/replication_coordinator*
|
||||
|
||||
|
|
|
|||
|
|
@ -1280,13 +1280,6 @@ mongo_cc_library(
|
|||
],
|
||||
)
|
||||
|
||||
mongo_cc_library(
|
||||
name = "data_with_lock_free_reads",
|
||||
hdrs = [
|
||||
"data_with_lock_free_reads.h",
|
||||
],
|
||||
)
|
||||
|
||||
mongo_cc_library(
|
||||
name = "repl_coordinator_impl",
|
||||
srcs = [
|
||||
|
|
@ -1304,7 +1297,6 @@ mongo_cc_library(
|
|||
deps = [
|
||||
"auto_get_rstl_for_stepup_stepdown",
|
||||
"data_replicator_external_state_impl",
|
||||
"data_with_lock_free_reads",
|
||||
"delayable_timeout_callback",
|
||||
"intent_registry",
|
||||
"oplog_visibility_manager",
|
||||
|
|
@ -1330,7 +1322,6 @@ mongo_cc_library(
|
|||
"//src/mongo/db/local_catalog:collection_catalog",
|
||||
"//src/mongo/db/local_catalog:local_oplog_info",
|
||||
"//src/mongo/db/local_catalog/lock_manager",
|
||||
"//src/mongo/db/repl/clang_checked",
|
||||
"//src/mongo/db/repl/initial_sync:initial_syncer",
|
||||
"//src/mongo/db/repl/split_horizon",
|
||||
"//src/mongo/db/session:kill_sessions_local",
|
||||
|
|
@ -1811,17 +1802,6 @@ mongo_cc_unit_test(
|
|||
],
|
||||
)
|
||||
|
||||
mongo_cc_unit_test(
|
||||
name = "data_with_lock_free_reads_test",
|
||||
srcs = [
|
||||
"data_with_lock_free_reads_test.cpp",
|
||||
],
|
||||
tags = ["mongo_unittest_seventh_group"],
|
||||
deps = [
|
||||
":data_with_lock_free_reads",
|
||||
],
|
||||
)
|
||||
|
||||
mongo_cc_unit_test(
|
||||
name = "oplog_application_test",
|
||||
srcs = [
|
||||
|
|
|
|||
|
|
@ -1,94 +0,0 @@
|
|||
/**
|
||||
* Copyright (C) 2025-present MongoDB, Inc.
|
||||
*
|
||||
* This program is free software: you can redistribute it and/or modify
|
||||
* it under the terms of the Server Side Public License, version 1,
|
||||
* as published by MongoDB, Inc.
|
||||
*
|
||||
* This program is distributed in the hope that it will be useful,
|
||||
* but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||||
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
||||
* Server Side Public License for more details.
|
||||
*
|
||||
* You should have received a copy of the Server Side Public License
|
||||
* along with this program. If not, see
|
||||
* <http://www.mongodb.com/licensing/server-side-public-license>.
|
||||
*
|
||||
* As a special exception, the copyright holders give permission to link the
|
||||
* code of portions of this program with the OpenSSL library under certain
|
||||
* conditions as described in each individual source file and distribute
|
||||
* linked combinations including the program with the OpenSSL library. You
|
||||
* must comply with the Server Side Public License in all respects for
|
||||
* all of the code used other than as permitted herein. If you modify file(s)
|
||||
* with this exception, you may extend this exception to your version of the
|
||||
* file(s), but you are not obligated to do so. If you do not wish to do so,
|
||||
* delete this exception statement from your version. If you delete this
|
||||
* exception statement from all source files in the program, then also delete
|
||||
* it in the license file.
|
||||
*/
|
||||
|
||||
#pragma once
|
||||
|
||||
#include "mongo/platform/compiler.h"
|
||||
#include "mongo/stdx/new.h"
|
||||
#include "mongo/util/concurrency/tsan_ignore.h"
|
||||
#include "mongo/util/concurrency/with_lock.h"
|
||||
|
||||
namespace mongo {
|
||||
|
||||
namespace repl {
|
||||
|
||||
/**
|
||||
* Provides lock-free reads for arbitrarily sized values, at the expense of
|
||||
* keeping two copies at all times. Writes are not lock-free, and writers
|
||||
* need to synchronize before updating the values.
|
||||
*
|
||||
* Under the hood, this type maintains two copies of the data. Readers will
|
||||
* always read from the active copy, where read-after-write data races are
|
||||
* avoided through acquire/release semantics. A generation number ensures
|
||||
* that readers can detect such races in the presence of many writers.
|
||||
*
|
||||
* Prefer using other synchronization primitives over this one when possible.
|
||||
*/
|
||||
template <typename DatumT>
|
||||
class DataWithLockFreeReads {
|
||||
public:
|
||||
using datum_type = DatumT;
|
||||
|
||||
// Ignore data races in certain functions when running with TSAN. In
|
||||
// particular, ignore concurrent access to _buffers[x] because it is
|
||||
// protected by WithLock on write and by the generation counter on read.
|
||||
|
||||
// store() requires the caller to hold the lock that protects the
|
||||
// uncached value to avoid races.
|
||||
MONGO_TSAN_IGNORE
|
||||
void store(WithLock lk, const datum_type& datum) {
|
||||
auto curGen = _generation.load();
|
||||
auto nextGen = curGen + 1;
|
||||
_buffers[nextGen & 1] = datum;
|
||||
invariant(_generation.compareAndSwap(&curGen, nextGen));
|
||||
}
|
||||
|
||||
MONGO_TSAN_IGNORE
|
||||
datum_type load() const {
|
||||
while (true) {
|
||||
// Get the counter, use it to index buffers and perform a read, and
|
||||
// then read the counter again. If the counter hasn't moved by more
|
||||
// than 1, then the value that you just read was not torn.
|
||||
auto initialGen = _generation.loadAcquire();
|
||||
datum_type result = _buffers[initialGen & 1];
|
||||
auto finalGen = _generation.loadAcquire();
|
||||
if (MONGO_likely(finalGen == initialGen)) {
|
||||
return result;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
private:
|
||||
alignas(stdx::hardware_destructive_interference_size) datum_type _buffers[2];
|
||||
|
||||
alignas(stdx::hardware_destructive_interference_size) mutable AtomicWord<uint64_t> _generation{
|
||||
0};
|
||||
};
|
||||
} // namespace repl
|
||||
} // namespace mongo
|
||||
|
|
@ -1,126 +0,0 @@
|
|||
/**
|
||||
* Copyright (C) 2025-present MongoDB, Inc.
|
||||
*
|
||||
* This program is free software: you can redistribute it and/or modify
|
||||
* it under the terms of the Server Side Public License, version 1,
|
||||
* as published by MongoDB, Inc.
|
||||
*
|
||||
* This program is distributed in the hope that it will be useful,
|
||||
* but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||||
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
||||
* Server Side Public License for more details.
|
||||
*
|
||||
* You should have received a copy of the Server Side Public License
|
||||
* along with this program. If not, see
|
||||
* <http://www.mongodb.com/licensing/server-side-public-license>.
|
||||
*
|
||||
* As a special exception, the copyright holders give permission to link the
|
||||
* code of portions of this program with the OpenSSL library under certain
|
||||
* conditions as described in each individual source file and distribute
|
||||
* linked combinations including the program with the OpenSSL library. You
|
||||
* must comply with the Server Side Public License in all respects for
|
||||
* all of the code used other than as permitted herein. If you modify file(s)
|
||||
* with this exception, you may extend this exception to your version of the
|
||||
* file(s), but you are not obligated to do so. If you do not wish to do so,
|
||||
* delete this exception statement from your version. If you delete this
|
||||
* exception statement from all source files in the program, then also delete
|
||||
* it in the license file.
|
||||
*/
|
||||
|
||||
#include "mongo/db/repl/data_with_lock_free_reads.h"
|
||||
|
||||
#include "mongo/unittest/unittest.h"
|
||||
#include "mongo/util/concurrency/with_lock.h"
|
||||
|
||||
#include <random>
|
||||
|
||||
#include <boost/functional/hash.hpp>
|
||||
|
||||
namespace mongo {
|
||||
namespace repl {
|
||||
|
||||
TEST(DataWithLockFreeReadsTest, ShouldStoreAndLoad) {
|
||||
DataWithLockFreeReads<int> dc;
|
||||
dc.store(WithLock::withoutLock(), 1);
|
||||
ASSERT_EQ(dc.load(), 1);
|
||||
dc.store(WithLock::withoutLock(), 2);
|
||||
ASSERT_EQ(dc.load(), 2);
|
||||
dc.store(WithLock::withoutLock(), 3);
|
||||
dc.store(WithLock::withoutLock(), 4);
|
||||
dc.store(WithLock::withoutLock(), 5);
|
||||
ASSERT_EQ(dc.load(), 5);
|
||||
}
|
||||
|
||||
struct alignas(128) Payload {
|
||||
public:
|
||||
explicit Payload() {
|
||||
std::random_device rd;
|
||||
std::mt19937_64 gen(rd());
|
||||
std::uniform_int_distribution<uint64_t> dist;
|
||||
for (size_t i = 0; i < 16; ++i) {
|
||||
_data[i] = dist(gen);
|
||||
}
|
||||
}
|
||||
|
||||
bool operator==(const Payload& other) const {
|
||||
return std::memcmp(_data, other._data, sizeof(_data)) == 0;
|
||||
}
|
||||
|
||||
struct Hash {
|
||||
std::size_t operator()(const Payload& payload) const {
|
||||
std::size_t seed = 0;
|
||||
for (int i = 0; i < 16; ++i) {
|
||||
boost::hash_combine(seed, payload._data[i]);
|
||||
}
|
||||
return seed;
|
||||
}
|
||||
};
|
||||
|
||||
private:
|
||||
uint64_t _data[16];
|
||||
};
|
||||
static_assert(sizeof(Payload) == 128);
|
||||
|
||||
TEST(DataWithLockFreeReadsTest, ShouldNotSeeTornReads) {
|
||||
const int numPayloads = 64;
|
||||
const int numReaders = 8;
|
||||
const int totalReads = 2 << 16;
|
||||
AtomicWord<int> loaded{0};
|
||||
AtomicWord<bool> stopWrites{false};
|
||||
stdx::mutex mu;
|
||||
auto payloads = std::array<Payload, numPayloads>();
|
||||
stdx::unordered_set<Payload, Payload::Hash> payloadsLookup;
|
||||
for (size_t i = 0; i < payloads.size(); ++i) {
|
||||
payloads[i] = Payload();
|
||||
payloadsLookup.insert(payloads[i]);
|
||||
}
|
||||
DataWithLockFreeReads<Payload*> dc;
|
||||
{
|
||||
stdx::lock_guard lk(mu);
|
||||
dc.store(lk, &payloads[0]);
|
||||
}
|
||||
std::vector<stdx::thread> readers;
|
||||
for (int i = 0; i < numReaders; ++i) {
|
||||
readers.emplace_back([&]() {
|
||||
while (loaded.fetchAndAdd(1) < totalReads) {
|
||||
auto p = dc.load();
|
||||
ASSERT_TRUE(payloadsLookup.contains(*p));
|
||||
}
|
||||
});
|
||||
}
|
||||
stdx::thread writer([&]() {
|
||||
while (!stopWrites.load()) {
|
||||
for (size_t i = 0; i < payloads.size(); ++i) {
|
||||
stdx::lock_guard lk(mu);
|
||||
dc.store(lk, &payloads[i]);
|
||||
}
|
||||
}
|
||||
});
|
||||
for (auto& thread : readers) {
|
||||
thread.join();
|
||||
}
|
||||
stopWrites.store(true);
|
||||
writer.join();
|
||||
}
|
||||
} // namespace repl
|
||||
} // namespace mongo
|
||||
|
|
@ -67,7 +67,6 @@
|
|||
#include "mongo/db/read_write_concern_defaults.h"
|
||||
#include "mongo/db/repl/always_allow_non_local_writes.h"
|
||||
#include "mongo/db/repl/check_quorum_for_config_change.h"
|
||||
#include "mongo/db/repl/clang_checked/mutex.h"
|
||||
#include "mongo/db/repl/collection_utils.h"
|
||||
#include "mongo/db/repl/data_replicator_external_state.h"
|
||||
#include "mongo/db/repl/data_replicator_external_state_initial_sync.h"
|
||||
|
|
@ -453,6 +452,7 @@ ReplicationCoordinatorImpl::ReplicationCoordinatorImpl(
|
|||
_rsConfigState(kConfigPreStart),
|
||||
_rsConfig(std::make_shared<ReplSetConfig>()), // Initialize with empty configuration.
|
||||
_selfIndex(-1),
|
||||
_sleptLastElection(false),
|
||||
_readWriteAbility(std::make_unique<ReadWriteAbility>(!settings.isReplSet())),
|
||||
_replicationProcess(replicationProcess),
|
||||
_storage(storage),
|
||||
|
|
@ -477,21 +477,6 @@ ReplicationCoordinatorImpl::ReplicationCoordinatorImpl(
|
|||
|
||||
invariant(_service);
|
||||
|
||||
{
|
||||
clang_checked::lock_guard lk(_mutex);
|
||||
_memberStateCached.store(lk, _memberState);
|
||||
_oplogSyncStateCached.store(lk, _oplogSyncState);
|
||||
|
||||
auto o0 = OpTimeAndWallTime(OpTime(), Date_t::min());
|
||||
_myLastAppliedOpTimeAndWallTimeCached.store(lk, o0);
|
||||
auto o1 = OpTimeAndWallTime(OpTime(), Date_t::min());
|
||||
_myLastCommittedOpTimeAndWallTimeCached.store(lk, o1);
|
||||
auto o2 = OpTimeAndWallTime(OpTime(), Date_t::min());
|
||||
_myLastDurableOpTimeAndWallTimeCached.store(lk, o2);
|
||||
auto o3 = OpTimeAndWallTime(OpTime(), Date_t::min());
|
||||
_myLastWrittenOpTimeAndWallTimeCached.store(lk, o3);
|
||||
}
|
||||
|
||||
if (!_settings.isReplSet()) {
|
||||
return;
|
||||
}
|
||||
|
|
@ -1273,7 +1258,8 @@ const ReplSettings& ReplicationCoordinatorImpl::getSettings() const {
|
|||
}
|
||||
|
||||
MemberState ReplicationCoordinatorImpl::getMemberState() const {
|
||||
return _memberStateCached.load();
|
||||
stdx::lock_guard lk(_mutex);
|
||||
return _getMemberState(lk);
|
||||
}
|
||||
|
||||
std::vector<MemberData> ReplicationCoordinatorImpl::getMemberData() const {
|
||||
|
|
@ -1371,7 +1357,8 @@ Status ReplicationCoordinatorImpl::_setFollowerMode(OperationContext* opCtx,
|
|||
}
|
||||
|
||||
ReplicationCoordinator::OplogSyncState ReplicationCoordinatorImpl::getOplogSyncState() {
|
||||
return _oplogSyncStateCached.load();
|
||||
stdx::lock_guard lk(_mutex);
|
||||
return _oplogSyncState;
|
||||
}
|
||||
|
||||
void ReplicationCoordinatorImpl::signalWriterDrainComplete(OperationContext* opCtx,
|
||||
|
|
@ -1390,7 +1377,6 @@ void ReplicationCoordinatorImpl::signalWriterDrainComplete(OperationContext* opC
|
|||
|
||||
// Update state and signal the applier's buffer to enter drain mode.
|
||||
_oplogSyncState = OplogSyncState::ApplierDraining;
|
||||
_oplogSyncStateCached.store(lk, _oplogSyncState);
|
||||
_externalState->onWriterDrainComplete(opCtx);
|
||||
}
|
||||
|
||||
|
|
@ -1462,7 +1448,6 @@ void ReplicationCoordinatorImpl::signalApplierDrainComplete(OperationContext* op
|
|||
return;
|
||||
}
|
||||
_oplogSyncState = OplogSyncState::Stopped;
|
||||
_oplogSyncStateCached.store(lk, _oplogSyncState);
|
||||
_externalState->onApplierDrainComplete(opCtx);
|
||||
|
||||
invariant(_getMemberState(lk).primary());
|
||||
|
|
@ -1679,7 +1664,6 @@ void ReplicationCoordinatorImpl::_setMyLastWrittenOpTimeAndWallTime(
|
|||
|
||||
_topCoord->setMyLastWrittenOpTimeAndWallTime(
|
||||
opTimeAndWallTime, _replExecutor->now(), isRollbackAllowed);
|
||||
_myLastWrittenOpTimeAndWallTimeCached.store(lk, opTimeAndWallTime);
|
||||
|
||||
// Signal anyone waiting on optime changes.
|
||||
_lastWrittenOpTimeWaiterList.setValueIf(
|
||||
|
|
@ -1703,8 +1687,6 @@ void ReplicationCoordinatorImpl::_setMyLastDurableOpTimeAndWallTime(
|
|||
// transaction, which may be delayed, but this should be fine.
|
||||
_topCoord->setMyLastDurableOpTimeAndWallTime(
|
||||
opTimeAndWallTime, _replExecutor->now(), isRollbackAllowed);
|
||||
_myLastDurableOpTimeAndWallTimeCached.store(lk, opTimeAndWallTime);
|
||||
|
||||
// If we are using durable times to calculate the commit level, update it now.
|
||||
if (_rsConfig.unsafePeek().getWriteConcernMajorityShouldJournal()) {
|
||||
_updateLastCommittedOpTimeAndWallTime(lk);
|
||||
|
|
@ -1754,18 +1736,12 @@ bool ReplicationCoordinatorImpl::_setMyLastDurableOpTimeAndWallTimeForward(
|
|||
}
|
||||
|
||||
OpTime ReplicationCoordinatorImpl::getMyLastWrittenOpTime() const {
|
||||
return _myLastWrittenOpTimeAndWallTimeCached.load().opTime;
|
||||
stdx::lock_guard lock(_mutex);
|
||||
return _getMyLastWrittenOpTime(lock);
|
||||
}
|
||||
|
||||
OpTimeAndWallTime ReplicationCoordinatorImpl::getMyLastWrittenOpTimeAndWallTime(
|
||||
bool rollbackSafe) const {
|
||||
// If !rollbackSafe, then we access only 1 member, so we don't need the
|
||||
// lock.
|
||||
if (!rollbackSafe) {
|
||||
return _myLastWrittenOpTimeAndWallTimeCached.load();
|
||||
}
|
||||
// Otherwise, we must take the lock since we might touch both _memberState
|
||||
// and _lastWritten.
|
||||
stdx::lock_guard lock(_mutex);
|
||||
if (rollbackSafe && _getMemberState(lock).rollback()) {
|
||||
return {};
|
||||
|
|
@ -1774,19 +1750,23 @@ OpTimeAndWallTime ReplicationCoordinatorImpl::getMyLastWrittenOpTimeAndWallTime(
|
|||
}
|
||||
|
||||
OpTime ReplicationCoordinatorImpl::getMyLastAppliedOpTime() const {
|
||||
return getMyLastAppliedOpTimeAndWallTime().opTime;
|
||||
stdx::lock_guard lock(_mutex);
|
||||
return _getMyLastAppliedOpTime(lock);
|
||||
}
|
||||
|
||||
OpTimeAndWallTime ReplicationCoordinatorImpl::getMyLastAppliedOpTimeAndWallTime() const {
|
||||
return _myLastAppliedOpTimeAndWallTimeCached.load();
|
||||
stdx::lock_guard lock(_mutex);
|
||||
return _getMyLastAppliedOpTimeAndWallTime(lock);
|
||||
}
|
||||
|
||||
OpTimeAndWallTime ReplicationCoordinatorImpl::getMyLastDurableOpTimeAndWallTime() const {
|
||||
return _myLastDurableOpTimeAndWallTimeCached.load();
|
||||
stdx::lock_guard lock(_mutex);
|
||||
return _getMyLastDurableOpTimeAndWallTime(lock);
|
||||
}
|
||||
|
||||
OpTime ReplicationCoordinatorImpl::getMyLastDurableOpTime() const {
|
||||
return getMyLastDurableOpTimeAndWallTime().opTime;
|
||||
stdx::lock_guard lock(_mutex);
|
||||
return _getMyLastDurableOpTime(lock);
|
||||
}
|
||||
|
||||
Status ReplicationCoordinatorImpl::_validateReadConcern(OperationContext* opCtx,
|
||||
|
|
@ -4551,11 +4531,10 @@ boost::optional<Timestamp> ReplicationCoordinatorImpl::getRecoveryTimestamp() {
|
|||
return _storage->getRecoveryTimestamp(getServiceContext());
|
||||
}
|
||||
|
||||
void ReplicationCoordinatorImpl::_enterDrainMode(WithLock lk) {
|
||||
void ReplicationCoordinatorImpl::_enterDrainMode(WithLock) {
|
||||
_oplogSyncState = feature_flags::gReduceMajorityWriteLatency.isEnabled()
|
||||
? OplogSyncState::WriterDraining
|
||||
: OplogSyncState::ApplierDraining;
|
||||
_oplogSyncStateCached.store(lk, _oplogSyncState);
|
||||
_externalState->stopProducer();
|
||||
}
|
||||
|
||||
|
|
@ -5028,8 +5007,6 @@ ChangeSyncSourceAction ReplicationCoordinatorImpl::shouldChangeSyncSourceOnError
|
|||
void ReplicationCoordinatorImpl::_updateLastCommittedOpTimeAndWallTime(WithLock lk) {
|
||||
if (_topCoord->updateLastCommittedOpTimeAndWallTime()) {
|
||||
_setStableTimestampForStorage(lk);
|
||||
auto lastCommittedOpTimeAndWallTime = _topCoord->getLastCommittedOpTimeAndWallTime();
|
||||
_myLastCommittedOpTimeAndWallTimeCached.store(lk, lastCommittedOpTimeAndWallTime);
|
||||
}
|
||||
}
|
||||
|
||||
|
|
@ -5242,8 +5219,6 @@ void ReplicationCoordinatorImpl::_advanceCommitPoint(
|
|||
bool forInitiate) {
|
||||
if (_topCoord->advanceLastCommittedOpTimeAndWallTime(
|
||||
committedOpTimeAndWallTime, fromSyncSource, forInitiate)) {
|
||||
auto lastCommittedOpTimeAndWallTime = _topCoord->getLastCommittedOpTimeAndWallTime();
|
||||
_myLastCommittedOpTimeAndWallTimeCached.store(lk, lastCommittedOpTimeAndWallTime);
|
||||
if (_getMemberState(lk).arbiter()) {
|
||||
// Arbiters do not store replicated data, so we consider their data trivially
|
||||
// consistent.
|
||||
|
|
@ -5258,11 +5233,13 @@ void ReplicationCoordinatorImpl::_advanceCommitPoint(
|
|||
}
|
||||
|
||||
OpTime ReplicationCoordinatorImpl::getLastCommittedOpTime() const {
|
||||
return getLastCommittedOpTimeAndWallTime().opTime;
|
||||
stdx::unique_lock lk(_mutex);
|
||||
return _topCoord->getLastCommittedOpTime();
|
||||
}
|
||||
|
||||
OpTimeAndWallTime ReplicationCoordinatorImpl::getLastCommittedOpTimeAndWallTime() const {
|
||||
return _myLastCommittedOpTimeAndWallTimeCached.load();
|
||||
stdx::unique_lock lk(_mutex);
|
||||
return _topCoord->getLastCommittedOpTimeAndWallTime();
|
||||
}
|
||||
|
||||
Status ReplicationCoordinatorImpl::processReplSetRequestVotes(
|
||||
|
|
|
|||
|
|
@ -43,7 +43,6 @@
|
|||
#include "mongo/db/namespace_string.h"
|
||||
#include "mongo/db/operation_context.h"
|
||||
#include "mongo/db/repl/auto_get_rstl_for_stepup_stepdown.h"
|
||||
#include "mongo/db/repl/data_with_lock_free_reads.h"
|
||||
#include "mongo/db/repl/delayable_timeout_callback.h"
|
||||
#include "mongo/db/repl/hello/hello_response.h"
|
||||
#include "mongo/db/repl/initial_sync/initial_syncer.h"
|
||||
|
|
@ -214,9 +213,6 @@ public:
|
|||
|
||||
const ReplSettings& getSettings() const override;
|
||||
|
||||
// This method makes no threading guarantees since it fetches a single
|
||||
// value. If you are an internal caller working with multiple protected
|
||||
// members, use _getMemberState(WithLock).
|
||||
MemberState getMemberState() const override;
|
||||
|
||||
std::vector<MemberData> getMemberData() const override;
|
||||
|
|
@ -292,34 +288,13 @@ public:
|
|||
|
||||
void setMyHeartbeatMessage(const std::string& msg) override;
|
||||
|
||||
// This method makes no threading guarantees since it fetches a single
|
||||
// value. If you are an internal caller working with multiple protected
|
||||
// members, use _getMyLastWrittenOpTime(WithLock).
|
||||
OpTime getMyLastWrittenOpTime() const override;
|
||||
|
||||
// This method makes no threading guarantees since it fetches a single
|
||||
// value. If you are an internal caller working with multiple protected
|
||||
// members, use _getMyLastWrittenOpTimeAndWallTime(WithLock).
|
||||
OpTimeAndWallTime getMyLastWrittenOpTimeAndWallTime(bool rollbackSafe = false) const override;
|
||||
|
||||
// This method makes no threading guarantees since it fetches a single
|
||||
// value. If you are an internal caller working with multiple protected
|
||||
// members, use _getMyLastAppliedOpTime(WithLock).
|
||||
OpTime getMyLastAppliedOpTime() const override;
|
||||
|
||||
// This method makes no threading guarantees since it fetches a single
|
||||
// value. If you are an internal caller working with multiple protected
|
||||
// members, use _getMyLastAppliedOpTimeAndWallTime(WithLock).
|
||||
OpTimeAndWallTime getMyLastAppliedOpTimeAndWallTime() const override;
|
||||
|
||||
// This method makes no threading guarantees since it fetches a single
|
||||
// value. If you are an internal caller working with multiple protected
|
||||
// members, use _getMyLastDurableOpTime(WithLock).
|
||||
OpTime getMyLastDurableOpTime() const override;
|
||||
|
||||
// This method makes no threading guarantees since it fetches a single
|
||||
// value. If you are an internal caller working with multiple protected
|
||||
// members, use _getMyLastDurableOpTimeAndWallTime(WithLock).
|
||||
OpTimeAndWallTime getMyLastDurableOpTimeAndWallTime() const override;
|
||||
|
||||
Status waitUntilMajorityOpTime(OperationContext* opCtx,
|
||||
|
|
@ -348,9 +323,6 @@ public:
|
|||
|
||||
Status setFollowerModeRollback(OperationContext* opCtx) override;
|
||||
|
||||
// This method makes no threading guarantees since it fetches a single
|
||||
// value. If you are an internal caller working with multiple protected
|
||||
// members, add a WithLock version of this method and use _oplogSyncState.
|
||||
OplogSyncState getOplogSyncState() override;
|
||||
|
||||
void signalWriterDrainComplete(OperationContext* opCtx,
|
||||
|
|
@ -449,14 +421,7 @@ public:
|
|||
ChangeSyncSourceAction shouldChangeSyncSourceOnError(
|
||||
const HostAndPort& currentSource, const OpTime& lastOpTimeFetched) const override;
|
||||
|
||||
// This method makes no threading guarantees since it fetches a single
|
||||
// value. If you are an internal caller working with multiple protected
|
||||
// members, use _topCoord->getLastCommittedOpTime().
|
||||
OpTime getLastCommittedOpTime() const override;
|
||||
|
||||
// This method makes no threading guarantees since it fetches a single
|
||||
// value. If you are an internal caller working with multiple protected
|
||||
// members, use _topCoord->getLastCommittedOpTimeAndWallTime().
|
||||
OpTimeAndWallTime getLastCommittedOpTimeAndWallTime() const override;
|
||||
|
||||
Status processReplSetRequestVotes(OperationContext* opCtx,
|
||||
|
|
@ -1831,11 +1796,6 @@ private:
|
|||
// Pointer to the TopologyCoordinator owned by this ReplicationCoordinator.
|
||||
std::unique_ptr<TopologyCoordinator> _topCoord; // (M)
|
||||
|
||||
DataWithLockFreeReads<OpTimeAndWallTime> _myLastAppliedOpTimeAndWallTimeCached; // (S)
|
||||
DataWithLockFreeReads<OpTimeAndWallTime> _myLastCommittedOpTimeAndWallTimeCached; // (S)
|
||||
DataWithLockFreeReads<OpTimeAndWallTime> _myLastDurableOpTimeAndWallTimeCached; // (S)
|
||||
DataWithLockFreeReads<OpTimeAndWallTime> _myLastWrittenOpTimeAndWallTimeCached; // (S)
|
||||
|
||||
// Executor that drives the topology coordinator.
|
||||
std::shared_ptr<executor::TaskExecutor> _replExecutor; // (S)
|
||||
|
||||
|
|
@ -1879,12 +1839,8 @@ private:
|
|||
// Current ReplicaSet state.
|
||||
MemberState _memberState; // (M)
|
||||
|
||||
DataWithLockFreeReads<MemberState> _memberStateCached; // (S)
|
||||
|
||||
ReplicationCoordinator::OplogSyncState _oplogSyncState = OplogSyncState::Running; // (M)
|
||||
|
||||
DataWithLockFreeReads<ReplicationCoordinator::OplogSyncState> _oplogSyncStateCached; // (S)
|
||||
|
||||
// Used to signal threads waiting for changes to _rsConfigState.
|
||||
stdx::condition_variable _rsConfigStateChange; // (M)
|
||||
|
||||
|
|
@ -1901,6 +1857,9 @@ private:
|
|||
// This member's index position in the current config.
|
||||
int _selfIndex; // (M)
|
||||
|
||||
// Whether we slept last time we attempted an election but possibly tied with other nodes.
|
||||
bool _sleptLastElection; // (M)
|
||||
|
||||
// Used to manage the concurrency around _canAcceptNonLocalWrites and _canServeNonLocalReads.
|
||||
std::unique_ptr<ReadWriteAbility> _readWriteAbility; // (S)
|
||||
|
||||
|
|
|
|||
|
|
@ -34,7 +34,6 @@
|
|||
#include "mongo/db/vector_clock/vector_clock_mutable.h"
|
||||
#include "mongo/executor/thread_pool_task_executor.h"
|
||||
#include "mongo/logv2/log.h"
|
||||
#include "mongo/util/scopeguard.h"
|
||||
#include "mongo/util/time_support.h"
|
||||
|
||||
|
||||
|
|
@ -301,7 +300,6 @@ ReplicationCoordinatorImpl::_updateMemberStateFromTopologyCoordinator(WithLock l
|
|||
}
|
||||
}
|
||||
_oplogSyncState = OplogSyncState::Running;
|
||||
_oplogSyncStateCached.store(lk, _oplogSyncState);
|
||||
_externalState->startProducerIfStopped();
|
||||
}
|
||||
|
||||
|
|
@ -360,7 +358,6 @@ ReplicationCoordinatorImpl::_updateMemberStateFromTopologyCoordinator(WithLock l
|
|||
}
|
||||
|
||||
_memberState = newState;
|
||||
_memberStateCached.store(lk, _memberState);
|
||||
|
||||
_cancelAndRescheduleElectionTimeout(lk);
|
||||
|
||||
|
|
@ -414,7 +411,6 @@ void ReplicationCoordinatorImpl::_setMyLastAppliedOpTimeAndWallTime(
|
|||
|
||||
_topCoord->setMyLastAppliedOpTimeAndWallTime(
|
||||
opTimeAndWallTime, _replExecutor->now(), isRollbackAllowed);
|
||||
_myLastAppliedOpTimeAndWallTimeCached.store(lk, opTimeAndWallTime);
|
||||
|
||||
// No need to wake up replication waiters because there should not be any replication waiters
|
||||
// waiting on our own lastApplied.
|
||||
|
|
|
|||
|
|
@ -98,13 +98,6 @@ public:
|
|||
return _value.load(std::memory_order_relaxed);
|
||||
}
|
||||
|
||||
/**
|
||||
* Gets the current value of this Atomic using acquire memory order.
|
||||
*/
|
||||
MONGO_MOD_PUB WordType loadAcquire() const {
|
||||
return _value.load(std::memory_order_acquire);
|
||||
}
|
||||
|
||||
/**
|
||||
* Sets the value of this Atomic to "newValue".
|
||||
*/
|
||||
|
|
@ -119,13 +112,6 @@ public:
|
|||
_value.store(newValue, std::memory_order_relaxed);
|
||||
}
|
||||
|
||||
/**
|
||||
* Sets the value of this Atomic to "newValue" using release memory order.
|
||||
*/
|
||||
MONGO_MOD_PUB void storeRelease(WordType newValue) {
|
||||
_value.store(newValue, std::memory_order_release);
|
||||
}
|
||||
|
||||
/**
|
||||
* Atomically swaps the current value of this with "newValue".
|
||||
*
|
||||
|
|
|
|||
Loading…
Reference in New Issue