SERVER-90424 A reader-friendly reader-writer mutex type (#22409)

GitOrigin-RevId: e1bc79a91c27159e29e0f1577fca7ffa54be39af
This commit is contained in:
Saman Memaripour 2024-05-23 11:02:12 -04:00 committed by MongoDB Bot
parent 9dfcda4b34
commit d4d165b630
6 changed files with 275 additions and 6 deletions

View File

@ -2,7 +2,7 @@
The following are specialized in-house shared mutex types that allow exploiting use-case specific
concurrency semantics to provide low overhead synchronization. Make sure to adopt these primitives
only if your use-case exactly matches the requirements listed below, or consult with the
only if your use-case exactly matches the requirements listed below, or consult with the Server
Programmability team.
## WriteRarelyRWMutex
@ -20,3 +20,14 @@ with the number of cores. However, the cost of acquiring a write lock increases
threads and can be hundreds of microseconds. Therefore, opt for using `WriteRarelyRWMutex` only when
almost all accesses are reads (e.g. replication configuration), and avoid using this mutex type when
writes are not an exception and could happen regularly.
## RWMutex
A reader-writer mutex type that is tailored for frequent reads, and occasional writes. Writers await
completion of active readers, while blocking any new reader. In comparison to `WriteRarelyRWMutex`,
reads are more expensive and less scalable in order to reduce the overhead of occasional writes.
Under the hood, `RWMutex` mimics a counter that records the number of active readers. Writers have
to wait until all readers retire, and block new readers by setting a write intent. This type could
outperform `std::shared_mutex` and `std::mutex` for specific use cases, therefore, prefer using the
alternatives from standard library unless there is a required performance budget to meet, as well as
strong evidence that using `RWMutex` helps with meeting those performance requirements.

View File

@ -46,6 +46,7 @@
#include <exception>
#include <list>
#include <mutex>
#include <shared_mutex>
#include "collection_catalog.h"
@ -77,6 +78,7 @@
#include "mongo/logv2/redaction.h"
#include "mongo/platform/atomic_word.h"
#include "mongo/platform/mutex.h"
#include "mongo/platform/rwmutex.h"
#include "mongo/stdx/condition_variable.h"
#include "mongo/util/assert_util.h"
#include "mongo/util/database_name_util.h"
@ -107,7 +109,7 @@ constexpr auto kNumDurableCatalogScansDueToMissingMapping = "numScansDueToMissin
class LatestCollectionCatalog {
public:
std::shared_ptr<CollectionCatalog> load() const {
std::lock_guard lk(_mutex);
std::shared_lock lk(_mutex); // NOLINT
return _catalog;
}
@ -126,7 +128,7 @@ public:
}
private:
mutable Mutex _mutex = MONGO_MAKE_LATCH("LatestCollectionCatalog::_mutex");
mutable RWMutex _mutex;
// TODO SERVER-56428: Replace with std::atomic<std::shared_ptr> when supported in our toolchain
std::shared_ptr<CollectionCatalog> _catalog = std::make_shared<CollectionCatalog>();
};

View File

@ -43,6 +43,7 @@
#include "mongo/db/server_options.h"
#include "mongo/db/transaction_resources.h"
#include "mongo/platform/mutex.h"
#include "mongo/platform/rwmutex.h"
#include "mongo/util/assert_util_core.h"
#include "mongo/util/decorable.h"
#include "mongo/util/namespace_string_util.h"
@ -116,9 +117,8 @@ private:
// Adding entries to `_collections` is expected to be very infrequent and far apart (collection
// creation), so the majority of accesses to this map are read-only and benefit from using a
// shared mutex type for synchronization. The selected `std::shared_mutex` primitive prefers
// writers over readers so it is the appropriate choice for this use-case.
mutable std::shared_mutex _mutex; // NOLINT
// shared mutex type for synchronization.
mutable RWMutex _mutex;
// Entries of the _collections map must never be deleted or replaced. This is to guarantee that
// a 'nss' is always associated to the same 'ResourceMutex'.

View File

