diff --git a/docs/rwmutex.md b/docs/rwmutex.md index 3757958f805..e10e7f46769 100644 --- a/docs/rwmutex.md +++ b/docs/rwmutex.md @@ -2,7 +2,7 @@ The following are specialized in-house shared mutex types that allow exploiting use-case specific concurrency semantics to provide low overhead synchronization. Make sure to adopt these primitives -only if your use-case exactly matches the requirements listed below, or consult with the +only if your use-case exactly matches the requirements listed below, or consult with the Server Programmability team. ## WriteRarelyRWMutex @@ -20,3 +20,14 @@ with the number of cores. However, the cost of acquiring a write lock increases threads and can be hundreds of microseconds. Therefore, opt for using `WriteRarelyRWMutex` only when almost all accesses are reads (e.g. replication configuration), and avoid using this mutex type when writes are not an exception and could happen regularly. + +## RWMutex + +A reader-writer mutex type that is tailored for frequent reads, and occasional writes. Writers await +completion of active readers, while blocking any new reader. In comparison to `WriteRarelyRWMutex`, +reads are more expensive and less scalable in order to reduce the overhead of occasional writes. +Under the hood, `RWMutex` mimics a counter that records the number of active readers. Writers have +to wait until all readers retire, and block new readers by setting a write intent. This type could +outperform `std::shared_mutex` and `std::mutex` for specific use cases, therefore, prefer using the +alternatives from standard library unless there is a required performance budget to meet, as well as +strong evidence that using `RWMutex` helps with meeting those performance requirements. diff --git a/src/mongo/db/catalog/collection_catalog.cpp b/src/mongo/db/catalog/collection_catalog.cpp index be7670dbae7..0c92952b988 100644 --- a/src/mongo/db/catalog/collection_catalog.cpp +++ b/src/mongo/db/catalog/collection_catalog.cpp @@ -46,6 +46,7 @@ #include #include #include +#include #include "collection_catalog.h" @@ -77,6 +78,7 @@ #include "mongo/logv2/redaction.h" #include "mongo/platform/atomic_word.h" #include "mongo/platform/mutex.h" +#include "mongo/platform/rwmutex.h" #include "mongo/stdx/condition_variable.h" #include "mongo/util/assert_util.h" #include "mongo/util/database_name_util.h" @@ -107,7 +109,7 @@ constexpr auto kNumDurableCatalogScansDueToMissingMapping = "numScansDueToMissin class LatestCollectionCatalog { public: std::shared_ptr load() const { - std::lock_guard lk(_mutex); + std::shared_lock lk(_mutex); // NOLINT return _catalog; } @@ -126,7 +128,7 @@ public: } private: - mutable Mutex _mutex = MONGO_MAKE_LATCH("LatestCollectionCatalog::_mutex"); + mutable RWMutex _mutex; // TODO SERVER-56428: Replace with std::atomic when supported in our toolchain std::shared_ptr _catalog = std::make_shared(); }; diff --git a/src/mongo/db/s/collection_sharding_state.cpp b/src/mongo/db/s/collection_sharding_state.cpp index 7f6041c16c0..ffe36daffab 100644 --- a/src/mongo/db/s/collection_sharding_state.cpp +++ b/src/mongo/db/s/collection_sharding_state.cpp @@ -43,6 +43,7 @@ #include "mongo/db/server_options.h" #include "mongo/db/transaction_resources.h" #include "mongo/platform/mutex.h" +#include "mongo/platform/rwmutex.h" #include "mongo/util/assert_util_core.h" #include "mongo/util/decorable.h" #include "mongo/util/namespace_string_util.h" @@ -116,9 +117,8 @@ private: // Adding entries to `_collections` is expected to be very infrequent and far apart (collection // creation), so the majority of accesses to this map are read-only and benefit from using a - // shared mutex type for synchronization. The selected `std::shared_mutex` primitive prefers - // writers over readers so it is the appropriate choice for this use-case. - mutable std::shared_mutex _mutex; // NOLINT + // shared mutex type for synchronization. + mutable RWMutex _mutex; // Entries of the _collections map must never be deleted or replaced. This is to guarantee that // a 'nss' is always associated to the same 'ResourceMutex'. diff --git a/src/mongo/platform/rwmutex.h b/src/mongo/platform/rwmutex.h index a74b2469192..f4c39e38863 100644 --- a/src/mongo/platform/rwmutex.h +++ b/src/mongo/platform/rwmutex.h @@ -29,11 +29,108 @@ #pragma once +#include "mongo/platform/compiler.h" #include "mongo/platform/mutex.h" #include "mongo/platform/waitable_atomic.h" +#include "mongo/util/assert_util.h" namespace mongo { +/** + * A reader-writer mutex type that is optimized for frequent, short reads and infrequent writes. + * This type is not fair towards readers, as back-to-back writes may starve reads. Therefore, this + * type is not suitable for use-cases where the mutex is acquired in exclusive mode in a tight loop. + * + * Note that `RWMutex` is not interruptible and provides similar semantics to `std::shared_mutex`. + * Make sure to closely examine your code before using `RWMutex` over `Mutex` and verify that the + * synchronization pattern is a good match for `RWMutex`. + */ +class RWMutex { +public: + using StateType = uint32_t; + static constexpr StateType kWriteIntentMask = 1 << 31; + static constexpr StateType kReadersCountMask = ~kWriteIntentMask; + static constexpr StateType kReadersOverflowMask = 1 << 30; + + void lock() noexcept { + _writeMutex.lock(); + auto state = _state.fetchAndBitOr(kWriteIntentMask) | kWriteIntentMask; + while (state & kReadersCountMask) { + // Keep waiting here until there are no readers. Any new reader will notice the write + // intent and withdraw. + state = _state.wait(state); + } + } + + void unlock() noexcept { + _state.fetchAndBitXor(kWriteIntentMask); + _state.notifyAll(); + _writeMutex.unlock(); + } + + void lock_shared() noexcept { + if (auto state = _state.addAndFetch(1); + MONGO_unlikely(_hasPendingWriterOrTooManyReaders(state))) { + // A write is in progress. Clear the read intent and wait until we can lock for reading. + _waitAndThenLock(state); + } + } + + void unlock_shared() noexcept { + if (MONGO_unlikely(_state.subtractAndFetch(1) == kWriteIntentMask)) { + // A writer is waiting and this is the last reader, so we need to notify the waiters. + _state.notifyAll(); + } + } + +private: + friend void setWriteIntent_forTest(RWMutex& mutex) { + mutex._state.fetchAndBitOr(kWriteIntentMask); + } + + friend bool isWriteIntentSet_forTest(const RWMutex& mutex) { + return mutex._state.load() & kWriteIntentMask; + } + + friend void addReaders_forTest(RWMutex& mutex, uint32_t readers) { + mutex._state.fetchAndAdd(readers); + } + + friend bool hasWaiters_forTest(const RWMutex& mutex) { + return hasWaiters_forTest(mutex._state); + } + + friend size_t getReadersCount_forTest(const RWMutex& mutex) { + return mutex._state.load() & kReadersCountMask; + } + + inline bool _hasPendingWriterOrTooManyReaders(StateType state) const { + return state & (kWriteIntentMask | kReadersOverflowMask); + } + + MONGO_COMPILER_NOINLINE MONGO_COMPILER_COLD_FUNCTION void _waitAndThenLock(StateType state) { + do { + invariant(!(state & kReadersOverflowMask), "Too many readers have acquired the lock!"); + unlock_shared(); + while (state & kWriteIntentMask) { + // Wait here until the write intent is cleared. + state = _state.wait(state); + } + state = _state.addAndFetch(1); + } while (MONGO_unlikely(_hasPendingWriterOrTooManyReaders(state))); + } + + // Synchronizes writers, only allowing a single writer to acquire the mutex at any time. + Mutex _writeMutex; + + /** + * Bits [0 .. 29] represent the number of readers, allowing up to 2 ^ 30 - 1 concurrent reads. + * Bit 30 must remain zero and allows preventing too many readers. + * Bit 31 tracks the write intent. + */ + WaitableAtomic _state{0}; +}; + /** * A shared mutex type optimized for readers, with the assumption of infrequent writes. Under the * hood, it is very similar to a hazard pointer, where each thread maintains a list for its shared diff --git a/src/mongo/platform/rwmutex_bm.cpp b/src/mongo/platform/rwmutex_bm.cpp index 282c7b7be73..88109bface2 100644 --- a/src/mongo/platform/rwmutex_bm.cpp +++ b/src/mongo/platform/rwmutex_bm.cpp @@ -107,6 +107,24 @@ private: DataType _data; }; +template +class RWMutexController { +public: + explicit RWMutexController(DataType value) { + stdx::unique_lock lk(_mutex); + _data = value; + } + + auto read() const { + std::shared_lock lk(_mutex); // NOLINT + return _data; + } + +private: + mutable RWMutex _mutex; + DataType _data; +}; + template class ResourceMutexController { public: @@ -160,6 +178,9 @@ BENCHMARK_TEMPLATE_DEFINE_F(RWMutexBm, SharedMutex, SharedMutexController)(bench BENCHMARK_TEMPLATE_DEFINE_F(RWMutexBm, Mutex, MutexController)(benchmark::State& s) { run(s); } +BENCHMARK_TEMPLATE_DEFINE_F(RWMutexBm, RWMutex, RWMutexController)(benchmark::State& s) { + run(s); +} BENCHMARK_TEMPLATE_DEFINE_F(RWMutexBm, ResourceMutex, ResourceMutexController) (benchmark::State& s) { run(s); @@ -169,6 +190,7 @@ const auto kMaxThreads = ProcessInfo::getNumLogicalCores() * 2; BENCHMARK_REGISTER_F(RWMutexBm, WriteRarelyRWMutex)->ThreadRange(1, kMaxThreads); BENCHMARK_REGISTER_F(RWMutexBm, SharedMutex)->ThreadRange(1, kMaxThreads); BENCHMARK_REGISTER_F(RWMutexBm, Mutex)->ThreadRange(1, kMaxThreads); +BENCHMARK_REGISTER_F(RWMutexBm, RWMutex)->ThreadRange(1, kMaxThreads); #if REGISTER_RESOURCE_MUTEX_BENCHMARKS BENCHMARK_REGISTER_F(RWMutexBm, ResourceMutex)->ThreadRange(1, kMaxThreads); #endif diff --git a/src/mongo/platform/rwmutex_test.cpp b/src/mongo/platform/rwmutex_test.cpp index 022b2426189..ec3ec2ce686 100644 --- a/src/mongo/platform/rwmutex_test.cpp +++ b/src/mongo/platform/rwmutex_test.cpp @@ -27,11 +27,15 @@ * it in the license file. */ +#include #include #include "mongo/platform/rwmutex.h" #include "mongo/platform/waitable_atomic.h" +#include "mongo/stdx/mutex.h" +#include "mongo/stdx/thread.h" #include "mongo/unittest/barrier.h" +#include "mongo/unittest/death_test.h" #include "mongo/unittest/join_thread.h" #include "mongo/unittest/thread_assertion_monitor.h" #include "mongo/unittest/unittest.h" @@ -221,5 +225,138 @@ TEST_F(WriteRarelyRWMutexTest, MultiWriter) { ASSERT_EQ(counter, kTargetValue); } +TEST(RWMutex, OneWriterAtAnyTime) { + RWMutex mutex; + stdx::unique_lock lk(mutex); + ASSERT_FALSE(hasWaiters_forTest(mutex)); + ASSERT_TRUE(isWriteIntentSet_forTest(mutex)); + + unittest::ThreadAssertionMonitor monitor; + auto writer = monitor.spawn([&] { + std::lock_guard anotherLk(mutex); + ASSERT_FALSE(lk.owns_lock()); + }); + + // Best effort to allow `writer` to start and try to exclusively acquire `mutex`. + sleepFor(Microseconds(5)); + + lk.unlock(); + // Allow `writer` to proceed with acquiring the lock. + monitor.notifyDone(); + writer.join(); +} + +TEST(RWMutex, WriterWaitsForReader) { + RWMutex mutex; + std::shared_lock lk(mutex); // NOLINT + ASSERT_FALSE(hasWaiters_forTest(mutex)); + ASSERT_FALSE(isWriteIntentSet_forTest(mutex)); + + unittest::ThreadAssertionMonitor monitor; + auto writer = monitor.spawn([&] { + std::lock_guard lk(mutex); + ASSERT_EQ(getReadersCount_forTest(mutex), 0); + }); + + while (!hasWaiters_forTest(mutex)) { + // Wait until the writer notices the reader and proceeds to wait for it to retire. + } + ASSERT_TRUE(isWriteIntentSet_forTest(mutex)); + + lk.unlock(); + // Let the writer proceed with acquiring the lock. + monitor.notifyDone(); + writer.join(); +} + +TEST(RWMutex, NewReaderWaitsForWriter) { + RWMutex mutex; + stdx::unique_lock lk(mutex); + ASSERT_FALSE(hasWaiters_forTest(mutex)); + + unittest::ThreadAssertionMonitor monitor; + auto reader = monitor.spawn([&] { + std::shared_lock lk(mutex); // NOLINT + ASSERT_FALSE(isWriteIntentSet_forTest(mutex)); + }); + + while (!hasWaiters_forTest(mutex)) { + // The reader should start waiting on the mutex shortly, so keep checking. + } + + lk.unlock(); + // The reader may now acquire the lock and make progress. + monitor.notifyDone(); + reader.join(); +} + +DEATH_TEST(RWMutex, TooManyReaders, "invariant") { + RWMutex mutex; + addReaders_forTest(mutex, RWMutex::kReadersOverflowMask - 1); + // The following must hit an invariant since it exceeds the maximum number of readers locks. + mutex.lock_shared(); +} + +TEST(RWMutex, MultipleReaders) { + const auto kNumReaders = 16; + unittest::Barrier barrier(kNumReaders); + + RWMutex mutex; + std::vector readers; + for (auto i = 0; i < kNumReaders; ++i) { + readers.emplace_back([&] { + std::shared_lock lk(mutex); // NOLINT + barrier.countDownAndWait(); + }); + } +} + +TEST(RWMutex, MultipleReadersAndWriters) { + // Starts `kNumWorkers` worker threads and have them loop for a total of `kNumIterations`. + // Worker threads assign a global order to their local loop, and decide on acquiring a read or a + // write lock based on this global order: if the order is divisible by one thousand, the thread + // will acquire a write lock, and otherwise it will acquire a read lock. Each worker ensures + // that there are no readers or writers when successfully acquiring a read or a write lock, + // respectively. + const size_t kNumWorkers = 8; + const size_t kNumIterations = 5'000'000; + + RWMutex mutex; + Atomic counter{0}; + Atomic readers, writers; + + unittest::Barrier barrier(kNumWorkers); + std::vector workers(kNumWorkers); + + unittest::ThreadAssertionMonitor monitor; + for (auto& worker : workers) { + worker = monitor.spawn([&] { + barrier.countDownAndWait(); + while (true) { + const auto iteration = counter.fetchAndAdd(1); + if (iteration >= kNumIterations) + return; + + if (iteration % 1'000 == 0) { + stdx::lock_guard writeLk(mutex); + ASSERT_EQ(readers.loadRelaxed(), 0); + writers.fetchAndAddRelaxed(1); + ON_BLOCK_EXIT([&] { writers.fetchAndSubtractRelaxed(1); }); + } else { + std::shared_lock readLk(mutex); // NOLINT + ASSERT_EQ(writers.loadRelaxed(), 0); + readers.fetchAndAddRelaxed(1); + ON_BLOCK_EXIT([&] { readers.fetchAndSubtractRelaxed(1); }); + } + } + }); + } + + monitor.notifyDone(); + for (auto& worker : workers) { + worker.join(); + } +} + } // namespace } // namespace mongo