mirror of https://github.com/mongodb/mongo
SERVER-106810: Move oplog visibility manager to local oplog info (#39260)
GitOrigin-RevId: 8bc8d39ae33c8dea1eaa26ac9eb8c8f324dab213
This commit is contained in:
parent
5632018cd5
commit
454126a36c
|
|
@ -103,6 +103,7 @@ mongo_cc_library(
|
|||
"//src/mongo/db:server_base", # TODO(SERVER-93876): Remove.
|
||||
"//src/mongo/db:vector_clock_mutable",
|
||||
"//src/mongo/db/admission:flow_control",
|
||||
"//src/mongo/db/repl:oplog_visibility_manager",
|
||||
"//src/mongo/db/repl:optime",
|
||||
"//src/mongo/db/repl:repl_coordinator_interface", # TODO(SERVER-93876): Remove.
|
||||
"//src/mongo/db/storage:oplog_truncate_markers",
|
||||
|
|
|
|||
|
|
@ -47,6 +47,7 @@
|
|||
#include "mongo/util/decorable.h"
|
||||
#include "mongo/util/scopeguard.h"
|
||||
|
||||
#include <memory>
|
||||
#include <mutex>
|
||||
#include <utility>
|
||||
|
||||
|
|
@ -85,6 +86,12 @@ RecordStore* LocalOplogInfo::getRecordStore() const {
|
|||
}
|
||||
|
||||
void LocalOplogInfo::setRecordStore(OperationContext* opCtx, RecordStore* rs) {
|
||||
Timestamp lastAppliedOpTime;
|
||||
if (repl::feature_flags::gFeatureFlagOplogVisibility.isEnabled()) {
|
||||
lastAppliedOpTime =
|
||||
repl::ReplicationCoordinator::get(opCtx)->getMyLastAppliedOpTime().getTimestamp();
|
||||
}
|
||||
|
||||
stdx::lock_guard<stdx::mutex> lk(_rsMutex);
|
||||
_rs = rs;
|
||||
// If the server was started in read-only mode or if we are restoring the node, skip
|
||||
|
|
@ -95,11 +102,21 @@ void LocalOplogInfo::setRecordStore(OperationContext* opCtx, RecordStore* rs) {
|
|||
if (needsTruncateMarkers) {
|
||||
_truncateMarkers = OplogTruncateMarkers::createOplogTruncateMarkers(opCtx, *rs);
|
||||
}
|
||||
|
||||
if (repl::feature_flags::gFeatureFlagOplogVisibility.isEnabled()) {
|
||||
_oplogVisibilityManager.reInit(_rs, lastAppliedOpTime);
|
||||
}
|
||||
}
|
||||
|
||||
void LocalOplogInfo::resetRecordStore() {
|
||||
stdx::lock_guard<stdx::mutex> lk(_rsMutex);
|
||||
_rs = nullptr;
|
||||
|
||||
if (repl::feature_flags::gFeatureFlagOplogVisibility.isEnabled()) {
|
||||
// It's possible for the oplog visibility manager to be uninitialized because
|
||||
// resetRecordStore may be called before setRecordStore in repair/standalone mode.
|
||||
_oplogVisibilityManager.clear();
|
||||
}
|
||||
}
|
||||
|
||||
std::shared_ptr<OplogTruncateMarkers> LocalOplogInfo::getTruncateMarkers() const {
|
||||
|
|
|
|||
|
|
@ -32,6 +32,7 @@
|
|||
#include "mongo/bson/timestamp.h"
|
||||
#include "mongo/db/operation_context.h"
|
||||
#include "mongo/db/repl/oplog.h"
|
||||
#include "mongo/db/repl/oplog_visibility_manager.h"
|
||||
#include "mongo/db/service_context.h"
|
||||
#include "mongo/db/storage/oplog_truncate_markers.h"
|
||||
#include "mongo/stdx/mutex.h"
|
||||
|
|
@ -154,6 +155,12 @@ private:
|
|||
// Synchronizes the section where a new Timestamp is generated and when it is registered in the
|
||||
// storage engine.
|
||||
mutable stdx::mutex _newOpMutex;
|
||||
|
||||
// Tracks timestamp reservations and controls oplog visibility.
|
||||
// This will be default-constructed and will not be properly re-initialized if
|
||||
// gFeatureFlagOplogVisibility is disabled.
|
||||
// TODO SERVER-85788: Update/remove this comment once the feature flag is removed.
|
||||
repl::OplogVisibilityManager _oplogVisibilityManager;
|
||||
};
|
||||
|
||||
} // namespace mongo
|
||||
|
|
|
|||
|
|
@ -40,8 +40,6 @@ namespace repl {
|
|||
|
||||
namespace {
|
||||
|
||||
const auto oplogVisibilityManager = ServiceContext::declareDecoration<OplogVisibilityManager>();
|
||||
|
||||
/**
|
||||
* Returns a lambda expression that notifies capped waiters if visibility is changed.
|
||||
*/
|
||||
|
|
@ -56,38 +54,23 @@ auto notifyCappedWaitersIfVisibilityChanged(const bool& visibilityChanged, Recor
|
|||
|
||||
} // namespace
|
||||
|
||||
// static
|
||||
OplogVisibilityManager* OplogVisibilityManager::get(ServiceContext& service) {
|
||||
return get(&service);
|
||||
}
|
||||
|
||||
// static
|
||||
OplogVisibilityManager* OplogVisibilityManager::get(ServiceContext* service) {
|
||||
return &oplogVisibilityManager(service);
|
||||
}
|
||||
|
||||
// static
|
||||
OplogVisibilityManager* OplogVisibilityManager::get(OperationContext* opCtx) {
|
||||
return get(opCtx->getServiceContext());
|
||||
}
|
||||
|
||||
RecordStore* OplogVisibilityManager::getRecordStore() const {
|
||||
return _rs;
|
||||
}
|
||||
|
||||
void OplogVisibilityManager::setRecordStore(RecordStore* rs) {
|
||||
void OplogVisibilityManager::reInit(RecordStore* rs, const Timestamp& initialTs) {
|
||||
stdx::lock_guard<stdx::mutex> lock(_mutex);
|
||||
invariant(_oplogTimestampList.empty());
|
||||
_oplogVisibilityTimestamp.store(initialTs);
|
||||
_rs = rs;
|
||||
}
|
||||
|
||||
void OplogVisibilityManager::resetRecordStore() {
|
||||
void OplogVisibilityManager::clear() {
|
||||
stdx::lock_guard<stdx::mutex> lock(_mutex);
|
||||
invariant(_oplogTimestampList.empty());
|
||||
_rs = nullptr;
|
||||
}
|
||||
|
||||
void OplogVisibilityManager::init(const Timestamp& initialTs) {
|
||||
_oplogVisibilityTimestamp.store(initialTs);
|
||||
_latestTimeSeen = initialTs;
|
||||
}
|
||||
|
||||
OplogVisibilityManager::const_iterator OplogVisibilityManager::trackTimestamps(
|
||||
const Timestamp& first, const Timestamp& last) {
|
||||
bool visibilityChanged = false;
|
||||
|
|
|
|||
|
|
@ -27,6 +27,8 @@
|
|||
* it in the license file.
|
||||
*/
|
||||
|
||||
#pragma once
|
||||
|
||||
#include "mongo/db/operation_context.h"
|
||||
#include "mongo/db/repl/slotted_timestamp_list.h"
|
||||
#include "mongo/stdx/mutex.h"
|
||||
|
|
@ -43,13 +45,7 @@ public:
|
|||
using iterator = SlottedTimestampList::iterator;
|
||||
using const_iterator = SlottedTimestampList::const_iterator;
|
||||
|
||||
static OplogVisibilityManager* get(ServiceContext& service);
|
||||
static OplogVisibilityManager* get(ServiceContext* service);
|
||||
static OplogVisibilityManager* get(OperationContext* opCtx);
|
||||
|
||||
RecordStore* getRecordStore() const;
|
||||
void setRecordStore(RecordStore* rs);
|
||||
void resetRecordStore();
|
||||
|
||||
OplogVisibilityManager() = default;
|
||||
OplogVisibilityManager(const OplogVisibilityManager& rhs) = delete;
|
||||
|
|
@ -58,9 +54,15 @@ public:
|
|||
OplogVisibilityManager& operator=(const OplogVisibilityManager&& rhs) = delete;
|
||||
|
||||
/**
|
||||
* Initializes oplog visibility using the given initialTs.
|
||||
* Re-initializes the oplog visibility manager with the given record store and initial
|
||||
* timestamp. Called by LocalOplogInfo when the oplog record store pointer is set.
|
||||
*/
|
||||
void init(const Timestamp& initialTs);
|
||||
void reInit(RecordStore* rs, const Timestamp& initialTs);
|
||||
|
||||
/**
|
||||
* Clears the oplog visibility manager.
|
||||
*/
|
||||
void clear();
|
||||
|
||||
/**
|
||||
* Start tracking the timestamps given the first and last timestamp.
|
||||
|
|
|
|||
|
|
@ -73,8 +73,7 @@ void OplogVisibilityManagerTest::setUp() {
|
|||
boost::none /* uuid */);
|
||||
|
||||
_notifier = _rs->capped()->getInsertNotifier();
|
||||
_manager.setRecordStore(_rs.get());
|
||||
_manager.init(Timestamp(1) /* initialTs */);
|
||||
_manager.reInit(_rs.get(), Timestamp(1) /* initialTs */);
|
||||
ASSERT_EQ(_manager.getOplogVisibilityTimestamp(), Timestamp(1));
|
||||
}
|
||||
|
||||
|
|
|
|||
|
|
@ -703,7 +703,7 @@ TEST_F(StorageInterfaceImplTest, CreateCollectionThatAlreadyExistsFails) {
|
|||
TEST_F(StorageInterfaceImplTest, CreateOplogCreateCappedCollection) {
|
||||
auto opCtx = getOperationContext();
|
||||
StorageInterfaceImpl storage;
|
||||
NamespaceString nss = NamespaceString::createNamespaceString_forTest("local.oplog.X");
|
||||
NamespaceString nss = NamespaceString::createNamespaceString_forTest("local.oplog.rs");
|
||||
{
|
||||
const auto coll = getCollectionForRead(opCtx, nss);
|
||||
ASSERT_FALSE(coll.exists());
|
||||
|
|
@ -722,7 +722,7 @@ TEST_F(StorageInterfaceImplTest,
|
|||
CreateCollectionReturnsUserExceptionAsStatusIfCollectionCreationThrows) {
|
||||
auto opCtx = getOperationContext();
|
||||
StorageInterfaceImpl storage;
|
||||
NamespaceString nss = NamespaceString::createNamespaceString_forTest("local.oplog.Y");
|
||||
NamespaceString nss = NamespaceString::createNamespaceString_forTest("local.oplog.rs");
|
||||
{
|
||||
const auto coll = getCollectionForRead(opCtx, nss);
|
||||
ASSERT_FALSE(coll.exists());
|
||||
|
|
|
|||
Loading…
Reference in New Issue