@ -29,11 +29,108 @@
#pragma once
#include "mongo/platform/compiler.h"
#include "mongo/platform/mutex.h"
#include "mongo/platform/waitable_atomic.h"
#include "mongo/util/assert_util.h"
namespace mongo {
/**
* A reader-writer mutex type that is optimized for frequent, short reads and infrequent writes.
* This type is not fair towards readers, as back-to-back writes may starve reads. Therefore, this
* type is not suitable for use-cases where the mutex is acquired in exclusive mode in a tight loop.
*
* Note that `RWMutex` is not interruptible and provides similar semantics to `std::shared_mutex`.
* Make sure to closely examine your code before using `RWMutex` over `Mutex` and verify that the
* synchronization pattern is a good match for `RWMutex`.
*/
class RWMutex {
public:
using StateType = uint32_t;
static constexpr StateType kWriteIntentMask = 1 << 31;
static constexpr StateType kReadersCountMask = ~kWriteIntentMask;
static constexpr StateType kReadersOverflowMask = 1 << 30;
void lock() noexcept {
_writeMutex.lock();
auto state = _state.fetchAndBitOr(kWriteIntentMask) | kWriteIntentMask;
while (state & kReadersCountMask) {
// Keep waiting here until there are no readers. Any new reader will notice the write
// intent and withdraw.
state = _state.wait(state);
}
}
void unlock() noexcept {
_state.fetchAndBitXor(kWriteIntentMask);
_state.notifyAll();
_writeMutex.unlock();
}
void lock_shared() noexcept {
if (auto state = _state.addAndFetch(1);
MONGO_unlikely(_hasPendingWriterOrTooManyReaders(state))) {
// A write is in progress. Clear the read intent and wait until we can lock for reading.
_waitAndThenLock(state);
}
}
void unlock_shared() noexcept {
if (MONGO_unlikely(_state.subtractAndFetch(1) == kWriteIntentMask)) {
// A writer is waiting and this is the last reader, so we need to notify the waiters.
_state.notifyAll();
}
}
private:
friend void setWriteIntent_forTest(RWMutex& mutex) {
mutex._state.fetchAndBitOr(kWriteIntentMask);
}
friend bool isWriteIntentSet_forTest(const RWMutex& mutex) {
return mutex._state.load() & kWriteIntentMask;
}
friend void addReaders_forTest(RWMutex& mutex, uint32_t readers) {
mutex._state.fetchAndAdd(readers);
}
friend bool hasWaiters_forTest(const RWMutex& mutex) {
return hasWaiters_forTest(mutex._state);
}
friend size_t getReadersCount_forTest(const RWMutex& mutex) {
return mutex._state.load() & kReadersCountMask;
}
inline bool _hasPendingWriterOrTooManyReaders(StateType state) const {
return state & (kWriteIntentMask | kReadersOverflowMask);
}
MONGO_COMPILER_NOINLINE MONGO_COMPILER_COLD_FUNCTION void _waitAndThenLock(StateType state) {
do {
invariant(!(state & kReadersOverflowMask), "Too many readers have acquired the lock!");
unlock_shared();
while (state & kWriteIntentMask) {
// Wait here until the write intent is cleared.
state = _state.wait(state);
}
state = _state.addAndFetch(1);
} while (MONGO_unlikely(_hasPendingWriterOrTooManyReaders(state)));
}
// Synchronizes writers, only allowing a single writer to acquire the mutex at any time.
Mutex _writeMutex;
/**
* Bits [0 .. 29] represent the number of readers, allowing up to 2 ^ 30 - 1 concurrent reads.
* Bit 30 must remain zero and allows preventing too many readers.
* Bit 31 tracks the write intent.
*/
WaitableAtomic<StateType> _state{0};
};
/**
* A shared mutex type optimized for readers, with the assumption of infrequent writes. Under the
* hood, it is very similar to a hazard pointer, where each thread maintains a list for its shared

View File

@ -107,6 +107,24 @@ private:
DataType _data;
};
template <typename DataType>
class RWMutexController {
public:
explicit RWMutexController(DataType value) {
stdx::unique_lock lk(_mutex);
_data = value;
}
auto read() const {
std::shared_lock lk(_mutex); // NOLINT
return _data;
}
private:
mutable RWMutex _mutex;
DataType _data;
};
template <typename DataType>
class ResourceMutexController {
public:
@ -160,6 +178,9 @@ BENCHMARK_TEMPLATE_DEFINE_F(RWMutexBm, SharedMutex, SharedMutexController)(bench
BENCHMARK_TEMPLATE_DEFINE_F(RWMutexBm, Mutex, MutexController)(benchmark::State& s) {
run(s);
}
BENCHMARK_TEMPLATE_DEFINE_F(RWMutexBm, RWMutex, RWMutexController)(benchmark::State& s) {
run(s);
}
BENCHMARK_TEMPLATE_DEFINE_F(RWMutexBm, ResourceMutex, ResourceMutexController)
(benchmark::State& s) {
run(s);
@ -169,6 +190,7 @@ const auto kMaxThreads = ProcessInfo::getNumLogicalCores() * 2;
BENCHMARK_REGISTER_F(RWMutexBm, WriteRarelyRWMutex)->ThreadRange(1, kMaxThreads);
BENCHMARK_REGISTER_F(RWMutexBm, SharedMutex)->ThreadRange(1, kMaxThreads);
BENCHMARK_REGISTER_F(RWMutexBm, Mutex)->ThreadRange(1, kMaxThreads);
BENCHMARK_REGISTER_F(RWMutexBm, RWMutex)->ThreadRange(1, kMaxThreads);
#if REGISTER_RESOURCE_MUTEX_BENCHMARKS
BENCHMARK_REGISTER_F(RWMutexBm, ResourceMutex)->ThreadRange(1, kMaxThreads);
#endif

View File

@ -27,11 +27,15 @@
* it in the license file.
*/
#include <shared_mutex>
#include <vector>
#include "mongo/platform/rwmutex.h"
#include "mongo/platform/waitable_atomic.h"
#include "mongo/stdx/mutex.h"
#include "mongo/stdx/thread.h"
#include "mongo/unittest/barrier.h"
#include "mongo/unittest/death_test.h"
#include "mongo/unittest/join_thread.h"
#include "mongo/unittest/thread_assertion_monitor.h"
#include "mongo/unittest/unittest.h"
@ -221,5 +225,138 @@ TEST_F(WriteRarelyRWMutexTest, MultiWriter) {
ASSERT_EQ(counter, kTargetValue);
}
TEST(RWMutex, OneWriterAtAnyTime) {
RWMutex mutex;
stdx::unique_lock lk(mutex);
ASSERT_FALSE(hasWaiters_forTest(mutex));
ASSERT_TRUE(isWriteIntentSet_forTest(mutex));
unittest::ThreadAssertionMonitor monitor;
auto writer = monitor.spawn([&] {
std::lock_guard anotherLk(mutex);
ASSERT_FALSE(lk.owns_lock());
});
// Best effort to allow `writer` to start and try to exclusively acquire `mutex`.
sleepFor(Microseconds(5));
lk.unlock();
// Allow `writer` to proceed with acquiring the lock.
monitor.notifyDone();
writer.join();
}
TEST(RWMutex, WriterWaitsForReader) {
RWMutex mutex;
std::shared_lock lk(mutex); // NOLINT
ASSERT_FALSE(hasWaiters_forTest(mutex));
ASSERT_FALSE(isWriteIntentSet_forTest(mutex));
unittest::ThreadAssertionMonitor monitor;
auto writer = monitor.spawn([&] {
std::lock_guard lk(mutex);
ASSERT_EQ(getReadersCount_forTest(mutex), 0);
});
while (!hasWaiters_forTest(mutex)) {
// Wait until the writer notices the reader and proceeds to wait for it to retire.
}
ASSERT_TRUE(isWriteIntentSet_forTest(mutex));
lk.unlock();
// Let the writer proceed with acquiring the lock.
monitor.notifyDone();
writer.join();
}
TEST(RWMutex, NewReaderWaitsForWriter) {
RWMutex mutex;
stdx::unique_lock lk(mutex);
ASSERT_FALSE(hasWaiters_forTest(mutex));
unittest::ThreadAssertionMonitor monitor;
auto reader = monitor.spawn([&] {
std::shared_lock lk(mutex); // NOLINT
ASSERT_FALSE(isWriteIntentSet_forTest(mutex));
});
while (!hasWaiters_forTest(mutex)) {
// The reader should start waiting on the mutex shortly, so keep checking.
}
lk.unlock();
// The reader may now acquire the lock and make progress.
monitor.notifyDone();
reader.join();
}
DEATH_TEST(RWMutex, TooManyReaders, "invariant") {
RWMutex mutex;
addReaders_forTest(mutex, RWMutex::kReadersOverflowMask - 1);
// The following must hit an invariant since it exceeds the maximum number of readers locks.
mutex.lock_shared();
}
TEST(RWMutex, MultipleReaders) {
const auto kNumReaders = 16;
unittest::Barrier barrier(kNumReaders);
RWMutex mutex;
std::vector<unittest::JoinThread> readers;
for (auto i = 0; i < kNumReaders; ++i) {
readers.emplace_back([&] {
std::shared_lock lk(mutex); // NOLINT
barrier.countDownAndWait();
});
}
}
TEST(RWMutex, MultipleReadersAndWriters) {
// Starts `kNumWorkers` worker threads and have them loop for a total of `kNumIterations`.
// Worker threads assign a global order to their local loop, and decide on acquiring a read or a
// write lock based on this global order: if the order is divisible by one thousand, the thread
// will acquire a write lock, and otherwise it will acquire a read lock. Each worker ensures
// that there are no readers or writers when successfully acquiring a read or a write lock,
// respectively.
const size_t kNumWorkers = 8;
const size_t kNumIterations = 5'000'000;
RWMutex mutex;
Atomic<size_t> counter{0};
Atomic<int> readers, writers;
unittest::Barrier barrier(kNumWorkers);
std::vector<stdx::thread> workers(kNumWorkers);
unittest::ThreadAssertionMonitor monitor;
for (auto& worker : workers) {
worker = monitor.spawn([&] {
barrier.countDownAndWait();
while (true) {
const auto iteration = counter.fetchAndAdd(1);
if (iteration >= kNumIterations)
return;
if (iteration % 1'000 == 0) {
stdx::lock_guard writeLk(mutex);
ASSERT_EQ(readers.loadRelaxed(), 0);
writers.fetchAndAddRelaxed(1);
ON_BLOCK_EXIT([&] { writers.fetchAndSubtractRelaxed(1); });
} else {
std::shared_lock readLk(mutex); // NOLINT
ASSERT_EQ(writers.loadRelaxed(), 0);
readers.fetchAndAddRelaxed(1);
ON_BLOCK_EXIT([&] { readers.fetchAndSubtractRelaxed(1); });
}
}
});
}
monitor.notifyDone();
for (auto& worker : workers) {
worker.join();
}
}
} // namespace
} // namespace mongo