Separate RDB snapshotting from atomic slot migration (#2533)

When we adding atomic slot migration in #1949, we reused a lot of rdb save code,
it was an easier way to implement ASM in the first time, but it comes with some
side effect. Like we are using CHILD_TYPE_RDB to do the fork, we use rdb.c/rdb.h
function to save the snapshot, these mess up the logs (we will print some logs
saying we are doing RDB stuff) and mess up the info fields (we will say we are
rdb_bgsave_in_progress but actually we are doing slot migration).

In addition, it makes the code difficult to maintain. The rdb_save method uses
a lot of rdb_* variables, but we are actually doing slot migration. If we want
to support one fork with multiple target nodes, we need to rewrite these code
for a better cleanup.

Note that the changes to rdb.c/rdb.h are reverting previous changes from when
we was reusing this code for slot migration. The slot migration snapshot logic
is similar to the previous diskless replication. We use pipe to transfer the
snapshot data from the child process to the parent process.

Interface changes:
- New slot_migration_fork_in_progress info field.
- New cow_size field in CLUSTER GETSLOTMIGRATIONS command.
- Also add slot migration fork to the cluster class trace latency.

Signed-off-by: Binbin <binloveplay1314@qq.com>
Signed-off-by: Jacob Murphy <jkmurphy@google.com>
Co-authored-by: Jacob Murphy <jkmurphy@google.com>
This commit is contained in:
Binbin 2025-09-18 16:26:42 +08:00 committed by Ricardo Dias
parent 9b8ac85a48
commit d759a7b4ea
15 changed files with 406 additions and 224 deletions

View File

@ -2280,6 +2280,8 @@ int rewriteObjectRio(rio *aof, robj *o, int db_num) {
return C_OK;
}
/* This function is currently used in slot migration to rewrite the corresponding
* slot hashtable to rio. */
int rewriteSlotToAppendOnlyFileRio(rio *aof, int db_num, int hashslot, size_t *key_count) {
long long updated_time = 0;
@ -2302,7 +2304,7 @@ int rewriteSlotToAppendOnlyFileRio(rio *aof, int db_num, int hashslot, size_t *k
if (key_count && ((*key_count)++ & 1023) == 0) {
long long now = mstime();
if (now - updated_time >= 1000) {
sendChildInfo(CHILD_INFO_TYPE_CURRENT_INFO, *key_count, "AOF rewrite");
sendChildInfo(CHILD_INFO_TYPE_CURRENT_INFO, *key_count, "Slot migration");
updated_time = now;
}
}

View File

@ -131,6 +131,8 @@ void updateChildInfo(childInfoType information_type, size_t cow, monotime cow_up
server.stat_rdb_cow_bytes = server.stat_current_cow_peak;
} else if (information_type == CHILD_INFO_TYPE_MODULE_COW_SIZE) {
server.stat_module_cow_bytes = server.stat_current_cow_peak;
} else if (information_type == CHILD_INFO_TYPE_SLOT_MIGRATION_COW_SIZE) {
server.stat_slot_migration_cow_bytes = server.stat_current_cow_peak;
} else if (information_type == CHILD_INFO_TYPE_REPL_OUTPUT_BYTES) {
server.stat_net_repl_output_bytes += (long long)repl_output_bytes;
}

View File

@ -9,6 +9,9 @@
#include "module.h"
#include "functions.h"
#include <sys/wait.h>
#include <fcntl.h>
typedef enum slotMigrationJobState {
/* Importing states */
SLOT_IMPORT_WAIT_ACK,
@ -76,6 +79,7 @@ typedef struct slotMigrationJob {
* cleanup is done. */
sds description; /* Description, used for
* logging. */
size_t stat_cow_bytes; /* Copy on write bytes during slot migration fork. */
/* State needed during client establishment */
connection *conn; /* Connection to slot import source node. */
@ -1264,9 +1268,8 @@ bool shouldRewriteHashtableIndex(int didx, hashtable *ht, void *privdata) {
}
/* Contains the logic run on the child process during the snapshot phase. */
int childSnapshotForSyncSlot(int req, rio *rdb, void *privdata) {
UNUSED(req);
list *slot_ranges = privdata;
int childSnapshotForSyncSlot(rio *aof, slotMigrationJob *job) {
list *slot_ranges = job->slot_ranges;
size_t key_count = 0;
for (int db_num = 0; db_num < server.dbnum; db_num++) {
listIter li;
@ -1276,40 +1279,127 @@ int childSnapshotForSyncSlot(int req, rio *rdb, void *privdata) {
slotRange *r = (slotRange *)ln->value;
for (int slot = r->start_slot; slot <= r->end_slot; slot++) {
if (rewriteSlotToAppendOnlyFileRio(
rdb, db_num, slot, &key_count) == C_ERR) return C_ERR;
aof, db_num, slot, &key_count) == C_ERR) return C_ERR;
}
}
}
rioWrite(rdb, "*3\r\n", 4);
rioWriteBulkString(rdb, "CLUSTER", 7);
rioWriteBulkString(rdb, "SYNCSLOTS", 9);
rioWriteBulkString(rdb, "SNAPSHOT-EOF", 12);
rioWrite(aof, "*3\r\n", 4);
rioWriteBulkString(aof, "CLUSTER", 7);
rioWriteBulkString(aof, "SYNCSLOTS", 9);
rioWriteBulkString(aof, "SNAPSHOT-EOF", 12);
return C_OK;
}
/* Kill the slot migration child using SIGUSR1 (so that the parent will know
* the child did not exit for an error, but because we wanted), and performs
* the cleanup needed. */
void killSlotMigrationChild(void) {
/* No slot migration child? return. */
if (server.child_type != CHILD_TYPE_SLOT_MIGRATION) return;
serverLog(LL_NOTICE, "Killing running slot migration child: %ld", (long)server.child_pid);
/* Because we are not using here waitpid (like we have in killAppendOnlyChild
* and TerminateModuleForkChild), all the cleanup operations is done by
* checkChildrenDone, that later will find that the process killed. */
kill(server.child_pid, SIGUSR1);
}
/* Begin the snapshot for the provided job in a child process. */
int slotExportJobBeginSnapshot(slotMigrationJob *job) {
connection **conns = zmalloc(sizeof(connection *));
*conns = job->client->conn;
rdbSnapshotOptions opts = {
.connsnum = 1,
.conns = conns,
.use_pipe = 1,
.req = REPLICA_REQ_NONE,
.skip_checksum = 1,
.snapshot_func = childSnapshotForSyncSlot,
.privdata = job->slot_ranges};
if (saveSnapshotToConnectionSockets(opts) != C_OK) {
int slotExportJobBeginSnapshotToTargetSocket(slotMigrationJob *job) {
if (hasActiveChildProcess()) return C_ERR;
pid_t childpid;
int pipefds[2], slot_migration_pipe_write = -1, safe_to_exit_pipe = -1;
serverAssert(server.slot_migration_pipe_read == -1 && server.slot_migration_child_exit_pipe == -1);
/* Before to fork, create a pipe that is used to transfer the slot data bytes to
* the parent, we can't let it write directly to the sockets, since in case
* of TLS we must let the parent handle a continuous TLS state when the
* child terminates and parent takes over. */
if (anetPipe(pipefds, O_NONBLOCK, 0) == -1) return C_ERR;
server.slot_migration_pipe_read = pipefds[0]; /* read end */
slot_migration_pipe_write = pipefds[1]; /* write end */
/* create another pipe that is used by the parent to signal to the child
* that it can exit. */
if (anetPipe(pipefds, 0, 0) == -1) {
close(slot_migration_pipe_write);
close(server.slot_migration_pipe_read);
server.slot_migration_pipe_read = -1;
return C_ERR;
}
if (server.debug_pause_after_fork) debugPauseProcess();
return C_OK;
safe_to_exit_pipe = pipefds[0]; /* read end */
server.slot_migration_child_exit_pipe = pipefds[1]; /* write end */
server.slot_migration_pipe_conn = job->client->conn;
if ((childpid = serverFork(CHILD_TYPE_SLOT_MIGRATION)) == 0) {
/* Child */
rio aof;
rioInitWithFd(&aof, slot_migration_pipe_write);
/* Close the reading part, so that if the parent crashes, the child will
* get a write error and exit. */
close(server.rdb_pipe_read);
serverSetProcTitle("valkey-slot-migration-to-target");
serverSetCpuAffinity(server.bgsave_cpulist);
int retval = childSnapshotForSyncSlot(&aof, job);
if (retval == C_OK && rioFlush(&aof) == 0) retval = C_ERR;
if (retval == C_OK) {
sendChildCowInfo(CHILD_INFO_TYPE_SLOT_MIGRATION_COW_SIZE, "Slot migration");
}
rioFreeFd(&aof);
/* wake up the reader, tell it we're done. */
close(slot_migration_pipe_write);
close(server.slot_migration_child_exit_pipe); /* close write end so that we can detect the close on the parent. */
ssize_t dummy = read(safe_to_exit_pipe, pipefds, 1);
UNUSED(dummy);
exitFromChild((retval == C_OK) ? 0 : 1);
} else {
/* Parent */
if (childpid == -1) {
serverLog(LL_WARNING, "Can't begin slot migration snapshot in background: fork: %s", strerror(errno));
close(slot_migration_pipe_write);
close(server.slot_migration_pipe_read);
close(server.slot_migration_child_exit_pipe);
server.slot_migration_pipe_conn = NULL;
return C_ERR;
}
serverLog(LL_NOTICE, "Started child process %ld for slot migration %s", (long)childpid, job->description);
close(slot_migration_pipe_write); /* close write in parent so that it can detect the close on the child. */
if (aeCreateFileEvent(server.el, server.slot_migration_pipe_read, AE_READABLE, slotMigrationPipeReadHandler, NULL) == AE_ERR) {
serverPanic("Unrecoverable error creating server.rdb_pipe_read file event.");
}
close(safe_to_exit_pipe);
if (server.debug_pause_after_fork) debugPauseProcess();
return C_OK;
}
return C_OK; /* Unreached. */
}
/* Callback triggered after snapshot is finished. We either begin sending the
* incremental contents or fail the associated migration. */
void clusterHandleSlotExportBackgroundSaveDone(int bgsaveerr) {
if (!server.cluster_enabled) return;
/* When a background slot migration terminates, call the right handler. */
void backgroundSlotMigrationDoneHandler(int exitcode, int bysignal) {
if (!bysignal && exitcode == 0) {
serverLog(LL_NOTICE, "Background SLOT MIGRATION transfer terminated with success");
} else if (!bysignal && exitcode != 0) {
serverLog(LL_WARNING, "Background SLOT MIGRATION transfer error");
} else {
serverLog(LL_WARNING, "Background SLOT MIGRATION transfer terminated by signal %d", bysignal);
}
if (server.slot_migration_child_exit_pipe != -1) close(server.slot_migration_child_exit_pipe);
if (server.slot_migration_pipe_read > 0) {
aeDeleteFileEvent(server.el, server.slot_migration_pipe_read, AE_READABLE);
close(server.slot_migration_pipe_read);
}
server.slot_migration_child_exit_pipe = -1;
server.slot_migration_pipe_read = -1;
server.slot_migration_pipe_conn = NULL;
zfree(server.slot_migration_pipe_buff);
server.slot_migration_pipe_buff = NULL;
server.slot_migration_pipe_bufflen = 0;
listIter li;
listNode *ln;
listRewind(server.cluster->slot_migration_jobs, &li);
@ -1319,8 +1409,9 @@ void clusterHandleSlotExportBackgroundSaveDone(int bgsaveerr) {
if (job->state != SLOT_EXPORT_SNAPSHOTTING) {
continue;
}
if (bgsaveerr == C_OK) {
if (!bysignal && exitcode == 0) {
slotExportBeginStreaming(job);
job->stat_cow_bytes = server.stat_slot_migration_cow_bytes;
} else {
serverLog(LL_WARNING,
"Child process failed to snapshot slot migration %s",
@ -1683,7 +1774,7 @@ void proceedWithSlotMigration(slotMigrationJob *job) {
serverLog(LL_NOTICE,
"Beginning snapshot of slot migration %s.",
job->description);
if (slotExportJobBeginSnapshot(job) == C_ERR) {
if (slotExportJobBeginSnapshotToTargetSocket(job) == C_ERR) {
serverLog(LL_WARNING,
"Slot migration %s failed to start slot snapshot",
job->description);
@ -1780,10 +1871,6 @@ void resetSlotMigrationJob(slotMigrationJob *job) {
sdsfree(job->response_buf);
job->response_buf = NULL;
/* Description is not needed once migration is finished */
sdsfree(job->description);
job->description = NULL;
}
void freeSlotMigrationJob(void *o) {
@ -1793,6 +1880,7 @@ void freeSlotMigrationJob(void *o) {
sdsfree(job->slot_ranges_str);
sdsfree(job->status_msg);
sdsfree(job->response_buf);
sdsfree(job->description);
zfree(o);
}
@ -1940,7 +2028,7 @@ void finishSlotMigrationJob(slotMigrationJob *job,
slotExportTryUnpause();
/* Fast fail the child process, which will be cleaned up fully in
* checkChildrenDone. */
if (job->state == SLOT_EXPORT_SNAPSHOTTING) killRDBChild();
if (job->state == SLOT_EXPORT_SNAPSHOTTING) killSlotMigrationChild();
}
if (job->type == SLOT_MIGRATION_IMPORT &&
state != SLOT_MIGRATION_JOB_SUCCESS) {
@ -1997,7 +2085,7 @@ void clusterCommandGetSlotMigrations(client *c) {
listRewind(server.cluster->slot_migration_jobs, &li);
while ((ln = listNext(&li)) != NULL) {
slotMigrationJob *job = ln->value;
addReplyMapLen(c, 10);
addReplyMapLen(c, 11);
addReplyBulkCString(c, "name");
addReplyBulkCBuffer(c, job->name, CLUSTER_NAMELEN);
addReplyBulkCString(c, "operation");
@ -2020,6 +2108,8 @@ void clusterCommandGetSlotMigrations(client *c) {
addReplyBulkCString(c, slotMigrationJobStateToString(job->state));
addReplyBulkCString(c, "message");
addReplyBulkCString(c, job->status_msg ? job->status_msg : "");
addReplyBulkCString(c, "cow_size");
addReplyLongLong(c, (long long)job->stat_cow_bytes);
}
}

View File

@ -24,7 +24,7 @@ void clusterCommandMigrateSlots(client *c);
void clusterCommandSyncSlots(client *c);
void clusterCommandGetSlotMigrations(client *c);
void clusterCommandCancelSlotMigrations(client *c);
void clusterHandleSlotExportBackgroundSaveDone(int bgsaveerr);
void backgroundSlotMigrationDoneHandler(int exitcode, int bysignal);
void clusterUpdateSlotExportsOnOwnershipChange(void);
void clusterUpdateSlotImportsOnOwnershipChange(void);
void clusterCleanupSlotMigrationLog(void);
@ -33,5 +33,6 @@ size_t clusterGetTotalSlotExportBufferMemory(void);
bool clusterSlotFailoverGranted(int slot);
void clusterFailAllSlotExportsWithMessage(char *message);
void clusterHandleSlotMigrationErrorResponse(slotMigrationJob *job);
void killSlotMigrationChild(void);
#endif /* __CLUSTER_MIGRATESLOTS_H */

View File

@ -39,8 +39,15 @@
"type": "string",
"pattern": "^([0-9]+-[0-9]+)( [0-9]+-[0-9]+)*$"
},
"node": {
"type": "string"
"target_node": {
"description": "The target node name in the migration job.",
"type": "string",
"pattern": "^[0-9a-fA-F]{40}$"
},
"source_node": {
"description": "The source node name in the migration job.",
"type": "string",
"pattern": "^[0-9a-fA-F]{40}$"
},
"create_time": {
"description": "Creation time, in seconds since the unix epoch.",
@ -55,10 +62,16 @@
"type": "integer"
},
"state": {
"description": "Human readable string representing the migration job state.",
"type": "string"
},
"message": {
"description": "Human readable status message with more details.",
"type": "string"
},
"cow_size": {
"description": "Copy on write bytes during slot migration fork.",
"type": "integer"
}
}
}

View File

@ -802,6 +802,7 @@ int getFlushCommandFlags(client *c, int *flags) {
void flushAllDataAndResetRDB(int flags) {
server.dirty += emptyData(-1, flags, NULL);
if (server.child_type == CHILD_TYPE_RDB) killRDBChild();
if (server.child_type == CHILD_TYPE_SLOT_MIGRATION) killSlotMigrationChild();
if (server.saveparamslen > 0) {
rdbSaveInfo rsi, *rsiptr;
rsiptr = rdbPopulateSaveInfo(&rsi);

View File

@ -13355,6 +13355,10 @@ int VM_RdbLoad(ValkeyModuleCtx *ctx, ValkeyModuleRdbStream *stream, int flags) {
* will prevent COW memory issue. */
if (server.child_type == CHILD_TYPE_RDB) killRDBChild();
/* Kill existing slot migration fork as it is saving outdated data. Also killing it
* will prevent COW memory issue. */
if (server.child_type == CHILD_TYPE_SLOT_MIGRATION) killSlotMigrationChild();
emptyData(-1, EMPTYDB_NO_FLAGS, NULL);
/* rdbLoad() can go back to the networking and process network events. If

271
src/rdb.c
View File

@ -3625,16 +3625,16 @@ void backgroundSaveDoneHandler(int exitcode, int bysignal) {
/* Possibly there are replicas waiting for a BGSAVE in order to be served
* (the first stage of SYNC is a bulk transfer of dump.rdb) */
updateReplicasWaitingBgsave((!bysignal && exitcode == 0) ? C_OK : C_ERR, type);
/* Slot export should also be notified, in case this was a export related
* snapshot */
clusterHandleSlotExportBackgroundSaveDone((!bysignal && exitcode == 0) ? C_OK : C_ERR);
}
/* Kill the RDB saving child using SIGUSR1 (so that the parent will know
* the child did not exit for an error, but because we wanted), and performs
* the cleanup needed. */
void killRDBChild(void) {
/* No rdb child? return. */
if (server.child_type != CHILD_TYPE_RDB) return;
serverLog(LL_NOTICE, "Killing running RDB child: %ld", (long)server.child_pid);
kill(server.child_pid, SIGUSR1);
/* Because we are not using here waitpid (like we have in killAppendOnlyChild
* and TerminateModuleForkChild), all the cleanup operations is done by
@ -3644,37 +3644,28 @@ void killRDBChild(void) {
* - rdbRemoveTempFile */
}
/* Save snapshot to the provided connections, spawning a child process and
* running the provided function.
*
* The connections array (the conns field in the rdbSnapshotOptions) is a
* heap-allocated array that will be freed by this function and shall not be
* freed by the caller. */
int saveSnapshotToConnectionSockets(rdbSnapshotOptions options) {
/* Spawn an RDB child that writes the RDB to the sockets of the replicas
* that are currently in REPLICA_STATE_WAIT_BGSAVE_START state. */
int rdbSaveToReplicasSockets(int req, rdbSaveInfo *rsi) {
listNode *ln;
listIter li;
pid_t childpid;
int pipefds[2], rdb_pipe_write = -1, safe_to_exit_pipe = -1;
if (hasActiveChildProcess()) {
zfree(options.conns);
return C_ERR;
}
int dual_channel = (req & REPLICA_REQ_RDB_CHANNEL);
if (hasActiveChildProcess()) return C_ERR;
serverAssert(server.rdb_pipe_read == -1 && server.rdb_child_exit_pipe == -1);
/* Even if the previous fork child exited, don't start a new one until we
* drained the pipe. */
if (server.rdb_pipe_conns) {
zfree(options.conns);
return C_ERR;
}
if (server.rdb_pipe_conns) return C_ERR;
if (options.use_pipe) {
if (!dual_channel) {
/* Before to fork, create a pipe that is used to transfer the rdb bytes to
* the parent, we can't let it write directly to the sockets, since in case
* of TLS we must let the parent handle a continuous TLS state when the
* child terminates and parent takes over. */
if (anetPipe(pipefds, O_NONBLOCK, 0) == -1) {
zfree(options.conns);
return C_ERR;
}
if (anetPipe(pipefds, O_NONBLOCK, 0) == -1) return C_ERR;
server.rdb_pipe_read = pipefds[0]; /* read end */
rdb_pipe_write = pipefds[1]; /* write end */
@ -3683,118 +3674,11 @@ int saveSnapshotToConnectionSockets(rdbSnapshotOptions options) {
if (anetPipe(pipefds, 0, 0) == -1) {
close(rdb_pipe_write);
close(server.rdb_pipe_read);
zfree(options.conns);
return C_ERR;
}
safe_to_exit_pipe = pipefds[0]; /* read end */
server.rdb_child_exit_pipe = pipefds[1]; /* write end */
}
server.rdb_pipe_conns = NULL;
if (options.use_pipe) {
server.rdb_pipe_conns = options.conns;
server.rdb_pipe_numconns = options.connsnum;
server.rdb_pipe_numconns_writing = 0;
}
/* Create the child process. */
if ((childpid = serverFork(CHILD_TYPE_RDB)) == 0) {
/* Child */
int retval, dummy;
rio rdb;
if (!options.use_pipe) {
rioInitWithConnset(&rdb, options.conns, options.connsnum);
} else {
rioInitWithFd(&rdb, rdb_pipe_write);
}
/* Close the reading part, so that if the parent crashes, the child will
* get a write error and exit. */
if (options.use_pipe) close(server.rdb_pipe_read);
if (strstr(server.exec_argv[0], "redis-server") != NULL) {
serverSetProcTitle("redis-rdb-to-slaves");
} else {
serverSetProcTitle("valkey-rdb-to-replicas");
}
serverSetCpuAffinity(server.bgsave_cpulist);
if (options.skip_checksum) rdb.flags |= RIO_FLAG_SKIP_RDB_CHECKSUM;
retval = options.snapshot_func(options.req, &rdb, options.privdata);
if (retval == C_OK && rioFlush(&rdb) == 0) retval = C_ERR;
if (retval == C_OK) {
sendChildCowInfo(CHILD_INFO_TYPE_RDB_COW_SIZE, "RDB");
if (!options.use_pipe) {
sendChildInfoGeneric(CHILD_INFO_TYPE_REPL_OUTPUT_BYTES, 0, rdb.processed_bytes, -1, "RDB");
}
}
if (!options.use_pipe) {
rioFreeConnset(&rdb);
} else {
rioFreeFd(&rdb);
/* wake up the reader, tell it we're done. */
close(rdb_pipe_write);
close(server.rdb_child_exit_pipe); /* close write end so that we can detect the close on the parent. */
}
zfree(options.conns);
/* hold exit until the parent tells us it's safe. we're not expecting
* to read anything, just get the error when the pipe is closed. */
if (options.use_pipe) dummy = read(safe_to_exit_pipe, pipefds, 1);
UNUSED(dummy);
exitFromChild((retval == C_OK) ? 0 : 1);
} else {
/* Parent */
if (childpid == -1) {
serverLog(LL_WARNING, "Can't save in background: fork: %s", strerror(errno));
if (options.use_pipe) {
close(rdb_pipe_write);
close(server.rdb_pipe_read);
close(server.rdb_child_exit_pipe);
}
zfree(options.conns);
if (!options.use_pipe) {
closeChildInfoPipe();
} else {
server.rdb_pipe_conns = NULL;
server.rdb_pipe_numconns = 0;
server.rdb_pipe_numconns_writing = 0;
}
} else {
serverLog(LL_NOTICE, "Background RDB transfer started by pid %ld to %s%s", (long)childpid,
!options.use_pipe ? "direct socket to replica" : "pipe through parent process",
options.skip_checksum ? " while skipping RDB checksum for this transfer" : "");
server.rdb_save_time_start = time(NULL);
server.rdb_child_type = RDB_CHILD_TYPE_SOCKET;
if (!options.use_pipe) {
/* For dual channel sync, the main process no longer requires these RDB connections. */
zfree(options.conns);
} else {
close(rdb_pipe_write); /* close write in parent so that it can detect the close on the child. */
if (aeCreateFileEvent(server.el, server.rdb_pipe_read, AE_READABLE, rdbPipeReadHandler, NULL) ==
AE_ERR) {
serverPanic("Unrecoverable error creating server.rdb_pipe_read file event.");
}
}
}
if (options.use_pipe) close(safe_to_exit_pipe);
return (childpid == -1) ? C_ERR : C_OK;
}
return C_OK; /* Unreached. */
}
int childSnapshotUsingRDB(int req, rio *rdb, void *privdata) {
return rdbSaveRioWithEOFMark(req, rdb, NULL, (rdbSaveInfo *)privdata);
}
/* Spawn an RDB child that writes the RDB to the sockets of the replicas
* that are currently in REPLICA_STATE_WAIT_BGSAVE_START state. */
int rdbSaveToReplicasSockets(int req, rdbSaveInfo *rsi) {
listNode *ln;
listIter li;
int dual_channel = (req & REPLICA_REQ_RDB_CHANNEL);
/*
* For replicas with repl_state == REPLICA_STATE_WAIT_BGSAVE_END and replica_req == req:
* Check replica capabilities, if every replica supports skipping RDB checksum, primary should also skip checksum.
@ -3802,10 +3686,15 @@ int rdbSaveToReplicasSockets(int req, rdbSaveInfo *rsi) {
*/
int skip_rdb_checksum = 1;
/* Collect the connections of the replicas we want to transfer
* the RDB to, which are i WAIT_BGSAVE_START state. */
* the RDB to, which are in WAIT_BGSAVE_START state. */
int connsnum = 0;
connection **conns = zmalloc(sizeof(connection *) * listLength(server.replicas));
server.rdb_pipe_conns = NULL;
if (!dual_channel) {
server.rdb_pipe_conns = conns;
server.rdb_pipe_numconns = 0;
server.rdb_pipe_numconns_writing = 0;
}
/* Filter replica connections pending full sync (ie. in WAIT_BGSAVE_START state). */
listRewind(server.replicas, &li);
while ((ln = listNext(&li))) {
@ -3824,37 +3713,113 @@ int rdbSaveToReplicasSockets(int req, rdbSaveInfo *rsi) {
addRdbReplicaToPsyncWait(replica);
/* Put the socket in blocking mode to simplify RDB transfer. */
connBlock(replica->conn);
} else {
server.rdb_pipe_numconns++;
}
replicationSetupReplicaForFullResync(replica, getPsyncInitialOffset());
}
/* do not skip RDB checksum on the primary if connection doesn't have integrity check or if the replica doesn't support it */
// do not skip RDB checksum on the primary if connection doesn't have integrity check or if the replica doesn't support it
if (!connIsIntegrityChecked(replica->conn) || !(replica->repl_data->replica_capa & REPLICA_CAPA_SKIP_RDB_CHECKSUM))
skip_rdb_checksum = 0;
}
rdbSnapshotOptions options = {
.conns = conns,
.connsnum = connsnum,
.use_pipe = !dual_channel,
.req = req,
.skip_checksum = skip_rdb_checksum,
.privdata = rsi,
.snapshot_func = childSnapshotUsingRDB};
if (saveSnapshotToConnectionSockets(options) != C_OK) {
/* Undo the state change. The caller will perform cleanup on
* all the replicas in BGSAVE_START state, but an early call to
* replicationSetupReplicaForFullResync() turned it into BGSAVE_END */
listRewind(server.replicas, &li);
while ((ln = listNext(&li))) {
client *replica = ln->value;
if (replica->repl_data->repl_state == REPLICA_STATE_WAIT_BGSAVE_END) {
replica->repl_data->repl_state = REPLICA_STATE_WAIT_BGSAVE_START;
/* Create the child process. */
if ((childpid = serverFork(CHILD_TYPE_RDB)) == 0) {
/* Child */
int retval, dummy;
rio rdb;
if (dual_channel) {
rioInitWithConnset(&rdb, conns, connsnum);
} else {
rioInitWithFd(&rdb, rdb_pipe_write);
}
/* Close the reading part, so that if the parent crashes, the child will
* get a write error and exit. */
if (!dual_channel) close(server.rdb_pipe_read);
if (strstr(server.exec_argv[0], "redis-server") != NULL) {
serverSetProcTitle("redis-rdb-to-slaves");
} else {
serverSetProcTitle("valkey-rdb-to-replicas");
}
serverSetCpuAffinity(server.bgsave_cpulist);
if (skip_rdb_checksum) rdb.flags |= RIO_FLAG_SKIP_RDB_CHECKSUM;
retval = rdbSaveRioWithEOFMark(req, &rdb, NULL, rsi);
if (retval == C_OK && rioFlush(&rdb) == 0) retval = C_ERR;
if (retval == C_OK) {
sendChildCowInfo(CHILD_INFO_TYPE_RDB_COW_SIZE, "RDB");
if (dual_channel) {
sendChildInfoGeneric(CHILD_INFO_TYPE_REPL_OUTPUT_BYTES, 0, rdb.processed_bytes, -1, "RDB");
}
}
return C_ERR;
if (dual_channel) {
rioFreeConnset(&rdb);
} else {
rioFreeFd(&rdb);
/* wake up the reader, tell it we're done. */
close(rdb_pipe_write);
close(server.rdb_child_exit_pipe); /* close write end so that we can detect the close on the parent. */
}
zfree(conns);
/* hold exit until the parent tells us it's safe. we're not expecting
* to read anything, just get the error when the pipe is closed. */
if (!dual_channel) dummy = read(safe_to_exit_pipe, pipefds, 1);
UNUSED(dummy);
exitFromChild((retval == C_OK) ? 0 : 1);
} else {
/* Parent */
if (childpid == -1) {
serverLog(LL_WARNING, "Can't save in background: fork: %s", strerror(errno));
/* Undo the state change. The caller will perform cleanup on
* all the replicas in BGSAVE_START state, but an early call to
* replicationSetupReplicaForFullResync() turned it into BGSAVE_END */
listRewind(server.replicas, &li);
while ((ln = listNext(&li))) {
client *replica = ln->value;
if (replica->repl_data->repl_state == REPLICA_STATE_WAIT_BGSAVE_END) {
replica->repl_data->repl_state = REPLICA_STATE_WAIT_BGSAVE_START;
}
}
if (!dual_channel) {
close(rdb_pipe_write);
close(server.rdb_pipe_read);
close(server.rdb_child_exit_pipe);
}
zfree(conns);
if (dual_channel) {
closeChildInfoPipe();
} else {
server.rdb_pipe_conns = NULL;
server.rdb_pipe_numconns = 0;
server.rdb_pipe_numconns_writing = 0;
}
} else {
serverLog(LL_NOTICE, "Background RDB transfer started by pid %ld to %s%s", (long)childpid,
dual_channel ? "direct socket to replica" : "pipe through parent process",
skip_rdb_checksum ? " while skipping RDB checksum for this transfer" : "");
server.rdb_save_time_start = time(NULL);
server.rdb_child_type = RDB_CHILD_TYPE_SOCKET;
if (dual_channel) {
/* For dual channel sync, the main process no longer requires these RDB connections. */
zfree(conns);
} else {
close(rdb_pipe_write); /* close write in parent so that it can detect the close on the child. */
if (aeCreateFileEvent(server.el, server.rdb_pipe_read, AE_READABLE, rdbPipeReadHandler, NULL) ==
AE_ERR) {
serverPanic("Unrecoverable error creating server.rdb_pipe_read file event.");
}
}
}
if (!dual_channel) close(safe_to_exit_pipe);
return (childpid == -1) ? C_ERR : C_OK;
}
return C_OK;
return C_OK; /* Unreached. */
}
void saveCommand(client *c) {

View File

@ -126,17 +126,6 @@ enum RdbType {
#define RDB_FOREIGN_TYPE_MIN 22
#define RDB_FOREIGN_TYPE_MAX 243
typedef int (*ChildSnapshotFunc)(int req, rio *rdb, void *privdata);
typedef struct rdbSnapshotOptions {
int connsnum; /* Number of connections. */
connection **conns; /* A heap-allocated array of connections to send the snapshot to. */
int use_pipe; /* Use pipe to send the snapshot. */
int req; /* See REPLICA_REQ_* in server.h. */
int skip_checksum; /* Skip checksum when sending the snapshot. */
ChildSnapshotFunc snapshot_func; /* Function to call to take the snapshot. */
void *privdata; /* Private data to pass to snapshot_func. */
} rdbSnapshotOptions;
/* Test if a type is an object type. */
#define rdbIsObjectType(t) (((t) >= 0 && (t) <= 7) || ((t) >= 9 && (t) < RDB_TYPE_LAST))
@ -222,6 +211,5 @@ int rdbFunctionLoad(rio *rdb, int ver, functionsLibCtx *lib_ctx, int rdbflags, s
int rdbSaveRio(int req, rio *rdb, int *error, int rdbflags, rdbSaveInfo *rsi);
ssize_t rdbSaveFunctions(rio *rdb);
rdbSaveInfo *rdbPopulateSaveInfo(rdbSaveInfo *rsi);
int saveSnapshotToConnectionSockets(rdbSnapshotOptions options);
#endif

View File

@ -40,6 +40,7 @@
#include "functions.h"
#include "connection.h"
#include "module.h"
#include "cluster_migrateslots.h"
#include <memory.h>
#include <sys/time.h>
@ -1719,11 +1720,7 @@ void rdbPipeWriteHandler(struct connection *conn) {
return;
} else {
replica->repl_data->repldboff += nwritten;
if (getClientType(replica) == CLIENT_TYPE_SLOT_EXPORT) {
server.stat_net_cluster_slot_export_bytes += nwritten;
} else {
server.stat_net_repl_output_bytes += nwritten;
}
server.stat_net_repl_output_bytes += nwritten;
if (replica->repl_data->repldboff < server.rdb_pipe_bufflen) {
replica->repl_data->repl_last_partial_write = server.unixtime;
return; /* more data to write.. */
@ -1797,11 +1794,7 @@ void rdbPipeReadHandler(struct aeEventLoop *eventLoop, int fd, void *clientData,
/* Note: when use diskless replication, 'repldboff' is the offset
* of 'rdb_pipe_buff' sent rather than the offset of entire RDB. */
replica->repl_data->repldboff = nwritten;
if (getClientType(replica) == CLIENT_TYPE_SLOT_EXPORT) {
server.stat_net_cluster_slot_export_bytes += nwritten;
} else {
server.stat_net_repl_output_bytes += nwritten;
}
server.stat_net_repl_output_bytes += nwritten;
}
/* If we were unable to write all the data to one of the replicas,
* setup write handler (and disable pipe read handler, below) */
@ -1820,6 +1813,94 @@ void rdbPipeReadHandler(struct aeEventLoop *eventLoop, int fd, void *clientData,
}
}
/* Called in slot migration source during transfer of data from the slot migration pipe, when
* the target becomes writable again. */
void slotMigrationPipeWriteHandler(struct connection *conn) {
serverAssert(server.slot_migration_pipe_bufflen > 0);
client *target = connGetPrivateData(conn);
ssize_t nwritten;
if ((nwritten = connWrite(conn, server.slot_migration_pipe_buff + target->repl_data->repldboff,
server.slot_migration_pipe_bufflen - target->repl_data->repldboff)) == -1) {
if (connGetState(conn) == CONN_STATE_CONNECTED) return; /* equivalent to EAGAIN */
serverLog(LL_WARNING, "Write error sending slot migration snapshot to target: %s", connGetLastError(conn));
freeClient(target);
return;
} else {
target->repl_data->repldboff += nwritten;
server.stat_net_cluster_slot_export_bytes += nwritten;
if (target->repl_data->repldboff < server.slot_migration_pipe_bufflen) {
target->repl_data->repl_last_partial_write = server.unixtime;
return; /* more data to write.. */
}
}
/* Remove the write handler and setup the pipe read handler. */
connSetWriteHandler(conn, NULL);
target->repl_data->repl_last_partial_write = 0;
if (aeCreateFileEvent(server.el, server.slot_migration_pipe_read, AE_READABLE, slotMigrationPipeReadHandler, NULL) == AE_ERR) {
serverPanic("Unrecoverable error creating server.slot_migration_pipe_read file event.");
}
}
/* Called in slot migration source, when there's data to read from the child's slot migration pipe. */
void slotMigrationPipeReadHandler(struct aeEventLoop *eventLoop, int fd, void *clientData, int mask) {
UNUSED(mask);
UNUSED(clientData);
UNUSED(eventLoop);
if (!server.slot_migration_pipe_buff) server.slot_migration_pipe_buff = zmalloc(PROTO_IOBUF_LEN);
while (1) {
server.slot_migration_pipe_bufflen = read(fd, server.slot_migration_pipe_buff, PROTO_IOBUF_LEN);
if (server.slot_migration_pipe_bufflen < 0) {
if (errno == EAGAIN || errno == EWOULDBLOCK) return;
serverLog(LL_WARNING, "Slot migration, read error sending snapshot to target: %s", strerror(errno));
client *target = connGetPrivateData(server.slot_migration_pipe_conn);
freeClient(target);
server.slot_migration_pipe_conn = NULL;
killSlotMigrationChild();
return;
}
if (server.slot_migration_pipe_bufflen == 0) {
/* EOF - write end was closed. */
aeDeleteFileEvent(server.el, server.slot_migration_pipe_read, AE_READABLE);
/* Now that the target have finished reading, notify the child that it's safe to exit.
* When the server detects the child has exited, it can mark the target as online, and
* start streaming the slot replication buffers. */
close(server.slot_migration_child_exit_pipe);
server.slot_migration_child_exit_pipe = -1;
return;
}
ssize_t nwritten;
connection *conn = server.slot_migration_pipe_conn;
client *target = connGetPrivateData(conn);
if ((nwritten = connWrite(conn, server.slot_migration_pipe_buff, server.slot_migration_pipe_bufflen)) == -1) {
if (connGetState(conn) != CONN_STATE_CONNECTED) {
serverLog(LL_WARNING, "Slot migration transfer, write error sending DB to target: %s",
connGetLastError(conn));
freeClient(target);
server.slot_migration_pipe_conn = NULL;
return;
}
/* An error and still in connected state, is equivalent to EAGAIN */
target->repl_data->repldboff = 0;
} else {
/* Note: when use diskless replication, 'repldboff' is the offset
* of 'slot_migration_pipe_buff' sent rather than the offset of entire snapshot. */
target->repl_data->repldboff = nwritten;
server.stat_net_cluster_slot_export_bytes += nwritten;
}
/* If we were unable to write all the data to the target,
* setup write handler and disable pipe read handler. */
if (nwritten != server.slot_migration_pipe_bufflen) {
connSetWriteHandler(conn, slotMigrationPipeWriteHandler);
aeDeleteFileEvent(server.el, server.slot_migration_pipe_read, AE_READABLE);
return;
}
}
}
/* This function is called at the end of every background saving.
*
* The argument bgsaveerr is C_OK if the background saving succeeded

View File

@ -824,6 +824,7 @@ const char *strChildType(int type) {
case CHILD_TYPE_AOF: return "AOF";
case CHILD_TYPE_LDB: return "LDB";
case CHILD_TYPE_MODULE: return "MODULE";
case CHILD_TYPE_SLOT_MIGRATION: return "SLOT_MIGRATION";
default: return "Unknown";
}
}
@ -850,7 +851,7 @@ void resetChildState(void) {
/* Return if child type is mutually exclusive with other fork children */
int isMutuallyExclusiveChildType(int type) {
return type == CHILD_TYPE_RDB || type == CHILD_TYPE_AOF || type == CHILD_TYPE_MODULE;
return type == CHILD_TYPE_RDB || type == CHILD_TYPE_AOF || type == CHILD_TYPE_MODULE || type == CHILD_TYPE_SLOT_MIGRATION;
}
/* Returns true when we're inside a long command that yielded to the event loop. */
@ -1390,17 +1391,19 @@ void checkChildrenDone(void) {
"child_type: %s, child_pid = %d",
strerror(errno), strChildType(server.child_type), (int)server.child_pid);
} else if (pid == server.child_pid) {
if (!bysignal && exitcode == 0) receiveChildInfo();
if (server.child_type == CHILD_TYPE_RDB) {
backgroundSaveDoneHandler(exitcode, bysignal);
} else if (server.child_type == CHILD_TYPE_AOF) {
backgroundRewriteDoneHandler(exitcode, bysignal);
} else if (server.child_type == CHILD_TYPE_MODULE) {
ModuleForkDoneHandler(exitcode, bysignal);
} else if (server.child_type == CHILD_TYPE_SLOT_MIGRATION) {
backgroundSlotMigrationDoneHandler(exitcode, bysignal);
} else {
serverPanic("Unknown child type %d for child pid %d", server.child_type, server.child_pid);
exit(1);
}
if (!bysignal && exitcode == 0) receiveChildInfo();
resetChildState();
} else {
if (!ldbRemoveChild(pid)) {
@ -2830,6 +2833,8 @@ void initServer(void) {
server.in_fork_child = CHILD_TYPE_NONE;
server.rdb_pipe_read = -1;
server.rdb_child_exit_pipe = -1;
server.slot_migration_pipe_read = -1;
server.slot_migration_child_exit_pipe = -1;
server.main_thread_id = pthread_self();
server.current_client = NULL;
server.errors = raxNew();
@ -2938,6 +2943,7 @@ void initServer(void) {
server.stat_rdb_cow_bytes = 0;
server.stat_aof_cow_bytes = 0;
server.stat_module_cow_bytes = 0;
server.stat_slot_migration_cow_bytes = 0;
server.stat_module_progress = 0;
for (int j = 0; j < CLIENT_TYPE_COUNT; j++) server.stat_clients_type_memory[j] = 0;
server.stat_cluster_links_memory = 0;
@ -4740,6 +4746,11 @@ int finishShutdown(void) {
}
}
if (server.child_type == CHILD_TYPE_SLOT_MIGRATION) {
serverLog(LL_WARNING, "There is a slot migration child. Killing it!");
killSlotMigrationChild();
}
/* Create a new RDB file before exiting. */
if ((server.saveparamslen > 0 && !nosave) || save) {
serverLog(LL_NOTICE, "Saving the final RDB snapshot before exiting.");
@ -6048,7 +6059,8 @@ sds genValkeyInfoString(dict *section_dict, int all_sections, int everything) {
"aof_last_write_status:%s\r\n", (server.aof_last_write_status == C_OK && aof_bio_fsync_status == C_OK) ? "ok" : "err",
"aof_last_cow_size:%zu\r\n", server.stat_aof_cow_bytes,
"module_fork_in_progress:%d\r\n", server.child_type == CHILD_TYPE_MODULE,
"module_fork_last_cow_size:%zu\r\n", server.stat_module_cow_bytes));
"module_fork_last_cow_size:%zu\r\n", server.stat_module_cow_bytes,
"slot_migration_fork_in_progress:%d\r\n", server.child_type == CHILD_TYPE_SLOT_MIGRATION));
if (server.aof_enabled) {
info = sdscatprintf(
@ -6762,6 +6774,8 @@ int serverFork(int purpose) {
latencyTraceIfNeeded(rdb, fork, server.stat_fork_time);
} else if (purpose == CHILD_TYPE_AOF) {
latencyTraceIfNeeded(aof, fork, server.stat_fork_time);
} else if (purpose == CHILD_TYPE_SLOT_MIGRATION) {
latencyTraceIfNeeded(cluster, fork, server.stat_fork_time);
}
/* The child_pid and child_type are only for mutually exclusive children.

View File

@ -1652,12 +1652,14 @@ typedef struct {
#define CHILD_TYPE_AOF 2
#define CHILD_TYPE_LDB 3
#define CHILD_TYPE_MODULE 4
#define CHILD_TYPE_SLOT_MIGRATION 5
typedef enum childInfoType {
CHILD_INFO_TYPE_CURRENT_INFO,
CHILD_INFO_TYPE_AOF_COW_SIZE,
CHILD_INFO_TYPE_RDB_COW_SIZE,
CHILD_INFO_TYPE_MODULE_COW_SIZE,
CHILD_INFO_TYPE_SLOT_MIGRATION_COW_SIZE,
CHILD_INFO_TYPE_REPL_OUTPUT_BYTES
} childInfoType;
@ -1834,6 +1836,7 @@ struct valkeyServer {
size_t stat_rdb_cow_bytes; /* Copy on write bytes during RDB saving. */
size_t stat_aof_cow_bytes; /* Copy on write bytes during AOF rewrite. */
size_t stat_module_cow_bytes; /* Copy on write bytes during module fork. */
size_t stat_slot_migration_cow_bytes; /* Copy on write bytes during slot migration fork. */
double stat_module_progress; /* Module save progress. */
size_t stat_clients_type_memory[CLIENT_TYPE_COUNT]; /* Mem usage by type */
size_t stat_cluster_links_memory; /* Mem usage by cluster links */
@ -2218,6 +2221,11 @@ struct valkeyServer {
* migrations. */
ssize_t slot_migration_max_failover_repl_bytes; /* Maximum amount of in flight bytes for a slot migration
* failover to be attempted. */
int slot_migration_pipe_read; /* Slot migration pipe used to transfer the slots data */
int slot_migration_child_exit_pipe; /* Used by the slot migration parent allow child exit. */
connection *slot_migration_pipe_conn; /* xxxx */
char *slot_migration_pipe_buff; /* In slot migration, this buffer holds slot snapshot data. */
ssize_t slot_migration_pipe_bufflen; /* that was read from the rdb pipe. */
/* Debug config that goes along with cluster_drop_packet_filter. When set, the link is closed on packet drop. */
uint32_t debug_cluster_close_link_on_packet_drop : 1;
/* Debug config to control the random ping. When set, we will disable the random ping in clusterCron. */
@ -2261,10 +2269,11 @@ struct valkeyServer {
serverUnixContextConfig unix_ctx_config;
serverRdmaContextConfig rdma_ctx_config;
/* cpu affinity */
char *server_cpulist; /* cpu affinity list of server main/io thread. */
char *bio_cpulist; /* cpu affinity list of bio thread. */
char *aof_rewrite_cpulist; /* cpu affinity list of aof rewrite process. */
char *bgsave_cpulist; /* cpu affinity list of bgsave process. */
char *server_cpulist; /* cpu affinity list of server main/io thread. */
char *bio_cpulist; /* cpu affinity list of bio thread. */
char *aof_rewrite_cpulist; /* cpu affinity list of aof rewrite process. */
char *bgsave_cpulist; /* cpu affinity list of bgsave process. */
char *slot_migration_cpulist; /* cpu affinity list of slot migration process. */
/* Sentinel config */
struct sentinelConfig *sentinel_config; /* sentinel config to load at startup time. */
/* Coordinate failover info */
@ -3099,6 +3108,7 @@ void showLatestBacklog(void);
void rdbPipeReadHandler(struct aeEventLoop *eventLoop, int fd, void *clientData, int mask);
void rdbPipeWriteHandlerConnRemoved(struct connection *conn);
int rdbRegisterAuxField(char *auxfield, rdbAuxFieldEncoder encoder, rdbAuxFieldDecoder decoder);
void slotMigrationPipeReadHandler(struct aeEventLoop *eventLoop, int fd, void *clientData, int mask);
void clearFailoverState(void);
void updateFailoverStatus(void);
void abortFailover(const char *err);

View File

@ -100,6 +100,7 @@ Generally valkey-server would not run in full utilization, the overhead is accep
| expire_cycle | valkey_db |
| expire_cycle_fields | valkey_db |
| expire_cycle_keys | valkey_db |
| fork | valkey_cluster |
| cluster_config_open | valkey_cluster |
| cluster_config_write | valkey_cluster |
| cluster_config_fsync | valkey_cluster |

View File

@ -115,6 +115,16 @@ LTTNG_UST_TRACEPOINT_EVENT_INSTANCE(
)
)
LTTNG_UST_TRACEPOINT_EVENT_INSTANCE(
/* Name of the tracepoint class provider */
valkey_cluster, valkey_cluster_class, valkey_cluster, fork,
/* List of tracepoint arguments (input) */
LTTNG_UST_TP_ARGS(
uint64_t, duration
)
)
#define valkey_cluster_trace(...) lttng_ust_tracepoint(__VA_ARGS__)
#endif /* __VALKEY_TRACE_CLUSTER_H__ */

View File

@ -2605,9 +2605,9 @@ jemalloc-bg-thread yes
# Normally you can do this using the "taskset" command, however it is also
# possible to do this via the server configuration directly, both in Linux and FreeBSD.
#
# You can pin the server/IO threads, bio threads, aof rewrite child process, and
# the bgsave child process. The syntax to specify the cpu list is the same as
# the taskset command:
# You can pin the server/IO threads, bio threads, aof rewrite child process,
# bgsave child process and the slot migration process.
# The syntax to specify the cpu list is the same as the taskset command:
#
# Set server/io threads to cpu affinity 0,2,4,6:
# server-cpulist 0-7:2
@ -2618,7 +2618,7 @@ jemalloc-bg-thread yes
# Set aof rewrite child process to cpu affinity 8,9,10,11:
# aof-rewrite-cpulist 8-11
#
# Set bgsave child process to cpu affinity 1,10,11
# Set bgsave (or slot migration) child process to cpu affinity 1,10,11:
# bgsave-cpulist 1,10-11
# In some cases the server will emit warnings and even refuse to start if it detects