This commit is contained in:
uriyage 2025-12-16 22:29:02 +07:00 committed by GitHub
commit 98c360d418
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
11 changed files with 1633 additions and 451 deletions

View File

@ -3250,6 +3250,7 @@ standardConfig static_configs[] = {
createBoolConfig("hide-user-data-from-log", NULL, MODIFIABLE_CONFIG, server.hide_user_data_from_log, 1, NULL, NULL),
createBoolConfig("lua-enable-insecure-api", "lua-enable-deprecated-api", MODIFIABLE_CONFIG | HIDDEN_CONFIG | PROTECTED_CONFIG, server.lua_enable_insecure_api, 0, NULL, updateLuaEnableInsecureApi),
createBoolConfig("import-mode", NULL, DEBUG_CONFIG | MODIFIABLE_CONFIG, server.import_mode, 0, NULL, NULL),
createBoolConfig("io-threads-always-active", NULL, MODIFIABLE_CONFIG | HIDDEN_CONFIG, server.io_threads_always_active, 0, NULL, NULL),
/* String Configs */
createStringConfig("aclfile", NULL, IMMUTABLE_CONFIG, ALLOW_EMPTY_STRING, server.acl_filename, "", NULL, NULL),
@ -3314,7 +3315,6 @@ standardConfig static_configs[] = {
createIntConfig("cluster-databases", NULL, IMMUTABLE_CONFIG, 1, INT_MAX, server.config_databases_cluster, 1, INTEGER_CONFIG, NULL, NULL),
createIntConfig("port", NULL, MODIFIABLE_CONFIG, 0, 65535, server.port, 6379, INTEGER_CONFIG, NULL, updatePort), /* TCP port. */
createIntConfig("io-threads", NULL, DEBUG_CONFIG | MODIFIABLE_CONFIG, 1, IO_THREADS_MAX_NUM, server.io_threads_num, 1, INTEGER_CONFIG, NULL, updateIOThreads), /* Single threaded by default */
createIntConfig("events-per-io-thread", NULL, MODIFIABLE_CONFIG | HIDDEN_CONFIG, 0, INT_MAX, server.events_per_io_thread, 2, INTEGER_CONFIG, NULL, NULL),
createIntConfig("min-io-threads-avoid-copy-reply", NULL, MODIFIABLE_CONFIG | HIDDEN_CONFIG, 0, INT_MAX, server.min_io_threads_copy_avoid, 7, INTEGER_CONFIG, NULL, NULL),
createIntConfig("min-string-size-avoid-copy-reply", NULL, MODIFIABLE_CONFIG | HIDDEN_CONFIG, 0, INT_MAX, server.min_string_size_copy_avoid, 16384, INTEGER_CONFIG, NULL, NULL),
createIntConfig("min-string-size-avoid-copy-reply-threaded", NULL, MODIFIABLE_CONFIG | HIDDEN_CONFIG, 0, INT_MAX, server.min_string_size_copy_avoid_threaded, 65536, INTEGER_CONFIG, NULL, NULL),

409
src/io_queues.h Normal file
View File

@ -0,0 +1,409 @@
/*
* Copyright (c) Valkey Contributors
* All rights reserved.
* SPDX-License-Identifier: BSD-3-Clause
*
* IO Queues - Specialized queues for main thread <-> IO threads communication.
*
* 1. Main -> IO: Shared Queue (SPMC - Single Producer Multi Consumer)
* - Automatic load balancing: all IO threads pull from the same queue.
* Busy threads take less work, idle threads take more.
* - Each ring buffer cell is cache-line padded to prevent consumer contention.
* Sequence numbers indicate empty/populated state for safe work claiming.
*
* 2. IO -> Main: Response Channel (MPSC - Multi Producer Single Consumer)
* - IO threads push completed jobs; main thread checks if queue is non-empty.
* - Threads reserve slots via atomic tail increment. If full, jobs are
* buffered locally until space is available.
*
* 3. Main -> IO (Thread-Specific): Private Inbox (SPSC - Single Producer Single Consumer)
* - For tasks that must run on a specific thread.
* - IO threads check their private inbox before the shared queue.
*/
#ifndef __IO_QUEUES_H__
#define __IO_QUEUES_H__
#include <stdatomic.h>
#include <stdint.h>
#include <stdbool.h>
#include <stddef.h>
#include "zmalloc.h"
#include "config.h"
/* ==========================================================================
* MPSC QUEUE (Multi-Producer Single-Consumer)
*
* Producer: IO threads only.
* Consumer: main thread only.
* ========================================================================== */
#define MPSC_QUEUE_SIZE 16384
#define MPSC_QUEUE_MASK (MPSC_QUEUE_SIZE - 1)
static_assert((MPSC_QUEUE_SIZE & (MPSC_QUEUE_SIZE - 1)) == 0, "MPSC_QUEUE_SIZE must be power of 2");
typedef struct mpscTicket {
size_t index;
bool has_reservation;
} mpscTicket;
typedef struct mpscQueue {
/* Consumer cache line */
_Alignas(CACHE_LINE_SIZE) atomic_size_t head;
size_t tail_cache;
/* Producer cache line */
_Alignas(CACHE_LINE_SIZE) atomic_size_t tail;
atomic_size_t head_cache;
/* Data buffer */
_Alignas(CACHE_LINE_SIZE) _Atomic(void *) *buffer;
} mpscQueue;
static inline void mpscInit(mpscQueue *q) {
q->buffer = (_Atomic(void *) *)zmalloc(sizeof(_Atomic(void *)) * MPSC_QUEUE_SIZE);
atomic_init(&q->head, 0);
atomic_init(&q->tail, 0);
atomic_init(&q->head_cache, 0);
q->tail_cache = 0;
for (size_t i = 0; i < MPSC_QUEUE_SIZE; ++i) {
atomic_init(&q->buffer[i], NULL);
}
}
static inline void mpscFree(mpscQueue *q) {
if (q->buffer) {
zfree(q->buffer);
q->buffer = NULL;
}
atomic_store_explicit(&q->head, 0, memory_order_relaxed);
atomic_store_explicit(&q->tail, 0, memory_order_relaxed);
atomic_store_explicit(&q->head_cache, 0, memory_order_relaxed);
q->tail_cache = 0;
}
/* Pushes an item into the queue.
*
* @param q The queue instance.
* @param data Pointer to the data (cannot be NULL).
* @param ticket State tracker for retries. Must be initialized to {0}.
* If the queue is full, this holds the reserved slot index.
* Subsequent calls must pass the same ticket to fill the reserved slot.
* @return true if pushed, false if queue is full. */
static inline bool mpscEnqueue(mpscQueue *q, void *data, mpscTicket *ticket) {
size_t tail;
serverAssert(data);
/* Reserve a slot (or use existing reservation) */
if (!ticket->has_reservation) {
tail = atomic_fetch_add_explicit(&q->tail, 1, memory_order_relaxed);
} else {
tail = ticket->index;
}
/* Check limits (Fullness check) */
size_t head = atomic_load_explicit(&q->head_cache, memory_order_acquire);
if ((tail - head) >= MPSC_QUEUE_SIZE) {
/* Cached limit reached, refresh from actual head */
head = atomic_load_explicit(&q->head, memory_order_acquire);
atomic_store_explicit(&q->head_cache, head, memory_order_release);
if (unlikely((tail - head) >= MPSC_QUEUE_SIZE)) {
/* Queue is full - Persist reservation for retry */
ticket->index = tail;
ticket->has_reservation = true;
return false;
}
}
/* Commit data */
atomic_store_explicit(&q->buffer[tail & MPSC_QUEUE_MASK], data, memory_order_release);
ticket->has_reservation = false;
return true;
}
/* Pops a batch of items from the queue.
* Stops at the first empty slot. */
static inline size_t mpscDequeueBatch(mpscQueue *q, void **jobs_out, size_t max_jobs) {
size_t popped_count = 0;
size_t head = atomic_load_explicit(&q->head, memory_order_relaxed);
size_t tail = q->tail_cache;
/* Refresh tail cache if it looks empty */
if (head == tail) {
tail = atomic_load_explicit(&q->tail, memory_order_acquire);
q->tail_cache = tail;
if (head == tail) return 0;
}
size_t limit = tail - head;
if (limit > max_jobs) limit = max_jobs;
for (size_t i = 0; i < limit; ++i) {
void *data = atomic_load_explicit(&q->buffer[head & MPSC_QUEUE_MASK], memory_order_relaxed);
/* Stop if slot is reserved but data not yet written */
if (!data) break;
jobs_out[popped_count++] = data;
atomic_store_explicit(&q->buffer[head & MPSC_QUEUE_MASK], NULL, memory_order_relaxed);
head++;
}
if (popped_count > 0) {
atomic_store_explicit(&q->head, head, memory_order_release);
/* Ensure data visibility for the caller */
atomic_thread_fence(memory_order_acquire);
}
return popped_count;
}
/* ==========================================================================
* SPMC QUEUE (Single-Producer Multi-Consumer)
*
* Producer: main thread only.
* Consumer: IO threads only.
* ========================================================================== */
#define SPMC_QUEUE_SIZE 4096
#define SPMC_QUEUE_MASK (SPMC_QUEUE_SIZE - 1)
static_assert((SPMC_QUEUE_SIZE & (SPMC_QUEUE_SIZE - 1)) == 0, "SPMC_QUEUE_SIZE must be power of 2");
typedef struct spmcCell {
_Alignas(CACHE_LINE_SIZE) atomic_size_t sequence;
void *data;
} spmcCell;
typedef struct spmcQueue {
/* Shared Read/Write (High Contention) */
_Alignas(CACHE_LINE_SIZE) atomic_size_t head;
/* Producer Cache line */
_Alignas(CACHE_LINE_SIZE) size_t tail;
size_t head_cache;
/* Data buffer */
_Alignas(CACHE_LINE_SIZE) spmcCell *buffer;
} spmcQueue;
static inline void spmcInit(spmcQueue *q) {
q->buffer = (spmcCell *)zmalloc(sizeof(spmcCell) * SPMC_QUEUE_SIZE);
atomic_init(&q->head, 0);
q->tail = 0;
q->head_cache = 0;
for (size_t i = 0; i < SPMC_QUEUE_SIZE; i++) {
atomic_init(&q->buffer[i].sequence, i);
q->buffer[i].data = NULL;
}
}
static inline void spmcFree(spmcQueue *q) {
if (q->buffer) {
zfree(q->buffer);
q->buffer = NULL;
}
atomic_store_explicit(&q->head, 0, memory_order_relaxed);
q->tail = 0;
q->head_cache = 0;
}
static inline bool spmcIsEmpty(spmcQueue *q) {
debugServerAssert(inMainThread());
/* Fast path: Check against cached consumer position */
if (q->tail == q->head_cache) {
return true;
}
/* Slow path: Refresh atomic head and update cache */
size_t curr_head = atomic_load_explicit(&q->head, memory_order_acquire);
q->head_cache = curr_head;
return q->tail == curr_head;
}
static inline size_t spmcSize(spmcQueue *q) {
size_t head = atomic_load_explicit(&q->head, memory_order_relaxed);
return (q->tail >= head) ? (q->tail - head) : 0;
}
static inline bool spmcEnqueue(spmcQueue *q, void *data) {
debugServerAssert(inMainThread());
spmcCell *cell = &q->buffer[q->tail & SPMC_QUEUE_MASK];
size_t seq = atomic_load_explicit(&cell->sequence, memory_order_acquire);
/* Sequence Check:
* seq == tail: Slot is empty and ready for current generation.
* seq < tail: Slot still occupied by consumer or stale. */
if (unlikely(seq != q->tail)) {
return false;
}
cell->data = data;
/* Increment sequence to (tail + 1) to publish availability */
atomic_store_explicit(&cell->sequence, q->tail + 1, memory_order_release);
q->tail++;
return true;
}
static inline void *spmcDequeue(spmcQueue *q) {
debugServerAssert(!inMainThread());
size_t head = atomic_load_explicit(&q->head, memory_order_relaxed);
spmcCell *cell;
void *data;
while (1) {
cell = &q->buffer[head & SPMC_QUEUE_MASK];
size_t seq = atomic_load_explicit(&cell->sequence, memory_order_acquire);
intptr_t diff = (intptr_t)seq - (intptr_t)(head + 1);
if (diff == 0) {
/* Slot has data. Attempt to claim via CAS on head. */
if (atomic_compare_exchange_weak_explicit(&q->head, &head, head + 1,
memory_order_relaxed,
memory_order_relaxed)) {
data = cell->data;
/* Mark slot empty for next generation (pos + size) */
atomic_store_explicit(&cell->sequence, head + SPMC_QUEUE_SIZE, memory_order_release);
return data;
}
} else if (diff < 0) {
/* Sequence is old; Producer hasn't filled this slot yet. Queue empty. */
return NULL;
} else {
/* diff > 0: Local 'pos' is stale. Reload head. */
head = atomic_load_explicit(&q->head, memory_order_relaxed);
}
}
}
/* ==========================================================================
* SPSC QUEUE (Single-Producer Single-Consumer)
*
* Producer: main thread only.
* Consumer: IO thread only.
* ========================================================================== */
#define SPSC_QUEUE_SIZE 4096
#define SPSC_QUEUE_MASK (SPSC_QUEUE_SIZE - 1)
static_assert((SPSC_QUEUE_SIZE & (SPSC_QUEUE_SIZE - 1)) == 0, "SPSC_QUEUE_SIZE must be power of 2");
typedef struct spscQueue {
/* Consumer cache line */
_Alignas(CACHE_LINE_SIZE) atomic_size_t head;
size_t tail_cache;
/* Producer cache line */
_Alignas(CACHE_LINE_SIZE) atomic_size_t tail;
size_t tail_local; /* Private write index */
size_t head_cache;
/* Dynamic buffer */
_Alignas(CACHE_LINE_SIZE) void **buffer;
} spscQueue;
static inline void spscInit(spscQueue *q) {
q->buffer = (void **)zmalloc(sizeof(void *) * SPSC_QUEUE_SIZE);
atomic_init(&q->head, 0);
atomic_init(&q->tail, 0);
q->head_cache = 0;
q->tail_cache = 0;
q->tail_local = 0;
}
static inline void spscFree(spscQueue *q) {
if (q->buffer) {
zfree(q->buffer);
q->buffer = NULL;
}
atomic_store_explicit(&q->head, 0, memory_order_relaxed);
atomic_store_explicit(&q->tail, 0, memory_order_relaxed);
q->head_cache = 0;
q->tail_cache = 0;
q->tail_local = 0;
}
static inline bool spscIsFull(spscQueue *q) {
debugServerAssert(inMainThread());
const size_t curr_tail = q->tail_local;
if (curr_tail - q->head_cache >= SPSC_QUEUE_SIZE) {
q->head_cache = atomic_load_explicit(&q->head, memory_order_acquire);
if (curr_tail - q->head_cache >= SPSC_QUEUE_SIZE) {
/* Flush any local changes before reporting full */
if (q->tail_local != q->tail) {
atomic_store_explicit(&q->tail, q->tail_local, memory_order_release);
}
return true;
}
}
return false;
}
/* Push data to the queue. Caller must ensure queue is not full via spscIsFull().
* @param commit If true, the tail pointer is updated immediately (visible to consumer).
* If false, only local index is updated (batching). */
static inline void spscEnqueue(spscQueue *q, void *data, bool commit) {
debugServerAssert(inMainThread());
q->buffer[q->tail_local & SPSC_QUEUE_MASK] = data;
q->tail_local++;
if (commit) {
atomic_store_explicit(&q->tail, q->tail_local, memory_order_release);
}
}
static inline void spscCommit(spscQueue *q) {
debugServerAssert(inMainThread());
size_t tail = atomic_load_explicit(&q->tail, memory_order_relaxed);
if (q->tail_local == tail) return;
atomic_store_explicit(&q->tail, q->tail_local, memory_order_release);
}
static inline size_t spscDequeueBatch(spscQueue *q, void **jobs_out, size_t num_jobs) {
debugServerAssert(!inMainThread());
size_t curr_head = atomic_load_explicit(&q->head, memory_order_relaxed);
size_t curr_tail_cache = q->tail_cache;
if (curr_head == curr_tail_cache) {
curr_tail_cache = atomic_load_explicit(&q->tail, memory_order_acquire);
q->tail_cache = curr_tail_cache;
if (curr_head == curr_tail_cache) return 0;
}
size_t available = curr_tail_cache - curr_head;
size_t count = (num_jobs < available) ? num_jobs : available;
for (size_t i = 0; i < count; ++i) {
jobs_out[i] = q->buffer[(curr_head + i) & SPSC_QUEUE_MASK];
}
atomic_store_explicit(&q->head, curr_head + count, memory_order_release);
return count;
}
/* Check if queue is empty from producer's perspective. */
static inline bool spscIsEmpty(spscQueue *q) {
debugServerAssert(inMainThread());
/* Fast path */
if (q->tail_local == q->head_cache) {
return true;
}
/* Slow path: refresh head */
size_t curr_head = atomic_load_explicit(&q->head, memory_order_acquire);
q->head_cache = curr_head;
return q->tail_local == curr_head;
}
#endif /* __IO_QUEUES_H__ */

View File

@ -5,147 +5,84 @@
*/
#include "io_threads.h"
#include "io_queues.h"
#include <sys/resource.h>
static _Thread_local int thread_id = 0; /* Thread local var */
static _Thread_local int thread_id = 0;
static _Thread_local mpscTicket io_thread_ticket = {0};
/* Backlog of responses when io_shared_outbox is full. Should be rare. */
static _Thread_local list *pending_io_responses = NULL;
static pthread_t io_threads[IO_THREADS_MAX_NUM] = {0};
static pthread_mutex_t io_threads_mutex[IO_THREADS_MAX_NUM];
static int cur_epoll_thread = 0;
static spmcQueue io_shared_inbox = {0}; /* shared queue for all IO threads */
static mpscQueue io_shared_outbox = {0}; /* results back to main thread */
static spscQueue io_private_inbox[IO_THREADS_MAX_NUM] = {0}; /* dedicated per-thread queues */
static size_t io_jobs_submitted;
static atomic_size_t io_jobs_finished;
static int io_threads_initialized = 0;
/* IO jobs queue functions - Used to send jobs from the main-thread to the IO thread. */
typedef void (*job_handler)(void *);
typedef struct iojob {
job_handler handler;
void *data;
} iojob;
/* Job Types for Tagged Pointers
* We use the lower 3 bits of the pointer to store the job type.
* Requires data pointers to be 8-byte aligned (standard for zmalloc/ptrs). */
#define JOB_TAG_MASK 0x7
#define JOB_PTR_MASK (~(uintptr_t)JOB_TAG_MASK)
typedef struct IOJobQueue {
iojob *ring_buffer;
size_t size;
_Atomic size_t head __attribute__((aligned(CACHE_LINE_SIZE))); /* Next write index for producer (main-thread) */
_Atomic size_t tail __attribute__((aligned(CACHE_LINE_SIZE))); /* Next read index for consumer (IO-thread) */
} IOJobQueue;
IOJobQueue io_jobs[IO_THREADS_MAX_NUM] = {0};
/* Initialize the job queue with a specified number of items. */
static void IOJobQueue_init(IOJobQueue *jq, size_t item_count) {
debugServerAssertWithInfo(NULL, NULL, inMainThread());
jq->ring_buffer = zcalloc(item_count * sizeof(iojob));
jq->size = item_count; /* Total number of items */
jq->head = 0;
jq->tail = 0;
static inline void *pack_job(void *ptr, int type) {
return (void *)((uintptr_t)ptr | type);
}
/* Clean up the job queue and free allocated memory. */
static void IOJobQueue_cleanup(IOJobQueue *jq) {
debugServerAssertWithInfo(NULL, NULL, inMainThread());
zfree(jq->ring_buffer);
memset(jq, 0, sizeof(*jq));
static inline void unpack_job(void *packed, void **ptr, int *type) {
*type = (int)((uintptr_t)packed & JOB_TAG_MASK);
*ptr = (void *)((uintptr_t)packed & JOB_PTR_MASK);
}
static int IOJobQueue_isFull(const IOJobQueue *jq) {
debugServerAssertWithInfo(NULL, NULL, inMainThread());
size_t current_head = atomic_load_explicit(&jq->head, memory_order_relaxed);
/* We don't use memory_order_acquire for the tail due to performance reasons,
* In the worst case we will just assume wrongly the buffer is full and the main thread will do the job by itself. */
size_t current_tail = atomic_load_explicit(&jq->tail, memory_order_relaxed);
size_t next_head = (current_head + 1) % jq->size;
return next_head == current_tail;
}
/* Attempt to push a new job to the queue from the main thread.
* the caller must ensure the queue is not full before calling this function. */
static void IOJobQueue_push(IOJobQueue *jq, job_handler handler, void *data) {
debugServerAssertWithInfo(NULL, NULL, inMainThread());
/* Assert the queue is not full - should not happen as the caller should check for it before. */
serverAssert(!IOJobQueue_isFull(jq));
/* No need to use atomic acquire for the head, as the main thread is the only one that writes to the head index. */
size_t current_head = atomic_load_explicit(&jq->head, memory_order_relaxed);
size_t next_head = (current_head + 1) % jq->size;
/* We store directly the job's fields to avoid allocating a new iojob structure. */
serverAssert(jq->ring_buffer[current_head].data == NULL);
serverAssert(jq->ring_buffer[current_head].handler == NULL);
jq->ring_buffer[current_head].data = data;
jq->ring_buffer[current_head].handler = handler;
/* memory_order_release to make sure the data is visible to the consumer (the IO thread). */
atomic_store_explicit(&jq->head, next_head, memory_order_release);
}
/* Returns the number of jobs currently available for consumption in the given job queue.
*
* This function ensures memory visibility for the jobs by
* using a memory acquire fence when there are jobs available. */
static size_t IOJobQueue_availableJobs(const IOJobQueue *jq) {
debugServerAssertWithInfo(NULL, NULL, !inMainThread());
/* We use memory_order_acquire to make sure the head and the job's fields are visible to the consumer (IO thread). */
size_t current_head = atomic_load_explicit(&jq->head, memory_order_acquire);
size_t current_tail = atomic_load_explicit(&jq->tail, memory_order_relaxed);
if (current_head >= current_tail) {
return current_head - current_tail;
} else {
return jq->size - (current_tail - current_head);
}
}
/* Checks if the job Queue is empty.
* returns 1 if the buffer is currently empty, 0 otherwise.
* Called by the main-thread only.
* This function uses relaxed memory order, so the caller need to use an acquire
* memory fence before calling this function to be sure it has the latest index
* from the other thread, especially when called repeatedly. */
static int IOJobQueue_isEmpty(const IOJobQueue *jq) {
size_t current_head = atomic_load_explicit(&jq->head, memory_order_relaxed);
size_t current_tail = atomic_load_explicit(&jq->tail, memory_order_relaxed);
return current_head == current_tail;
}
/* Removes the next job from the given job queue by advancing the tail index.
* Called by the IO thread.
* The caller must ensure that the queue is not empty before calling this function.
* This function uses relaxed memory order, so the caller need to use an release memory fence
* after calling this function to make sure the updated tail is visible to the producer (main thread). */
static void IOJobQueue_removeJob(IOJobQueue *jq) {
debugServerAssertWithInfo(NULL, NULL, !inMainThread());
size_t current_tail = atomic_load_explicit(&jq->tail, memory_order_relaxed);
jq->ring_buffer[current_tail].data = NULL;
jq->ring_buffer[current_tail].handler = NULL;
atomic_store_explicit(&jq->tail, (current_tail + 1) % jq->size, memory_order_relaxed);
}
/* Retrieves the next job handler and data from the job queue without removal.
* Called by the consumer (IO thread). Caller must ensure queue is not empty.*/
static void IOJobQueue_peek(const IOJobQueue *jq, job_handler *handler, void **data) {
debugServerAssertWithInfo(NULL, NULL, !inMainThread());
size_t current_tail = atomic_load_explicit(&jq->tail, memory_order_relaxed);
iojob *job = &jq->ring_buffer[current_tail];
*handler = job->handler;
*data = job->data;
}
/* End of IO job queue functions */
/* Handler prototypes */
void ioThreadReadQueryFromClient(void *data);
void ioThreadWriteToClient(void *data);
void IOThreadFreeArgv(void *data);
void IOThreadPoll(void *data);
static void ioThreadAccept(void *data);
int inMainThread(void) {
return thread_id == 0;
}
int getIOThreadID(void) {
int getCurTid(void) {
return thread_id;
}
void commitIOJobs(void) {
for (int i = 1; i < server.active_io_threads_num; i++) {
spscCommit(&io_private_inbox[i]);
}
}
/* Jobs sent but not yet processed by IO threads. */
static size_t getPendingIOThreadsJobs(void) {
return io_jobs_submitted - atomic_load_explicit(&io_jobs_finished, memory_order_acquire);
}
/* Read/write jobs awaiting response from IO threads. */
static int getPendingIOResponsesCount(void) {
return server.stat_io_writes_pending + server.stat_io_reads_pending;
}
/* Drains the I/O threads queue by waiting for all jobs to be processed.
* This function must be called from the main thread. */
void drainIOThreadsQueue(void) {
serverAssert(inMainThread());
for (int i = 1; i < IO_THREADS_MAX_NUM; i++) { /* No need to drain thread 0, which is the main thread. */
while (!IOJobQueue_isEmpty(&io_jobs[i])) {
/* memory barrier acquire to get the latest job queue state */
atomic_thread_fence(memory_order_acquire);
}
commitIOJobs();
while (getPendingIOThreadsJobs()) {
atomic_thread_fence(memory_order_acquire);
}
}
/* Returns if there is an IO operation in progress for the given client. */
int clientHasPendingIO(client *c) {
return c->io_read_state != CLIENT_IDLE || c->io_write_state != CLIENT_IDLE;
}
/* Wait until the IO-thread is done with the client */
void waitForClientIO(client *c) {
/* No need to wait if the client was not offloaded to the IO thread. */
@ -165,37 +102,129 @@ void waitForClientIO(client *c) {
atomic_thread_fence(memory_order_acquire);
}
/** Adjusts the number of active I/O threads based on the current event load.
* If increase_only is non-zero, only allows increasing the number of threads.*/
void adjustIOThreadsByEventLoad(int numevents, int increase_only) {
if (server.io_threads_num == 1) return; /* All I/O is being done by the main thread. */
debugServerAssertWithInfo(NULL, NULL, server.io_threads_num > 1);
/* When events_per_io_thread is set to 0, we offload all events to the IO threads.
* This is used mainly for testing purposes. */
int target_threads = server.events_per_io_thread == 0 ? (numevents + 1) : numevents / server.events_per_io_thread;
void IOThreadsBeforeSleep(long long current_time) {
#ifndef RUSAGE_THREAD
UNUSED(current_time);
#endif
if (server.io_threads_num == 1) return;
target_threads = max(1, min(target_threads, server.io_threads_num));
commitIOJobs();
if (target_threads == server.active_io_threads_num) return;
if (target_threads < server.active_io_threads_num) {
if (increase_only) return;
int threads_to_deactivate_num = server.active_io_threads_num - target_threads;
for (int i = 0; i < threads_to_deactivate_num; i++) {
int tid = server.active_io_threads_num - 1;
IOJobQueue *jq = &io_jobs[tid];
/* We can't lock the thread if it may have pending jobs */
if (!IOJobQueue_isEmpty(jq)) return;
pthread_mutex_lock(&io_threads_mutex[tid]);
server.active_io_threads_num--;
if (server.io_threads_always_active) {
/* active_all_io_threads state is for debug purposes: deactivate all threads before sleep if no pending jobs,
* and reactivate all after sleep. We can't leave it active all the time as it will consume much CPU that will interfere with tests */
if (server.active_io_threads_num > 1 && getPendingIOThreadsJobs() == 0) {
for (int i = 1; i < server.active_io_threads_num; i++) {
pthread_mutex_lock(&io_threads_mutex[i]);
}
server.active_io_threads_num = 1;
}
} else {
int threads_to_activate_num = target_threads - server.active_io_threads_num;
for (int i = 0; i < threads_to_activate_num; i++) {
pthread_mutex_unlock(&io_threads_mutex[server.active_io_threads_num]);
}
#ifdef RUSAGE_THREAD
/* If threads are not active track main thread CPU time (system) for ignition decision */
if (server.active_io_threads_num == 1) {
static long long last_measurement_time = 0;
if (current_time - last_measurement_time < 50000) return; /* Sample once in 50ms */
last_measurement_time = current_time;
struct rusage ru;
if (getrusage(RUSAGE_THREAD, &ru) == 0) {
long long sys_time_us = ru.ru_stime.tv_sec * 1000000LL + ru.ru_stime.tv_usec;
trackInstantaneousMetric(STATS_METRIC_MAIN_THREAD_CPU_SYS, sys_time_us, current_time, 1000000);
}
}
#endif
}
#define IO_COOLDOWN_MS 1000
#define IO_SAMPLE_RATE_MS 10
#define IO_IGNITION_EVENTS 4
#define IO_IGNITION_CPU 30.0
void IOThreadsAfterSleep(int numevents) {
if (server.io_threads_num == 1) return;
/* Always Active Policy */
if (server.io_threads_always_active) {
if (numevents > 0 && server.active_io_threads_num < server.io_threads_num) {
for (int i = server.active_io_threads_num; i < server.io_threads_num; i++) {
pthread_mutex_unlock(&io_threads_mutex[i]);
}
server.active_io_threads_num = server.io_threads_num;
}
return;
}
long long now = server.mstime;
static long long last_scale_time = 0;
/* Ignition Policy */
if (server.active_io_threads_num == 1) {
int should_ignite = 0;
#ifdef RUSAGE_THREAD
float cpu = (float)getInstantaneousMetric(STATS_METRIC_MAIN_THREAD_CPU_SYS) / 10000.0;
should_ignite = (cpu > IO_IGNITION_CPU);
#else
should_ignite = (numevents >= IO_IGNITION_EVENTS);
#endif
if (should_ignite) {
pthread_mutex_unlock(&io_threads_mutex[1]);
server.active_io_threads_num++;
last_scale_time = now;
serverLog(LL_DEBUG, "IO threads ignition: increased to %d", server.active_io_threads_num);
}
return;
}
static size_t last_sample_time = 0;
static size_t spmc_size_sum = 0;
static size_t sample_count = 0;
/* Scaling Up/Down Policy */
if (now - last_sample_time < IO_SAMPLE_RATE_MS) return;
last_sample_time = now;
size_t q_size = spmcSize(&io_shared_inbox);
spmc_size_sum += q_size;
sample_count++;
trackInstantaneousMetric(STATS_METRIC_IO_WAIT, spmc_size_sum, sample_count, 1);
/* Decision (Every STATS_METRIC_SAMPLES Samples) */
if (sample_count % STATS_METRIC_SAMPLES != 0) return;
size_t avg_q_size = getInstantaneousMetric(STATS_METRIC_IO_WAIT);
size_t active = server.active_io_threads_num;
size_t target = active;
/* Calculate Target */
if (avg_q_size > 1 && active < (size_t)server.io_threads_num) {
target++;
} else if (avg_q_size == 0 && (now - last_scale_time > IO_COOLDOWN_MS)) {
if (target > 1) target--;
}
/* Scale Up */
if (target > active) {
for (size_t i = active; i < target; i++) {
pthread_mutex_unlock(&io_threads_mutex[i]);
}
last_scale_time = now;
server.active_io_threads_num = target;
serverLog(LL_DEBUG, "IO threads increased from %zu to %zu", active, target);
}
/* Scale Down*/
else if (target < active) {
int tid = active - 1;
/* Don't suspend if work remains in the specific thread's queue... */
if (!spscIsEmpty(&io_private_inbox[tid])) return;
/* ...or if we are dropping to 1 thread but the global queue still has work */
if (target == 1 && !spmcIsEmpty(&io_shared_inbox)) return;
pthread_mutex_lock(&io_threads_mutex[tid]);
server.active_io_threads_num--;
serverLog(LL_DEBUG, "IO threads decreased from %zu to %d", active, server.active_io_threads_num);
}
}
@ -205,11 +234,50 @@ void IOThreadPoll(void *data) {
aeEventLoop *el = (aeEventLoop *)data;
struct timeval tvp = {0, 0};
int num_events = aePoll(el, &tvp);
server.io_ae_fired_events = num_events;
atomic_store_explicit(&server.io_poll_state, AE_IO_STATE_DONE, memory_order_release);
}
static void flushPendingIOResponses(int blocking) {
if (!pending_io_responses) return;
listIter li;
listNode *ln;
listRewind(pending_io_responses, &li);
while ((ln = listNext(&li))) {
void *job = listNodeValue(ln);
int pushed = 0;
/* Try to enqueue. If blocking is set, retry until success. */
do {
pushed = mpscEnqueue(&io_shared_outbox, job, &io_thread_ticket);
if (pushed || !blocking || server.crashed) break; /* On server crash we kill the IO threads, no point in sending back jobs to the main-thread. */
atomic_thread_fence(memory_order_acquire);
} while (true);
if (pushed) {
listDelNode(pending_io_responses, ln);
} else {
return;
}
}
/* List is fully drained */
listRelease(pending_io_responses);
pending_io_responses = NULL;
}
/* Define a cleanup function that will clean all thread resources */
void cleanupThreadResources(void *dummy) {
UNUSED(dummy);
/* Blocking flush: ensure all pending jobs are sent before thread dies */
flushPendingIOResponses(1);
/* Free the shared query buffer */
freeSharedQueryBuf();
}
static void *IOThreadMain(void *myid) {
/* The ID is the thread ID number (from 1 to server.io_threads_num-1). ID 0 is the main thread. */
long id = (long)myid;
@ -219,55 +287,100 @@ static void *IOThreadMain(void *myid) {
valkey_set_thread_title(thdname);
serverSetCpuAffinity(server.server_cpulist);
initSharedQueryBuf();
pthread_cleanup_push(freeSharedQueryBuf, NULL);
pthread_cleanup_push(cleanupThreadResources, NULL);
thread_id = (int)id;
size_t jobs_to_process = 0;
IOJobQueue *jq = &io_jobs[id];
const int BATCH_SIZE = 32;
void *batch_jobs[BATCH_SIZE];
while (1) {
/* Cancellation point so that pthread_cancel() from main thread is honored. */
pthread_testcancel();
int processed = 0;
size_t batch_count = 0;
/* Wait for jobs */
for (int j = 0; j < 1000000; j++) {
jobs_to_process = IOJobQueue_availableJobs(jq);
if (jobs_to_process) break;
/* PRIORITY 1: Drain Private SPSC Queue (Batch Processing) */
while ((batch_count = spscDequeueBatch(&io_private_inbox[id], batch_jobs, BATCH_SIZE)) > 0) {
for (size_t i = 0; i < batch_count; i++) {
void *data;
int type;
unpack_job(batch_jobs[i], &data, &type);
switch (type) {
case JOB_REQ_FREE_ARGV:
IOThreadFreeArgv(data);
break;
case JOB_REQ_POLL:
IOThreadPoll(data);
break;
default:
serverPanic("Invalid SPSC job type: %d", type);
}
}
processed += batch_count;
}
/* Give the main thread a chance to stop this thread. */
if (jobs_to_process == 0) {
pthread_mutex_lock(&io_threads_mutex[id]);
pthread_mutex_unlock(&io_threads_mutex[id]);
continue;
}
for (size_t j = 0; j < jobs_to_process; j++) {
job_handler handler;
/*
* PRIORITY 2: Shared Global Queue (SPMC)
* Only checked after SPSC is drained.
*/
void *packed_job = spmcDequeue(&io_shared_inbox);
if (packed_job) {
void *data;
/* We keep the job in the queue until it's processed. This ensures that if the main thread checks
* and finds the queue empty, it can be certain that the IO thread is not currently handling any job. */
IOJobQueue_peek(jq, &handler, &data);
handler(data);
/* Remove the job after it was processed */
IOJobQueue_removeJob(jq);
int type;
unpack_job(packed_job, &data, &type);
switch (type) {
case JOB_REQ_READ_CLIENT:
ioThreadReadQueryFromClient(data);
break;
case JOB_REQ_WRITE_CLIENT:
ioThreadWriteToClient(data);
break;
case JOB_REQ_FREE_OBJ:
decrRefCount(data);
break;
case JOB_REQ_ACCEPT:
ioThreadAccept(data);
break;
case JOB_REQ_POLL:
IOThreadPoll(data);
break;
default:
serverPanic("Invalid SPMC job type: %d", type);
}
processed++;
}
if (processed) {
atomic_fetch_add_explicit(&io_jobs_finished, processed, memory_order_release);
}
/* If both queues were empty (no processing done), wait for signal. */
if (processed == 0) {
if (unlikely(pending_io_responses)) {
flushPendingIOResponses(0);
} else {
/* If it is locked. We should block until main thread unlocks it. */
pthread_mutex_lock(&io_threads_mutex[id]);
pthread_mutex_unlock(&io_threads_mutex[id]);
}
}
/* Memory barrier to make sure the main thread sees the updated tail index.
* We do it once per loop and not per tail-update for optimization reasons.
* As the main-thread main concern is to check if the queue is empty, it's enough to do it once at the end. */
atomic_thread_fence(memory_order_release);
}
pthread_cleanup_pop(0);
return NULL;
}
#define IO_JOB_QUEUE_SIZE 2048
static void createIOThread(int id) {
serverAssert(server.io_threads_num > 0);
serverAssert(id > 0 && id < server.io_threads_num);
/* Initialize the private SPSC queue for this thread */
spscInit(&io_private_inbox[id]);
pthread_t tid;
pthread_mutex_init(&io_threads_mutex[id], NULL);
IOJobQueue_init(&io_jobs[id], IO_JOB_QUEUE_SIZE);
pthread_mutex_lock(&io_threads_mutex[id]); /* Thread will be stopped. */
int err = pthread_create(&tid, NULL, IOThreadMain, (void *)(long)id);
if (err) {
@ -277,8 +390,7 @@ static void createIOThread(int id) {
io_threads[id] = tid;
}
/* Terminates the IO thread specified by id.
* Called on server shutdown */
/* Terminates the IO thread specified by id. */
static void shutdownIOThread(int id) {
int err;
pthread_t tid = io_threads[id];
@ -297,7 +409,7 @@ static void shutdownIOThread(int id) {
serverLog(LL_NOTICE, "IO thread(tid:%lu) terminated", (unsigned long)tid);
}
pthread_mutex_destroy(&io_threads_mutex[id]);
IOJobQueue_cleanup(&io_jobs[id]);
spscFree(&io_private_inbox[id]);
}
void killIOThreads(void) {
@ -308,7 +420,7 @@ void killIOThreads(void) {
int updateIOThreads(const char **err) {
serverAssert(inMainThread());
UNUSED(err);
int prev_threads_num = 1;
for (int i = IO_THREADS_MAX_NUM - 1; i > 0; i--) {
if (io_threads[i]) {
@ -318,25 +430,32 @@ int updateIOThreads(const char **err) {
}
if (prev_threads_num == server.io_threads_num) return 1;
/* DEADLOCK PREVENTION:
* Check if the pending workload fits in the return queue.
* If the number of pending jobs is greater than the capacity of the Global MPSC queue,
* the worker threads might fill the queue and block. If we enter drainIOThreadsQueue
* in that state, we will deadlock (Main thread waits for worker, Worker waits for queue space). */
size_t pending = getPendingIOResponsesCount();
if (pending > MPSC_QUEUE_SIZE) {
if (err) *err = "Can't update IO threads under load, try again later";
return 0;
}
serverLog(LL_NOTICE, "Changing number of IO threads from %d to %d.", prev_threads_num, server.io_threads_num);
drainIOThreadsQueue();
/* Set active threads to 1, will be adjusted based on workload later. */
for (int i = 1; i < server.active_io_threads_num; i++) {
pthread_mutex_lock(&io_threads_mutex[i]);
}
server.active_io_threads_num = 1;
// Create new threads.
if (server.io_threads_num > prev_threads_num) {
prefetchCommandsBatchInit();
for (int i = prev_threads_num; i < server.io_threads_num; i++) {
createIOThread(i);
}
}
// Decrease the number of threads.
else {
initIOThreads(prev_threads_num);
} else {
for (int i = prev_threads_num - 1; i >= server.io_threads_num; i--) {
// Unblock inactive thread.
/* Unblock inactive thread. */
pthread_mutex_unlock(&io_threads_mutex[i]);
shutdownIOThread(i);
io_threads[i] = 0;
@ -346,21 +465,27 @@ int updateIOThreads(const char **err) {
}
/* Initialize the data structures needed for I/O threads. */
void initIOThreads(void) {
server.active_io_threads_num = 1; /* We start with threads not active. */
server.io_poll_state = AE_IO_STATE_NONE;
server.io_ae_fired_events = 0;
void initIOThreads(int prev_threads_num) {
/* Don't spawn any thread if the user selected a single thread:
* we'll handle I/O directly from the main thread. */
if (server.io_threads_num == 1) return;
serverAssert(server.io_threads_num <= IO_THREADS_MAX_NUM);
prefetchCommandsBatchInit();
if (!io_threads_initialized) {
server.active_io_threads_num = 1; /* We start with threads not active. */
server.io_poll_state = AE_IO_STATE_NONE;
server.io_ae_fired_events = 0;
spmcInit(&io_shared_inbox);
mpscInit(&io_shared_outbox);
io_jobs_submitted = 0;
atomic_init(&io_jobs_finished, 0);
prefetchCommandsBatchInit();
io_threads_initialized = 1;
}
/* Spawn and initialize the I/O threads. */
for (int i = 1; i < server.io_threads_num; i++) {
for (int i = prev_threads_num; i < server.io_threads_num; i++) {
createIOThread(i);
}
}
@ -369,6 +494,7 @@ int trySendReadToIOThreads(client *c) {
if (server.active_io_threads_num <= 1) return C_ERR;
/* If IO thread is already reading, return C_OK to make sure the main thread will not handle it. */
if (c->io_read_state != CLIENT_IDLE) return C_OK;
if (c->io_write_state == CLIENT_PENDING_IO) return C_OK;
/* For simplicity, don't offload replica clients reads as read traffic from replica is negligible */
if (getClientType(c) == CLIENT_TYPE_REPLICA) return C_ERR;
/* With Lua debug client we may call connWrite directly in the main thread */
@ -376,30 +502,24 @@ int trySendReadToIOThreads(client *c) {
/* For simplicity let the main-thread handle the blocked clients */
if (c->flag.blocked || c->flag.unblocked) return C_ERR;
if (c->flag.close_asap) return C_ERR;
size_t tid = (c->id % (server.active_io_threads_num - 1)) + 1;
/* Handle case where client has a pending IO write job on a different thread:
* 1. A write job is still pending (io_write_state == CLIENT_PENDING_IO)
* 2. The pending job is on a different thread (c->cur_tid != tid)
*
* This situation can occur if active_io_threads_num increased since the
* original job assignment. In this case, we keep the job on its current
* thread to ensure the same thread handles the client's I/O operations. */
if (c->io_write_state == CLIENT_PENDING_IO && c->cur_tid != (uint8_t)tid) tid = c->cur_tid;
IOJobQueue *jq = &io_jobs[tid];
if (IOJobQueue_isFull(jq)) return C_ERR;
c->cur_tid = tid;
c->read_flags = canParseCommand(c) ? 0 : READ_FLAGS_DONT_PARSE;
c->read_flags |= authRequired(c) ? READ_FLAGS_AUTH_REQUIRED : 0;
c->read_flags |= isReplicatedClient(c) ? READ_FLAGS_REPLICATED : 0;
c->io_read_state = CLIENT_PENDING_IO;
connSetPostponeUpdateState(c->conn, 1);
IOJobQueue_push(jq, ioThreadReadQueryFromClient, c);
if (unlikely(spmcEnqueue(&io_shared_inbox, pack_job(c, JOB_REQ_READ_CLIENT)) == false)) {
c->read_flags = 0;
c->io_read_state = CLIENT_IDLE;
connSetPostponeUpdateState(c->conn, 0);
return C_ERR;
}
io_jobs_submitted++;
server.stat_io_reads_pending++;
c->flag.pending_read = 1;
listLinkNodeTail(server.clients_pending_io_read, &c->pending_read_list_node);
return C_OK;
}
@ -410,6 +530,7 @@ int trySendWriteToIOThreads(client *c) {
if (server.active_io_threads_num <= 1) return C_ERR;
/* The I/O thread is already writing for this client. */
if (c->io_write_state != CLIENT_IDLE) return C_OK;
if (c->io_read_state == CLIENT_PENDING_IO) return C_ERR;
/* Nothing to write */
if (!clientHasPendingReplies(c)) return C_ERR;
/* For simplicity, avoid offloading non-online replicas */
@ -417,29 +538,6 @@ int trySendWriteToIOThreads(client *c) {
/* We can't offload debugged clients as the main-thread may read at the same time */
if (c->flag.lua_debug) return C_ERR;
size_t tid = (c->id % (server.active_io_threads_num - 1)) + 1;
/* Handle case where client has a pending IO read job on a different thread:
* 1. A read job is still pending (io_read_state == CLIENT_PENDING_IO)
* 2. The pending job is on a different thread (c->cur_tid != tid)
*
* This situation can occur if active_io_threads_num increased since the
* original job assignment. In this case, we keep the job on its current
* thread to ensure the same thread handles the client's I/O operations. */
if (c->io_read_state == CLIENT_PENDING_IO && c->cur_tid != (uint8_t)tid) tid = c->cur_tid;
IOJobQueue *jq = &io_jobs[tid];
if (IOJobQueue_isFull(jq)) return C_ERR;
c->cur_tid = tid;
if (c->flag.pending_write) {
/* We move the client to the io pending write queue */
listUnlinkNode(server.clients_pending_write, &c->clients_pending_write_node);
} else {
c->flag.pending_write = 1;
}
serverAssert(c->clients_pending_write_node.prev == NULL && c->clients_pending_write_node.next == NULL);
listLinkNodeTail(server.clients_pending_io_write, &c->clients_pending_write_node);
int is_replica = getClientType(c) == CLIENT_TYPE_REPLICA;
if (is_replica) {
c->io_last_reply_block = listLast(server.repl_buffer_blocks);
@ -469,8 +567,23 @@ int trySendWriteToIOThreads(client *c) {
connSetPostponeUpdateState(c->conn, 1);
c->write_flags = is_replica ? WRITE_FLAGS_IS_REPLICA : 0;
c->io_write_state = CLIENT_PENDING_IO;
void *job = pack_job(c, JOB_REQ_WRITE_CLIENT);
if (unlikely(spmcEnqueue(&io_shared_inbox, job) == false)) {
c->io_write_state = CLIENT_IDLE;
connSetPostponeUpdateState(c->conn, 0);
c->write_flags = 0;
c->io_last_reply_block = NULL;
c->io_last_bufpos = 0;
return C_ERR;
}
IOJobQueue_push(jq, ioThreadWriteToClient, c);
if (c->flag.pending_write) {
listUnlinkNode(server.clients_pending_write, &c->clients_pending_write_node);
c->flag.pending_write = 0;
}
io_jobs_submitted++;
server.stat_io_writes_pending++;
return C_OK;
}
@ -508,10 +621,12 @@ int tryOffloadFreeArgvToIOThreads(client *c, int argc, robj **argv) {
return C_ERR;
}
size_t tid = (c->id % (server.active_io_threads_num - 1)) + 1;
int target_id = c->cur_tid;
if (target_id < 1 || target_id >= server.active_io_threads_num) {
target_id = (c->id % (server.active_io_threads_num - 1)) + 1;
}
IOJobQueue *jq = &io_jobs[tid];
if (IOJobQueue_isFull(jq)) {
if (spscIsFull(&io_private_inbox[target_id])) {
return C_ERR;
}
@ -538,9 +653,14 @@ int tryOffloadFreeArgvToIOThreads(client *c, int argc, robj **argv) {
* this is the last argument to free. With this approach, we don't need to
* send the argc to the IO thread and we can send just the argv ptr. */
argv[last_arg_to_free]->refcount = 0;
/* Must succeed as we checked the free space before. */
IOJobQueue_push(jq, IOThreadFreeArgv, argv);
void *job = pack_job(argv, JOB_REQ_FREE_ARGV);
/* We pass false to enqueue the job without committing the queue index immediately.
* This allows us to batch multiple free jobs together and
* commit them in a single operation later in the event loop. This reduces the overhead
* of memory barriers and cache line bouncing associated
* with updating the queue's write pointer per job. */
spscEnqueue(&io_private_inbox[target_id], job, false);
io_jobs_submitted++;
return C_OK;
}
@ -557,20 +677,9 @@ int tryOffloadFreeObjToIOThreads(robj *obj) {
if (obj->encoding != OBJ_ENCODING_RAW || obj->type != OBJ_STRING) return C_ERR;
/* We select the thread ID in a round-robin fashion. */
size_t tid = (server.stat_io_freed_objects % (server.active_io_threads_num - 1)) + 1;
IOJobQueue *jq = &io_jobs[tid];
if (IOJobQueue_isFull(jq)) {
return C_ERR;
}
/* We offload only the free of the ptr that may be allocated by the I/O thread.
* The object itself was allocated by the main thread and will be freed by the main thread. */
IOJobQueue_push(jq, sdsfreeVoid, obj->ptr);
obj->ptr = NULL;
decrRefCount(obj);
void *job = pack_job(obj, JOB_REQ_FREE_OBJ);
if (unlikely(spmcEnqueue(&io_shared_inbox, job) == false)) return C_ERR;
io_jobs_submitted++;
server.stat_io_freed_objects++;
return C_OK;
}
@ -602,7 +711,7 @@ void trySendPollJobToIOThreads(void) {
}
/* If there are no pending jobs, let the main thread do the poll-wait by itself. */
if (listLength(server.clients_pending_io_write) + listLength(server.clients_pending_io_read) == 0) {
if (getPendingIOResponsesCount() == 0) {
return;
}
@ -611,17 +720,44 @@ void trySendPollJobToIOThreads(void) {
return;
}
/* The poll is sent to the last thread. While a random thread could have been selected,
* the last thread has a slightly better chance of being less loaded compared to other threads,
* As we activate the lowest threads first. */
int tid = server.active_io_threads_num - 1;
IOJobQueue *jq = &io_jobs[tid];
if (IOJobQueue_isFull(jq)) return; /* The main thread will handle the poll itself. */
void *job = pack_job(server.el, JOB_REQ_POLL);
server.io_poll_state = AE_IO_STATE_POLL;
aeSetCustomPollProc(server.el, getIOThreadPollResults);
aeSetPollProtect(server.el, 1);
IOJobQueue_push(jq, IOThreadPoll, server.el);
/* Use SPMC to minimize polling overhead. At high thread counts, use private SPSC queues for lower latency. */
if (server.active_io_threads_num <= 9) {
if (unlikely(spmcEnqueue(&io_shared_inbox, job) == false)) {
server.io_poll_state = AE_IO_STATE_NONE;
aeSetPollProtect(server.el, 0);
return;
}
} else {
cur_epoll_thread = ((cur_epoll_thread) % (server.active_io_threads_num - 1)) + 1;
if (unlikely(spscIsFull(&io_private_inbox[cur_epoll_thread]))) {
server.io_poll_state = AE_IO_STATE_NONE;
aeSetPollProtect(server.el, 0);
return;
}
spscEnqueue(&io_private_inbox[cur_epoll_thread], job, true);
}
aeSetCustomPollProc(server.el, getIOThreadPollResults);
io_jobs_submitted++;
}
void sendToMainThread(void *data, int type) {
if (unlikely(pending_io_responses)) {
flushPendingIOResponses(0);
}
void *job = pack_job(data, type);
if (unlikely(pending_io_responses || !mpscEnqueue(&io_shared_outbox, job, &io_thread_ticket))) {
/* Failed to push new job: initialize list if needed and save job */
if (pending_io_responses == NULL) {
pending_io_responses = listCreate();
}
listAddNodeTail(pending_io_responses, job);
}
}
static void ioThreadAccept(void *data) {
@ -629,6 +765,7 @@ static void ioThreadAccept(void *data) {
connAccept(c->conn, NULL);
atomic_thread_fence(memory_order_release);
c->io_read_state = CLIENT_COMPLETED_IO;
sendToMainThread(c, JOB_RES_READ_CLIENT);
}
/*
@ -660,19 +797,102 @@ int trySendAcceptToIOThreads(connection *conn) {
return C_ERR;
}
size_t thread_id = (c->id % (server.active_io_threads_num - 1)) + 1;
IOJobQueue *job_queue = &io_jobs[thread_id];
c->io_read_state = CLIENT_PENDING_IO;
c->flag.pending_read = 1;
connSetPostponeUpdateState(c->conn, 1);
if (IOJobQueue_isFull(job_queue)) {
void *job = pack_job(c, JOB_REQ_ACCEPT);
if (unlikely(spmcEnqueue(&io_shared_inbox, job) == false)) {
c->io_read_state = CLIENT_IDLE;
c->flag.pending_read = 0;
connSetPostponeUpdateState(c->conn, 0);
return C_ERR;
}
c->io_read_state = CLIENT_PENDING_IO;
c->flag.pending_read = 1;
listLinkNodeTail(server.clients_pending_io_read, &c->pending_read_list_node);
connSetPostponeUpdateState(c->conn, 1);
server.stat_io_reads_pending++;
server.stat_io_accept_offloaded++;
IOJobQueue_push(job_queue, ioThreadAccept, c);
io_jobs_submitted++;
return C_OK;
}
/* Function to handle read jobs */
static void handleReadJobs(client **read_jobs, int read_count) {
server.stat_io_reads_pending -= read_count;
serverAssert(server.stat_io_reads_pending >= 0);
/* process each client */
for (int i = 0; i < read_count; i++) {
client *c = read_jobs[i];
processClientIOReadsDone(c);
}
/* Process commands in batch if we processed any reads */
if (read_count) {
server.stat_io_reads_processed += read_count;
processClientsCommandsBatch();
}
}
/* Function to handle write jobs */
static void handleWriteJobs(client **write_jobs, int write_count) {
server.stat_io_writes_pending -= write_count;
serverAssert(server.stat_io_writes_pending >= 0);
for (int i = 0; i < write_count; i++) {
client *c = write_jobs[i];
server.stat_io_writes_processed++;
processClientIOWriteDone(c);
}
}
#define JOB_BATCH_SIZE (16)
int processIOThreadsResponses(void) {
/* We don't check for threads number since some threads may return jobs then deactivate/shut-down */
/* Quick check if any pending operations exist */
if (getPendingIOResponsesCount() == 0) return 0;
int total_processed = 0;
void *jobs[JOB_BATCH_SIZE];
client *read_jobs[JOB_BATCH_SIZE];
client *write_jobs[JOB_BATCH_SIZE];
/* Loop until we consume all pending jobs */
while (1) {
int received_responses = 0;
int dequeued_count = 0;
int read_count = 0;
int write_count = 0;
/* Try to dequeue JOB_BATCH_SIZE */
while (received_responses < JOB_BATCH_SIZE) {
dequeued_count = mpscDequeueBatch(&io_shared_outbox, jobs, JOB_BATCH_SIZE - received_responses);
/* Stop if we can't get more jobs from the queue. */
if (dequeued_count == 0) break;
received_responses += dequeued_count;
total_processed += dequeued_count;
for (int i = 0; i < dequeued_count; i++) {
client *c;
int job_type;
unpack_job(jobs[i], (void *)&c, &job_type);
if (job_type == JOB_RES_READ_CLIENT) {
serverAssert(c->io_read_state == CLIENT_COMPLETED_IO);
read_jobs[read_count++] = c;
} else if (job_type == JOB_RES_WRITE_CLIENT) {
serverAssert(c->io_write_state == CLIENT_COMPLETED_IO);
write_jobs[write_count++] = c;
} else {
serverPanic("Unknown job type %d", job_type);
}
}
}
if (read_count) handleReadJobs(read_jobs, read_count);
if (write_count) handleWriteJobs(write_jobs, write_count);
/* If the queue was empty at the last try - don't try again */
if (dequeued_count == 0) return total_processed;
}
}

View File

@ -3,17 +3,42 @@
#include "server.h"
void initIOThreads(void);
typedef enum {
JOB_REQ_READ_CLIENT = 0,
JOB_REQ_WRITE_CLIENT,
JOB_REQ_FREE_ARGV,
JOB_REQ_FREE_OBJ,
JOB_REQ_POLL,
JOB_REQ_ACCEPT,
JOB_REQ_COUNT
} JobRequest;
_Static_assert(JOB_REQ_COUNT <= 7, "JOB_REQ_COUNT must not exceed 7 for pointer arithmetic");
typedef enum {
JOB_RES_READ_CLIENT = 0,
JOB_RES_WRITE_CLIENT,
JOB_RES_COUNT
} JobResult;
_Static_assert(JOB_RES_COUNT <= 7, "JOB_RES_COUNT must not exceed 7 for pointer arithmetic");
typedef void (*job_handler)(void *);
void initIOThreads(int prev_threads_num);
void killIOThreads(void);
int inMainThread(void);
int trySendReadToIOThreads(client *c);
int trySendWriteToIOThreads(client *c);
int tryOffloadFreeObjToIOThreads(robj *o);
int tryOffloadFreeArgvToIOThreads(client *c, int argc, robj **argv);
void adjustIOThreadsByEventLoad(int numevents, int increase_only);
void IOThreadsAfterSleep(int numevents);
void IOThreadsBeforeSleep(long long current_time);
void drainIOThreadsQueue(void);
void trySendPollJobToIOThreads(void);
int trySendAcceptToIOThreads(connection *conn);
int updateIOThreads(const char **err);
int clientHasPendingIO(struct client *c);
int processIOThreadsResponses(void);
int getCurTid(void);
void sendToMainThread(void *data, int type);
#endif /* IO_THREADS_H */

View File

@ -353,7 +353,6 @@ client *createClient(connection *conn) {
c->last_memory_usage = 0;
c->last_memory_type = CLIENT_TYPE_NORMAL;
listInitNode(&c->clients_pending_write_node, c);
listInitNode(&c->pending_read_list_node, c);
c->mem_usage_bucket = NULL;
c->mem_usage_bucket_node = NULL;
if (conn) linkClient(c);
@ -1873,9 +1872,6 @@ void disconnectReplicas(void) {
void unlinkClient(client *c) {
listNode *ln;
/* Wait for IO operations to be done before unlinking the client. */
waitForClientIO(c);
/* If this is marked as current client unset it. */
if (c->conn && server.current_client == c) server.current_client = NULL;
@ -1935,21 +1931,12 @@ void unlinkClient(client *c) {
/* Remove from the list of pending writes if needed. */
if (c->flag.pending_write) {
if (c->io_write_state == CLIENT_IDLE) {
listUnlinkNode(server.clients_pending_write, &c->clients_pending_write_node);
} else {
listUnlinkNode(server.clients_pending_io_write, &c->clients_pending_write_node);
}
serverAssert(server.clients_pending_write->len > 0);
listUnlinkNode(server.clients_pending_write, &c->clients_pending_write_node);
c->flag.pending_write = 0;
}
/* Remove from the list of pending reads if needed. */
serverAssert(c->io_read_state != CLIENT_PENDING_IO && c->io_write_state != CLIENT_PENDING_IO);
if (c->flag.pending_read) {
listUnlinkNode(server.clients_pending_io_read, &c->pending_read_list_node);
c->flag.pending_read = 0;
}
/* When client was just unblocked because of a blocking operation,
* remove it from the list of unblocked clients. */
@ -2012,19 +1999,19 @@ void clearClientConnectionState(client *c) {
c->flag.no_evict = 0;
}
void freeClient(client *c) {
/* Free the client structure and all the data associated with it.
* Returns 0 if the client was not freed immediately, but scheduled for
* asynchronous freeing, and 1 if the client was freed immediately. */
int freeClient(client *c) {
listNode *ln;
/* If a client is protected, yet we need to free it right now, make sure
* to at least use asynchronous freeing. */
if (c->flag.protected || c->flag.protected_rdb_channel) {
if (c->flag.protected || c->flag.protected_rdb_channel || clientHasPendingIO(c)) {
freeClientAsync(c);
return;
return 0;
}
/* Wait for IO operations to be done before proceeding */
waitForClientIO(c);
/* For connected clients, call the disconnection event of modules hooks. */
if (c->conn) {
moduleFireServerEvent(VALKEYMODULE_EVENT_CLIENT_CHANGE, VALKEYMODULE_SUBEVENT_CLIENT_CHANGE_DISCONNECTED, c);
@ -2055,7 +2042,7 @@ void freeClient(client *c) {
c->flag.close_asap = 0;
c->flag.close_after_reply = 0;
replicationCachePrimary(c);
return;
return 0;
}
}
@ -2131,6 +2118,7 @@ void freeClient(client *c) {
sdsfree(c->peerid);
sdsfree(c->sockname);
zfree(c);
return 1;
}
/* Schedule a client to free it at a safe time in the beforeSleep() function.
@ -2277,7 +2265,7 @@ int freeClientsInAsyncFreeQueue(void) {
c->flag.protected_rdb_channel = 0;
}
if (c->flag.protected) continue;
if (c->flag.protected || clientHasPendingIO(c)) continue;
c->flag.close_asap = 0;
freeClient(c);
@ -3070,18 +3058,13 @@ parseResult handleParseResults(client *c) {
* This function handles various post-write tasks, including updating client state,
* allow_async_writes - A flag indicating whether I/O threads can handle pending writes for this client.
* returns 1 if processing completed successfully, 0 if processing is skipped. */
int processClientIOWriteDone(client *c, int allow_async_writes) {
/* memory barrier acquire to get the latest client state */
atomic_thread_fence(memory_order_acquire);
/* If a client is protected, don't proceed to check the write results as it may trigger conn close. */
if (c->flag.protected) return 0;
listUnlinkNode(server.clients_pending_io_write, &c->clients_pending_write_node);
c->flag.pending_write = 0;
void processClientIOWriteDone(client *c) {
if (c->io_write_state == CLIENT_IDLE) return; /* Already handled */
serverAssert(c->io_write_state == CLIENT_COMPLETED_IO);
c->io_write_state = CLIENT_IDLE;
/* Don't post-process-writes to clients that are going to be closed anyway. */
if (c->flag.close_asap) return 0;
if (c->flag.close_asap) return;
/* Update processed count on server */
server.stat_io_writes_processed += 1;
@ -3089,48 +3072,21 @@ int processClientIOWriteDone(client *c, int allow_async_writes) {
connSetPostponeUpdateState(c->conn, 0);
connUpdateState(c->conn);
if (postWriteToClient(c) == C_ERR) {
return 1;
return;
}
if (clientHasPendingReplies(c)) {
if (c->write_flags & WRITE_FLAGS_WRITE_ERROR) {
/* Install the write handler if there are pending writes in some of the clients as a result of not being
* able to write everything in one go. */
installClientWriteHandler(c);
} else {
/* If we can send the client to the I/O thread, let it handle the write. */
if (allow_async_writes && trySendWriteToIOThreads(c) == C_OK) return 1;
/* Try again in the next eventloop */
putClientInPendingWriteQueue(c);
}
if (!clientHasPendingReplies(c)) return;
if (c->write_flags & WRITE_FLAGS_WRITE_ERROR) {
/* Install the write handler if there are pending writes in some of the clients as a result of not being
* able to write everything in one go. */
installClientWriteHandler(c);
} else {
/* If we can send the client to the I/O thread, let it handle the write. */
if (trySendWriteToIOThreads(c) == C_OK) return;
/* Try again in the next eventloop */
putClientInPendingWriteQueue(c);
}
return 1;
}
/* This function handles the post-processing of I/O write operations that have been
* completed for clients. It iterates through the list of clients with pending I/O
* writes and performs necessary actions based on their current state.
*
* Returns The number of clients processed during this function call. */
int processIOThreadsWriteDone(void) {
if (listLength(server.clients_pending_io_write) == 0) return 0;
int processed = 0;
listNode *ln;
listNode *next = listFirst(server.clients_pending_io_write);
while (next) {
ln = next;
next = listNextNode(ln);
client *c = listNodeValue(ln);
/* Client is still waiting for a pending I/O - skip it */
if (c->io_write_state == CLIENT_PENDING_IO || c->io_read_state == CLIENT_PENDING_IO) continue;
processed += processClientIOWriteDone(c, 1);
}
return processed;
}
/* This function is called just before entering the event loop, in the hope
@ -3142,17 +3098,12 @@ int handleClientsWithPendingWrites(void) {
int pending_writes = listLength(server.clients_pending_write);
if (pending_writes == 0) return processed; /* Return ASAP if there are no clients. */
/* Adjust the number of I/O threads based on the number of pending writes this is required in case pending_writes >
* poll_events (for example in pubsub) */
adjustIOThreadsByEventLoad(pending_writes, 1);
listIter li;
listNode *ln;
listRewind(server.clients_pending_write, &li);
while ((ln = listNext(&li))) {
client *c = listNodeValue(ln);
c->flag.pending_write = 0;
listUnlinkNode(server.clients_pending_write, ln);
serverAssert(c->flag.pending_write);
/* If a client is protected, don't do anything,
* that may trigger write error or recreate handler. */
@ -3161,13 +3112,18 @@ int handleClientsWithPendingWrites(void) {
/* Don't write to clients that are going to be closed anyway. */
if (c->flag.close_asap) continue;
if (c->io_read_state == CLIENT_PENDING_IO) continue;
c->flag.pending_write = 0;
listUnlinkNode(server.clients_pending_write, ln);
if (!clientHasPendingReplies(c)) continue;
/* If we can send the client to the I/O thread, let it handle the write. */
if (trySendWriteToIOThreads(c) == C_OK) continue;
/* We can't write to the client while IO operation is in progress. */
if (c->io_write_state != CLIENT_IDLE || c->io_read_state != CLIENT_IDLE) continue;
if (c->io_write_state != CLIENT_IDLE) continue;
processed++;
@ -3245,8 +3201,7 @@ void initSharedQueryBuf(void) {
sdsclear(thread_shared_qb);
}
void freeSharedQueryBuf(void *dummy) {
UNUSED(dummy);
void freeSharedQueryBuf(void) {
sdsfree(thread_shared_qb);
thread_shared_qb = NULL;
}
@ -6313,91 +6268,59 @@ int postponeClientRead(client *c) {
return (trySendReadToIOThreads(c) == C_OK);
}
int processIOThreadsReadDone(void) {
void processClientIOReadsDone(client *c) {
serverAssert(c->io_read_state == CLIENT_COMPLETED_IO);
if (ProcessingEventsWhileBlocked) {
/* When ProcessingEventsWhileBlocked we may call processIOThreadsReadDone recursively.
* In this case, there may be some clients left in the batch waiting to be processed. */
processClientsCommandsBatch();
}
if (listLength(server.clients_pending_io_read) == 0) return 0;
int processed = 0;
listNode *ln;
c->flag.pending_read = 0;
c->io_read_state = CLIENT_IDLE;
listNode *next = listFirst(server.clients_pending_io_read);
while (next) {
ln = next;
next = listNextNode(ln);
client *c = listNodeValue(ln);
/* Don't post-process-reads from clients that are going to be closed anyway. */
if (c->flag.close_asap) return;
/* Client is still waiting for a pending I/O - skip it */
if (c->io_write_state == CLIENT_PENDING_IO || c->io_read_state == CLIENT_PENDING_IO) continue;
/* If the write job is done, process it ASAP to free the buffer and handle connection errors */
if (c->io_write_state == CLIENT_COMPLETED_IO) {
int allow_async_writes = 0; /* Don't send writes for the client to IO threads before processing the reads */
processClientIOWriteDone(c, allow_async_writes);
}
/* memory barrier acquire to get the updated client state */
atomic_thread_fence(memory_order_acquire);
/* If a client is protected, don't do anything,
* that may trigger read/write error or recreate handler. */
if (c->flag.protected) return;
listUnlinkNode(server.clients_pending_io_read, ln);
c->flag.pending_read = 0;
c->io_read_state = CLIENT_IDLE;
/* Save the current conn state, as connUpdateState may modify it */
int in_accept_state = (connGetState(c->conn) == CONN_STATE_ACCEPTING);
connSetPostponeUpdateState(c->conn, 0);
connUpdateState(c->conn);
/* Don't post-process-reads from clients that are going to be closed anyway. */
if (c->flag.close_asap) continue;
/* In accept state, no client's data was read - stop here. */
if (in_accept_state) return;
/* If a client is protected, don't do anything,
* that may trigger read/write error or recreate handler. */
if (c->flag.protected) continue;
/* On read error - stop here. */
if (handleReadResult(c) == C_ERR) {
return;
}
processed++;
server.stat_io_reads_processed++;
/* Save the current conn state, as connUpdateState may modify it */
int in_accept_state = (connGetState(c->conn) == CONN_STATE_ACCEPTING);
connSetPostponeUpdateState(c->conn, 0);
connUpdateState(c->conn);
/* In accept state, no client's data was read - stop here. */
if (in_accept_state) continue;
/* On read error - stop here. */
if (handleReadResult(c) == C_ERR) {
continue;
}
if (!(c->read_flags & READ_FLAGS_DONT_PARSE)) {
parseResult res = handleParseResults(c);
/* On parse error - stop here. */
if (res == PARSE_ERR) {
continue;
} else if (res == PARSE_NEEDMORE) {
beforeNextClient(c);
continue;
}
}
if (c->argc > 0) {
c->flag.pending_command = 1;
}
size_t list_length_before_command_execute = listLength(server.clients_pending_io_read);
/* try to add the command to the batch */
int ret = addCommandToBatchAndProcessIfFull(c);
/* If the command was not added to the commands batch, process it immediately */
if (ret == C_ERR) {
if (processPendingCommandAndInputBuffer(c) == C_OK) beforeNextClient(c);
}
if (list_length_before_command_execute != listLength(server.clients_pending_io_read)) {
/* A client was unlink from the list possibly making the next node invalid */
next = listFirst(server.clients_pending_io_read);
if (!(c->read_flags & READ_FLAGS_DONT_PARSE)) {
parseResult res = handleParseResults(c);
/* On parse error - stop here. */
if (res == PARSE_ERR) {
return;
} else if (res == PARSE_NEEDMORE) {
beforeNextClient(c);
return;
}
}
processClientsCommandsBatch();
if (c->argc > 0) {
c->flag.pending_command = 1;
}
return processed;
/* try to add the command to the batch */
int ret = addCommandToBatchAndProcessIfFull(c);
/* If the command was not added to the commands batch, process it immediately */
if (ret == C_ERR) {
if (processPendingCommandAndInputBuffer(c) == C_OK) beforeNextClient(c);
}
}
/* Returns the actual client eviction limit based on current configuration or
@ -6430,16 +6353,32 @@ void evictClients(void) {
listRewind(server.client_mem_usage_buckets[curr_bucket].clients, &bucket_iter);
size_t client_eviction_limit = getClientEvictionLimit();
if (client_eviction_limit == 0) return;
while (server.stat_clients_type_memory[CLIENT_TYPE_NORMAL] + server.stat_clients_type_memory[CLIENT_TYPE_PUBSUB] >
/* Variable to track memory of clients marked for close but not yet freed */
size_t pending_freed = 0;
while (server.stat_clients_type_memory[CLIENT_TYPE_NORMAL] +
server.stat_clients_type_memory[CLIENT_TYPE_PUBSUB] -
pending_freed >
client_eviction_limit) {
listNode *ln = listNext(&bucket_iter);
if (ln) {
client *c = ln->value;
if (c->flag.close_asap) {
/* Already scheduled to close. Count memory as freed and skip. */
pending_freed += getClientMemoryUsage(c, NULL);
continue;
}
sds ci = catClientInfoString(sdsempty(), c, server.hide_user_data_from_log);
serverLog(LL_NOTICE, "Evicting client: %s", ci);
freeClient(c);
sdsfree(ci);
server.stat_evictedclients++;
if (freeClient(c) == 0) {
/* Protected client (async close). Count memory as freed and skip. */
pending_freed += getClientMemoryUsage(c, NULL);
continue;
}
} else {
curr_bucket--;
if (curr_bucket < 0) {
@ -6460,6 +6399,10 @@ void ioThreadReadQueryFromClient(void *data) {
/* Read */
readToQueryBuf(c);
if (c->flag.close_asap) {
goto done;
}
/* Check for read errors. */
if (c->nread <= 0) {
goto done;
@ -6495,8 +6438,10 @@ done:
if (!(c->read_flags & READ_FLAGS_REPLICATED)) {
trimClientQueryBuffer(c);
}
atomic_thread_fence(memory_order_release);
c->io_read_state = CLIENT_COMPLETED_IO;
c->cur_tid = getCurTid();
sendToMainThread(c, JOB_RES_READ_CLIENT);
}
void ioThreadWriteToClient(void *data) {
@ -6509,6 +6454,6 @@ void ioThreadWriteToClient(void *data) {
_writeToClient(c);
}
atomic_thread_fence(memory_order_release);
c->io_write_state = CLIENT_COMPLETED_IO;
sendToMainThread(c, JOB_RES_WRITE_CLIENT);
}

View File

@ -1793,14 +1793,14 @@ void beforeSleep(struct aeEventLoop *eventLoop) {
* events to handle. */
if (ProcessingEventsWhileBlocked) {
uint64_t processed = 0;
processed += processIOThreadsReadDone();
processed += processIOThreadsResponses();
processed += connTypeProcessPendingData();
if (server.aof_state == AOF_ON || server.aof_state == AOF_WAIT_REWRITE) flushAppendOnlyFile(0);
processed += handleClientsWithPendingWrites();
int last_processed = 0;
do {
/* Try to process all the pending IO events. */
last_processed = processIOThreadsReadDone() + processIOThreadsWriteDone();
last_processed = processIOThreadsResponses();
processed += last_processed;
} while (last_processed != 0);
processed += freeClientsInAsyncFreeQueue();
@ -1809,7 +1809,7 @@ void beforeSleep(struct aeEventLoop *eventLoop) {
}
/* We should handle pending reads clients ASAP after event loop. */
processIOThreadsReadDone();
processIOThreadsResponses();
/* Handle pending data(typical TLS). (must be done before flushAppendOnlyFile) */
connTypeProcessPendingData();
@ -1904,11 +1904,9 @@ void beforeSleep(struct aeEventLoop *eventLoop) {
/* Try to process more IO reads that are ready to be processed. */
if (server.aof_fsync != AOF_FSYNC_ALWAYS) {
processIOThreadsReadDone();
processIOThreadsResponses();
}
processIOThreadsWriteDone();
/* Record cron time in beforeSleep. This does not include the time consumed by AOF writing and IO writing above. */
monotime cron_start_time_after_write = getMonotonicUs();
@ -1923,11 +1921,12 @@ void beforeSleep(struct aeEventLoop *eventLoop) {
evictClients();
/* Record cron time in beforeSleep. */
monotime duration_after_write = getMonotonicUs() - cron_start_time_after_write;
monotime current_time = getMonotonicUs();
monotime duration_after_write = current_time - cron_start_time_after_write;
/* Record eventloop latency. */
if (server.el_start > 0) {
monotime el_duration = getMonotonicUs() - server.el_start;
monotime el_duration = current_time - server.el_start;
durationAddSample(EL_DURATION_TYPE_EL, el_duration);
latencyTraceIfNeeded(server, eventloop, el_duration);
}
@ -1947,6 +1946,8 @@ void beforeSleep(struct aeEventLoop *eventLoop) {
* connection has pending data) */
aeSetDontWait(server.el, dont_sleep);
IOThreadsBeforeSleep(current_time);
/* Before we are going to sleep, let the threads access the dataset by
* releasing the GIL. The server main thread will not touch anything at this
* time. */
@ -1993,7 +1994,7 @@ void afterSleep(struct aeEventLoop *eventLoop, int numevents) {
server.cmd_time_snapshot = server.mstime;
}
adjustIOThreadsByEventLoad(numevents, 0);
IOThreadsAfterSleep(numevents);
}
/* =========================== Server initialization ======================== */
@ -2843,8 +2844,6 @@ void initServer(void) {
server.replicas_waiting_psync = raxNew();
server.wait_before_rdb_client_free = DEFAULT_WAIT_BEFORE_RDB_CLIENT_FREE;
server.clients_pending_write = listCreate();
server.clients_pending_io_write = listCreate();
server.clients_pending_io_read = listCreate();
server.clients_timeout_table = raxNew();
server.replication_allowed = 1;
server.replicas_eldb = -1; /* Force to emit the first SELECT command. */
@ -3117,7 +3116,7 @@ void initListeners(void) {
* see: https://sourceware.org/bugzilla/show_bug.cgi?id=19329 */
void InitServerLast(void) {
bioInit();
initIOThreads();
initIOThreads(1);
set_jemalloc_bg_thread(server.jemalloc_bg_thread);
server.initial_memory_usage = zmalloc_used_memory();
}
@ -6219,10 +6218,13 @@ sds genValkeyInfoString(dict *section_dict, int all_sections, int everything) {
"io_threaded_reads_processed:%lld\r\n", server.stat_io_reads_processed,
"io_threaded_writes_processed:%lld\r\n", server.stat_io_writes_processed,
"io_threaded_freed_objects:%lld\r\n", server.stat_io_freed_objects,
"io_threaded_reads_pending:%lld\r\n", server.stat_io_reads_pending,
"io_threaded_writes_pending:%lld\r\n", server.stat_io_writes_pending,
"io_threaded_accept_processed:%lld\r\n", server.stat_io_accept_offloaded,
"io_threaded_poll_processed:%lld\r\n", server.stat_poll_processed_by_io_threads,
"io_threaded_total_prefetch_batches:%lld\r\n", server.stat_total_prefetch_batches,
"io_threaded_total_prefetch_entries:%lld\r\n", server.stat_total_prefetch_entries,
"active_io_threads_num:%d\r\n", server.active_io_threads_num,
"client_query_buffer_limit_disconnections:%lld\r\n", server.stat_client_qbuf_limit_disconnections,
"client_output_buffer_limit_disconnections:%lld\r\n", server.stat_client_outbuf_limit_disconnections,
"reply_buffer_shrinks:%lld\r\n", server.stat_reply_buffer_shrinks,
@ -6231,7 +6233,8 @@ sds genValkeyInfoString(dict *section_dict, int all_sections, int everything) {
"eventloop_duration_sum:%llu\r\n", server.duration_stats[EL_DURATION_TYPE_EL].sum,
"eventloop_duration_cmd_sum:%llu\r\n", server.duration_stats[EL_DURATION_TYPE_CMD].sum,
"instantaneous_eventloop_cycles_per_sec:%llu\r\n", getInstantaneousMetric(STATS_METRIC_EL_CYCLE),
"instantaneous_eventloop_duration_usec:%llu\r\n", getInstantaneousMetric(STATS_METRIC_EL_DURATION)));
"instantaneous_eventloop_duration_usec:%llu\r\n", getInstantaneousMetric(STATS_METRIC_EL_DURATION),
"instantaneous_io_pending_jobs:%lld\r\n", getInstantaneousMetric(STATS_METRIC_IO_WAIT)));
info = genValkeyInfoStringACLStats(info);
}

View File

@ -190,6 +190,8 @@ typedef enum {
STATS_METRIC_NET_OUTPUT_REPLICATION, /* Bytes written to network during replication. */
STATS_METRIC_EL_CYCLE, /* Number of eventloop cycled. */
STATS_METRIC_EL_DURATION, /* Eventloop duration. */
STATS_METRIC_IO_WAIT, /* IO queue size */
STATS_METRIC_MAIN_THREAD_CPU_SYS, /* Main thread CPU sys time */
STATS_METRIC_COUNT /* Total count */
} instantaneous_metric_type;
@ -1320,8 +1322,7 @@ typedef struct client {
* client, and in which category the client was, in order to remove it
* before adding it the new value. */
uint8_t last_memory_type;
uint8_t capa; /* Client capabilities: CLIENT_CAPA* macros. */
listNode pending_read_list_node; /* IO thread only ?*/
uint8_t capa; /* Client capabilities: CLIENT_CAPA* macros. */
/* Statistics and metrics */
unsigned long long net_input_bytes; /* Total network input bytes read from this client. */
unsigned long long net_input_bytes_curr_cmd; /* Total network input bytes read for the* execution of this client's current command. */
@ -1724,8 +1725,6 @@ struct valkeyServer {
list *clients; /* List of active clients */
list *clients_to_close; /* Clients to close asynchronously */
list *clients_pending_write; /* There is to write or install handler. */
list *clients_pending_io_read; /* List of clients with pending read to be process by I/O threads. */
list *clients_pending_io_write; /* List of clients with pending write to be process by I/O threads. */
list *replicas, *monitors; /* List of replicas and MONITORs */
rax *replicas_waiting_psync; /* Radix tree for tracking replicas awaiting partial synchronization.
* Key: RDB client ID
@ -1758,7 +1757,7 @@ struct valkeyServer {
int protected_mode; /* Don't accept external connections. */
int io_threads_num; /* Number of IO threads to use. */
int active_io_threads_num; /* Current number of active IO threads, includes main thread. */
int events_per_io_thread; /* Number of events on the event loop to trigger IO threads activation. */
int io_threads_always_active; /* Activate all IO threads regardless of load size. */
int prefetch_batch_max_size; /* Maximum number of keys to prefetch in a single batch */
long long events_processed_while_blocked; /* processEventsWhileBlocked() */
int enable_protected_configs; /* Enable the modification of protected configs, see PROTECTED_ACTION_ALLOWED_* */
@ -1840,7 +1839,9 @@ struct valkeyServer {
long long stat_total_error_replies; /* Total number of issued error replies ( command + rejected errors ) */
long long stat_dump_payload_sanitizations; /* Number deep dump payloads integrity validations. */
long long stat_io_reads_processed; /* Number of read events processed by IO threads */
long long stat_io_reads_pending; /* Number of read events pending in IO threads */
long long stat_io_writes_processed; /* Number of write events processed by IO threads */
long long stat_io_writes_pending; /* Number of write events pending in IO threads */
long long stat_io_freed_objects; /* Number of objects freed by IO threads */
long long stat_io_accept_offloaded; /* Number of offloaded accepts */
long long stat_poll_processed_by_io_threads; /* Total number of poll jobs processed by IO */
@ -1999,7 +2000,7 @@ struct valkeyServer {
int syslog_facility; /* Syslog facility */
int crashlog_enabled; /* Enable signal handler for crashlog.
* disable for clean core dumps. */
int crashed; /* True if the server has crashed, used in catClientInfoString
volatile int crashed; /* True if the server has crashed, used in catClientInfoString
* to indicate that no wait for IO threads is needed. */
int memcheck_enabled; /* Enable memory check on crash. */
int use_exit_on_panic; /* Use exit() on panic and assert rather than
@ -2784,7 +2785,7 @@ void dictVanillaFree(void *val);
#define WRITE_FLAGS_IS_REPLICA (1 << 1)
client *createClient(connection *conn);
void freeClient(client *c);
int freeClient(client *c);
void freeClientAsync(client *c);
void logInvalidUseAndFreeClientAsync(client *c, const char *fmt, ...);
void beforeNextClient(client *c);
@ -2906,7 +2907,7 @@ void linkClient(client *c);
void protectClient(client *c);
void unprotectClient(client *c);
void initSharedQueryBuf(void);
void freeSharedQueryBuf(void *dummy);
void freeSharedQueryBuf(void);
client *lookupClientByID(uint64_t id);
int authRequired(client *c);
void clientSetUser(client *c, user *u, int authenticated);
@ -2917,8 +2918,8 @@ void waitForClientIO(client *c);
void ioThreadReadQueryFromClient(void *data);
void ioThreadWriteToClient(void *data);
int canParseCommand(client *c);
int processIOThreadsReadDone(void);
int processIOThreadsWriteDone(void);
void processClientIOReadsDone(client *c);
void processClientIOWriteDone(client *c);
void releaseReplyReferences(client *c);
void resetLastWrittenBuf(client *c);
@ -3388,6 +3389,8 @@ sds activeDefragSds(sds sdsptr);
robj *activeDefragStringOb(robj *ob);
void dismissSds(sds s);
void dismissMemoryInChild(void);
void trackInstantaneousMetric(int metric, long long current_value, long long current_base, long long factor);
long long getInstantaneousMetric(int metric);
#define RESTART_SERVER_NONE 0
#define RESTART_SERVER_GRACEFULLY (1 << 0) /* Do proper shutdown. */

View File

@ -56,6 +56,21 @@ int test_intsetUpgradeFromint16Toint64(int argc, char **argv, int flags);
int test_intsetUpgradeFromint32Toint64(int argc, char **argv, int flags);
int test_intsetStressLookups(int argc, char **argv, int flags);
int test_intsetStressAddDelete(int argc, char **argv, int flags);
int test_spscBasicEnqueueDequeue(int argc, char **argv, int flags);
int test_spscBatchCommit(int argc, char **argv, int flags);
int test_spscFullAndWrapAround(int argc, char **argv, int flags);
int test_spscEmptyDequeue(int argc, char **argv, int flags);
int test_spscPartialBatchDequeue(int argc, char **argv, int flags);
int test_spmcBasicEnqueueDequeue(int argc, char **argv, int flags);
int test_spmcFullAndWrapAround(int argc, char **argv, int flags);
int test_spmcSize(int argc, char **argv, int flags);
int test_mpscBasicEnqueueDequeue(int argc, char **argv, int flags);
int test_mpscTicketRetry(int argc, char **argv, int flags);
int test_mpscFullAndWrapAround(int argc, char **argv, int flags);
int test_mpscInterleavedOperations(int argc, char **argv, int flags);
int test_spscConcurrent(int argc, char **argv, int flags);
int test_spmcConcurrent(int argc, char **argv, int flags);
int test_mpscConcurrent(int argc, char **argv, int flags);
int test_kvstoreAdd16Keys(int argc, char **argv, int flags);
int test_kvstoreIteratorRemoveAllKeysNoDeleteEmptyHashtable(int argc, char **argv, int flags);
int test_kvstoreIteratorRemoveAllKeysDeleteEmptyHashtable(int argc, char **argv, int flags);
@ -270,6 +285,7 @@ unitTest __test_endianconv_c[] = {{"test_endianconv", test_endianconv}, {NULL, N
unitTest __test_entry_c[] = {{"test_entryCreate", test_entryCreate}, {"test_entryUpdate", test_entryUpdate}, {"test_entryHasexpiry_entrySetExpiry", test_entryHasexpiry_entrySetExpiry}, {"test_entryIsExpired", test_entryIsExpired}, {"test_entryMemUsage_entrySetExpiry_entrySetValue", test_entryMemUsage_entrySetExpiry_entrySetValue}, {NULL, NULL}};
unitTest __test_hashtable_c[] = {{"test_cursor", test_cursor}, {"test_set_hash_function_seed", test_set_hash_function_seed}, {"test_add_find_delete", test_add_find_delete}, {"test_add_find_delete_avoid_resize", test_add_find_delete_avoid_resize}, {"test_instant_rehashing", test_instant_rehashing}, {"test_bucket_chain_length", test_bucket_chain_length}, {"test_two_phase_insert_and_pop", test_two_phase_insert_and_pop}, {"test_replace_reallocated_entry", test_replace_reallocated_entry}, {"test_incremental_find", test_incremental_find}, {"test_scan", test_scan}, {"test_iterator", test_iterator}, {"test_safe_iterator", test_safe_iterator}, {"test_compact_bucket_chain", test_compact_bucket_chain}, {"test_random_entry", test_random_entry}, {"test_random_entry_with_long_chain", test_random_entry_with_long_chain}, {"test_random_entry_sparse_table", test_random_entry_sparse_table}, {"test_safe_iterator_invalidation", test_safe_iterator_invalidation}, {"test_safe_iterator_empty_no_invalidation", test_safe_iterator_empty_no_invalidation}, {"test_safe_iterator_reset_invalidation", test_safe_iterator_reset_invalidation}, {"test_safe_iterator_reset_untracking", test_safe_iterator_reset_untracking}, {"test_safe_iterator_pause_resume_tracking", test_safe_iterator_pause_resume_tracking}, {"test_null_hashtable_iterator", test_null_hashtable_iterator}, {"test_hashtable_retarget_iterator", test_hashtable_retarget_iterator}, {NULL, NULL}};
unitTest __test_intset_c[] = {{"test_intsetValueEncodings", test_intsetValueEncodings}, {"test_intsetBasicAdding", test_intsetBasicAdding}, {"test_intsetLargeNumberRandomAdd", test_intsetLargeNumberRandomAdd}, {"test_intsetUpgradeFromint16Toint32", test_intsetUpgradeFromint16Toint32}, {"test_intsetUpgradeFromint16Toint64", test_intsetUpgradeFromint16Toint64}, {"test_intsetUpgradeFromint32Toint64", test_intsetUpgradeFromint32Toint64}, {"test_intsetStressLookups", test_intsetStressLookups}, {"test_intsetStressAddDelete", test_intsetStressAddDelete}, {NULL, NULL}};
unitTest __test_io_queues_c[] = {{"test_spscBasicEnqueueDequeue", test_spscBasicEnqueueDequeue}, {"test_spscBatchCommit", test_spscBatchCommit}, {"test_spscFullAndWrapAround", test_spscFullAndWrapAround}, {"test_spscEmptyDequeue", test_spscEmptyDequeue}, {"test_spscPartialBatchDequeue", test_spscPartialBatchDequeue}, {"test_spmcBasicEnqueueDequeue", test_spmcBasicEnqueueDequeue}, {"test_spmcFullAndWrapAround", test_spmcFullAndWrapAround}, {"test_spmcSize", test_spmcSize}, {"test_mpscBasicEnqueueDequeue", test_mpscBasicEnqueueDequeue}, {"test_mpscTicketRetry", test_mpscTicketRetry}, {"test_mpscFullAndWrapAround", test_mpscFullAndWrapAround}, {"test_mpscInterleavedOperations", test_mpscInterleavedOperations}, {"test_spscConcurrent", test_spscConcurrent}, {"test_spmcConcurrent", test_spmcConcurrent}, {"test_mpscConcurrent", test_mpscConcurrent}, {NULL, NULL}};
unitTest __test_kvstore_c[] = {{"test_kvstoreAdd16Keys", test_kvstoreAdd16Keys}, {"test_kvstoreIteratorRemoveAllKeysNoDeleteEmptyHashtable", test_kvstoreIteratorRemoveAllKeysNoDeleteEmptyHashtable}, {"test_kvstoreIteratorRemoveAllKeysDeleteEmptyHashtable", test_kvstoreIteratorRemoveAllKeysDeleteEmptyHashtable}, {"test_kvstoreHashtableIteratorRemoveAllKeysNoDeleteEmptyHashtable", test_kvstoreHashtableIteratorRemoveAllKeysNoDeleteEmptyHashtable}, {"test_kvstoreHashtableIteratorRemoveAllKeysDeleteEmptyHashtable", test_kvstoreHashtableIteratorRemoveAllKeysDeleteEmptyHashtable}, {"test_kvstoreHashtableExpand", test_kvstoreHashtableExpand}, {NULL, NULL}};
unitTest __test_listpack_c[] = {{"test_listpackCreateIntList", test_listpackCreateIntList}, {"test_listpackCreateList", test_listpackCreateList}, {"test_listpackLpPrepend", test_listpackLpPrepend}, {"test_listpackLpPrependInteger", test_listpackLpPrependInteger}, {"test_listpackGetELementAtIndex", test_listpackGetELementAtIndex}, {"test_listpackPop", test_listpackPop}, {"test_listpackGetELementAtIndex2", test_listpackGetELementAtIndex2}, {"test_listpackIterate0toEnd", test_listpackIterate0toEnd}, {"test_listpackIterate1toEnd", test_listpackIterate1toEnd}, {"test_listpackIterate2toEnd", test_listpackIterate2toEnd}, {"test_listpackIterateBackToFront", test_listpackIterateBackToFront}, {"test_listpackIterateBackToFrontWithDelete", test_listpackIterateBackToFrontWithDelete}, {"test_listpackDeleteWhenNumIsMinusOne", test_listpackDeleteWhenNumIsMinusOne}, {"test_listpackDeleteWithNegativeIndex", test_listpackDeleteWithNegativeIndex}, {"test_listpackDeleteInclusiveRange0_0", test_listpackDeleteInclusiveRange0_0}, {"test_listpackDeleteInclusiveRange0_1", test_listpackDeleteInclusiveRange0_1}, {"test_listpackDeleteInclusiveRange1_2", test_listpackDeleteInclusiveRange1_2}, {"test_listpackDeleteWitStartIndexOutOfRange", test_listpackDeleteWitStartIndexOutOfRange}, {"test_listpackDeleteWitNumOverflow", test_listpackDeleteWitNumOverflow}, {"test_listpackBatchDelete", test_listpackBatchDelete}, {"test_listpackDeleteFooWhileIterating", test_listpackDeleteFooWhileIterating}, {"test_listpackReplaceWithSameSize", test_listpackReplaceWithSameSize}, {"test_listpackReplaceWithDifferentSize", test_listpackReplaceWithDifferentSize}, {"test_listpackRegressionGt255Bytes", test_listpackRegressionGt255Bytes}, {"test_listpackCreateLongListAndCheckIndices", test_listpackCreateLongListAndCheckIndices}, {"test_listpackCompareStrsWithLpEntries", test_listpackCompareStrsWithLpEntries}, {"test_listpackLpMergeEmptyLps", test_listpackLpMergeEmptyLps}, {"test_listpackLpMergeLp1Larger", test_listpackLpMergeLp1Larger}, {"test_listpackLpMergeLp2Larger", test_listpackLpMergeLp2Larger}, {"test_listpackLpNextRandom", test_listpackLpNextRandom}, {"test_listpackLpNextRandomCC", test_listpackLpNextRandomCC}, {"test_listpackRandomPairWithOneElement", test_listpackRandomPairWithOneElement}, {"test_listpackRandomPairWithManyElements", test_listpackRandomPairWithManyElements}, {"test_listpackRandomPairsWithOneElement", test_listpackRandomPairsWithOneElement}, {"test_listpackRandomPairsWithManyElements", test_listpackRandomPairsWithManyElements}, {"test_listpackRandomPairsUniqueWithOneElement", test_listpackRandomPairsUniqueWithOneElement}, {"test_listpackRandomPairsUniqueWithManyElements", test_listpackRandomPairsUniqueWithManyElements}, {"test_listpackPushVariousEncodings", test_listpackPushVariousEncodings}, {"test_listpackLpFind", test_listpackLpFind}, {"test_listpackLpValidateIntegrity", test_listpackLpValidateIntegrity}, {"test_listpackNumberOfElementsExceedsLP_HDR_NUMELE_UNKNOWN", test_listpackNumberOfElementsExceedsLP_HDR_NUMELE_UNKNOWN}, {"test_listpackStressWithRandom", test_listpackStressWithRandom}, {"test_listpackSTressWithVariableSize", test_listpackSTressWithVariableSize}, {"test_listpackBenchmarkInit", test_listpackBenchmarkInit}, {"test_listpackBenchmarkLpAppend", test_listpackBenchmarkLpAppend}, {"test_listpackBenchmarkLpFindString", test_listpackBenchmarkLpFindString}, {"test_listpackBenchmarkLpFindNumber", test_listpackBenchmarkLpFindNumber}, {"test_listpackBenchmarkLpSeek", test_listpackBenchmarkLpSeek}, {"test_listpackBenchmarkLpValidateIntegrity", test_listpackBenchmarkLpValidateIntegrity}, {"test_listpackBenchmarkLpCompareWithString", test_listpackBenchmarkLpCompareWithString}, {"test_listpackBenchmarkLpCompareWithNumber", test_listpackBenchmarkLpCompareWithNumber}, {"test_listpackBenchmarkFree", test_listpackBenchmarkFree}, {NULL, NULL}};
unitTest __test_networking_c[] = {{"test_writeToReplica", test_writeToReplica}, {"test_postWriteToReplica", test_postWriteToReplica}, {"test_backupAndUpdateClientArgv", test_backupAndUpdateClientArgv}, {"test_rewriteClientCommandArgument", test_rewriteClientCommandArgument}, {"test_addRepliesWithOffloadsToBuffer", test_addRepliesWithOffloadsToBuffer}, {"test_addRepliesWithOffloadsToList", test_addRepliesWithOffloadsToList}, {"test_addBufferToReplyIOV", test_addBufferToReplyIOV}, {NULL, NULL}};
@ -299,6 +315,7 @@ struct unitTestSuite {
{"test_entry.c", __test_entry_c},
{"test_hashtable.c", __test_hashtable_c},
{"test_intset.c", __test_intset_c},
{"test_io_queues.c", __test_io_queues_c},
{"test_kvstore.c", __test_kvstore_c},
{"test_listpack.c", __test_listpack_c},
{"test_networking.c", __test_networking_c},

560
src/unit/test_io_queues.c Normal file
View File

@ -0,0 +1,560 @@
/*
* Copyright (c) Valkey Contributors
* All rights reserved.
* SPDX-License-Identifier: BSD-3-Clause
*/
#include <assert.h>
#include <pthread.h>
#include <sched.h>
#include <stdatomic.h>
/* Mock inMainThread for tests - must be declared before io_queues.h */
static _Thread_local int mock_main_thread = 1;
static int inMainThread(void) {
return mock_main_thread;
}
#define serverAssert(e) assert(e)
#define debugServerAssert(e) assert(e)
#include "../io_queues.h"
#include "test_help.h"
/* SPSC Queue Tests */
int test_spscBasicEnqueueDequeue(int argc, char **argv, int flags) {
UNUSED(argc);
UNUSED(argv);
UNUSED(flags);
spscQueue q;
spscInit(&q);
void *data1 = (void *)0x1000;
void *data2 = (void *)0x2000;
/* Enqueue from main thread */
mock_main_thread = 1;
TEST_ASSERT(!spscIsFull(&q));
spscEnqueue(&q, data1, true);
spscEnqueue(&q, data2, true);
/* Dequeue from IO thread */
mock_main_thread = 0;
void *jobs[2];
size_t count = spscDequeueBatch(&q, jobs, 2);
TEST_ASSERT(count == 2);
TEST_ASSERT(jobs[0] == data1);
TEST_ASSERT(jobs[1] == data2);
/* Queue should be empty now */
mock_main_thread = 1;
TEST_ASSERT(spscIsEmpty(&q));
spscFree(&q);
return 0;
}
int test_spscBatchCommit(int argc, char **argv, int flags) {
UNUSED(argc);
UNUSED(argv);
UNUSED(flags);
spscQueue q;
spscInit(&q);
mock_main_thread = 1;
/* Enqueue without commit */
spscEnqueue(&q, (void *)0x1000, false);
spscEnqueue(&q, (void *)0x2000, false);
/* Consumer shouldn't see uncommitted data */
mock_main_thread = 0;
void *jobs[2];
size_t count = spscDequeueBatch(&q, jobs, 2);
TEST_ASSERT(count == 0);
/* Commit and try again */
mock_main_thread = 1;
spscCommit(&q);
mock_main_thread = 0;
count = spscDequeueBatch(&q, jobs, 2);
TEST_ASSERT(count == 2);
/* Test commit with no pending data */
mock_main_thread = 1;
spscCommit(&q);
mock_main_thread = 0;
count = spscDequeueBatch(&q, jobs, 2);
TEST_ASSERT(count == 0);
spscFree(&q);
return 0;
}
int test_spscFullAndWrapAround(int argc, char **argv, int flags) {
UNUSED(argc);
UNUSED(argv);
UNUSED(flags);
spscQueue q;
spscInit(&q);
/* Fill, drain, refill to test wrap-around */
for (int round = 0; round < 3; round++) {
mock_main_thread = 1;
for (size_t i = 0; i < SPSC_QUEUE_SIZE; i++) {
TEST_ASSERT(!spscIsFull(&q));
spscEnqueue(&q, (void *)(uintptr_t)(i + 1), true);
}
TEST_ASSERT(spscIsFull(&q));
mock_main_thread = 0;
void *jobs[64];
size_t total = 0;
size_t count;
while ((count = spscDequeueBatch(&q, jobs, 64)) > 0) {
total += count;
}
TEST_ASSERT(total == SPSC_QUEUE_SIZE);
mock_main_thread = 1;
TEST_ASSERT(spscIsEmpty(&q));
TEST_ASSERT(!spscIsFull(&q));
}
spscFree(&q);
return 0;
}
int test_spscEmptyDequeue(int argc, char **argv, int flags) {
UNUSED(argc);
UNUSED(argv);
UNUSED(flags);
spscQueue q;
spscInit(&q);
mock_main_thread = 0;
void *jobs[1];
TEST_ASSERT(spscDequeueBatch(&q, jobs, 1) == 0);
spscFree(&q);
return 0;
}
int test_spscPartialBatchDequeue(int argc, char **argv, int flags) {
UNUSED(argc);
UNUSED(argv);
UNUSED(flags);
spscQueue q;
spscInit(&q);
mock_main_thread = 1;
for (int i = 0; i < 5; i++) {
spscEnqueue(&q, (void *)(uintptr_t)(i + 1), true);
}
mock_main_thread = 0;
void *jobs[64];
TEST_ASSERT(spscDequeueBatch(&q, jobs, 64) == 5);
spscFree(&q);
return 0;
}
/* SPMC Queue Tests */
int test_spmcBasicEnqueueDequeue(int argc, char **argv, int flags) {
UNUSED(argc);
UNUSED(argv);
UNUSED(flags);
spmcQueue q;
spmcInit(&q);
void *data1 = (void *)0x1000;
void *data2 = (void *)0x2000;
mock_main_thread = 1;
TEST_ASSERT(spmcEnqueue(&q, data1));
TEST_ASSERT(spmcEnqueue(&q, data2));
mock_main_thread = 0;
void *result1 = spmcDequeue(&q);
void *result2 = spmcDequeue(&q);
void *result3 = spmcDequeue(&q);
TEST_ASSERT(result1 == data1);
TEST_ASSERT(result2 == data2);
TEST_ASSERT(result3 == NULL);
mock_main_thread = 1;
TEST_ASSERT(spmcIsEmpty(&q));
spmcFree(&q);
return 0;
}
int test_spmcFullAndWrapAround(int argc, char **argv, int flags) {
UNUSED(argc);
UNUSED(argv);
UNUSED(flags);
spmcQueue q;
spmcInit(&q);
for (int round = 0; round < 3; round++) {
mock_main_thread = 1;
for (size_t i = 0; i < SPMC_QUEUE_SIZE; i++) {
TEST_ASSERT(spmcEnqueue(&q, (void *)(uintptr_t)(i + 1)));
}
/* Queue should be full */
TEST_ASSERT(!spmcEnqueue(&q, (void *)0xDEAD));
mock_main_thread = 0;
for (size_t i = 0; i < SPMC_QUEUE_SIZE; i++) {
void *data = spmcDequeue(&q);
TEST_ASSERT(data == (void *)(uintptr_t)(i + 1));
}
TEST_ASSERT(spmcDequeue(&q) == NULL);
}
spmcFree(&q);
return 0;
}
int test_spmcSize(int argc, char **argv, int flags) {
UNUSED(argc);
UNUSED(argv);
UNUSED(flags);
spmcQueue q;
spmcInit(&q);
mock_main_thread = 1;
TEST_ASSERT(spmcSize(&q) == 0);
spmcEnqueue(&q, (void *)0x1);
TEST_ASSERT(spmcSize(&q) == 1);
spmcEnqueue(&q, (void *)0x2);
TEST_ASSERT(spmcSize(&q) == 2);
mock_main_thread = 0;
spmcDequeue(&q);
TEST_ASSERT(spmcSize(&q) == 1);
spmcFree(&q);
return 0;
}
/* MPSC Queue Tests */
int test_mpscBasicEnqueueDequeue(int argc, char **argv, int flags) {
UNUSED(argc);
UNUSED(argv);
UNUSED(flags);
mpscQueue q;
mpscInit(&q);
void *data1 = (void *)0x1000;
void *data2 = (void *)0x2000;
/* Enqueue from producer */
mpscTicket ticket = {0};
TEST_ASSERT(mpscEnqueue(&q, data1, &ticket));
TEST_ASSERT(!ticket.has_reservation);
ticket = (mpscTicket){0};
TEST_ASSERT(mpscEnqueue(&q, data2, &ticket));
TEST_ASSERT(!ticket.has_reservation);
/* Dequeue from consumer */
void *jobs[2];
size_t count = mpscDequeueBatch(&q, jobs, 2);
TEST_ASSERT(count == 2);
TEST_ASSERT(jobs[0] == data1);
TEST_ASSERT(jobs[1] == data2);
/* Queue should be empty */
count = mpscDequeueBatch(&q, jobs, 2);
TEST_ASSERT(count == 0);
mpscFree(&q);
return 0;
}
int test_mpscTicketRetry(int argc, char **argv, int flags) {
UNUSED(argc);
UNUSED(argv);
UNUSED(flags);
mpscQueue q;
mpscInit(&q);
/* Fill the queue */
for (size_t i = 0; i < MPSC_QUEUE_SIZE; i++) {
mpscTicket ticket = {0};
TEST_ASSERT(mpscEnqueue(&q, (void *)(uintptr_t)(i + 1), &ticket));
}
/* Next enqueue should fail and return a ticket */
mpscTicket ticket = {0};
TEST_ASSERT(!mpscEnqueue(&q, (void *)0xBEEF, &ticket));
TEST_ASSERT(ticket.has_reservation);
/* Drain some items */
void *jobs[100];
size_t count = mpscDequeueBatch(&q, jobs, 100);
TEST_ASSERT(count > 0);
/* Retry with same ticket should succeed */
TEST_ASSERT(mpscEnqueue(&q, (void *)0xBEEF, &ticket));
TEST_ASSERT(!ticket.has_reservation);
mpscFree(&q);
return 0;
}
int test_mpscFullAndWrapAround(int argc, char **argv, int flags) {
UNUSED(argc);
UNUSED(argv);
UNUSED(flags);
mpscQueue q;
mpscInit(&q);
for (int round = 0; round < 3; round++) {
for (size_t i = 0; i < MPSC_QUEUE_SIZE; i++) {
mpscTicket ticket = {0};
TEST_ASSERT(mpscEnqueue(&q, (void *)(uintptr_t)(i + 1), &ticket));
}
void *jobs[64];
size_t total = 0;
size_t count;
while ((count = mpscDequeueBatch(&q, jobs, 64)) > 0) {
total += count;
}
TEST_ASSERT(total == MPSC_QUEUE_SIZE);
}
mpscFree(&q);
return 0;
}
int test_mpscInterleavedOperations(int argc, char **argv, int flags) {
UNUSED(argc);
UNUSED(argv);
UNUSED(flags);
mpscQueue q;
mpscInit(&q);
/* Enqueue some */
for (size_t i = 0; i < 100; i++) {
mpscTicket ticket = {0};
TEST_ASSERT(mpscEnqueue(&q, (void *)(uintptr_t)(i + 1), &ticket));
}
/* Dequeue half */
void *jobs[50];
size_t count = mpscDequeueBatch(&q, jobs, 50);
TEST_ASSERT(count == 50);
for (size_t i = 0; i < 50; i++) {
TEST_ASSERT(jobs[i] == (void *)(uintptr_t)(i + 1));
}
/* Enqueue more */
for (size_t i = 100; i < 150; i++) {
mpscTicket ticket = {0};
TEST_ASSERT(mpscEnqueue(&q, (void *)(uintptr_t)(i + 1), &ticket));
}
/* Drain remaining */
size_t total = 0;
while ((count = mpscDequeueBatch(&q, jobs, 50)) > 0) {
total += count;
}
TEST_ASSERT(total == 100);
mpscFree(&q);
return 0;
}
/* ============== Multi-threaded Stress Tests ============== */
#define STRESS_ITERATIONS 100000
#define NUM_THREADS 4
/* SPSC concurrent test */
typedef struct {
spscQueue *q;
size_t count;
} spscThreadArg;
static void *spscConsumerThread(void *arg) {
spscThreadArg *ta = (spscThreadArg *)arg;
mock_main_thread = 0;
void *jobs[64];
size_t total = 0;
while (total < ta->count) {
size_t n = spscDequeueBatch(ta->q, jobs, 64);
total += n;
if (n == 0) sched_yield();
}
return (void *)total;
}
int test_spscConcurrent(int argc, char **argv, int flags) {
UNUSED(argc);
UNUSED(argv);
UNUSED(flags);
spscQueue q;
spscInit(&q);
spscThreadArg arg = {&q, STRESS_ITERATIONS};
pthread_t consumer;
pthread_create(&consumer, NULL, spscConsumerThread, &arg);
mock_main_thread = 1;
for (size_t i = 0; i < STRESS_ITERATIONS; i++) {
while (spscIsFull(&q)) sched_yield();
spscEnqueue(&q, (void *)(uintptr_t)(i + 1), true);
}
void *result;
pthread_join(consumer, &result);
TEST_ASSERT((size_t)result == STRESS_ITERATIONS);
spscFree(&q);
return 0;
}
/* SPMC concurrent test - multiple consumers */
typedef struct {
spmcQueue *q;
atomic_size_t *consumed;
atomic_int *done;
} spmcThreadArg;
static void *spmcConsumerThread(void *arg) {
spmcThreadArg *ta = (spmcThreadArg *)arg;
mock_main_thread = 0;
size_t local_count = 0;
while (!atomic_load(ta->done)) {
void *data = spmcDequeue(ta->q);
if (data) {
local_count++;
} else {
sched_yield();
}
}
/* Drain remaining after done signal */
void *data;
while ((data = spmcDequeue(ta->q)) != NULL) {
local_count++;
}
atomic_fetch_add(ta->consumed, local_count);
return NULL;
}
int test_spmcConcurrent(int argc, char **argv, int flags) {
UNUSED(argc);
UNUSED(argv);
UNUSED(flags);
spmcQueue q;
spmcInit(&q);
atomic_size_t consumed = 0;
atomic_int done = 0;
spmcThreadArg arg = {&q, &consumed, &done};
pthread_t consumers[NUM_THREADS];
for (int i = 0; i < NUM_THREADS; i++) {
pthread_create(&consumers[i], NULL, spmcConsumerThread, &arg);
}
mock_main_thread = 1;
for (size_t i = 0; i < STRESS_ITERATIONS; i++) {
while (!spmcEnqueue(&q, (void *)(uintptr_t)(i + 1))) sched_yield();
}
while (!spmcIsEmpty(&q)) sched_yield();
atomic_store(&done, 1);
for (int i = 0; i < NUM_THREADS; i++) {
pthread_join(consumers[i], NULL);
}
TEST_ASSERT(atomic_load(&consumed) == STRESS_ITERATIONS);
spmcFree(&q);
return 0;
}
/* MPSC concurrent test - multiple producers */
typedef struct {
mpscQueue *q;
size_t items_per_thread;
int thread_id;
} mpscProducerArg;
static void *mpscProducerThread(void *arg) {
mpscProducerArg *pa = (mpscProducerArg *)arg;
for (size_t i = 0; i < pa->items_per_thread; i++) {
mpscTicket ticket = {0};
void *data = (void *)(uintptr_t)((pa->thread_id << 20) | (i + 1));
while (!mpscEnqueue(pa->q, data, &ticket)) sched_yield();
}
return NULL;
}
int test_mpscConcurrent(int argc, char **argv, int flags) {
UNUSED(argc);
UNUSED(argv);
UNUSED(flags);
mpscQueue q;
mpscInit(&q);
size_t items_per_thread = STRESS_ITERATIONS / NUM_THREADS;
pthread_t producers[NUM_THREADS];
mpscProducerArg args[NUM_THREADS];
for (int i = 0; i < NUM_THREADS; i++) {
args[i] = (mpscProducerArg){&q, items_per_thread, i};
pthread_create(&producers[i], NULL, mpscProducerThread, &args[i]);
}
size_t total = 0;
void *jobs[64];
size_t expected = items_per_thread * NUM_THREADS;
while (total < expected) {
size_t n = mpscDequeueBatch(&q, jobs, 64);
total += n;
if (n == 0) sched_yield();
}
for (int i = 0; i < NUM_THREADS; i++) {
pthread_join(producers[i], NULL);
}
TEST_ASSERT(total == expected);
mpscFree(&q);
return 0;
}

View File

@ -108,7 +108,7 @@ proc spawn_instance {type base_port count {conf {}} {base_conf_file ""}} {
if {$::io_threads} {
puts $cfg "io-threads 2"
puts $cfg "events-per-io-thread 0"
puts $cfg "io-threads-always-active yes"
puts $cfg "min-io-threads-avoid-copy-reply 2"
}

View File

@ -553,7 +553,7 @@ proc start_server {options {code undefined}} {
if {$::io_threads} {
dict set config "io-threads" 2
dict set config "events-per-io-thread" 0
dict set config "io-threads-always-active" yes
dict set config "min-io-threads-avoid-copy-reply" 2
}