bio.c: Split thread function into smaller parts v2

No functional changes.

This makes for easier reading, extending the functionality and
maintenance only.

Signed-off-by: Ted Lyngmo <ted@lyncon.se>
This commit is contained in:
Ted Lyngmo 2025-09-03 20:09:58 +02:00
parent 811a64407b
commit dcf6a9e699
2 changed files with 64 additions and 41 deletions

102
src/bio.c
View File

@ -130,7 +130,7 @@ typedef union bio_job {
} save_to_disk_args;
} bio_job;
void *bioProcessBackgroundJobs(void *arg);
static void *bioProcessBackgroundJobs(void *arg);
/* Make sure we have enough stack to perform all the things we do in the
* main thread. */
@ -167,7 +167,7 @@ void bioInit(void) {
}
}
void bioSubmitJob(int type, bio_job *job) {
static void bioSubmitJob(int type, bio_job *job) {
job->header.type = type;
bio_worker_data *const bwd = &bio_workers[bio_job_to_worker[type]];
pthread_mutex_lock(&bwd->bio_mutex);
@ -227,7 +227,64 @@ void bioCreateSaveRDBToDiskJob(connection *conn, int is_dual_channel) {
bioSubmitJob(BIO_RDB_SAVE, job);
}
void *bioProcessBackgroundJobs(void *arg) {
static void bioCloseFileJob(bio_job *job) {
if (job->fd_args.need_fsync && valkey_fsync(job->fd_args.fd) == -1 && errno != EBADF && errno != EINVAL) {
serverLog(LL_WARNING, "Fail to fsync the AOF file: %s", strerror(errno));
}
if (job->fd_args.need_reclaim_cache) {
if (reclaimFilePageCache(job->fd_args.fd, 0, 0) == -1) {
serverLog(LL_NOTICE, "Unable to reclaim page cache: %s", strerror(errno));
}
}
close(job->fd_args.fd);
}
static void bioAofFsyncJob(bio_job *job) {
/* The fd may be closed by main thread and reused for another
* socket, pipe, or file. We just ignore these errno because
* aof fsync did not really fail. */
if (valkey_fsync(job->fd_args.fd) == -1 && errno != EBADF && errno != EINVAL) {
int last_status = atomic_load_explicit(&server.aof_bio_fsync_status, memory_order_relaxed);
atomic_store_explicit(&server.aof_bio_fsync_errno, errno, memory_order_relaxed);
atomic_store_explicit(&server.aof_bio_fsync_status, C_ERR, memory_order_release);
if (last_status == C_OK) {
serverLog(LL_WARNING, "Fail to fsync the AOF file: %s", strerror(errno));
}
} else {
atomic_store_explicit(&server.aof_bio_fsync_status, C_OK, memory_order_relaxed);
atomic_store_explicit(&server.fsynced_reploff_pending, job->fd_args.offset, memory_order_relaxed);
}
if (job->fd_args.need_reclaim_cache) {
if (reclaimFilePageCache(job->fd_args.fd, 0, 0) == -1) {
serverLog(LL_NOTICE, "Unable to reclaim page cache: %s", strerror(errno));
}
}
}
static void bioLazyFreeJob(bio_job *job) {
job->free_args.free_fn(job->free_args.free_args);
}
static void bioCloseAofJob(bio_job *job) {
bioAofFsyncJob(job);
close(job->fd_args.fd);
}
static void bioRdbSaveJob(bio_job *job) {
replicaReceiveRDBFromPrimaryToDisk(job->save_to_disk_args.conn, job->save_to_disk_args.is_dual_channel);
}
static void (*const job_functions[BIO_NUM_OPS])(bio_job *) = {
[BIO_CLOSE_FILE] = bioCloseFileJob,
[BIO_AOF_FSYNC] = bioAofFsyncJob,
[BIO_LAZY_FREE] = bioLazyFreeJob,
[BIO_CLOSE_AOF] = bioCloseAofJob,
[BIO_RDB_SAVE] = bioRdbSaveJob,
};
static void *bioProcessBackgroundJobs(void *arg) {
bio_worker_data *const bwd = arg;
bio_job *job;
sigset_t sigset;
@ -267,43 +324,8 @@ void *bioProcessBackgroundJobs(void *arg) {
/* Process the job accordingly to its type. */
int job_type = job->header.type;
if (job_type == BIO_CLOSE_FILE) {
if (job->fd_args.need_fsync && valkey_fsync(job->fd_args.fd) == -1 && errno != EBADF && errno != EINVAL) {
serverLog(LL_WARNING, "Fail to fsync the AOF file: %s", strerror(errno));
}
if (job->fd_args.need_reclaim_cache) {
if (reclaimFilePageCache(job->fd_args.fd, 0, 0) == -1) {
serverLog(LL_NOTICE, "Unable to reclaim page cache: %s", strerror(errno));
}
}
close(job->fd_args.fd);
} else if (job_type == BIO_AOF_FSYNC || job_type == BIO_CLOSE_AOF) {
/* The fd may be closed by main thread and reused for another
* socket, pipe, or file. We just ignore these errno because
* aof fsync did not really fail. */
if (valkey_fsync(job->fd_args.fd) == -1 && errno != EBADF && errno != EINVAL) {
int last_status = atomic_load_explicit(&server.aof_bio_fsync_status, memory_order_relaxed);
atomic_store_explicit(&server.aof_bio_fsync_errno, errno, memory_order_relaxed);
atomic_store_explicit(&server.aof_bio_fsync_status, C_ERR, memory_order_release);
if (last_status == C_OK) {
serverLog(LL_WARNING, "Fail to fsync the AOF file: %s", strerror(errno));
}
} else {
atomic_store_explicit(&server.aof_bio_fsync_status, C_OK, memory_order_relaxed);
atomic_store_explicit(&server.fsynced_reploff_pending, job->fd_args.offset, memory_order_relaxed);
}
if (job->fd_args.need_reclaim_cache) {
if (reclaimFilePageCache(job->fd_args.fd, 0, 0) == -1) {
serverLog(LL_NOTICE, "Unable to reclaim page cache: %s", strerror(errno));
}
}
if (job_type == BIO_CLOSE_AOF) close(job->fd_args.fd);
} else if (job_type == BIO_LAZY_FREE) {
job->free_args.free_fn(job->free_args.free_args);
} else if (job_type == BIO_RDB_SAVE) {
replicaReceiveRDBFromPrimaryToDisk(job->save_to_disk_args.conn, job->save_to_disk_args.is_dual_channel);
if (job_type >= 0 && job_type < BIO_NUM_OPS) {
job_functions[job_type](job);
} else {
serverPanic("Wrong job type in bioProcessBackgroundJobs().");
}

View File

@ -31,7 +31,8 @@ proc check_log_backtrace_for_debug {log_pattern} {
if {!$::valgrind} {
assert_equal [count_log_message 0 "wait_threads(): waiting threads timed out"] 0
# make sure the server prints stack trace for all threads. we know 3 threads are idle in bio.c
assert_equal [count_log_message 0 "bioProcessBackgroundJobs"] 3
# waiting on a condition variable
assert_equal [count_log_message 0 "pthread_cond_wait"] 3
}
}