SERVER-110493 Time-based oplog truncation (#43414)

Co-authored-by: Wei Hu <wei.hu@mongodb.com>
GitOrigin-RevId: e43142e6b3c98a110e381e6d29c285322174e798
This commit is contained in:
Nicholas Shectman 2025-12-09 17:27:19 -04:00 committed by MongoDB Bot
parent 0ff8470fb7
commit 7a9c2fd7b0
18 changed files with 332 additions and 77 deletions

View File

@ -57,7 +57,7 @@ function doTest(startIndex, endIndex, originalDocs, docsBeforeTruncate) {
op: "c",
ns: `${dbName}.$cmd`,
o: {
"truncateRange": collName,
"truncateRange": coll.getFullName(),
"minRecordId": originalDocs[startIndex].$recordId,
"maxRecordId": originalDocs[endIndex].$recordId,
"bytesDeleted": recordsDeleted, // just a placeholder

View File

@ -954,7 +954,7 @@ repl::OpTime truncateRange(OperationContext* opCtx,
uassert(ErrorCodes::IllegalOperation,
"Cannot perform ranged truncate in collections other than clustered collections as "
"RecordIds must be equal across nodes",
collection->isClustered());
collection->isClustered() || nss.isOplog());
uassert(ErrorCodes::IllegalOperation,
"Cannot perform ranged truncate in collections with preimages enabled",
!collection->isChangeStreamPreAndPostImagesEnabled());

View File

@ -42,6 +42,7 @@
#include "mongo/db/index_builds/index_builds_common.h"
#include "mongo/db/logical_time_validator.h"
#include "mongo/db/namespace_string.h"
#include "mongo/db/namespace_string_reserved.h"
#include "mongo/db/op_observer/batched_write_context.h"
#include "mongo/db/op_observer/op_observer_util.h"
#include "mongo/db/operation_context.h"
@ -2342,12 +2343,14 @@ void OpObserverImpl::onTruncateRange(OperationContext* opCtx,
rss::ReplicatedStorageService::get(opCtx).getPersistenceProvider(),
VersionContext::getDecoration(opCtx)));
TruncateRangeOplogEntry objectEntry(
std::string(coll->ns().coll()), minRecordId, maxRecordId, bytesDeleted, docsDeleted);
NamespaceString nss = coll->ns();
TruncateRangeOplogEntry objectEntry(nss, minRecordId, maxRecordId, bytesDeleted, docsDeleted);
MutableOplogEntry oplogEntry;
oplogEntry.setOpType(repl::OpTypeEnum::kCommand);
oplogEntry.setNss(coll->ns().getCommandNS());
// For oplog truncation, use admin.$cmd as the namespace to get around the isOplogDisabledFor()
// checks that usually prevent generating oplog entries for operations on the oplog.
oplogEntry.setNss(nss.isOplog() ? NamespaceString::kAdminCommandNamespace : nss.getCommandNS());
oplogEntry.setUuid(coll->uuid());
oplogEntry.setObject(objectEntry.toBSON());
opTime = logOperation(opCtx, &oplogEntry, true /*assignCommonFields*/, _operationLogger.get());

View File

@ -1651,8 +1651,8 @@ TEST_F(OpObserverTest, TruncateRangeIsReplicated) {
ASSERT_EQ(oplogEntry.getUuid(), autoColl->uuid());
const auto& o = oplogEntry.getObject();
TruncateRangeOplogEntry objectEntry(
std::string(autoColl->ns().coll()), RecordId("a"), RecordId("b"), 1, 1);
// truncate range oplog entries include the db name in the namespace string
TruncateRangeOplogEntry objectEntry(autoColl->ns(), RecordId("a"), RecordId("b"), 1, 1);
ASSERT_BSONOBJ_EQ(o, objectEntry.toBSON());
}

View File

@ -739,6 +739,7 @@ mongo_cc_library(
],
deps = [
"//src/mongo:base",
"//src/mongo/db:server_base",
"//src/mongo/idl:idl_parser",
],
)

View File

@ -96,11 +96,13 @@ void LocalOplogInfo::setRecordStore(OperationContext* opCtx, RecordStore* rs) {
stdx::lock_guard<stdx::mutex> lk(_rsMutex);
_rs = rs;
// If the server was started in read-only mode, if we are restoring the node, or if async
// sampling is enabled, skip calculating the oplog truncate markers here.
// If the server was started in read-only mode, or we are restoring the node, don't truncate.
// If async sampling is enabled, skip calculating the oplog truncate markers here.
// Don't sample markers if sampling isn't supported (markers will need to be created elsewhere).
auto& provider = rss::ReplicatedStorageService::get(opCtx).getPersistenceProvider();
bool needsTruncateMarkers = opCtx->getServiceContext()->userWritesAllowed() &&
!storageGlobalParams.repair && !repl::ReplSettings::shouldSkipOplogSampling() &&
!gOplogSamplingAsyncEnabled;
!gOplogSamplingAsyncEnabled && provider.supportsOplogSampling();
if (needsTruncateMarkers) {
_truncateMarkers = OplogTruncateMarkers::createOplogTruncateMarkers(opCtx, *rs);
}

View File

@ -1237,7 +1237,11 @@ const StringMap<ApplyOpMetadata> kOpsMap = {
-> Status {
const auto& entry = *op;
const auto& cmd = entry.getObject();
const auto& ns = OplogApplication::extractNsFromCmd(entry.getNss().dbName(), cmd);
// for truncateRange, the full namespace including database name is in the
// first command element, rather than just the usual ns value without database name.
const auto& ns = NamespaceStringUtil::deserialize(boost::none,
cmd.firstElement().valueStringData(),
SerializationContext::stateDefault());
const auto truncateRangeEntry = TruncateRangeOplogEntry::parse(cmd);
writeConflictRetryWithLimit(opCtx, "applyOps_truncateRange", ns, [&] {

View File

@ -321,7 +321,9 @@ OplogApplierBatcher::BatchAction OplogApplierBatcher::_getBatchActionForEntry(
if (entry.getCommandType() == OplogEntry::CommandType::kTruncateRange) {
const auto& cmd = entry.getObject();
const auto& ns = OplogApplication::extractNsFromCmd(entry.getNss().dbName(), cmd);
const auto& ns = NamespaceStringUtil::deserialize(boost::none,
cmd.firstElement().valueStringData(),
SerializationContext::stateDefault());
if (ns.isChangeStreamPreImagesCollection()) {
auto truncateRangeEntry = TruncateRangeOplogEntry::parse(cmd);
const auto& maxRecordId = truncateRangeEntry.getMaxRecordId();

View File

@ -519,7 +519,7 @@ OplogEntry makeLargeRetryableWriteOplogEntries(int t,
* Generates a truncateRange oplog entry
*/
OplogEntry makeTruncateRangeEntry(int t, const NamespaceString& nss, const RecordId& maxRecordId) {
TruncateRangeOplogEntry oField(std::string(nss.coll()), RecordId(), maxRecordId, 0, 0);
TruncateRangeOplogEntry oField(nss, RecordId(), maxRecordId, 0, 0);
return {DurableOplogEntry(OpTime(Timestamp(t, 1), 1), // optime
OpTypeEnum::kCommand, // op type
nss.getCommandNS(), // namespace

View File

@ -40,8 +40,8 @@ structs:
strict: false
fields:
truncateRange:
description: "Name of the collection being truncated"
type: string
description: "Full namespace for the collection being truncated"
type: namespacestring
minRecordId:
description: "Inclusive lower bound of the range to truncate"
type: RecordId

View File

@ -87,6 +87,45 @@ CollectionTruncateMarkers::peekOldestMarkerIfNeeded(OperationContext* opCtx) con
return _markers.front();
}
boost::optional<CollectionTruncateMarkers::Marker> CollectionTruncateMarkers::newestExpiredRecord(
OperationContext* opCtx,
RecordStore& recordStore,
const RecordId& mayTruncateUpTo,
Date_t expiryTime) {
// reverse cursor so that non-exact matches will give the next older entry, not the next newer
auto recoveryUnit = shard_role_details::getRecoveryUnit(opCtx);
auto cursor = recordStore.getCursor(opCtx, *recoveryUnit, /* forward */ false);
auto newest = cursor->next();
RecordId newestUnpinnedId;
if (!mayTruncateUpTo.isNull()) {
auto newestUnpinned =
cursor->seek(mayTruncateUpTo, SeekableRecordCursor::BoundInclusion::kInclude);
if (newestUnpinned) {
newestUnpinnedId = newestUnpinned->id;
}
}
// it's OK to round off expiry time to the nearest second.
RecordId seekTo(durationCount<Seconds>(expiryTime.toDurationSinceEpoch()), /* low */ 0);
if (!mayTruncateUpTo.isNull() && seekTo > mayTruncateUpTo) {
seekTo = mayTruncateUpTo;
}
auto record = cursor->seek(seekTo, SeekableRecordCursor::BoundInclusion::kInclude);
if (!record) {
return {};
}
// for behavioral compatibility with ASC, don't entirely empty the oplog
// or remove the last unpinned entry (defined there to *include* the mayTruncateUpTo point)
if (record->id == newest->id || record->id == newestUnpinnedId) {
// reverse cursor, so one older
record = cursor->next();
if (!record) {
return {};
}
}
// byte and record increments are only advisory even when there's a size storer to look at them.
return Marker(/*.records =*/0, /*.bytes =*/0, record->id, expiryTime);
}
void CollectionTruncateMarkers::popOldestMarker() {
stdx::lock_guard<stdx::mutex> lk(_markersMutex);
_markers.pop_front();

View File

@ -120,6 +120,20 @@ public:
boost::optional<Marker> peekOldestMarkerIfNeeded(OperationContext* opCtx) const;
/**
* Creates a new marker object representing the newest oplog that may be truncated, if any.
* A record may be truncated if it is older than the expiry time, and if truncating would
* not remove all unpinned records.
* Returns a marker with the record to be truncated but without byte or record
* truncation counts.
* The marker queue is not used and popOldestMarker() should not be called after the use of
* this function.
*/
static boost::optional<Marker> newestExpiredRecord(OperationContext* opCtx,
RecordStore& recordStore,
const RecordId& mayTruncateUpTo,
Date_t expiryTime);
void popOldestMarker();
void createNewMarkerIfNeeded(const RecordId& lastRecord,

View File

@ -784,4 +784,112 @@ TEST_F(CollectionMarkersTest, ScanningWorksWithTruncate) {
yieldNotifier.join();
}
void checkMarker(const RecordId& expected,
OperationContext* opCtx,
RecordStore& rs,
RecordId pin,
Date_t expiryTime) {
auto marker = CollectionTruncateMarkers::newestExpiredRecord(opCtx, rs, pin, expiryTime);
// the first three of these are always the same regardless of the record store contents:
ASSERT_EQ(0, marker->records);
ASSERT_EQ(0, marker->bytes);
ASSERT_EQ(expiryTime, marker->wallTime);
// this is the assertion that might ever fail, so add some info about the other inputs
ASSERT_EQ(expected, marker->lastRecord) << " with expiry=" << expiryTime << " and pin=" << pin;
}
TEST_F(CollectionMarkersTest, TimeBasedMarkerConstruction) {
auto collNs = NamespaceString::createNamespaceString_forTest("test", "coll");
auto opCtx = getClient()->makeOperationContext();
createCollection(opCtx.get(), collNs);
std::vector<RecordIdAndWall> elements;
std::vector<RecordId> pins; // representing one second after each element
Date_t wallTime = Date_t::now();
// no truncatable record if oplog is empty
{
AutoGetCollection coll(opCtx.get(), collNs, MODE_IS);
ASSERT_FALSE(CollectionTruncateMarkers::newestExpiredRecord(
opCtx.get(), *coll->getRecordStore(), RecordId(), wallTime));
Timestamp ts(durationCount<Seconds>(wallTime.toDurationSinceEpoch()), 0);
ASSERT_FALSE(CollectionTruncateMarkers::newestExpiredRecord(
opCtx.get(), *coll->getRecordStore(), RecordId(ts.getSecs(), 0), wallTime));
}
// for this test we need real sequence numbers
{
AutoGetCollection coll(opCtx.get(), collNs, MODE_IX);
RecordStore& rs = *coll->getRecordStore();
const auto insertedData = std::string(50, 'a');
WriteUnitOfWork wuow(opCtx.get());
for (size_t i = 0; i < 20; ++i) {
Timestamp ts(durationCount<Seconds>(wallTime.toDurationSinceEpoch()), 0);
RecordId recordId = RecordId(ts.getSecs(), 0);
auto recordIdStatus = rs.insertRecord(opCtx.get(),
*shard_role_details::getRecoveryUnit(opCtx.get()),
recordId,
insertedData.data(),
insertedData.length(),
ts);
ASSERT_OK(recordIdStatus);
ASSERT_EQ(recordId, recordIdStatus.getValue());
elements.emplace_back(recordId, wallTime);
pins.emplace_back(RecordId(ts.getSecs() + 1, 0));
wallTime += Seconds(2);
}
wuow.commit();
}
AutoGetCollection coll(opCtx.get(), collNs, MODE_IS);
RecordStore& rs = *coll->getRecordStore();
// first, check when expiry time is the limiting factor, with and without pins
for (size_t i = 1; i < elements.size(); ++i) {
const RecordIdAndWall& element = elements.at(i);
RecordId expected = element.recordId;
// don't completely empty out the oplog! instead, test that looking up the newest wall time
// returns the second-newest recordId.
if (expected == elements.back().recordId) {
expected = elements[elements.size() - 2].recordId;
}
// exact match with no pin
checkMarker(expected, opCtx.get(), rs, RecordId(), element.wallTime);
// in between records with no pin should match older record,
// and expiry time newer than newest record should match newest record
checkMarker(expected, opCtx.get(), rs, RecordId(), element.wallTime + Seconds(1));
// check various pins when the expiry time is the limiting factor
// pin equal to expiry is covered below with pin-limited truncation
// note that pins require one unpinned entry to be retained, so don't start j equal to i
for (size_t j = i + 1; j < elements.size(); ++j) {
checkMarker(expected, opCtx.get(), rs, elements.at(j).recordId, element.wallTime);
checkMarker(expected, opCtx.get(), rs, pins.at(j), element.wallTime);
checkMarker(
expected, opCtx.get(), rs, elements.at(j).recordId, element.wallTime + Seconds(1));
checkMarker(expected, opCtx.get(), rs, pins.at(j), element.wallTime + Seconds(1));
}
}
// check various expiry times when pin is the limiting factor (or pin and expiry retain equally)
for (size_t i = 1; i < elements.size(); ++i) {
for (int j = 0; j < 4; ++j) {
// don't truncate the mayTruncateUpTo point if the pin is an exact match for an entry
checkMarker(elements.at(i - 1).recordId,
opCtx.get(),
rs,
elements.at(i).recordId,
elements.at(i).wallTime + Seconds(j));
// leave an entry before the mayTruncateUpTo point if the pin has no exact match
checkMarker(elements.at(i - 1).recordId,
opCtx.get(),
rs,
pins.at(i),
elements.at(i).wallTime + Seconds(j));
}
}
// corner cases not covered in the above for loops:
// no truncatable record if expiry time is older than oldest record
ASSERT_FALSE(CollectionTruncateMarkers::newestExpiredRecord(
opCtx.get(), rs, RecordId(), elements.at(0).wallTime - Seconds(1)));
// no truncatable record if oplog is all pinned
ASSERT_FALSE(CollectionTruncateMarkers::newestExpiredRecord(
opCtx.get(), rs, elements.at(0).recordId, wallTime + Seconds(25)));
// no truncatable record if all but one oplog entry is pinned, but not as an exact match
ASSERT_FALSE(CollectionTruncateMarkers::newestExpiredRecord(
opCtx.get(), rs, pins.at(0), wallTime + Seconds(25)));
}
} // namespace mongo

View File

@ -60,48 +60,6 @@
namespace mongo {
namespace {
/**
* Responsible for deleting oplog truncate markers once their max capacity has been reached.
*/
class OplogCapMaintainerThread final : public BackgroundJob {
public:
OplogCapMaintainerThread() : BackgroundJob(false /* deleteSelf */) {}
static OplogCapMaintainerThread* get(ServiceContext* serviceCtx);
static OplogCapMaintainerThread* get(OperationContext* opCtx);
static void set(ServiceContext* serviceCtx,
std::unique_ptr<OplogCapMaintainerThread> oplogCapMaintainerThread);
std::string name() const override {
return _name;
}
void run() override;
/**
* Waits until the maintainer thread finishes. Must not be called concurrently with start().
*/
void shutdown(const Status& reason);
private:
/**
* Returns true iff there was an oplog to delete from.
*/
bool _deleteExcessDocuments(OperationContext* opCtx);
// Serializes setting/resetting _uniqueCtx and marking _uniqueCtx killed.
mutable stdx::mutex _opCtxMutex;
// Saves a reference to the cap maintainer thread's operation context.
boost::optional<ServiceContext::UniqueOperationContext> _uniqueCtx;
mutable stdx::mutex _stateMutex;
bool _shuttingDown = false;
Status _shutdownReason = Status::OK();
std::string _name = std::string("OplogCapMaintainerThread-") +
toStringForLogging(NamespaceString::kRsOplogNamespace);
};
const auto getMaintainerThread =
ServiceContext::declareDecoration<std::unique_ptr<OplogCapMaintainerThread>>();
@ -247,6 +205,13 @@ void OplogCapMaintainerThread::set(
maintainerThread = std::move(oplogCapMaintainerThread);
}
Lock::GlobalLockOptions OplogCapMaintainerThread::_getOplogTruncationLockOptions() {
return {.skipFlowControlTicket = true,
.skipRSTLLock = true,
.skipDirectConnectionChecks = false,
.explicitIntent = rss::consensus::IntentRegistry::Intent::LocalWrite};
}
bool OplogCapMaintainerThread::_deleteExcessDocuments(OperationContext* opCtx) {
// A Global IX lock should be good enough to protect the oplog truncation from
// interruptions such as replication rollback. Database lock or collection lock is not
@ -282,10 +247,7 @@ bool OplogCapMaintainerThread::_deleteExcessDocuments(OperationContext* opCtx) {
MODE_IX,
Date_t::max(),
Lock::InterruptBehavior::kThrow,
{false,
true /* skipRstl */,
false,
rss::consensus::IntentRegistry::Intent::LocalWrite});
_getOplogTruncationLockOptions());
auto rs = LocalOplogInfo::get(opCtx)->getRecordStore();
if (!rs) {
LOGV2_DEBUG(9064300, 2, "oplog collection does not exist");
@ -295,7 +257,7 @@ bool OplogCapMaintainerThread::_deleteExcessDocuments(OperationContext* opCtx) {
auto mayTruncateUpTo = opCtx->getServiceContext()->getStorageEngine()->getPinnedOplog();
Timer timer;
oplog_truncation::reclaimOplog(opCtx, *rs, RecordId(mayTruncateUpTo.asULL()));
_reclaimOplog(opCtx, *rs, RecordId(mayTruncateUpTo.asULL()));
auto elapsedMicros = timer.micros();
totalTimeTruncating.fetchAndAdd(elapsedMicros);
@ -313,9 +275,15 @@ bool OplogCapMaintainerThread::_deleteExcessDocuments(OperationContext* opCtx) {
return true;
}
void OplogCapMaintainerThread::_reclaimOplog(OperationContext* opCtx,
RecordStore& rs,
RecordId mayTruncateUpTo) {
oplog_truncation::reclaimOplog(opCtx, rs, mayTruncateUpTo);
}
void OplogCapMaintainerThread::run() {
LOGV2(5295000, "Oplog cap maintainer thread started", "threadName"_attr = _name);
ThreadClient tc(_name,
LOGV2(5295000, "Oplog cap maintainer thread started", "threadName"_attr = name());
ThreadClient tc(name(),
getGlobalServiceContext()->getService(ClusterRole::ShardServer),
Client::noSession(),
ClientOperationKillableByStepdown{false});
@ -353,7 +321,9 @@ void OplogCapMaintainerThread::run() {
}
});
if (gOplogSamplingAsyncEnabled) {
// asynchronously regenerate truncation markers, if they are still needed when we get here.
auto& provider = rss::ReplicatedStorageService::get(_uniqueCtx->get()).getPersistenceProvider();
if (gOplogSamplingAsyncEnabled && provider.supportsOplogSampling()) {
try {
{
stdx::unique_lock<stdx::mutex> lk(_stateMutex);

View File

@ -30,6 +30,8 @@
#pragma once
#include "mongo/db/service_context.h"
#include "mongo/db/shard_role/lock_manager/d_concurrency.h"
#include "mongo/util/background.h"
#include "mongo/util/modules.h"
namespace MONGO_MOD_PUBLIC mongo {
@ -38,4 +40,58 @@ void startOplogCapMaintainerThread(ServiceContext* serviceContext,
bool shouldSkipOplogSampling);
void stopOplogCapMaintainerThread(ServiceContext* serviceContext, const Status& reason);
/**
* Responsible for deleting oplog truncate markers once their max capacity has been reached.
*/
class MONGO_MOD_OPEN OplogCapMaintainerThread : public BackgroundJob {
public:
OplogCapMaintainerThread() : BackgroundJob(false /* deleteSelf */) {}
static OplogCapMaintainerThread* get(ServiceContext* serviceCtx);
static OplogCapMaintainerThread* get(OperationContext* opCtx);
static void set(ServiceContext* serviceCtx,
std::unique_ptr<OplogCapMaintainerThread> oplogCapMaintainerThread);
std::string name() const override {
return _name;
}
void run() override;
/**
* Waits until the maintainer thread finishes. Must not be called concurrently with start().
*/
void shutdown(const Status& reason);
private:
/**
* Options for lock acquisition with an intent appropriate to the oplog being truncated.
*/
virtual Lock::GlobalLockOptions _getOplogTruncationLockOptions();
/**
* Returns true iff there was an oplog to delete from.
*/
bool _deleteExcessDocuments(OperationContext* opCtx);
/**
* Pass-through method for reclaiming oplog appropriately to the oplog being truncated.
*/
virtual void _reclaimOplog(OperationContext* opCtx, RecordStore& rs, RecordId mayTruncateUpTo);
// Serializes setting/resetting _uniqueCtx and marking _uniqueCtx killed.
mutable stdx::mutex _opCtxMutex;
// Saves a reference to the cap maintainer thread's operation context.
boost::optional<ServiceContext::UniqueOperationContext> _uniqueCtx;
mutable stdx::mutex _stateMutex;
bool _shuttingDown = false;
Status _shutdownReason = Status::OK();
std::string _name = std::string("OplogCapMaintainerThread-") +
toStringForLogging(NamespaceString::kRsOplogNamespace);
};
} // namespace MONGO_MOD_PUBLIC mongo

View File

@ -46,7 +46,6 @@
namespace mongo {
namespace {
const double kNumMSInHour = 1000 * 60 * 60;
MONGO_FAIL_POINT_DEFINE(hangDuringOplogSampling);
} // namespace
@ -174,6 +173,20 @@ void OplogTruncateMarkers::kill() {
_reclaimCv.notify_one();
}
Date_t OplogTruncateMarkers::newestExpiredWallTime(OperationContext* opCtx) {
// retention times can be fractional, so use floating point arithmetic for this calculation.
static const double kNumMSInHour = durationCount<Milliseconds>(Hours(1));
long minRetentionMS = storageGlobalParams.oplogMinRetentionHours.load() * kNumMSInHour;
// If retention by time is disabled, consumers with space-based retention will consider all
// records expired and consumers without will consider no records expired.
// The former case is handled separately in _hasExcessMarkers() in this file.
// The latter case is not handled elsewhere, so handle it here.
if (minRetentionMS == 0) {
return Date_t();
}
return opCtx->fastClockSource().now() - Milliseconds(minRetentionMS);
}
void OplogTruncateMarkers::clearMarkersOnCommit(OperationContext* opCtx) {
shard_role_details::getRecoveryUnit(opCtx)->onCommit(
[this](OperationContext*, boost::optional<Timestamp>) {
@ -253,6 +266,32 @@ bool OplogTruncateMarkers::awaitHasExcessMarkersOrDead(OperationContext* opCtx)
return !(_isDead || !isWaitConditionSatisfied);
}
bool OplogTruncateMarkers::awaitHasExpiredOplogOrDead(OperationContext* opCtx, RecordStore& rs) {
// The same mechanism (and the same private data) is used for canceling this wait as for
// awaitHasExcessMarkersOrDead, but since this is waiting for time-based truncation, factor
// the retention time into the wait period and not just the configured thread wake interval.
// Note that oplogMinRetentionHours can be fractional, so don't use Interval to calculate this.
double minRetentionSeconds = storageGlobalParams.oplogMinRetentionHours.load() * 60 * 60;
int checkPeriodSeconds = (minRetentionSeconds != 0.0)
? std::min<int>(gOplogTruncationCheckPeriodSeconds, minRetentionSeconds)
: gOplogTruncationCheckPeriodSeconds;
// Wait until kill() is called or oplog can be truncated.
stdx::unique_lock<stdx::mutex> lock(_reclaimMutex);
MONGO_IDLE_THREAD_BLOCK;
auto isWaitConditionSatisfied =
opCtx->waitForConditionOrInterruptFor(_reclaimCv, lock, Seconds(checkPeriodSeconds), [&] {
if (_isDead) {
return true;
}
RecordId pin(opCtx->getServiceContext()->getStorageEngine()->getPinnedOplog().asULL());
return newestExpiredRecord(opCtx, rs, pin, newestExpiredWallTime(opCtx)).has_value();
});
// Return true only when we have detected excess oplog, not because the record store
// is being destroyed (_isDead) or we timed out waiting on the condition variable.
return !(_isDead || !isWaitConditionSatisfied);
}
bool OplogTruncateMarkers::_hasExcessMarkers(OperationContext* opCtx) const {
int64_t totalBytes = 0;
for (const auto& marker : getMarkers()) {
@ -272,20 +311,12 @@ bool OplogTruncateMarkers::_hasExcessMarkers(OperationContext* opCtx) const {
return false;
}
double minRetentionHours = storageGlobalParams.oplogMinRetentionHours.load();
// If we are not checking for time, then yes, there is a truncate marker to be reaped
// because oplog is at capacity.
if (minRetentionHours == 0.0) {
if (storageGlobalParams.oplogMinRetentionHours.load() == 0.0) {
return true;
}
auto nowWall = Date_t::now();
auto lastTruncateMarkerWallTime = truncateMarker.wallTime;
auto currRetentionMS = durationCount<Milliseconds>(nowWall - lastTruncateMarkerWallTime);
double currRetentionHours = currRetentionMS / kNumMSInHour;
return currRetentionHours >= minRetentionHours;
return truncateMarker.wallTime <= newestExpiredWallTime(opCtx);
}
void OplogTruncateMarkers::adjust(int64_t maxSize) {

View File

@ -45,7 +45,7 @@ namespace MONGO_MOD_PUBLIC mongo {
// Keep "milestones" against the oplog to efficiently remove the old records when the collection
// grows beyond its desired maximum size.
class OplogTruncateMarkers final : public CollectionTruncateMarkers {
class MONGO_MOD_OPEN OplogTruncateMarkers : public CollectionTruncateMarkers {
public:
OplogTruncateMarkers(std::deque<CollectionTruncateMarkers::Marker> markers,
int64_t partialMarkerRecords,
@ -67,6 +67,11 @@ public:
*/
void kill();
/**
* The time after which oplog is retained by policy.
*/
static Date_t newestExpiredWallTime(OperationContext* opCtx);
/**
* Waits for excess oplog space to be available for reclamation.
* Returns true if we can proceed to reclaim space in the oplog.
@ -77,6 +82,16 @@ public:
*/
bool awaitHasExcessMarkersOrDead(OperationContext* opCtx) override;
/**
* Waits for expired oplog entries to be eligible for reclamation by time-based retention only.
* Returns true if we can proceed to reclaim space in the oplog by time.
* Otherwise, returns false if the containing record store instance is being destroyed
* or if we reached the deadline for waiting.
* Throws exception if interrupted.
* See 'oplogTruncationCheckPeriodSeconds' server parameter.
*/
bool awaitHasExpiredOplogOrDead(OperationContext* opCtx, RecordStore& rs);
// Clears all the markers of the instance whenever the current WUOW commits.
void clearMarkersOnCommit(OperationContext* opCtx);

View File

@ -47,6 +47,7 @@
#include "mongo/db/shard_role/shard_catalog/index_catalog_entry.h"
#include "mongo/db/shard_role/shard_catalog/index_descriptor.h"
#include "mongo/db/storage/record_store.h"
#include "mongo/db/storage/storage_options.h"
#include "mongo/db/storage/wiredtiger/wiredtiger_record_store.h"
#include "mongo/db/storage/write_unit_of_work.h"
#include "mongo/unittest/unittest.h"
@ -924,5 +925,14 @@ TEST_F(OplogTruncationTest, OplogTruncateMarkers_Duplicates) {
}
}
TEST_F(OplogTruncationTest, OplogTruncateMarkers_NewestExpiredWallTime) {
storageGlobalParams.oplogMinRetentionHours.store(24.0);
Date_t expected = Date_t::now() - Hours(24);
Date_t actual = OplogTruncateMarkers::newestExpiredWallTime(getOperationContext());
ASSERT_APPROX_EQUAL(expected.toMillisSinceEpoch(), actual.toMillisSinceEpoch(), 1000);
storageGlobalParams.oplogMinRetentionHours.store(0.0);
ASSERT_EQ(Date_t(), OplogTruncateMarkers::newestExpiredWallTime(getOperationContext()));
}
} // namespace repl
} // namespace mongo