From d04cf783b9e56192b2b13b54a403ef2329840561 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Viktor=20S=C3=B6derqvist?= Date: Fri, 10 Oct 2025 15:24:10 +0200 Subject: [PATCH 1/6] Produce RDB 11 for replicas older than 9.0 MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Only in full sync. Use the replica's REPLCONF VERSION to select RDB version. Abort if any data that can't be represented in the older RDB version is found. Signed-off-by: Viktor Söderqvist --- src/aof.c | 2 +- src/cluster.c | 6 +- src/cluster_migrateslots.c | 6 +- src/cluster_migrateslots.h | 2 +- src/rdb.c | 78 +++++++++++-------- src/rdb.h | 22 +++--- src/replication.c | 56 ++++++++++--- src/server.h | 1 + .../integration/cross-version-replication.tcl | 77 +++++++++++++++--- tests/support/util.tcl | 19 +++++ tests/unit/cluster/cross-version-cluster.tcl | 5 +- 11 files changed, 204 insertions(+), 70 deletions(-) diff --git a/src/aof.c b/src/aof.c index 62c0ab46b..73cc8ce49 100644 --- a/src/aof.c +++ b/src/aof.c @@ -2401,7 +2401,7 @@ int rewriteAppendOnlyFile(char *filename) { if (server.aof_use_rdb_preamble) { int error; - if (rdbSaveRio(REPLICA_REQ_NONE, &aof, &error, RDBFLAGS_AOF_PREAMBLE, NULL) == C_ERR) { + if (rdbSaveRio(REPLICA_REQ_NONE, RDB_VERSION, &aof, &error, RDBFLAGS_AOF_PREAMBLE, NULL) == C_ERR) { errno = error; goto werr; } diff --git a/src/cluster.c b/src/cluster.c index a188bef33..8382e0093 100644 --- a/src/cluster.c +++ b/src/cluster.c @@ -126,8 +126,10 @@ void createDumpPayload(rio *payload, robj *o, robj *key, int dbid) { /* Serialize the object in an RDB-like format. It consist of an object type * byte followed by the serialized object. This is understood by RESTORE. */ rioInitWithBuffer(payload, sdsempty()); - serverAssert(rdbSaveObjectType(payload, o)); - serverAssert(rdbSaveObject(payload, o, key, dbid)); + int rdbtype = rdbGetObjectType(o, RDB_VERSION); + serverAssert(rdbtype >= 0); + serverAssert(rdbSaveType(payload, rdbtype)); + serverAssert(rdbSaveObject(payload, o, key, dbid, rdbtype)); /* Write the footer, this is how it looks like: * ----------------+---------------------+---------------+ diff --git a/src/cluster_migrateslots.c b/src/cluster_migrateslots.c index 6042ee43c..26ebfacff 100644 --- a/src/cluster_migrateslots.c +++ b/src/cluster_migrateslots.c @@ -359,7 +359,7 @@ void fireModuleSlotMigrationEvent(slotMigrationJob *job, int subevent) { /* Save the active slot imports to the RDB file. The import job name and the * slot ranges are saved. */ -int clusterRDBSaveSlotImports(rio *rdb) { +int clusterRDBSaveSlotImports(rio *rdb, int rdbver) { if (!server.cluster_enabled) return C_OK; if (listLength(server.cluster->slot_migration_jobs) == 0) return C_OK; listNode *ln; @@ -371,6 +371,10 @@ int clusterRDBSaveSlotImports(rio *rdb) { slotMigrationJob *job = ln->value; if (isSlotMigrationJobFinished(job)) continue; if (job->type == SLOT_MIGRATION_EXPORT) continue; + if (rdbver < 80) { + serverLog(LL_WARNING, "Can't store slot migrations in RDB version %d", rdbver); + return C_ERR; + } if (rdbSaveType(rdb, RDB_OPCODE_SLOT_IMPORT) < 0) return C_ERR; if (rdbSaveRawString(rdb, (unsigned char *)job->name, CLUSTER_NAMELEN) < 0) return C_ERR; if (rdbSaveLen(rdb, listLength(job->slot_ranges)) < 0) return C_ERR; diff --git a/src/cluster_migrateslots.h b/src/cluster_migrateslots.h index bb46fd884..ddde101ad 100644 --- a/src/cluster_migrateslots.h +++ b/src/cluster_migrateslots.h @@ -37,7 +37,7 @@ void clusterCleanSlotImportsOnFullSync(void); void clusterCleanSlotImportsOnPromotion(void); void clusterCleanSlotImportsBeforeLoad(void); void clusterCleanSlotImportsAfterLoad(void); -int clusterRDBSaveSlotImports(rio *rdb); +int clusterRDBSaveSlotImports(rio *rdb, int rdbver); int clusterRDBLoadSlotImport(rio *rdb); #endif /* __CLUSTER_MIGRATESLOTS_H */ diff --git a/src/rdb.c b/src/rdb.c index bec340cb0..648dddde5 100644 --- a/src/rdb.c +++ b/src/rdb.c @@ -707,43 +707,47 @@ int rdbLoadBinaryFloatValue(rio *rdb, float *val) { return 0; } -/* Save the object type of object "o". */ -int rdbSaveObjectType(rio *rdb, robj *o) { +/* Return the RDB object type to use for saving object "o", or -1 if the object + * can't be represented in the given RDB version (only for older RDB). */ +int rdbGetObjectType(robj *o, int rdbver) { switch (o->type) { - case OBJ_STRING: return rdbSaveType(rdb, RDB_TYPE_STRING); + case OBJ_STRING: return RDB_TYPE_STRING; case OBJ_LIST: if (o->encoding == OBJ_ENCODING_QUICKLIST || o->encoding == OBJ_ENCODING_LISTPACK) - return rdbSaveType(rdb, RDB_TYPE_LIST_QUICKLIST_2); + return RDB_TYPE_LIST_QUICKLIST_2; else serverPanic("Unknown list encoding"); case OBJ_SET: if (o->encoding == OBJ_ENCODING_INTSET) - return rdbSaveType(rdb, RDB_TYPE_SET_INTSET); + return RDB_TYPE_SET_INTSET; else if (o->encoding == OBJ_ENCODING_HASHTABLE) - return rdbSaveType(rdb, RDB_TYPE_SET); + return RDB_TYPE_SET; else if (o->encoding == OBJ_ENCODING_LISTPACK) - return rdbSaveType(rdb, RDB_TYPE_SET_LISTPACK); + return RDB_TYPE_SET_LISTPACK; else serverPanic("Unknown set encoding"); case OBJ_ZSET: if (o->encoding == OBJ_ENCODING_LISTPACK) - return rdbSaveType(rdb, RDB_TYPE_ZSET_LISTPACK); + return RDB_TYPE_ZSET_LISTPACK; else if (o->encoding == OBJ_ENCODING_SKIPLIST) - return rdbSaveType(rdb, RDB_TYPE_ZSET_2); + return RDB_TYPE_ZSET_2; else serverPanic("Unknown sorted set encoding"); case OBJ_HASH: if (o->encoding == OBJ_ENCODING_LISTPACK) - return rdbSaveType(rdb, RDB_TYPE_HASH_LISTPACK); + return RDB_TYPE_HASH_LISTPACK; else if (o->encoding == OBJ_ENCODING_HASHTABLE) if (hashTypeHasVolatileFields(o)) - return rdbSaveType(rdb, RDB_TYPE_HASH_2); + if (rdbver >= 80) + return RDB_TYPE_HASH_2; + else + return -1; /* skip the key; can't be sent over old RDB */ else - return rdbSaveType(rdb, RDB_TYPE_HASH); + return RDB_TYPE_HASH; else serverPanic("Unknown hash encoding"); - case OBJ_STREAM: return rdbSaveType(rdb, RDB_TYPE_STREAM_LISTPACKS_3); - case OBJ_MODULE: return rdbSaveType(rdb, RDB_TYPE_MODULE_2); + case OBJ_STREAM: return RDB_TYPE_STREAM_LISTPACKS_3; + case OBJ_MODULE: return RDB_TYPE_MODULE_2; default: serverPanic("Unknown object type"); } return -1; /* avoid warning */ @@ -860,7 +864,7 @@ size_t rdbSaveStreamConsumers(rio *rdb, streamCG *cg) { /* Save an Object. * Returns -1 on error, number of bytes written on success. */ -ssize_t rdbSaveObject(rio *rdb, robj *o, robj *key, int dbid) { +ssize_t rdbSaveObject(rio *rdb, robj *o, robj *key, int dbid, unsigned char rdbtype) { ssize_t n = 0, nwritten = 0; if (o->type == OBJ_STRING) { /* Save a string value */ @@ -978,6 +982,7 @@ ssize_t rdbSaveObject(rio *rdb, robj *o, robj *key, int dbid) { if ((n = rdbSaveRawString(rdb, o->ptr, l)) == -1) return -1; nwritten += n; } else if (o->encoding == OBJ_ENCODING_HASHTABLE) { + serverAssert(rdbtype == RDB_TYPE_HASH || rdbtype == RDB_TYPE_HASH_2); hashtable *ht = o->ptr; if ((n = rdbSaveLen(rdb, hashtableSize(ht))) == -1) { @@ -985,7 +990,7 @@ ssize_t rdbSaveObject(rio *rdb, robj *o, robj *key, int dbid) { } nwritten += n; /* check if need to add expired time for the hash fields */ - bool add_expiry = hashTypeHasVolatileFields(o); + bool add_expiry = (rdbtype == RDB_TYPE_HASH_2); hashtableIterator iter; hashtableInitIterator(&iter, ht, HASHTABLE_ITER_SKIP_VALIDATION); void *next; @@ -1164,7 +1169,9 @@ ssize_t rdbSaveObject(rio *rdb, robj *o, robj *key, int dbid) { * this length with very little changes to the code. In the future * we could switch to a faster solution. */ size_t rdbSavedObjectLen(robj *o, robj *key, int dbid) { - ssize_t len = rdbSaveObject(NULL, o, key, dbid); + int rdbtype = rdbGetObjectType(o, RDB_VERSION); + serverAssert(rdbtype != -1); + ssize_t len = rdbSaveObject(NULL, o, key, dbid, rdbtype); serverAssertWithInfo(NULL, o, len != -1); return len; } @@ -1172,7 +1179,7 @@ size_t rdbSavedObjectLen(robj *o, robj *key, int dbid) { /* Save a key-value pair, with expire time, type, key, value. * On error -1 is returned. * On success if the key was actually saved 1 is returned. */ -int rdbSaveKeyValuePair(rio *rdb, robj *key, robj *val, long long expiretime, int dbid) { +int rdbSaveKeyValuePair(rio *rdb, robj *key, robj *val, long long expiretime, int dbid, int rdbver) { int savelru = server.maxmemory_policy & MAXMEMORY_FLAG_LRU; int savelfu = server.maxmemory_policy & MAXMEMORY_FLAG_LFU; @@ -1203,9 +1210,15 @@ int rdbSaveKeyValuePair(rio *rdb, robj *key, robj *val, long long expiretime, in } /* Save type, key, value */ - if (rdbSaveObjectType(rdb, val) == -1) return -1; + int rdbtype = rdbGetObjectType(val, rdbver); + if (rdbtype == -1) { + serverLog(LL_WARNING, "Can't store key '%s' (db %d) in RDB version %d", + (char *)key->ptr, dbid, rdbver); + return -1; + } + if (rdbSaveType(rdb, rdbtype) == -1) return -1; if (rdbSaveStringObject(rdb, key) == -1) return -1; - if (rdbSaveObject(rdb, val, key, dbid) == -1) return -1; + if (rdbSaveObject(rdb, val, key, dbid, rdbtype) == -1) return -1; /* Delay return if required (for testing) */ if (server.rdb_key_save_delay) debugDelay(server.rdb_key_save_delay); @@ -1364,7 +1377,7 @@ werr: return -1; } -ssize_t rdbSaveDb(rio *rdb, int dbid, int rdbflags, long *key_counter) { +ssize_t rdbSaveDb(rio *rdb, int dbid, int rdbflags, int rdbver, long *key_counter) { ssize_t written = 0; ssize_t res; kvstoreIterator *kvs_it = NULL; @@ -1419,7 +1432,7 @@ ssize_t rdbSaveDb(rio *rdb, int dbid, int rdbflags, long *key_counter) { initStaticStringObject(key, keystr); expire = objectGetExpire(o); - if ((res = rdbSaveKeyValuePair(rdb, &key, o, expire, dbid)) < 0) goto werr; + if ((res = rdbSaveKeyValuePair(rdb, &key, o, expire, dbid, rdbver)) < 0) goto werr; written += res; /* In fork child process, we can try to release memory back to the @@ -1455,14 +1468,16 @@ werr: * When the function returns C_ERR and if 'error' is not NULL, the * integer pointed by 'error' is set to the value of errno just after the I/O * error. */ -int rdbSaveRio(int req, rio *rdb, int *error, int rdbflags, rdbSaveInfo *rsi) { +int rdbSaveRio(int req, int rdbver, rio *rdb, int *error, int rdbflags, rdbSaveInfo *rsi) { char magic[10]; uint64_t cksum; long key_counter = 0; int j; if (server.rdb_checksum) rdb->update_cksum = rioGenericUpdateChecksum; - snprintf(magic, sizeof(magic), "VALKEY%03d", RDB_VERSION); + const char *magic_prefix = rdbUseValkeyMagic(rdbver) ? "VALKEY" : "REDIS0"; + serverAssert(rdbver >= 0 && rdbver <= RDB_VERSION); + snprintf(magic, sizeof(magic), "%s%03d", magic_prefix, rdbver); if (rdbWriteRaw(rdb, magic, 9) == -1) goto werr; if (rdbSaveInfoAuxFields(rdb, rdbflags, rsi) == -1) goto werr; if (!(req & REPLICA_REQ_RDB_EXCLUDE_DATA) && rdbSaveModulesAux(rdb, VALKEYMODULE_AUX_BEFORE_RDB) == -1) goto werr; @@ -1474,9 +1489,9 @@ int rdbSaveRio(int req, rio *rdb, int *error, int rdbflags, rdbSaveInfo *rsi) { if (!(req & REPLICA_REQ_RDB_EXCLUDE_DATA)) { /* RDB slot import info is encoded in a required opcode since exposing * importing slots is a consistency problem. */ - if (clusterRDBSaveSlotImports(rdb) == C_ERR) goto werr; + if (clusterRDBSaveSlotImports(rdb, rdbver) == C_ERR) goto werr; for (j = 0; j < server.dbnum; j++) { - if (rdbSaveDb(rdb, j, rdbflags, &key_counter) == -1) goto werr; + if (rdbSaveDb(rdb, j, rdbflags, rdbver, &key_counter) == -1) goto werr; } } @@ -1506,7 +1521,7 @@ werr: * While the suffix is the 40 bytes hex string we announced in the prefix. * This way processes receiving the payload can understand when it ends * without doing any processing of the content. */ -int rdbSaveRioWithEOFMark(int req, rio *rdb, int *error, rdbSaveInfo *rsi) { +int rdbSaveRioWithEOFMark(int req, int rdbver, rio *rdb, int *error, rdbSaveInfo *rsi) { char eofmark[RDB_EOF_MARK_SIZE]; startSaving(RDBFLAGS_REPLICATION); @@ -1515,7 +1530,7 @@ int rdbSaveRioWithEOFMark(int req, rio *rdb, int *error, rdbSaveInfo *rsi) { if (rioWrite(rdb, "$EOF:", 5) == 0) goto werr; if (rioWrite(rdb, eofmark, RDB_EOF_MARK_SIZE) == 0) goto werr; if (rioWrite(rdb, "\r\n", 2) == 0) goto werr; - if (rdbSaveRio(req, rdb, error, RDBFLAGS_REPLICATION, rsi) == C_ERR) goto werr; + if (rdbSaveRio(req, rdbver, rdb, error, RDBFLAGS_REPLICATION, rsi) == C_ERR) goto werr; if (rioWrite(rdb, eofmark, RDB_EOF_MARK_SIZE) == 0) goto werr; stopSaving(1); return C_OK; @@ -1554,7 +1569,7 @@ static int rdbSaveInternal(int req, const char *filename, rdbSaveInfo *rsi, int if (!(rdbflags & RDBFLAGS_KEEP_CACHE)) rioSetReclaimCache(&rdb, 1); } - if (rdbSaveRio(req, &rdb, &error, rdbflags, rsi) == C_ERR) { + if (rdbSaveRio(req, RDB_VERSION, &rdb, &error, rdbflags, rsi) == C_ERR) { errno = error; err_op = "rdbSaveRio"; goto werr; @@ -3655,7 +3670,7 @@ void killRDBChild(void) { /* Spawn an RDB child that writes the RDB to the sockets of the replicas * that are currently in REPLICA_STATE_WAIT_BGSAVE_START state. */ -int rdbSaveToReplicasSockets(int req, rdbSaveInfo *rsi) { +int rdbSaveToReplicasSockets(int req, int rdbver, rdbSaveInfo *rsi) { listNode *ln; listIter li; pid_t childpid; @@ -3711,6 +3726,7 @@ int rdbSaveToReplicasSockets(int req, rdbSaveInfo *rsi) { if (replica->repl_data->repl_state == REPLICA_STATE_WAIT_BGSAVE_START) { /* Check replica has the exact requirements */ if (replica->repl_data->replica_req != req) continue; + if (replicaRdbVersion(replica) != rdbver) continue; conns[connsnum++] = replica->conn; if (dual_channel) { @@ -3756,7 +3772,7 @@ int rdbSaveToReplicasSockets(int req, rdbSaveInfo *rsi) { if (skip_rdb_checksum) rdb.flags |= RIO_FLAG_SKIP_RDB_CHECKSUM; - retval = rdbSaveRioWithEOFMark(req, &rdb, NULL, rsi); + retval = rdbSaveRioWithEOFMark(req, rdbver, &rdb, NULL, rsi); if (retval == C_OK && rioFlush(&rdb) == 0) retval = C_ERR; if (retval == C_OK) { diff --git a/src/rdb.h b/src/rdb.h index 2ab0ef84f..1cdbf978d 100644 --- a/src/rdb.h +++ b/src/rdb.h @@ -61,6 +61,10 @@ static inline bool rdbIsForeignVersion(int rdbver) { return rdbver >= RDB_FOREIGN_VERSION_MIN && rdbver <= RDB_FOREIGN_VERSION_MAX; } +static inline bool rdbUseValkeyMagic(int rdbver) { + return rdbver > RDB_FOREIGN_VERSION_MAX; +} + /* Defines related to the dump file format. To store 32 bits lengths for short * keys requires a lot of space, so we check the most significant 2 bits of * the first byte to interpreter the length: @@ -110,13 +114,13 @@ enum RdbType { RDB_TYPE_HASH_ZIPLIST = 13, RDB_TYPE_LIST_QUICKLIST = 14, RDB_TYPE_STREAM_LISTPACKS = 15, - RDB_TYPE_HASH_LISTPACK = 16, + RDB_TYPE_HASH_LISTPACK = 16, /* Added in RDB 10 (7.0) */ RDB_TYPE_ZSET_LISTPACK = 17, RDB_TYPE_LIST_QUICKLIST_2 = 18, RDB_TYPE_STREAM_LISTPACKS_2 = 19, - RDB_TYPE_SET_LISTPACK = 20, + RDB_TYPE_SET_LISTPACK = 20, /* Added in RDB 11 (7.2) */ RDB_TYPE_STREAM_LISTPACKS_3 = 21, - RDB_TYPE_HASH_2 = 22, /* Hash with field-level expiration (Valkey 9.0) */ + RDB_TYPE_HASH_2 = 22, /* Hash with field-level expiration, RDB 80 (9.0) */ RDB_TYPE_LAST }; /* NOTE: WHEN ADDING NEW RDB TYPE, UPDATE rdb_type_string[] */ @@ -131,7 +135,7 @@ enum RdbType { /* Special RDB opcodes (saved/loaded with rdbSaveType/rdbLoadType). * These are special RDB types, but they start from 255 and grow down. */ -#define RDB_OPCODE_SLOT_IMPORT 243 /* Slot import state. */ +#define RDB_OPCODE_SLOT_IMPORT 243 /* Slot import state (9.0). */ #define RDB_OPCODE_SLOT_INFO 244 /* Foreign slot info, safe to ignore. */ #define RDB_OPCODE_FUNCTION2 245 /* function library data */ #define RDB_OPCODE_FUNCTION_PRE_GA 246 /* old function library data for 7.0 rc1 and rc2 */ @@ -183,19 +187,19 @@ ssize_t rdbSaveMillisecondTime(rio *rdb, long long t); long long rdbLoadMillisecondTime(rio *rdb, int rdbver); uint64_t rdbLoadLen(rio *rdb, int *isencoded); int rdbLoadLenByRef(rio *rdb, int *isencoded, uint64_t *lenptr); -int rdbSaveObjectType(rio *rdb, robj *o); +int rdbGetObjectType(robj *o, int rdbver); int rdbLoadObjectType(rio *rdb); int rdbLoad(char *filename, rdbSaveInfo *rsi, int rdbflags); int rdbSaveBackground(int req, char *filename, rdbSaveInfo *rsi, int rdbflags); -int rdbSaveToReplicasSockets(int req, rdbSaveInfo *rsi); +int rdbSaveToReplicasSockets(int req, int rdbver, rdbSaveInfo *rsi); void rdbRemoveTempFile(pid_t childpid, int from_signal); int rdbSaveToFile(const char *filename); int rdbSave(int req, char *filename, rdbSaveInfo *rsi, int rdbflags); -ssize_t rdbSaveObject(rio *rdb, robj *o, robj *key, int dbid); +ssize_t rdbSaveObject(rio *rdb, robj *o, robj *key, int dbid, unsigned char type); size_t rdbSavedObjectLen(robj *o, robj *key, int dbid); robj *rdbLoadObject(int rdbtype, rio *rdb, sds key, int dbid, int *error); void backgroundSaveDoneHandler(int exitcode, int bysignal); -int rdbSaveKeyValuePair(rio *rdb, robj *key, robj *val, long long expiretime, int dbid); +int rdbSaveKeyValuePair(rio *rdb, robj *key, robj *val, long long expiretime, int dbid, int rdbver); ssize_t rdbSaveSingleModuleAux(rio *rdb, int when, moduleType *mt); robj *rdbLoadCheckModuleValue(rio *rdb, char *modulename); robj *rdbLoadStringObject(rio *rdb); @@ -209,7 +213,7 @@ int rdbLoadBinaryFloatValue(rio *rdb, float *val); int rdbLoadRio(rio *rdb, int rdbflags, rdbSaveInfo *rsi); int rdbLoadRioWithLoadingCtxScopedRdb(rio *rdb, int rdbflags, rdbSaveInfo *rsi, rdbLoadingCtx *rdb_loading_ctx); int rdbFunctionLoad(rio *rdb, int ver, functionsLibCtx *lib_ctx, int rdbflags, sds *err); -int rdbSaveRio(int req, rio *rdb, int *error, int rdbflags, rdbSaveInfo *rsi); +int rdbSaveRio(int req, int rdbver, rio *rdb, int *error, int rdbflags, rdbSaveInfo *rsi); ssize_t rdbSaveFunctions(rio *rdb); rdbSaveInfo *rdbPopulateSaveInfo(rdbSaveInfo *rsi); diff --git a/src/replication.c b/src/replication.c index 64f5d256e..b5ddd3696 100644 --- a/src/replication.c +++ b/src/replication.c @@ -215,6 +215,23 @@ static inline client *lookupRdbClientByID(uint64_t id) { return c; } +/* Decide which RDB version to send to a replica. */ +int replicaRdbVersion(client *replica) { + if (!(replica->repl_data->replica_capa & REPLICA_CAPA_EOF)) { + /* The replica doesn't have CAPA EOF, so we need to use disk-based + * replication, but we don't want to write an old RDB version to disk, + * so we force the latest RDB version. */ + return RDB_VERSION; + } else if (replica->repl_data->replica_version >= 0x090000) { + return 80; + } else { + /* RDB 11 is used in 7.2 and 8.x. 7.2 and older don't report their + * version. If no version was provided, we assume it's 7.2. We don't + * currently produce RDB 10 (7.0) and RDB 9 (5.0--6.2). */ + return 11; + } +} + /* Replication: Primary side - connections association. * During dual channel sync, association is used to keep replication data * in the backlog until the replica requests PSYNC. @@ -937,6 +954,9 @@ need_full_resync: * of the replicas waiting for this BGSAVE, so represents the replica capabilities * all the replicas support. Can be tested via REPLICA_CAPA_* macros. * + * The rdbver argument is the RDB version to use. It should be calculated based + * on what the replicas reported using REPLCONF VERSION. + * * Side effects, other than starting a BGSAVE: * * 1) Handle the replicas in WAIT_START state, by preparing them for a full @@ -947,16 +967,22 @@ need_full_resync: * started. * * Returns C_OK on success or C_ERR otherwise. */ -int startBgsaveForReplication(int mincapa, int req) { +int startBgsaveForReplication(int mincapa, int req, int rdbver) { int retval; int socket_target = 0; listIter li; listNode *ln; - /* We use a socket target if replica can handle the EOF marker and we're configured to do diskless syncs. - * Note that in case we're creating a "filtered" RDB (functions-only, for example) we also force socket replication - * to avoid overwriting the snapshot RDB file with filtered data. */ - socket_target = (server.repl_diskless_sync || req & REPLICA_REQ_RDB_MASK) && (mincapa & REPLICA_CAPA_EOF); + /* We use a socket target if replica can handle the EOF marker and we're + * configured to do diskless syncs. + * + * Note that in case we're creating a "filtered" RDB (functions-only, for + * example) or an older RDB version, we also force socket replication to + * avoid overwriting the snapshot RDB file with filtered data. */ + socket_target = ((server.repl_diskless_sync || + (req & REPLICA_REQ_RDB_MASK) || + rdbver != RDB_VERSION) && + (mincapa & REPLICA_CAPA_EOF)); /* `SYNC` should have failed with error if we don't support socket and require a filter, assert this here */ serverAssert(socket_target || !(req & REPLICA_REQ_RDB_MASK)); @@ -970,7 +996,7 @@ int startBgsaveForReplication(int mincapa, int req) { * otherwise replica will miss repl-stream-db. */ if (rsiptr) { if (socket_target) - retval = rdbSaveToReplicasSockets(req, rsiptr); + retval = rdbSaveToReplicasSockets(req, rdbver, rsiptr); else { /* Keep the page cache since it'll get used soon */ retval = rdbSaveBackground(req, server.rdb_filename, rsiptr, RDBFLAGS_REPLICATION | RDBFLAGS_KEEP_CACHE); @@ -1019,6 +1045,7 @@ int startBgsaveForReplication(int mincapa, int req) { if (replica->repl_data->repl_state == REPLICA_STATE_WAIT_BGSAVE_START) { /* Check replica has the exact requirements */ if (replica->repl_data->replica_req != req) continue; + if (replicaRdbVersion(replica) != rdbver) continue; replicationSetupReplicaForFullResync(replica, getPsyncInitialOffset()); } } @@ -1219,7 +1246,9 @@ void syncCommand(client *c) { /* We don't have a BGSAVE in progress, let's start one. Diskless * or disk-based mode is determined by replica's capacity. */ if (!hasActiveChildProcess()) { - startBgsaveForReplication(c->repl_data->replica_capa, c->repl_data->replica_req); + startBgsaveForReplication(c->repl_data->replica_capa, + c->repl_data->replica_req, + replicaRdbVersion(c)); } else { serverLog(LL_NOTICE, "No BGSAVE in progress, but another BG operation is active. " "BGSAVE for replication delayed"); @@ -5303,7 +5332,7 @@ void replicationCron(void) { replication_cron_loops++; /* Incremented with frequency 1 HZ. */ } -int shouldStartChildReplication(int *mincapa_out, int *req_out) { +int shouldStartChildReplication(int *mincapa_out, int *req_out, int *rdbver_out) { /* We should start a BGSAVE good for replication if we have replicas in * WAIT_BGSAVE_START state. * @@ -5314,6 +5343,7 @@ int shouldStartChildReplication(int *mincapa_out, int *req_out) { time_t idle, max_idle = 0; int replicas_waiting = 0; int mincapa; + int rdbver; int req; int first = 1; listNode *ln; @@ -5326,7 +5356,9 @@ int shouldStartChildReplication(int *mincapa_out, int *req_out) { if (first) { /* Get first replica's requirements */ req = replica->repl_data->replica_req; - } else if (req != replica->repl_data->replica_req) { + rdbver = replicaRdbVersion(replica); + } else if (req != replica->repl_data->replica_req || + rdbver != replicaRdbVersion(replica)) { /* Skip replicas that don't match */ continue; } @@ -5344,6 +5376,7 @@ int shouldStartChildReplication(int *mincapa_out, int *req_out) { max_idle >= server.repl_diskless_sync_delay)) { if (mincapa_out) *mincapa_out = mincapa; if (req_out) *req_out = req; + if (rdbver_out) *rdbver_out = rdbver; return 1; } } @@ -5354,12 +5387,13 @@ int shouldStartChildReplication(int *mincapa_out, int *req_out) { void replicationStartPendingFork(void) { int mincapa = -1; int req = -1; + int rdbver = -1; - if (shouldStartChildReplication(&mincapa, &req)) { + if (shouldStartChildReplication(&mincapa, &req, &rdbver)) { /* Start the BGSAVE. The called function may start a * BGSAVE with socket target or disk target depending on the * configuration and replicas capabilities and requirements. */ - startBgsaveForReplication(mincapa, req); + startBgsaveForReplication(mincapa, req, rdbver); } } diff --git a/src/server.h b/src/server.h index 24ca1ba82..22f0dd0cb 100644 --- a/src/server.h +++ b/src/server.h @@ -3117,6 +3117,7 @@ void abortFailover(const char *err); const char *getFailoverStateString(void); sds getReplicaPortString(void); int sendCurrentOffsetToReplica(client *replica); +int replicaRdbVersion(client *replica); void addRdbReplicaToPsyncWait(client *replica); void initClientReplicationData(client *c); void freeClientReplicationData(client *c); diff --git a/tests/integration/cross-version-replication.tcl b/tests/integration/cross-version-replication.tcl index 47329ee5a..9954de852 100644 --- a/tests/integration/cross-version-replication.tcl +++ b/tests/integration/cross-version-replication.tcl @@ -1,18 +1,9 @@ # Test replication from an older version primary. # # Use minimal.conf to make sure we don't use any configs not supported on the old version. - -proc server_name_and_version {} { - set server_name [s server_name] - if {$server_name eq {}} { - set server_name redis - } - set server_version [s "${server_name}_version"] - return "$server_name $server_version" -} - start_server {tags {"repl needs:other-server external:skip compatible-redis"} start-other-server 1 config "minimal.conf"} { - set primary_name_and_version [server_name_and_version] + set hello [r hello] + set primary_name_and_version "[dict get $hello server] [dict get $hello version]" r set foo bar start_server {} { @@ -32,3 +23,67 @@ start_server {tags {"repl needs:other-server external:skip compatible-redis"} st } } } + +# Test replication from the current version to an older version replica. +start_server {tags {"repl needs:other-server external:skip"}} { + set primary [srv 0 client] + set primary_host [srv 0 host] + set primary_port [srv 0 port] + $primary config set repl-diskless-sync yes + $primary config set repl-diskless-sync-delay 1 + + # As a side-effect, this first start_server block initializes old_replica_version which + # is used in the tests below. + start_server {start-other-server 1 config "minimal.conf"} { + set hello [r hello] + set old_replica_version [dict get $hello version] + # set replica_name_and_version "[dict get $hello server] $replica_version" + set old_replica [srv 0 client] + start_server {} { + set new_replica [srv 0 client] + test {Keys can be sync'ed by old and new replicas} { + $primary set foo bar + $old_replica replicaof $primary_host $primary_port + $new_replica replicaof $primary_host $primary_port + wait_for_sync $old_replica 500 100 + wait_for_sync $new_replica 500 100 + assert_equal bar [$old_replica get foo] + assert_equal bar [$new_replica get foo] + } + } + } + + test "Old pre-HFE replica can't sync but doesn't prevent new replica from sync" { + if {[version_greater_or_equal $old_replica_version 9.0.0]} { + skip "Replica $old_replica_version does support HFE" + } + r flushall + r hsetex hfe ex 1000 fields 1 field1 value1 + start_server {start-other-server 1 config "minimal.conf"} { + set old_replica [srv 0 client] + start_server {} { + set new_replica [srv 0 client] + $old_replica replicaof $primary_host $primary_port + $new_replica replicaof $primary_host $primary_port + wait_for_sync $new_replica 500 100 + wait_for_log_messages -2 [list {*Can't store key 'hfe'*}] 0 50 100 + assert_equal value1 [$new_replica hget hfe field1] + assert_match {*master_link_status:up*} [$new_replica info replication] + assert_match {*master_link_status:down*} [$old_replica info replication] + } + } + } + + test "Replica with HFE support can full sync" { + if {![version_greater_or_equal $old_replica_version 9.0.0]} { + skip "Replica $old_replica_version doesn't support HFE" + } + r hsetex hfe ex 1000 fields 1 field1 value1 + start_server {start-other-server 1 config "minimal.conf"} { + set old_replica [srv 0 client] + $old_replica replicaof $primary_host $primary_port + wait_for_sync $old_replica 500 100 + assert_equal value1 [$old_replica hget hfe field1] + } + } +} diff --git a/tests/support/util.tcl b/tests/support/util.tcl index 5a001c04a..893b81f22 100644 --- a/tests/support/util.tcl +++ b/tests/support/util.tcl @@ -1280,3 +1280,22 @@ proc bp {{s {}}} { puts $res } } + +# Compares two version strings. Returns 1 if a >= b, 0 otherwise. +proc version_greater_or_equal {a b} { + regexp {^([0-9]+).([0-9]+).([0-9]+)$} $a -> a_major a_minor a_patch + regexp {^([0-9]+).([0-9]+).([0-9]+)$} $b -> b_major b_minor b_patch + if {$a_major < $b_major} { + return 0 + } elseif {$a_major > $b_major} { + return 1 + } elseif {$a_minor < $b_minor} { + return 0 + } elseif {$a_minor > $b_minor} { + return 1 + } elseif {$a_patch < $b_patch} { + return 0 + } else { + return 1 + } +} diff --git a/tests/unit/cluster/cross-version-cluster.tcl b/tests/unit/cluster/cross-version-cluster.tcl index bfc57e10c..10543ef0d 100644 --- a/tests/unit/cluster/cross-version-cluster.tcl +++ b/tests/unit/cluster/cross-version-cluster.tcl @@ -17,9 +17,8 @@ tags {external:skip needs:other-server cluster singledb} { set primary_id [$primary cluster myid] start_server {config "minimal-cluster.conf" start-other-server 1 overrides {cluster-ping-interval 1000}} { - set res [dict get [r hello] version] - assert [regexp {([0-9]+)\.([0-9]+)\.[0-9]+} $res -> major minor] - if {($major < 8) || ($major == 8 && $minor < 1)} { + set version [dict get [r hello] version] + if {![version_greater_or_equal $version 8.1.0]} { skip "Requires Valkey 8.1 or above" } r config set rdb-version-check relaxed From ae6567d09751aedf81eb05b9d4a650c33b95e784 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Viktor=20S=C3=B6derqvist?= Date: Mon, 27 Oct 2025 14:13:36 +0100 Subject: [PATCH 2/6] Minor changes MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Signed-off-by: Viktor Söderqvist --- src/replication.c | 7 +++---- tests/integration/cross-version-replication.tcl | 1 + tests/support/util.tcl | 4 ++-- 3 files changed, 6 insertions(+), 6 deletions(-) diff --git a/src/replication.c b/src/replication.c index b5ddd3696..5c741ec39 100644 --- a/src/replication.c +++ b/src/replication.c @@ -979,10 +979,9 @@ int startBgsaveForReplication(int mincapa, int req, int rdbver) { * Note that in case we're creating a "filtered" RDB (functions-only, for * example) or an older RDB version, we also force socket replication to * avoid overwriting the snapshot RDB file with filtered data. */ - socket_target = ((server.repl_diskless_sync || - (req & REPLICA_REQ_RDB_MASK) || - rdbver != RDB_VERSION) && - (mincapa & REPLICA_CAPA_EOF)); + socket_target = (mincapa & REPLICA_CAPA_EOF) && (server.repl_diskless_sync || + (req & REPLICA_REQ_RDB_MASK) || + rdbver != RDB_VERSION); /* `SYNC` should have failed with error if we don't support socket and require a filter, assert this here */ serverAssert(socket_target || !(req & REPLICA_REQ_RDB_MASK)); diff --git a/tests/integration/cross-version-replication.tcl b/tests/integration/cross-version-replication.tcl index 9954de852..0e3de03de 100644 --- a/tests/integration/cross-version-replication.tcl +++ b/tests/integration/cross-version-replication.tcl @@ -78,6 +78,7 @@ start_server {tags {"repl needs:other-server external:skip"}} { if {![version_greater_or_equal $old_replica_version 9.0.0]} { skip "Replica $old_replica_version doesn't support HFE" } + r flushall r hsetex hfe ex 1000 fields 1 field1 value1 start_server {start-other-server 1 config "minimal.conf"} { set old_replica [srv 0 client] diff --git a/tests/support/util.tcl b/tests/support/util.tcl index 893b81f22..5867c3554 100644 --- a/tests/support/util.tcl +++ b/tests/support/util.tcl @@ -1283,8 +1283,8 @@ proc bp {{s {}}} { # Compares two version strings. Returns 1 if a >= b, 0 otherwise. proc version_greater_or_equal {a b} { - regexp {^([0-9]+).([0-9]+).([0-9]+)$} $a -> a_major a_minor a_patch - regexp {^([0-9]+).([0-9]+).([0-9]+)$} $b -> b_major b_minor b_patch + regexp {^([0-9]+)\.([0-9]+)\.([0-9]+)$} $a -> a_major a_minor a_patch + regexp {^([0-9]+)\.([0-9]+)\.([0-9]+)$} $b -> b_major b_minor b_patch if {$a_major < $b_major} { return 0 } elseif {$a_major > $b_major} { From 12868be9b2c4871e7c00b3532598f61c191ffcd8 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Viktor=20S=C3=B6derqvist?= Date: Tue, 16 Dec 2025 15:41:18 +0100 Subject: [PATCH 3/6] Address review comments MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Signed-off-by: Viktor Söderqvist --- src/rdb.h | 10 ++++++++++ src/replication.c | 22 +++++++++++++++------- 2 files changed, 25 insertions(+), 7 deletions(-) diff --git a/src/rdb.h b/src/rdb.h index 1cdbf978d..4cc76781a 100644 --- a/src/rdb.h +++ b/src/rdb.h @@ -51,6 +51,16 @@ * string. */ #define RDB_VERSION 80 +/* Mapping between RDB version and the Valkey version where it was added. */ +static const int RDB_VERSION_MAP[][2] = { + /* {RDB version, added in Valkey version} from oldest to newest. */ + {11, 0x070200}, + {80, 0x090000}, +}; + +static_assert(RDB_VERSION == RDB_VERSION_MAP[sizeof(RDB_VERSION_MAP) / sizeof(RDB_VERSION_MAP[0]) - 1][0], + "RDB_VERSION must be the last entry in RDB_VERSION_MAP"); + /* Reserved range for foreign (unsupported, non-OSS) RDB format. */ #define RDB_FOREIGN_VERSION_MIN 12 #define RDB_FOREIGN_VERSION_MAX 79 diff --git a/src/replication.c b/src/replication.c index 5c741ec39..7ca42b329 100644 --- a/src/replication.c +++ b/src/replication.c @@ -220,16 +220,24 @@ int replicaRdbVersion(client *replica) { if (!(replica->repl_data->replica_capa & REPLICA_CAPA_EOF)) { /* The replica doesn't have CAPA EOF, so we need to use disk-based * replication, but we don't want to write an old RDB version to disk, + * because it can be reused for disk-based full sync to other replicas, * so we force the latest RDB version. */ return RDB_VERSION; - } else if (replica->repl_data->replica_version >= 0x090000) { - return 80; - } else { - /* RDB 11 is used in 7.2 and 8.x. 7.2 and older don't report their - * version. If no version was provided, we assume it's 7.2. We don't - * currently produce RDB 10 (7.0) and RDB 9 (5.0--6.2). */ - return 11; } + /* Search the version map backwards to select the highest RDB version the + * replica understands. */ + const int n = sizeof(RDB_VERSION_MAP) / sizeof(RDB_VERSION_MAP[0]); + for (int i = n - 1; i >= 0; i--) { + if (replica->repl_data->replica_version >= RDB_VERSION_MAP[i][1]) { + return RDB_VERSION_MAP[i][0]; + } + } + /* Fallback to RDB 11, which was introduced in 7.2. + * + * 7.2 and older don't report their version. If no version was provided, we + * assume it's 7.2. We don't currently produce RDB 10 (7.0) and RDB 9 + * (5.0--6.2). */ + return 11; } /* Replication: Primary side - connections association. From 607fa6ae0ef429dbb4ed2857b46d6c7b5410877d Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Viktor=20S=C3=B6derqvist?= Date: Tue, 16 Dec 2025 15:59:08 +0100 Subject: [PATCH 4/6] Update a comment MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Signed-off-by: Viktor Söderqvist --- src/rdb.c | 2 +- src/replication.c | 4 +++- 2 files changed, 4 insertions(+), 2 deletions(-) diff --git a/src/rdb.c b/src/rdb.c index 648dddde5..e55681a09 100644 --- a/src/rdb.c +++ b/src/rdb.c @@ -741,7 +741,7 @@ int rdbGetObjectType(robj *o, int rdbver) { if (rdbver >= 80) return RDB_TYPE_HASH_2; else - return -1; /* skip the key; can't be sent over old RDB */ + return -1; /* can't be represeted in old RDB */ else return RDB_TYPE_HASH; else diff --git a/src/replication.c b/src/replication.c index 7ca42b329..849ae401a 100644 --- a/src/replication.c +++ b/src/replication.c @@ -986,7 +986,9 @@ int startBgsaveForReplication(int mincapa, int req, int rdbver) { * * Note that in case we're creating a "filtered" RDB (functions-only, for * example) or an older RDB version, we also force socket replication to - * avoid overwriting the snapshot RDB file with filtered data. */ + * avoid overwriting the snapshot RDB file, which needs to be usable by + * other replicas (not using filtered RDB or older versions) in disk-based + * full sync. */ socket_target = (mincapa & REPLICA_CAPA_EOF) && (server.repl_diskless_sync || (req & REPLICA_REQ_RDB_MASK) || rdbver != RDB_VERSION); From 21c2abc2f1cbe3700a99c57169495fda22e6af5e Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Viktor=20S=C3=B6derqvist?= Date: Tue, 16 Dec 2025 16:27:44 +0100 Subject: [PATCH 5/6] Remove broken static_assert MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Signed-off-by: Viktor Söderqvist --- src/rdb.h | 3 --- 1 file changed, 3 deletions(-) diff --git a/src/rdb.h b/src/rdb.h index 4cc76781a..080439b7c 100644 --- a/src/rdb.h +++ b/src/rdb.h @@ -58,9 +58,6 @@ static const int RDB_VERSION_MAP[][2] = { {80, 0x090000}, }; -static_assert(RDB_VERSION == RDB_VERSION_MAP[sizeof(RDB_VERSION_MAP) / sizeof(RDB_VERSION_MAP[0]) - 1][0], - "RDB_VERSION must be the last entry in RDB_VERSION_MAP"); - /* Reserved range for foreign (unsupported, non-OSS) RDB format. */ #define RDB_FOREIGN_VERSION_MIN 12 #define RDB_FOREIGN_VERSION_MAX 79 From c09c9e2188bd62366c45263bbdbcce24c00cf2eb Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Viktor=20S=C3=B6derqvist?= Date: Tue, 16 Dec 2025 16:29:36 +0100 Subject: [PATCH 6/6] spelling MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Signed-off-by: Viktor Söderqvist --- src/rdb.c | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/rdb.c b/src/rdb.c index e55681a09..daeb132f4 100644 --- a/src/rdb.c +++ b/src/rdb.c @@ -741,7 +741,7 @@ int rdbGetObjectType(robj *o, int rdbver) { if (rdbver >= 80) return RDB_TYPE_HASH_2; else - return -1; /* can't be represeted in old RDB */ + return -1; /* can't be stored in old RDB */ else return RDB_TYPE_HASH; else