diff --git a/jstests/replsets/replicated_truncate.js b/jstests/replsets/replicated_truncate.js index ee4bb33645a..9d385ebebdf 100644 --- a/jstests/replsets/replicated_truncate.js +++ b/jstests/replsets/replicated_truncate.js @@ -57,7 +57,7 @@ function doTest(startIndex, endIndex, originalDocs, docsBeforeTruncate) { op: "c", ns: `${dbName}.$cmd`, o: { - "truncateRange": collName, + "truncateRange": coll.getFullName(), "minRecordId": originalDocs[startIndex].$recordId, "maxRecordId": originalDocs[endIndex].$recordId, "bytesDeleted": recordsDeleted, // just a placeholder diff --git a/src/mongo/db/collection_crud/collection_write_path.cpp b/src/mongo/db/collection_crud/collection_write_path.cpp index f185554c526..aac0ede2dbd 100644 --- a/src/mongo/db/collection_crud/collection_write_path.cpp +++ b/src/mongo/db/collection_crud/collection_write_path.cpp @@ -954,7 +954,7 @@ repl::OpTime truncateRange(OperationContext* opCtx, uassert(ErrorCodes::IllegalOperation, "Cannot perform ranged truncate in collections other than clustered collections as " "RecordIds must be equal across nodes", - collection->isClustered()); + collection->isClustered() || nss.isOplog()); uassert(ErrorCodes::IllegalOperation, "Cannot perform ranged truncate in collections with preimages enabled", !collection->isChangeStreamPreAndPostImagesEnabled()); diff --git a/src/mongo/db/op_observer/op_observer_impl.cpp b/src/mongo/db/op_observer/op_observer_impl.cpp index 8177cb112ce..66ee4434251 100644 --- a/src/mongo/db/op_observer/op_observer_impl.cpp +++ b/src/mongo/db/op_observer/op_observer_impl.cpp @@ -42,6 +42,7 @@ #include "mongo/db/index_builds/index_builds_common.h" #include "mongo/db/logical_time_validator.h" #include "mongo/db/namespace_string.h" +#include "mongo/db/namespace_string_reserved.h" #include "mongo/db/op_observer/batched_write_context.h" #include "mongo/db/op_observer/op_observer_util.h" #include "mongo/db/operation_context.h" @@ -2342,12 +2343,14 @@ void OpObserverImpl::onTruncateRange(OperationContext* opCtx, rss::ReplicatedStorageService::get(opCtx).getPersistenceProvider(), VersionContext::getDecoration(opCtx))); - TruncateRangeOplogEntry objectEntry( - std::string(coll->ns().coll()), minRecordId, maxRecordId, bytesDeleted, docsDeleted); + NamespaceString nss = coll->ns(); + TruncateRangeOplogEntry objectEntry(nss, minRecordId, maxRecordId, bytesDeleted, docsDeleted); MutableOplogEntry oplogEntry; oplogEntry.setOpType(repl::OpTypeEnum::kCommand); - oplogEntry.setNss(coll->ns().getCommandNS()); + // For oplog truncation, use admin.$cmd as the namespace to get around the isOplogDisabledFor() + // checks that usually prevent generating oplog entries for operations on the oplog. + oplogEntry.setNss(nss.isOplog() ? NamespaceString::kAdminCommandNamespace : nss.getCommandNS()); oplogEntry.setUuid(coll->uuid()); oplogEntry.setObject(objectEntry.toBSON()); opTime = logOperation(opCtx, &oplogEntry, true /*assignCommonFields*/, _operationLogger.get()); diff --git a/src/mongo/db/op_observer/op_observer_impl_test.cpp b/src/mongo/db/op_observer/op_observer_impl_test.cpp index 8978b4be368..b89a1ac66c6 100644 --- a/src/mongo/db/op_observer/op_observer_impl_test.cpp +++ b/src/mongo/db/op_observer/op_observer_impl_test.cpp @@ -1651,8 +1651,8 @@ TEST_F(OpObserverTest, TruncateRangeIsReplicated) { ASSERT_EQ(oplogEntry.getUuid(), autoColl->uuid()); const auto& o = oplogEntry.getObject(); - TruncateRangeOplogEntry objectEntry( - std::string(autoColl->ns().coll()), RecordId("a"), RecordId("b"), 1, 1); + // truncate range oplog entries include the db name in the namespace string + TruncateRangeOplogEntry objectEntry(autoColl->ns(), RecordId("a"), RecordId("b"), 1, 1); ASSERT_BSONOBJ_EQ(o, objectEntry.toBSON()); } diff --git a/src/mongo/db/repl/BUILD.bazel b/src/mongo/db/repl/BUILD.bazel index 1aa0ba597da..b6ae7f3436b 100644 --- a/src/mongo/db/repl/BUILD.bazel +++ b/src/mongo/db/repl/BUILD.bazel @@ -739,6 +739,7 @@ mongo_cc_library( ], deps = [ "//src/mongo:base", + "//src/mongo/db:server_base", "//src/mongo/idl:idl_parser", ], ) diff --git a/src/mongo/db/repl/local_oplog_info.cpp b/src/mongo/db/repl/local_oplog_info.cpp index d310f8a0757..d0c9eb80d43 100644 --- a/src/mongo/db/repl/local_oplog_info.cpp +++ b/src/mongo/db/repl/local_oplog_info.cpp @@ -96,11 +96,13 @@ void LocalOplogInfo::setRecordStore(OperationContext* opCtx, RecordStore* rs) { stdx::lock_guard lk(_rsMutex); _rs = rs; - // If the server was started in read-only mode, if we are restoring the node, or if async - // sampling is enabled, skip calculating the oplog truncate markers here. + // If the server was started in read-only mode, or we are restoring the node, don't truncate. + // If async sampling is enabled, skip calculating the oplog truncate markers here. + // Don't sample markers if sampling isn't supported (markers will need to be created elsewhere). + auto& provider = rss::ReplicatedStorageService::get(opCtx).getPersistenceProvider(); bool needsTruncateMarkers = opCtx->getServiceContext()->userWritesAllowed() && !storageGlobalParams.repair && !repl::ReplSettings::shouldSkipOplogSampling() && - !gOplogSamplingAsyncEnabled; + !gOplogSamplingAsyncEnabled && provider.supportsOplogSampling(); if (needsTruncateMarkers) { _truncateMarkers = OplogTruncateMarkers::createOplogTruncateMarkers(opCtx, *rs); } diff --git a/src/mongo/db/repl/oplog.cpp b/src/mongo/db/repl/oplog.cpp index 2a66e0bbc79..f5411d8b908 100644 --- a/src/mongo/db/repl/oplog.cpp +++ b/src/mongo/db/repl/oplog.cpp @@ -1237,7 +1237,11 @@ const StringMap kOpsMap = { -> Status { const auto& entry = *op; const auto& cmd = entry.getObject(); - const auto& ns = OplogApplication::extractNsFromCmd(entry.getNss().dbName(), cmd); + // for truncateRange, the full namespace including database name is in the + // first command element, rather than just the usual ns value without database name. + const auto& ns = NamespaceStringUtil::deserialize(boost::none, + cmd.firstElement().valueStringData(), + SerializationContext::stateDefault()); const auto truncateRangeEntry = TruncateRangeOplogEntry::parse(cmd); writeConflictRetryWithLimit(opCtx, "applyOps_truncateRange", ns, [&] { diff --git a/src/mongo/db/repl/oplog_applier_batcher.cpp b/src/mongo/db/repl/oplog_applier_batcher.cpp index 70e1d371fe9..936eb727141 100644 --- a/src/mongo/db/repl/oplog_applier_batcher.cpp +++ b/src/mongo/db/repl/oplog_applier_batcher.cpp @@ -321,7 +321,9 @@ OplogApplierBatcher::BatchAction OplogApplierBatcher::_getBatchActionForEntry( if (entry.getCommandType() == OplogEntry::CommandType::kTruncateRange) { const auto& cmd = entry.getObject(); - const auto& ns = OplogApplication::extractNsFromCmd(entry.getNss().dbName(), cmd); + const auto& ns = NamespaceStringUtil::deserialize(boost::none, + cmd.firstElement().valueStringData(), + SerializationContext::stateDefault()); if (ns.isChangeStreamPreImagesCollection()) { auto truncateRangeEntry = TruncateRangeOplogEntry::parse(cmd); const auto& maxRecordId = truncateRangeEntry.getMaxRecordId(); diff --git a/src/mongo/db/repl/oplog_applier_batcher_test_fixture.cpp b/src/mongo/db/repl/oplog_applier_batcher_test_fixture.cpp index 0ca5909d227..da85814629e 100644 --- a/src/mongo/db/repl/oplog_applier_batcher_test_fixture.cpp +++ b/src/mongo/db/repl/oplog_applier_batcher_test_fixture.cpp @@ -519,7 +519,7 @@ OplogEntry makeLargeRetryableWriteOplogEntries(int t, * Generates a truncateRange oplog entry */ OplogEntry makeTruncateRangeEntry(int t, const NamespaceString& nss, const RecordId& maxRecordId) { - TruncateRangeOplogEntry oField(std::string(nss.coll()), RecordId(), maxRecordId, 0, 0); + TruncateRangeOplogEntry oField(nss, RecordId(), maxRecordId, 0, 0); return {DurableOplogEntry(OpTime(Timestamp(t, 1), 1), // optime OpTypeEnum::kCommand, // op type nss.getCommandNS(), // namespace diff --git a/src/mongo/db/repl/truncate_range_oplog_entry.idl b/src/mongo/db/repl/truncate_range_oplog_entry.idl index dca509a39f5..4e9374533f5 100644 --- a/src/mongo/db/repl/truncate_range_oplog_entry.idl +++ b/src/mongo/db/repl/truncate_range_oplog_entry.idl @@ -40,8 +40,8 @@ structs: strict: false fields: truncateRange: - description: "Name of the collection being truncated" - type: string + description: "Full namespace for the collection being truncated" + type: namespacestring minRecordId: description: "Inclusive lower bound of the range to truncate" type: RecordId diff --git a/src/mongo/db/storage/collection_truncate_markers.cpp b/src/mongo/db/storage/collection_truncate_markers.cpp index 61d20f48f25..727b98c7ba0 100644 --- a/src/mongo/db/storage/collection_truncate_markers.cpp +++ b/src/mongo/db/storage/collection_truncate_markers.cpp @@ -87,6 +87,45 @@ CollectionTruncateMarkers::peekOldestMarkerIfNeeded(OperationContext* opCtx) con return _markers.front(); } +boost::optional CollectionTruncateMarkers::newestExpiredRecord( + OperationContext* opCtx, + RecordStore& recordStore, + const RecordId& mayTruncateUpTo, + Date_t expiryTime) { + // reverse cursor so that non-exact matches will give the next older entry, not the next newer + auto recoveryUnit = shard_role_details::getRecoveryUnit(opCtx); + auto cursor = recordStore.getCursor(opCtx, *recoveryUnit, /* forward */ false); + auto newest = cursor->next(); + RecordId newestUnpinnedId; + if (!mayTruncateUpTo.isNull()) { + auto newestUnpinned = + cursor->seek(mayTruncateUpTo, SeekableRecordCursor::BoundInclusion::kInclude); + if (newestUnpinned) { + newestUnpinnedId = newestUnpinned->id; + } + } + // it's OK to round off expiry time to the nearest second. + RecordId seekTo(durationCount(expiryTime.toDurationSinceEpoch()), /* low */ 0); + if (!mayTruncateUpTo.isNull() && seekTo > mayTruncateUpTo) { + seekTo = mayTruncateUpTo; + } + auto record = cursor->seek(seekTo, SeekableRecordCursor::BoundInclusion::kInclude); + if (!record) { + return {}; + } + // for behavioral compatibility with ASC, don't entirely empty the oplog + // or remove the last unpinned entry (defined there to *include* the mayTruncateUpTo point) + if (record->id == newest->id || record->id == newestUnpinnedId) { + // reverse cursor, so one older + record = cursor->next(); + if (!record) { + return {}; + } + } + // byte and record increments are only advisory even when there's a size storer to look at them. + return Marker(/*.records =*/0, /*.bytes =*/0, record->id, expiryTime); +} + void CollectionTruncateMarkers::popOldestMarker() { stdx::lock_guard lk(_markersMutex); _markers.pop_front(); diff --git a/src/mongo/db/storage/collection_truncate_markers.h b/src/mongo/db/storage/collection_truncate_markers.h index 6ad4d611b46..da06ae461f9 100644 --- a/src/mongo/db/storage/collection_truncate_markers.h +++ b/src/mongo/db/storage/collection_truncate_markers.h @@ -120,6 +120,20 @@ public: boost::optional peekOldestMarkerIfNeeded(OperationContext* opCtx) const; + /** + * Creates a new marker object representing the newest oplog that may be truncated, if any. + * A record may be truncated if it is older than the expiry time, and if truncating would + * not remove all unpinned records. + * Returns a marker with the record to be truncated but without byte or record + * truncation counts. + * The marker queue is not used and popOldestMarker() should not be called after the use of + * this function. + */ + static boost::optional newestExpiredRecord(OperationContext* opCtx, + RecordStore& recordStore, + const RecordId& mayTruncateUpTo, + Date_t expiryTime); + void popOldestMarker(); void createNewMarkerIfNeeded(const RecordId& lastRecord, diff --git a/src/mongo/db/storage/collection_truncate_markers_test.cpp b/src/mongo/db/storage/collection_truncate_markers_test.cpp index b8e1909df1d..93ec7249f44 100644 --- a/src/mongo/db/storage/collection_truncate_markers_test.cpp +++ b/src/mongo/db/storage/collection_truncate_markers_test.cpp @@ -784,4 +784,112 @@ TEST_F(CollectionMarkersTest, ScanningWorksWithTruncate) { yieldNotifier.join(); } +void checkMarker(const RecordId& expected, + OperationContext* opCtx, + RecordStore& rs, + RecordId pin, + Date_t expiryTime) { + auto marker = CollectionTruncateMarkers::newestExpiredRecord(opCtx, rs, pin, expiryTime); + // the first three of these are always the same regardless of the record store contents: + ASSERT_EQ(0, marker->records); + ASSERT_EQ(0, marker->bytes); + ASSERT_EQ(expiryTime, marker->wallTime); + // this is the assertion that might ever fail, so add some info about the other inputs + ASSERT_EQ(expected, marker->lastRecord) << " with expiry=" << expiryTime << " and pin=" << pin; +} + +TEST_F(CollectionMarkersTest, TimeBasedMarkerConstruction) { + auto collNs = NamespaceString::createNamespaceString_forTest("test", "coll"); + auto opCtx = getClient()->makeOperationContext(); + createCollection(opCtx.get(), collNs); + std::vector elements; + std::vector pins; // representing one second after each element + Date_t wallTime = Date_t::now(); + // no truncatable record if oplog is empty + { + AutoGetCollection coll(opCtx.get(), collNs, MODE_IS); + ASSERT_FALSE(CollectionTruncateMarkers::newestExpiredRecord( + opCtx.get(), *coll->getRecordStore(), RecordId(), wallTime)); + Timestamp ts(durationCount(wallTime.toDurationSinceEpoch()), 0); + ASSERT_FALSE(CollectionTruncateMarkers::newestExpiredRecord( + opCtx.get(), *coll->getRecordStore(), RecordId(ts.getSecs(), 0), wallTime)); + } + // for this test we need real sequence numbers + { + AutoGetCollection coll(opCtx.get(), collNs, MODE_IX); + RecordStore& rs = *coll->getRecordStore(); + const auto insertedData = std::string(50, 'a'); + WriteUnitOfWork wuow(opCtx.get()); + for (size_t i = 0; i < 20; ++i) { + Timestamp ts(durationCount(wallTime.toDurationSinceEpoch()), 0); + RecordId recordId = RecordId(ts.getSecs(), 0); + auto recordIdStatus = rs.insertRecord(opCtx.get(), + *shard_role_details::getRecoveryUnit(opCtx.get()), + recordId, + insertedData.data(), + insertedData.length(), + ts); + ASSERT_OK(recordIdStatus); + ASSERT_EQ(recordId, recordIdStatus.getValue()); + elements.emplace_back(recordId, wallTime); + pins.emplace_back(RecordId(ts.getSecs() + 1, 0)); + wallTime += Seconds(2); + } + wuow.commit(); + } + AutoGetCollection coll(opCtx.get(), collNs, MODE_IS); + RecordStore& rs = *coll->getRecordStore(); + // first, check when expiry time is the limiting factor, with and without pins + for (size_t i = 1; i < elements.size(); ++i) { + const RecordIdAndWall& element = elements.at(i); + RecordId expected = element.recordId; + // don't completely empty out the oplog! instead, test that looking up the newest wall time + // returns the second-newest recordId. + if (expected == elements.back().recordId) { + expected = elements[elements.size() - 2].recordId; + } + // exact match with no pin + checkMarker(expected, opCtx.get(), rs, RecordId(), element.wallTime); + // in between records with no pin should match older record, + // and expiry time newer than newest record should match newest record + checkMarker(expected, opCtx.get(), rs, RecordId(), element.wallTime + Seconds(1)); + // check various pins when the expiry time is the limiting factor + // pin equal to expiry is covered below with pin-limited truncation + // note that pins require one unpinned entry to be retained, so don't start j equal to i + for (size_t j = i + 1; j < elements.size(); ++j) { + checkMarker(expected, opCtx.get(), rs, elements.at(j).recordId, element.wallTime); + checkMarker(expected, opCtx.get(), rs, pins.at(j), element.wallTime); + checkMarker( + expected, opCtx.get(), rs, elements.at(j).recordId, element.wallTime + Seconds(1)); + checkMarker(expected, opCtx.get(), rs, pins.at(j), element.wallTime + Seconds(1)); + } + } + // check various expiry times when pin is the limiting factor (or pin and expiry retain equally) + for (size_t i = 1; i < elements.size(); ++i) { + for (int j = 0; j < 4; ++j) { + // don't truncate the mayTruncateUpTo point if the pin is an exact match for an entry + checkMarker(elements.at(i - 1).recordId, + opCtx.get(), + rs, + elements.at(i).recordId, + elements.at(i).wallTime + Seconds(j)); + // leave an entry before the mayTruncateUpTo point if the pin has no exact match + checkMarker(elements.at(i - 1).recordId, + opCtx.get(), + rs, + pins.at(i), + elements.at(i).wallTime + Seconds(j)); + } + } + // corner cases not covered in the above for loops: + // no truncatable record if expiry time is older than oldest record + ASSERT_FALSE(CollectionTruncateMarkers::newestExpiredRecord( + opCtx.get(), rs, RecordId(), elements.at(0).wallTime - Seconds(1))); + // no truncatable record if oplog is all pinned + ASSERT_FALSE(CollectionTruncateMarkers::newestExpiredRecord( + opCtx.get(), rs, elements.at(0).recordId, wallTime + Seconds(25))); + // no truncatable record if all but one oplog entry is pinned, but not as an exact match + ASSERT_FALSE(CollectionTruncateMarkers::newestExpiredRecord( + opCtx.get(), rs, pins.at(0), wallTime + Seconds(25))); +} } // namespace mongo diff --git a/src/mongo/db/storage/oplog_cap_maintainer_thread.cpp b/src/mongo/db/storage/oplog_cap_maintainer_thread.cpp index 5f208eba0ab..d29fa9f392a 100644 --- a/src/mongo/db/storage/oplog_cap_maintainer_thread.cpp +++ b/src/mongo/db/storage/oplog_cap_maintainer_thread.cpp @@ -60,48 +60,6 @@ namespace mongo { namespace { -/** - * Responsible for deleting oplog truncate markers once their max capacity has been reached. - */ -class OplogCapMaintainerThread final : public BackgroundJob { -public: - OplogCapMaintainerThread() : BackgroundJob(false /* deleteSelf */) {} - - static OplogCapMaintainerThread* get(ServiceContext* serviceCtx); - static OplogCapMaintainerThread* get(OperationContext* opCtx); - static void set(ServiceContext* serviceCtx, - std::unique_ptr oplogCapMaintainerThread); - - std::string name() const override { - return _name; - } - - void run() override; - - /** - * Waits until the maintainer thread finishes. Must not be called concurrently with start(). - */ - void shutdown(const Status& reason); - -private: - /** - * Returns true iff there was an oplog to delete from. - */ - bool _deleteExcessDocuments(OperationContext* opCtx); - - // Serializes setting/resetting _uniqueCtx and marking _uniqueCtx killed. - mutable stdx::mutex _opCtxMutex; - - // Saves a reference to the cap maintainer thread's operation context. - boost::optional _uniqueCtx; - - mutable stdx::mutex _stateMutex; - bool _shuttingDown = false; - Status _shutdownReason = Status::OK(); - - std::string _name = std::string("OplogCapMaintainerThread-") + - toStringForLogging(NamespaceString::kRsOplogNamespace); -}; const auto getMaintainerThread = ServiceContext::declareDecoration>(); @@ -247,6 +205,13 @@ void OplogCapMaintainerThread::set( maintainerThread = std::move(oplogCapMaintainerThread); } +Lock::GlobalLockOptions OplogCapMaintainerThread::_getOplogTruncationLockOptions() { + return {.skipFlowControlTicket = true, + .skipRSTLLock = true, + .skipDirectConnectionChecks = false, + .explicitIntent = rss::consensus::IntentRegistry::Intent::LocalWrite}; +} + bool OplogCapMaintainerThread::_deleteExcessDocuments(OperationContext* opCtx) { // A Global IX lock should be good enough to protect the oplog truncation from // interruptions such as replication rollback. Database lock or collection lock is not @@ -282,10 +247,7 @@ bool OplogCapMaintainerThread::_deleteExcessDocuments(OperationContext* opCtx) { MODE_IX, Date_t::max(), Lock::InterruptBehavior::kThrow, - {false, - true /* skipRstl */, - false, - rss::consensus::IntentRegistry::Intent::LocalWrite}); + _getOplogTruncationLockOptions()); auto rs = LocalOplogInfo::get(opCtx)->getRecordStore(); if (!rs) { LOGV2_DEBUG(9064300, 2, "oplog collection does not exist"); @@ -295,7 +257,7 @@ bool OplogCapMaintainerThread::_deleteExcessDocuments(OperationContext* opCtx) { auto mayTruncateUpTo = opCtx->getServiceContext()->getStorageEngine()->getPinnedOplog(); Timer timer; - oplog_truncation::reclaimOplog(opCtx, *rs, RecordId(mayTruncateUpTo.asULL())); + _reclaimOplog(opCtx, *rs, RecordId(mayTruncateUpTo.asULL())); auto elapsedMicros = timer.micros(); totalTimeTruncating.fetchAndAdd(elapsedMicros); @@ -313,9 +275,15 @@ bool OplogCapMaintainerThread::_deleteExcessDocuments(OperationContext* opCtx) { return true; } +void OplogCapMaintainerThread::_reclaimOplog(OperationContext* opCtx, + RecordStore& rs, + RecordId mayTruncateUpTo) { + oplog_truncation::reclaimOplog(opCtx, rs, mayTruncateUpTo); +} + void OplogCapMaintainerThread::run() { - LOGV2(5295000, "Oplog cap maintainer thread started", "threadName"_attr = _name); - ThreadClient tc(_name, + LOGV2(5295000, "Oplog cap maintainer thread started", "threadName"_attr = name()); + ThreadClient tc(name(), getGlobalServiceContext()->getService(ClusterRole::ShardServer), Client::noSession(), ClientOperationKillableByStepdown{false}); @@ -353,7 +321,9 @@ void OplogCapMaintainerThread::run() { } }); - if (gOplogSamplingAsyncEnabled) { + // asynchronously regenerate truncation markers, if they are still needed when we get here. + auto& provider = rss::ReplicatedStorageService::get(_uniqueCtx->get()).getPersistenceProvider(); + if (gOplogSamplingAsyncEnabled && provider.supportsOplogSampling()) { try { { stdx::unique_lock lk(_stateMutex); diff --git a/src/mongo/db/storage/oplog_cap_maintainer_thread.h b/src/mongo/db/storage/oplog_cap_maintainer_thread.h index d20ac2762c2..c91703d3196 100644 --- a/src/mongo/db/storage/oplog_cap_maintainer_thread.h +++ b/src/mongo/db/storage/oplog_cap_maintainer_thread.h @@ -30,6 +30,8 @@ #pragma once #include "mongo/db/service_context.h" +#include "mongo/db/shard_role/lock_manager/d_concurrency.h" +#include "mongo/util/background.h" #include "mongo/util/modules.h" namespace MONGO_MOD_PUBLIC mongo { @@ -38,4 +40,58 @@ void startOplogCapMaintainerThread(ServiceContext* serviceContext, bool shouldSkipOplogSampling); void stopOplogCapMaintainerThread(ServiceContext* serviceContext, const Status& reason); + +/** + * Responsible for deleting oplog truncate markers once their max capacity has been reached. + */ +class MONGO_MOD_OPEN OplogCapMaintainerThread : public BackgroundJob { +public: + OplogCapMaintainerThread() : BackgroundJob(false /* deleteSelf */) {} + + static OplogCapMaintainerThread* get(ServiceContext* serviceCtx); + static OplogCapMaintainerThread* get(OperationContext* opCtx); + static void set(ServiceContext* serviceCtx, + std::unique_ptr oplogCapMaintainerThread); + + std::string name() const override { + return _name; + } + + void run() override; + + /** + * Waits until the maintainer thread finishes. Must not be called concurrently with start(). + */ + void shutdown(const Status& reason); + +private: + /** + * Options for lock acquisition with an intent appropriate to the oplog being truncated. + */ + virtual Lock::GlobalLockOptions _getOplogTruncationLockOptions(); + + /** + * Returns true iff there was an oplog to delete from. + */ + bool _deleteExcessDocuments(OperationContext* opCtx); + + /** + * Pass-through method for reclaiming oplog appropriately to the oplog being truncated. + */ + virtual void _reclaimOplog(OperationContext* opCtx, RecordStore& rs, RecordId mayTruncateUpTo); + + // Serializes setting/resetting _uniqueCtx and marking _uniqueCtx killed. + mutable stdx::mutex _opCtxMutex; + + // Saves a reference to the cap maintainer thread's operation context. + boost::optional _uniqueCtx; + + mutable stdx::mutex _stateMutex; + bool _shuttingDown = false; + Status _shutdownReason = Status::OK(); + + std::string _name = std::string("OplogCapMaintainerThread-") + + toStringForLogging(NamespaceString::kRsOplogNamespace); +}; + } // namespace MONGO_MOD_PUBLIC mongo diff --git a/src/mongo/db/storage/oplog_truncate_markers.cpp b/src/mongo/db/storage/oplog_truncate_markers.cpp index 226cc61c2c8..e2bb5bf87c7 100644 --- a/src/mongo/db/storage/oplog_truncate_markers.cpp +++ b/src/mongo/db/storage/oplog_truncate_markers.cpp @@ -46,7 +46,6 @@ namespace mongo { namespace { -const double kNumMSInHour = 1000 * 60 * 60; MONGO_FAIL_POINT_DEFINE(hangDuringOplogSampling); } // namespace @@ -174,6 +173,20 @@ void OplogTruncateMarkers::kill() { _reclaimCv.notify_one(); } +Date_t OplogTruncateMarkers::newestExpiredWallTime(OperationContext* opCtx) { + // retention times can be fractional, so use floating point arithmetic for this calculation. + static const double kNumMSInHour = durationCount(Hours(1)); + long minRetentionMS = storageGlobalParams.oplogMinRetentionHours.load() * kNumMSInHour; + // If retention by time is disabled, consumers with space-based retention will consider all + // records expired and consumers without will consider no records expired. + // The former case is handled separately in _hasExcessMarkers() in this file. + // The latter case is not handled elsewhere, so handle it here. + if (minRetentionMS == 0) { + return Date_t(); + } + return opCtx->fastClockSource().now() - Milliseconds(minRetentionMS); +} + void OplogTruncateMarkers::clearMarkersOnCommit(OperationContext* opCtx) { shard_role_details::getRecoveryUnit(opCtx)->onCommit( [this](OperationContext*, boost::optional) { @@ -253,6 +266,32 @@ bool OplogTruncateMarkers::awaitHasExcessMarkersOrDead(OperationContext* opCtx) return !(_isDead || !isWaitConditionSatisfied); } +bool OplogTruncateMarkers::awaitHasExpiredOplogOrDead(OperationContext* opCtx, RecordStore& rs) { + // The same mechanism (and the same private data) is used for canceling this wait as for + // awaitHasExcessMarkersOrDead, but since this is waiting for time-based truncation, factor + // the retention time into the wait period and not just the configured thread wake interval. + // Note that oplogMinRetentionHours can be fractional, so don't use Interval to calculate this. + double minRetentionSeconds = storageGlobalParams.oplogMinRetentionHours.load() * 60 * 60; + int checkPeriodSeconds = (minRetentionSeconds != 0.0) + ? std::min(gOplogTruncationCheckPeriodSeconds, minRetentionSeconds) + : gOplogTruncationCheckPeriodSeconds; + // Wait until kill() is called or oplog can be truncated. + stdx::unique_lock lock(_reclaimMutex); + MONGO_IDLE_THREAD_BLOCK; + auto isWaitConditionSatisfied = + opCtx->waitForConditionOrInterruptFor(_reclaimCv, lock, Seconds(checkPeriodSeconds), [&] { + if (_isDead) { + return true; + } + RecordId pin(opCtx->getServiceContext()->getStorageEngine()->getPinnedOplog().asULL()); + return newestExpiredRecord(opCtx, rs, pin, newestExpiredWallTime(opCtx)).has_value(); + }); + + // Return true only when we have detected excess oplog, not because the record store + // is being destroyed (_isDead) or we timed out waiting on the condition variable. + return !(_isDead || !isWaitConditionSatisfied); +} + bool OplogTruncateMarkers::_hasExcessMarkers(OperationContext* opCtx) const { int64_t totalBytes = 0; for (const auto& marker : getMarkers()) { @@ -272,20 +311,12 @@ bool OplogTruncateMarkers::_hasExcessMarkers(OperationContext* opCtx) const { return false; } - double minRetentionHours = storageGlobalParams.oplogMinRetentionHours.load(); - // If we are not checking for time, then yes, there is a truncate marker to be reaped // because oplog is at capacity. - if (minRetentionHours == 0.0) { + if (storageGlobalParams.oplogMinRetentionHours.load() == 0.0) { return true; } - - auto nowWall = Date_t::now(); - auto lastTruncateMarkerWallTime = truncateMarker.wallTime; - - auto currRetentionMS = durationCount(nowWall - lastTruncateMarkerWallTime); - double currRetentionHours = currRetentionMS / kNumMSInHour; - return currRetentionHours >= minRetentionHours; + return truncateMarker.wallTime <= newestExpiredWallTime(opCtx); } void OplogTruncateMarkers::adjust(int64_t maxSize) { diff --git a/src/mongo/db/storage/oplog_truncate_markers.h b/src/mongo/db/storage/oplog_truncate_markers.h index 75dfd908dc1..9e2482c5e19 100644 --- a/src/mongo/db/storage/oplog_truncate_markers.h +++ b/src/mongo/db/storage/oplog_truncate_markers.h @@ -45,7 +45,7 @@ namespace MONGO_MOD_PUBLIC mongo { // Keep "milestones" against the oplog to efficiently remove the old records when the collection // grows beyond its desired maximum size. -class OplogTruncateMarkers final : public CollectionTruncateMarkers { +class MONGO_MOD_OPEN OplogTruncateMarkers : public CollectionTruncateMarkers { public: OplogTruncateMarkers(std::deque markers, int64_t partialMarkerRecords, @@ -67,6 +67,11 @@ public: */ void kill(); + /** + * The time after which oplog is retained by policy. + */ + static Date_t newestExpiredWallTime(OperationContext* opCtx); + /** * Waits for excess oplog space to be available for reclamation. * Returns true if we can proceed to reclaim space in the oplog. @@ -77,6 +82,16 @@ public: */ bool awaitHasExcessMarkersOrDead(OperationContext* opCtx) override; + /** + * Waits for expired oplog entries to be eligible for reclamation by time-based retention only. + * Returns true if we can proceed to reclaim space in the oplog by time. + * Otherwise, returns false if the containing record store instance is being destroyed + * or if we reached the deadline for waiting. + * Throws exception if interrupted. + * See 'oplogTruncationCheckPeriodSeconds' server parameter. + */ + bool awaitHasExpiredOplogOrDead(OperationContext* opCtx, RecordStore& rs); + // Clears all the markers of the instance whenever the current WUOW commits. void clearMarkersOnCommit(OperationContext* opCtx); diff --git a/src/mongo/db/storage/oplog_truncation_test.cpp b/src/mongo/db/storage/oplog_truncation_test.cpp index 3fb45a086b1..3de762e6b31 100644 --- a/src/mongo/db/storage/oplog_truncation_test.cpp +++ b/src/mongo/db/storage/oplog_truncation_test.cpp @@ -47,6 +47,7 @@ #include "mongo/db/shard_role/shard_catalog/index_catalog_entry.h" #include "mongo/db/shard_role/shard_catalog/index_descriptor.h" #include "mongo/db/storage/record_store.h" +#include "mongo/db/storage/storage_options.h" #include "mongo/db/storage/wiredtiger/wiredtiger_record_store.h" #include "mongo/db/storage/write_unit_of_work.h" #include "mongo/unittest/unittest.h" @@ -924,5 +925,14 @@ TEST_F(OplogTruncationTest, OplogTruncateMarkers_Duplicates) { } } +TEST_F(OplogTruncationTest, OplogTruncateMarkers_NewestExpiredWallTime) { + storageGlobalParams.oplogMinRetentionHours.store(24.0); + Date_t expected = Date_t::now() - Hours(24); + Date_t actual = OplogTruncateMarkers::newestExpiredWallTime(getOperationContext()); + ASSERT_APPROX_EQUAL(expected.toMillisSinceEpoch(), actual.toMillisSinceEpoch(), 1000); + storageGlobalParams.oplogMinRetentionHours.store(0.0); + ASSERT_EQ(Date_t(), OplogTruncateMarkers::newestExpiredWallTime(getOperationContext())); +} + } // namespace repl } // namespace mongo