mirror of https://github.com/valkey-io/valkey
Merge 8dec4e027a into 51f871ae52
This commit is contained in:
commit
b09dd9f2f1
20
src/bio.c
20
src/bio.c
|
|
@ -62,6 +62,7 @@
|
|||
|
||||
#include "server.h"
|
||||
#include "connection.h"
|
||||
#include "cluster.h"
|
||||
#include "bio.h"
|
||||
#include <stdatomic.h>
|
||||
|
||||
|
|
@ -71,6 +72,7 @@ static unsigned int bio_job_to_worker[] = {
|
|||
[BIO_CLOSE_AOF] = 1,
|
||||
[BIO_LAZY_FREE] = 2,
|
||||
[BIO_RDB_SAVE] = 3,
|
||||
[BIO_CLUSTER_SAVE] = 4,
|
||||
};
|
||||
|
||||
typedef struct {
|
||||
|
|
@ -86,6 +88,7 @@ static bio_worker_data bio_workers[] = {
|
|||
{"bio_aof"},
|
||||
{"bio_lazy_free"},
|
||||
{"bio_rdb_save"},
|
||||
{"bio_cluster_config_save"},
|
||||
};
|
||||
static const bio_worker_data *const bio_worker_end = bio_workers + (sizeof bio_workers / sizeof *bio_workers);
|
||||
|
||||
|
|
@ -128,6 +131,12 @@ typedef union bio_job {
|
|||
connection *conn; /* Connection to download the RDB from */
|
||||
int is_dual_channel; /* Single vs dual channel */
|
||||
} save_to_disk_args;
|
||||
|
||||
struct {
|
||||
int type;
|
||||
sds content; /* Cluster config file content. */
|
||||
unsigned do_fsync : 1; /* A flag to indicate that a fsync is required. */
|
||||
} cluster_save_args;
|
||||
} bio_job;
|
||||
|
||||
void *bioProcessBackgroundJobs(void *arg);
|
||||
|
|
@ -227,6 +236,13 @@ void bioCreateSaveRDBToDiskJob(connection *conn, int is_dual_channel) {
|
|||
bioSubmitJob(BIO_RDB_SAVE, job);
|
||||
}
|
||||
|
||||
void bioCreateClusterConfigSaveJob(sds content, int do_fsync) {
|
||||
bio_job *job = zmalloc(sizeof(*job));
|
||||
job->cluster_save_args.content = content;
|
||||
job->cluster_save_args.do_fsync = do_fsync;
|
||||
bioSubmitJob(BIO_CLUSTER_SAVE, job);
|
||||
}
|
||||
|
||||
void *bioProcessBackgroundJobs(void *arg) {
|
||||
bio_worker_data *const bwd = arg;
|
||||
bio_job *job;
|
||||
|
|
@ -304,6 +320,10 @@ void *bioProcessBackgroundJobs(void *arg) {
|
|||
job->free_args.free_fn(job->free_args.free_args);
|
||||
} else if (job_type == BIO_RDB_SAVE) {
|
||||
replicaReceiveRDBFromPrimaryToDisk(job->save_to_disk_args.conn, job->save_to_disk_args.is_dual_channel);
|
||||
} else if (job_type == BIO_CLUSTER_SAVE) {
|
||||
if (clusterSaveConfigFromBio(job->cluster_save_args.content, job->cluster_save_args.do_fsync) == C_ERR) {
|
||||
serverLog(LL_WARNING, "Failed to save the cluster config file in background.");
|
||||
}
|
||||
} else {
|
||||
serverPanic("Wrong job type in bioProcessBackgroundJobs().");
|
||||
}
|
||||
|
|
|
|||
|
|
@ -42,6 +42,7 @@ void bioCreateCloseAofJob(int fd, long long offset, int need_reclaim_cache);
|
|||
void bioCreateFsyncJob(int fd, long long offset, int need_reclaim_cache);
|
||||
void bioCreateLazyFreeJob(lazy_free_fn free_fn, int arg_count, ...);
|
||||
void bioCreateSaveRDBToDiskJob(connection *conn, int is_dual_channel);
|
||||
void bioCreateClusterConfigSaveJob(sds content, int do_fsync);
|
||||
int inBioThread(void);
|
||||
|
||||
/* Background job opcodes */
|
||||
|
|
@ -51,6 +52,7 @@ enum {
|
|||
BIO_LAZY_FREE, /* Deferred objects freeing. */
|
||||
BIO_CLOSE_AOF, /* Deferred close for AOF files. */
|
||||
BIO_RDB_SAVE, /* Deferred save RDB to disk on replica */
|
||||
BIO_CLUSTER_SAVE, /* Deferred cluster config file save and fsync. */
|
||||
BIO_NUM_OPS
|
||||
};
|
||||
|
||||
|
|
|
|||
|
|
@ -149,8 +149,8 @@ sds aggregateClientOutputBuffer(client *c);
|
|||
void resetClusterStats(void);
|
||||
unsigned int delKeysInSlot(unsigned int hashslot, int lazy, bool propagate_del, bool send_del_event);
|
||||
|
||||
unsigned int propagateSlotDeletionByKeys(unsigned int hashslot);
|
||||
void clusterUpdateState(void);
|
||||
int clusterSaveConfigFromBio(sds content, int do_fsync);
|
||||
void clusterSaveConfigOrDie(int do_fsync);
|
||||
int clusterDelSlot(int slot);
|
||||
int clusterAddSlot(clusterNode *n, int slot);
|
||||
|
|
|
|||
|
|
@ -44,6 +44,7 @@
|
|||
#include "endianconv.h"
|
||||
#include "connection.h"
|
||||
#include "module.h"
|
||||
#include "bio.h"
|
||||
|
||||
#include <stdlib.h>
|
||||
#include <sys/types.h>
|
||||
|
|
@ -958,6 +959,16 @@ fmterr:
|
|||
serverPanic("Unrecoverable error: corrupted cluster config file \"%s\".", line);
|
||||
}
|
||||
|
||||
/* Get the nodes description and concatenate our "vars" directive to
|
||||
* save currentEpoch and lastVoteEpoch. */
|
||||
sds clusterGenNodesConfContent(void) {
|
||||
sds content = clusterGenNodesDescription(NULL, CLUSTER_NODE_HANDSHAKE, 0);
|
||||
content = sdscatfmt(content, "vars currentEpoch %U lastVoteEpoch %U\n",
|
||||
(unsigned long long)server.cluster->currentEpoch,
|
||||
(unsigned long long)server.cluster->lastVoteEpoch);
|
||||
return content;
|
||||
}
|
||||
|
||||
/* Cluster node configuration is exactly the same as CLUSTER NODES output.
|
||||
*
|
||||
* This function writes the node config and returns C_OK, on error C_ERR
|
||||
|
|
@ -970,23 +981,14 @@ fmterr:
|
|||
* a single write to write the whole file. If the pre-existing file was
|
||||
* bigger we pad our payload with newlines that are anyway ignored and truncate
|
||||
* the file afterward. */
|
||||
int clusterSaveConfig(int do_fsync) {
|
||||
sds ci, tmpfilename;
|
||||
int clusterSaveConfigImpl(sds content, int from_bio, int do_fsync) {
|
||||
sds tmpfilename;
|
||||
size_t content_size, offset = 0;
|
||||
ssize_t written_bytes;
|
||||
int fd = -1;
|
||||
int retval = C_ERR;
|
||||
mstime_t latency;
|
||||
|
||||
server.cluster->todo_before_sleep &= ~CLUSTER_TODO_SAVE_CONFIG;
|
||||
|
||||
/* Get the nodes description and concatenate our "vars" directive to
|
||||
* save currentEpoch and lastVoteEpoch. */
|
||||
ci = clusterGenNodesDescription(NULL, CLUSTER_NODE_HANDSHAKE, 0);
|
||||
ci = sdscatfmt(ci, "vars currentEpoch %U lastVoteEpoch %U\n",
|
||||
(unsigned long long)server.cluster->currentEpoch,
|
||||
(unsigned long long)server.cluster->lastVoteEpoch);
|
||||
content_size = sdslen(ci);
|
||||
content_size = sdslen(content);
|
||||
|
||||
/* Create a temp file with the new content. */
|
||||
tmpfilename = sdscatfmt(sdsempty(), "%s.tmp-%i-%I", server.cluster_configfile, (int)getpid(), mstime());
|
||||
|
|
@ -996,11 +998,11 @@ int clusterSaveConfig(int do_fsync) {
|
|||
goto cleanup;
|
||||
}
|
||||
latencyEndMonitor(latency);
|
||||
latencyAddSampleIfNeeded("cluster-config-open", latency);
|
||||
latencyTraceIfNeeded(cluster, cluster_config_open, latency);
|
||||
if (!from_bio) latencyAddSampleIfNeeded("cluster-config-open", latency);
|
||||
if (!from_bio) latencyTraceIfNeeded(cluster, cluster_config_open, latency);
|
||||
latencyStartMonitor(latency);
|
||||
while (offset < content_size) {
|
||||
written_bytes = write(fd, ci + offset, content_size - offset);
|
||||
written_bytes = write(fd, content + offset, content_size - offset);
|
||||
if (written_bytes <= 0) {
|
||||
if (errno == EINTR) continue;
|
||||
serverLog(LL_WARNING, "Failed after writing (%zd) bytes to tmp cluster config file: %s", offset,
|
||||
|
|
@ -1010,8 +1012,8 @@ int clusterSaveConfig(int do_fsync) {
|
|||
offset += written_bytes;
|
||||
}
|
||||
latencyEndMonitor(latency);
|
||||
latencyAddSampleIfNeeded("cluster-config-write", latency);
|
||||
latencyTraceIfNeeded(cluster, cluster_config_write, latency);
|
||||
if (!from_bio) latencyAddSampleIfNeeded("cluster-config-write", latency);
|
||||
if (!from_bio) latencyTraceIfNeeded(cluster, cluster_config_write, latency);
|
||||
if (do_fsync) {
|
||||
latencyStartMonitor(latency);
|
||||
server.cluster->todo_before_sleep &= ~CLUSTER_TODO_FSYNC_CONFIG;
|
||||
|
|
@ -1020,8 +1022,8 @@ int clusterSaveConfig(int do_fsync) {
|
|||
goto cleanup;
|
||||
}
|
||||
latencyEndMonitor(latency);
|
||||
latencyAddSampleIfNeeded("cluster-config-fsync", latency);
|
||||
latencyTraceIfNeeded(cluster, cluster_config_fsync, latency);
|
||||
if (!from_bio) latencyAddSampleIfNeeded("cluster-config-fsync", latency);
|
||||
if (!from_bio) latencyTraceIfNeeded(cluster, cluster_config_fsync, latency);
|
||||
}
|
||||
|
||||
latencyStartMonitor(latency);
|
||||
|
|
@ -1030,8 +1032,8 @@ int clusterSaveConfig(int do_fsync) {
|
|||
goto cleanup;
|
||||
}
|
||||
latencyEndMonitor(latency);
|
||||
latencyAddSampleIfNeeded("cluster-config-rename", latency);
|
||||
latencyTraceIfNeeded(cluster, cluster_config_rename, latency);
|
||||
if (!from_bio) latencyAddSampleIfNeeded("cluster-config-rename", latency);
|
||||
if (!from_bio) latencyTraceIfNeeded(cluster, cluster_config_rename, latency);
|
||||
if (do_fsync) {
|
||||
latencyStartMonitor(latency);
|
||||
if (fsyncFileDir(server.cluster_configfile) == -1) {
|
||||
|
|
@ -1039,8 +1041,8 @@ int clusterSaveConfig(int do_fsync) {
|
|||
goto cleanup;
|
||||
}
|
||||
latencyEndMonitor(latency);
|
||||
latencyAddSampleIfNeeded("cluster-config-dir-fsync", latency);
|
||||
latencyTraceIfNeeded(cluster, cluster_config_dir_fsync, latency);
|
||||
if (!from_bio) latencyAddSampleIfNeeded("cluster-config-dir-fsync", latency);
|
||||
if (!from_bio) latencyTraceIfNeeded(cluster, cluster_config_dir_fsync, latency);
|
||||
}
|
||||
retval = C_OK; /* If we reached this point, everything is fine. */
|
||||
|
||||
|
|
@ -1049,23 +1051,48 @@ cleanup:
|
|||
latencyStartMonitor(latency);
|
||||
close(fd);
|
||||
latencyEndMonitor(latency);
|
||||
latencyAddSampleIfNeeded("cluster-config-close", latency);
|
||||
latencyTraceIfNeeded(cluster, cluster_config_close, latency);
|
||||
if (!from_bio) latencyAddSampleIfNeeded("cluster-config-close", latency);
|
||||
if (!from_bio) latencyTraceIfNeeded(cluster, cluster_config_close, latency);
|
||||
}
|
||||
if (retval == C_ERR) {
|
||||
latencyStartMonitor(latency);
|
||||
unlink(tmpfilename);
|
||||
latencyEndMonitor(latency);
|
||||
latencyAddSampleIfNeeded("cluster-config-unlink", latency);
|
||||
latencyTraceIfNeeded(cluster, cluster_config_unlink, latency);
|
||||
if (!from_bio) latencyAddSampleIfNeeded("cluster-config-unlink", latency);
|
||||
if (!from_bio) latencyTraceIfNeeded(cluster, cluster_config_unlink, latency);
|
||||
}
|
||||
sdsfree(tmpfilename);
|
||||
sdsfree(ci);
|
||||
sdsfree(content);
|
||||
return retval;
|
||||
}
|
||||
|
||||
/* Save cluster config file.
|
||||
*
|
||||
* This function writes the node config and returns C_OK, on error C_ERR
|
||||
* is returned. It is possible to use bio, which can move I/O latency into
|
||||
* the bio thread. If bio is used, it always returns C_OK. */
|
||||
int clusterSaveConfig(int bio, int do_fsync) {
|
||||
server.cluster->todo_before_sleep &= ~CLUSTER_TODO_SAVE_CONFIG;
|
||||
if (do_fsync) server.cluster->todo_before_sleep &= ~CLUSTER_TODO_FSYNC_CONFIG;
|
||||
|
||||
sds content = clusterGenNodesConfContent();
|
||||
if (bio) {
|
||||
/* We can actually always fsync the file in bio, but anyway lets follow the old code. */
|
||||
bioCreateClusterConfigSaveJob(content, do_fsync);
|
||||
return C_OK;
|
||||
} else {
|
||||
return clusterSaveConfigImpl(content, 0, do_fsync);
|
||||
}
|
||||
}
|
||||
|
||||
/* Save the cluster file, it is called from the bio thread. */
|
||||
int clusterSaveConfigFromBio(sds content, int do_fsync) {
|
||||
return clusterSaveConfigImpl(content, 1, do_fsync);
|
||||
}
|
||||
|
||||
/* Save the cluster file, if save fails, the process will exit. */
|
||||
void clusterSaveConfigOrDie(int do_fsync) {
|
||||
if (clusterSaveConfig(do_fsync) == C_ERR) {
|
||||
if (clusterSaveConfig(0, do_fsync) == C_ERR) {
|
||||
serverLog(LL_WARNING, "Fatal: can't update cluster config file.");
|
||||
exit(1);
|
||||
}
|
||||
|
|
@ -1517,7 +1544,8 @@ void clusterHandleServerShutdown(bool auto_failover) {
|
|||
|
||||
/* The error logs have been logged in the save function if the save fails. */
|
||||
serverLog(LL_NOTICE, "Saving the cluster configuration file before exiting.");
|
||||
clusterSaveConfig(1);
|
||||
bioDrainWorker(BIO_CLUSTER_SAVE);
|
||||
clusterSaveConfig(0, 1);
|
||||
|
||||
#if !defined(__sun)
|
||||
/* Unlock the cluster config file before shutdown, see clusterLockConfig.
|
||||
|
|
@ -6103,7 +6131,7 @@ void clusterBeforeSleep(void) {
|
|||
/* Save the config, possibly using fsync. */
|
||||
if (flags & CLUSTER_TODO_SAVE_CONFIG) {
|
||||
int fsync = flags & CLUSTER_TODO_FSYNC_CONFIG;
|
||||
clusterSaveConfigOrDie(fsync);
|
||||
clusterSaveConfig(1, fsync);
|
||||
}
|
||||
|
||||
if (flags & CLUSTER_TODO_BROADCAST_ALL) {
|
||||
|
|
@ -6508,7 +6536,10 @@ int verifyClusterConfigWithData(void) {
|
|||
delKeysInSlot(j, server.lazyfree_lazy_server_del, true, false);
|
||||
}
|
||||
}
|
||||
if (update_config) clusterSaveConfigOrDie(1);
|
||||
if (update_config) {
|
||||
bioDrainWorker(BIO_CLUSTER_SAVE);
|
||||
clusterSaveConfigOrDie(1);
|
||||
}
|
||||
return C_OK;
|
||||
}
|
||||
|
||||
|
|
@ -7740,7 +7771,8 @@ int clusterCommandSpecial(client *c) {
|
|||
(unsigned long long)myself->configEpoch);
|
||||
addReplySds(c, reply);
|
||||
} else if (!strcasecmp(c->argv[1]->ptr, "saveconfig") && c->argc == 2) {
|
||||
int retval = clusterSaveConfig(1);
|
||||
bioDrainWorker(BIO_CLUSTER_SAVE);
|
||||
int retval = clusterSaveConfig(0, 1);
|
||||
|
||||
if (retval == C_OK)
|
||||
addReply(c, shared.ok);
|
||||
|
|
|
|||
|
|
@ -266,7 +266,7 @@ proc start_cluster {masters replicas options code {slot_allocator continuous_slo
|
|||
# Configure the starting of multiple servers. Set cluster node timeout
|
||||
# aggressively since many tests depend on ping/pong messages.
|
||||
|
||||
set cluster_options [list overrides [list cluster-enabled yes cluster-ping-interval 100 cluster-node-timeout 3000 cluster-databases 16 cluster-slot-stats-enabled yes]]
|
||||
set cluster_options [list overrides [list cluster-enabled yes cluster-ping-interval 100 cluster-node-timeout 3000 cluster-databases 16 cluster-slot-stats-enabled yes latency-monitor-threshold 1]]
|
||||
set options [concat $cluster_options $options]
|
||||
|
||||
# Cluster mode only supports a single database, so before executing the tests
|
||||
|
|
|
|||
Loading…
Reference in New Issue