diff --git a/src/aof.c b/src/aof.c index 38a9f86eb..50edf6cc8 100644 --- a/src/aof.c +++ b/src/aof.c @@ -2401,7 +2401,7 @@ int rewriteAppendOnlyFile(char *filename) { if (server.aof_use_rdb_preamble) { int error; - if (rdbSaveRio(REPLICA_REQ_NONE, &aof, &error, RDBFLAGS_AOF_PREAMBLE, NULL) == C_ERR) { + if (rdbSaveRio(REPLICA_REQ_NONE, RDB_VERSION, &aof, &error, RDBFLAGS_AOF_PREAMBLE, NULL) == C_ERR) { errno = error; goto werr; } diff --git a/src/cluster.c b/src/cluster.c index 47917dac8..1aceabfbd 100644 --- a/src/cluster.c +++ b/src/cluster.c @@ -126,8 +126,10 @@ void createDumpPayload(rio *payload, robj *o, robj *key, int dbid) { /* Serialize the object in an RDB-like format. It consist of an object type * byte followed by the serialized object. This is understood by RESTORE. */ rioInitWithBuffer(payload, sdsempty()); - serverAssert(rdbSaveObjectType(payload, o)); - serverAssert(rdbSaveObject(payload, o, key, dbid)); + int rdbtype = rdbGetObjectType(o, RDB_VERSION); + serverAssert(rdbtype >= 0); + serverAssert(rdbSaveType(payload, rdbtype)); + serverAssert(rdbSaveObject(payload, o, key, dbid, rdbtype)); /* Write the footer, this is how it looks like: * ----------------+---------------------+---------------+ diff --git a/src/cluster_migrateslots.c b/src/cluster_migrateslots.c index ba5fab687..2ae7c1f15 100644 --- a/src/cluster_migrateslots.c +++ b/src/cluster_migrateslots.c @@ -359,7 +359,7 @@ void fireModuleSlotMigrationEvent(slotMigrationJob *job, int subevent) { /* Save the active slot imports to the RDB file. The import job name and the * slot ranges are saved. */ -int clusterRDBSaveSlotImports(rio *rdb) { +int clusterRDBSaveSlotImports(rio *rdb, int rdbver) { if (!server.cluster_enabled) return C_OK; if (listLength(server.cluster->slot_migration_jobs) == 0) return C_OK; listNode *ln; @@ -371,6 +371,10 @@ int clusterRDBSaveSlotImports(rio *rdb) { slotMigrationJob *job = ln->value; if (isSlotMigrationJobFinished(job)) continue; if (job->type == SLOT_MIGRATION_EXPORT) continue; + if (rdbver < 80) { + serverLog(LL_WARNING, "Can't store slot migrations in RDB version %d", rdbver); + return C_ERR; + } if (rdbSaveType(rdb, RDB_OPCODE_SLOT_IMPORT) < 0) return C_ERR; if (rdbSaveRawString(rdb, (unsigned char *)job->name, CLUSTER_NAMELEN) < 0) return C_ERR; if (rdbSaveLen(rdb, listLength(job->slot_ranges)) < 0) return C_ERR; diff --git a/src/cluster_migrateslots.h b/src/cluster_migrateslots.h index bb46fd884..ddde101ad 100644 --- a/src/cluster_migrateslots.h +++ b/src/cluster_migrateslots.h @@ -37,7 +37,7 @@ void clusterCleanSlotImportsOnFullSync(void); void clusterCleanSlotImportsOnPromotion(void); void clusterCleanSlotImportsBeforeLoad(void); void clusterCleanSlotImportsAfterLoad(void); -int clusterRDBSaveSlotImports(rio *rdb); +int clusterRDBSaveSlotImports(rio *rdb, int rdbver); int clusterRDBLoadSlotImport(rio *rdb); #endif /* __CLUSTER_MIGRATESLOTS_H */ diff --git a/src/rdb.c b/src/rdb.c index a192c5305..086b5b290 100644 --- a/src/rdb.c +++ b/src/rdb.c @@ -708,43 +708,47 @@ int rdbLoadBinaryFloatValue(rio *rdb, float *val) { return 0; } -/* Save the object type of object "o". */ -int rdbSaveObjectType(rio *rdb, robj *o) { +/* Return the RDB object type to use for saving object "o", or -1 if the object + * can't be represented in the given RDB version (only for older RDB). */ +int rdbGetObjectType(robj *o, int rdbver) { switch (o->type) { - case OBJ_STRING: return rdbSaveType(rdb, RDB_TYPE_STRING); + case OBJ_STRING: return RDB_TYPE_STRING; case OBJ_LIST: if (o->encoding == OBJ_ENCODING_QUICKLIST || o->encoding == OBJ_ENCODING_LISTPACK) - return rdbSaveType(rdb, RDB_TYPE_LIST_QUICKLIST_2); + return RDB_TYPE_LIST_QUICKLIST_2; else serverPanic("Unknown list encoding"); case OBJ_SET: if (o->encoding == OBJ_ENCODING_INTSET) - return rdbSaveType(rdb, RDB_TYPE_SET_INTSET); + return RDB_TYPE_SET_INTSET; else if (o->encoding == OBJ_ENCODING_HASHTABLE) - return rdbSaveType(rdb, RDB_TYPE_SET); + return RDB_TYPE_SET; else if (o->encoding == OBJ_ENCODING_LISTPACK) - return rdbSaveType(rdb, RDB_TYPE_SET_LISTPACK); + return RDB_TYPE_SET_LISTPACK; else serverPanic("Unknown set encoding"); case OBJ_ZSET: if (o->encoding == OBJ_ENCODING_LISTPACK) - return rdbSaveType(rdb, RDB_TYPE_ZSET_LISTPACK); + return RDB_TYPE_ZSET_LISTPACK; else if (o->encoding == OBJ_ENCODING_SKIPLIST) - return rdbSaveType(rdb, RDB_TYPE_ZSET_2); + return RDB_TYPE_ZSET_2; else serverPanic("Unknown sorted set encoding"); case OBJ_HASH: if (o->encoding == OBJ_ENCODING_LISTPACK) - return rdbSaveType(rdb, RDB_TYPE_HASH_LISTPACK); + return RDB_TYPE_HASH_LISTPACK; else if (o->encoding == OBJ_ENCODING_HASHTABLE) if (hashTypeHasVolatileFields(o)) - return rdbSaveType(rdb, RDB_TYPE_HASH_2); + if (rdbver >= 80) + return RDB_TYPE_HASH_2; + else + return -1; /* can't be stored in old RDB */ else - return rdbSaveType(rdb, RDB_TYPE_HASH); + return RDB_TYPE_HASH; else serverPanic("Unknown hash encoding"); - case OBJ_STREAM: return rdbSaveType(rdb, RDB_TYPE_STREAM_LISTPACKS_3); - case OBJ_MODULE: return rdbSaveType(rdb, RDB_TYPE_MODULE_2); + case OBJ_STREAM: return RDB_TYPE_STREAM_LISTPACKS_3; + case OBJ_MODULE: return RDB_TYPE_MODULE_2; default: serverPanic("Unknown object type"); } return -1; /* avoid warning */ @@ -861,7 +865,7 @@ size_t rdbSaveStreamConsumers(rio *rdb, streamCG *cg) { /* Save an Object. * Returns -1 on error, number of bytes written on success. */ -ssize_t rdbSaveObject(rio *rdb, robj *o, robj *key, int dbid) { +ssize_t rdbSaveObject(rio *rdb, robj *o, robj *key, int dbid, unsigned char rdbtype) { ssize_t n = 0, nwritten = 0; if (o->type == OBJ_STRING) { /* Save a string value */ @@ -980,6 +984,7 @@ ssize_t rdbSaveObject(rio *rdb, robj *o, robj *key, int dbid) { if ((n = rdbSaveRawString(rdb, o->ptr, l)) == -1) return -1; nwritten += n; } else if (o->encoding == OBJ_ENCODING_HASHTABLE) { + serverAssert(rdbtype == RDB_TYPE_HASH || rdbtype == RDB_TYPE_HASH_2); hashtable *ht = o->ptr; if ((n = rdbSaveLen(rdb, hashtableSize(ht))) == -1) { @@ -987,7 +992,7 @@ ssize_t rdbSaveObject(rio *rdb, robj *o, robj *key, int dbid) { } nwritten += n; /* check if need to add expired time for the hash fields */ - bool add_expiry = hashTypeHasVolatileFields(o); + bool add_expiry = (rdbtype == RDB_TYPE_HASH_2); hashtableIterator iter; hashtableInitIterator(&iter, ht, HASHTABLE_ITER_SKIP_VALIDATION); void *next; @@ -1166,7 +1171,9 @@ ssize_t rdbSaveObject(rio *rdb, robj *o, robj *key, int dbid) { * this length with very little changes to the code. In the future * we could switch to a faster solution. */ size_t rdbSavedObjectLen(robj *o, robj *key, int dbid) { - ssize_t len = rdbSaveObject(NULL, o, key, dbid); + int rdbtype = rdbGetObjectType(o, RDB_VERSION); + serverAssert(rdbtype != -1); + ssize_t len = rdbSaveObject(NULL, o, key, dbid, rdbtype); serverAssertWithInfo(NULL, o, len != -1); return len; } @@ -1174,7 +1181,7 @@ size_t rdbSavedObjectLen(robj *o, robj *key, int dbid) { /* Save a key-value pair, with expire time, type, key, value. * On error -1 is returned. * On success if the key was actually saved 1 is returned. */ -int rdbSaveKeyValuePair(rio *rdb, robj *key, robj *val, long long expiretime, int dbid) { +int rdbSaveKeyValuePair(rio *rdb, robj *key, robj *val, long long expiretime, int dbid, int rdbver) { int savelru = server.maxmemory_policy & MAXMEMORY_FLAG_LRU; int savelfu = server.maxmemory_policy & MAXMEMORY_FLAG_LFU; @@ -1203,9 +1210,15 @@ int rdbSaveKeyValuePair(rio *rdb, robj *key, robj *val, long long expiretime, in } /* Save type, key, value */ - if (rdbSaveObjectType(rdb, val) == -1) return -1; + int rdbtype = rdbGetObjectType(val, rdbver); + if (rdbtype == -1) { + serverLog(LL_WARNING, "Can't store key '%s' (db %d) in RDB version %d", + (char *)key->ptr, dbid, rdbver); + return -1; + } + if (rdbSaveType(rdb, rdbtype) == -1) return -1; if (rdbSaveStringObject(rdb, key) == -1) return -1; - if (rdbSaveObject(rdb, val, key, dbid) == -1) return -1; + if (rdbSaveObject(rdb, val, key, dbid, rdbtype) == -1) return -1; /* Delay return if required (for testing) */ if (server.rdb_key_save_delay) debugDelay(server.rdb_key_save_delay); @@ -1364,7 +1377,7 @@ werr: return -1; } -ssize_t rdbSaveDb(rio *rdb, int dbid, int rdbflags, long *key_counter) { +ssize_t rdbSaveDb(rio *rdb, int dbid, int rdbflags, int rdbver, long *key_counter) { ssize_t written = 0; ssize_t res; kvstoreIterator *kvs_it = NULL; @@ -1419,7 +1432,7 @@ ssize_t rdbSaveDb(rio *rdb, int dbid, int rdbflags, long *key_counter) { initStaticStringObject(key, keystr); expire = objectGetExpire(o); - if ((res = rdbSaveKeyValuePair(rdb, &key, o, expire, dbid)) < 0) goto werr; + if ((res = rdbSaveKeyValuePair(rdb, &key, o, expire, dbid, rdbver)) < 0) goto werr; written += res; /* In fork child process, we can try to release memory back to the @@ -1455,14 +1468,16 @@ werr: * When the function returns C_ERR and if 'error' is not NULL, the * integer pointed by 'error' is set to the value of errno just after the I/O * error. */ -int rdbSaveRio(int req, rio *rdb, int *error, int rdbflags, rdbSaveInfo *rsi) { +int rdbSaveRio(int req, int rdbver, rio *rdb, int *error, int rdbflags, rdbSaveInfo *rsi) { char magic[10]; uint64_t cksum; long key_counter = 0; int j; if (server.rdb_checksum) rdb->update_cksum = rioGenericUpdateChecksum; - snprintf(magic, sizeof(magic), "VALKEY%03d", RDB_VERSION); + const char *magic_prefix = rdbUseValkeyMagic(rdbver) ? "VALKEY" : "REDIS0"; + serverAssert(rdbver >= 0 && rdbver <= RDB_VERSION); + snprintf(magic, sizeof(magic), "%s%03d", magic_prefix, rdbver); if (rdbWriteRaw(rdb, magic, 9) == -1) goto werr; if (rdbSaveInfoAuxFields(rdb, rdbflags, rsi) == -1) goto werr; if (!(req & REPLICA_REQ_RDB_EXCLUDE_DATA) && rdbSaveModulesAux(rdb, VALKEYMODULE_AUX_BEFORE_RDB) == -1) goto werr; @@ -1474,9 +1489,9 @@ int rdbSaveRio(int req, rio *rdb, int *error, int rdbflags, rdbSaveInfo *rsi) { if (!(req & REPLICA_REQ_RDB_EXCLUDE_DATA)) { /* RDB slot import info is encoded in a required opcode since exposing * importing slots is a consistency problem. */ - if (clusterRDBSaveSlotImports(rdb) == C_ERR) goto werr; + if (clusterRDBSaveSlotImports(rdb, rdbver) == C_ERR) goto werr; for (j = 0; j < server.dbnum; j++) { - if (rdbSaveDb(rdb, j, rdbflags, &key_counter) == -1) goto werr; + if (rdbSaveDb(rdb, j, rdbflags, rdbver, &key_counter) == -1) goto werr; } } @@ -1506,7 +1521,7 @@ werr: * While the suffix is the 40 bytes hex string we announced in the prefix. * This way processes receiving the payload can understand when it ends * without doing any processing of the content. */ -int rdbSaveRioWithEOFMark(int req, rio *rdb, int *error, rdbSaveInfo *rsi) { +int rdbSaveRioWithEOFMark(int req, int rdbver, rio *rdb, int *error, rdbSaveInfo *rsi) { char eofmark[RDB_EOF_MARK_SIZE]; startSaving(RDBFLAGS_REPLICATION); @@ -1515,7 +1530,7 @@ int rdbSaveRioWithEOFMark(int req, rio *rdb, int *error, rdbSaveInfo *rsi) { if (rioWrite(rdb, "$EOF:", 5) == 0) goto werr; if (rioWrite(rdb, eofmark, RDB_EOF_MARK_SIZE) == 0) goto werr; if (rioWrite(rdb, "\r\n", 2) == 0) goto werr; - if (rdbSaveRio(req, rdb, error, RDBFLAGS_REPLICATION, rsi) == C_ERR) goto werr; + if (rdbSaveRio(req, rdbver, rdb, error, RDBFLAGS_REPLICATION, rsi) == C_ERR) goto werr; if (rioWrite(rdb, eofmark, RDB_EOF_MARK_SIZE) == 0) goto werr; stopSaving(1); return C_OK; @@ -1554,7 +1569,7 @@ static int rdbSaveInternal(int req, const char *filename, rdbSaveInfo *rsi, int if (!(rdbflags & RDBFLAGS_KEEP_CACHE)) rioSetReclaimCache(&rdb, 1); } - if (rdbSaveRio(req, &rdb, &error, rdbflags, rsi) == C_ERR) { + if (rdbSaveRio(req, RDB_VERSION, &rdb, &error, rdbflags, rsi) == C_ERR) { errno = error; err_op = "rdbSaveRio"; goto werr; @@ -3670,7 +3685,7 @@ void killRDBChild(void) { /* Spawn an RDB child that writes the RDB to the sockets of the replicas * that are currently in REPLICA_STATE_WAIT_BGSAVE_START state. */ -int rdbSaveToReplicasSockets(int req, rdbSaveInfo *rsi) { +int rdbSaveToReplicasSockets(int req, int rdbver, rdbSaveInfo *rsi) { listNode *ln; listIter li; pid_t childpid; @@ -3726,6 +3741,7 @@ int rdbSaveToReplicasSockets(int req, rdbSaveInfo *rsi) { if (replica->repl_data->repl_state == REPLICA_STATE_WAIT_BGSAVE_START) { /* Check replica has the exact requirements */ if (replica->repl_data->replica_req != req) continue; + if (replicaRdbVersion(replica) != rdbver) continue; conns[connsnum++] = replica->conn; if (dual_channel) { @@ -3771,7 +3787,7 @@ int rdbSaveToReplicasSockets(int req, rdbSaveInfo *rsi) { if (skip_rdb_checksum) rdb.flags |= RIO_FLAG_SKIP_RDB_CHECKSUM; - retval = rdbSaveRioWithEOFMark(req, &rdb, NULL, rsi); + retval = rdbSaveRioWithEOFMark(req, rdbver, &rdb, NULL, rsi); if (retval == C_OK && rioFlush(&rdb) == 0) retval = C_ERR; if (retval == C_OK) { diff --git a/src/rdb.h b/src/rdb.h index 2fed56ca3..1d4abe0a7 100644 --- a/src/rdb.h +++ b/src/rdb.h @@ -51,6 +51,13 @@ * string. */ #define RDB_VERSION 80 +/* Mapping between RDB version and the Valkey version where it was added. */ +static const int RDB_VERSION_MAP[][2] = { + /* {RDB version, added in Valkey version} from oldest to newest. */ + {11, 0x070200}, + {80, 0x090000}, +}; + /* Reserved range for foreign (unsupported, non-OSS) RDB format. */ #define RDB_FOREIGN_VERSION_MIN 12 #define RDB_FOREIGN_VERSION_MAX 79 @@ -61,6 +68,10 @@ static inline bool rdbIsForeignVersion(int rdbver) { return rdbver >= RDB_FOREIGN_VERSION_MIN && rdbver <= RDB_FOREIGN_VERSION_MAX; } +static inline bool rdbUseValkeyMagic(int rdbver) { + return rdbver > RDB_FOREIGN_VERSION_MAX; +} + /* Defines related to the dump file format. To store 32 bits lengths for short * keys requires a lot of space, so we check the most significant 2 bits of * the first byte to interpreter the length: @@ -110,13 +121,13 @@ enum RdbType { RDB_TYPE_HASH_ZIPLIST = 13, RDB_TYPE_LIST_QUICKLIST = 14, RDB_TYPE_STREAM_LISTPACKS = 15, - RDB_TYPE_HASH_LISTPACK = 16, + RDB_TYPE_HASH_LISTPACK = 16, /* Added in RDB 10 (7.0) */ RDB_TYPE_ZSET_LISTPACK = 17, RDB_TYPE_LIST_QUICKLIST_2 = 18, RDB_TYPE_STREAM_LISTPACKS_2 = 19, - RDB_TYPE_SET_LISTPACK = 20, + RDB_TYPE_SET_LISTPACK = 20, /* Added in RDB 11 (7.2) */ RDB_TYPE_STREAM_LISTPACKS_3 = 21, - RDB_TYPE_HASH_2 = 22, /* Hash with field-level expiration (Valkey 9.0) */ + RDB_TYPE_HASH_2 = 22, /* Hash with field-level expiration, RDB 80 (9.0) */ RDB_TYPE_LAST }; /* NOTE: WHEN ADDING NEW RDB TYPE, UPDATE rdb_type_string[] */ @@ -131,7 +142,7 @@ enum RdbType { /* Special RDB opcodes (saved/loaded with rdbSaveType/rdbLoadType). * These are special RDB types, but they start from 255 and grow down. */ -#define RDB_OPCODE_SLOT_IMPORT 243 /* Slot import state. */ +#define RDB_OPCODE_SLOT_IMPORT 243 /* Slot import state (9.0). */ #define RDB_OPCODE_SLOT_INFO 244 /* Foreign slot info, safe to ignore. */ #define RDB_OPCODE_FUNCTION2 245 /* function library data */ #define RDB_OPCODE_FUNCTION_PRE_GA 246 /* old function library data for 7.0 rc1 and rc2 */ @@ -184,19 +195,19 @@ ssize_t rdbSaveMillisecondTime(rio *rdb, long long t); long long rdbLoadMillisecondTime(rio *rdb, int rdbver); uint64_t rdbLoadLen(rio *rdb, int *isencoded); int rdbLoadLenByRef(rio *rdb, int *isencoded, uint64_t *lenptr); -int rdbSaveObjectType(rio *rdb, robj *o); +int rdbGetObjectType(robj *o, int rdbver); int rdbLoadObjectType(rio *rdb); int rdbLoad(char *filename, rdbSaveInfo *rsi, int rdbflags); int rdbSaveBackground(int req, char *filename, rdbSaveInfo *rsi, int rdbflags); -int rdbSaveToReplicasSockets(int req, rdbSaveInfo *rsi); +int rdbSaveToReplicasSockets(int req, int rdbver, rdbSaveInfo *rsi); void rdbRemoveTempFile(pid_t childpid, int from_signal); int rdbSaveToFile(const char *filename); int rdbSave(int req, char *filename, rdbSaveInfo *rsi, int rdbflags); -ssize_t rdbSaveObject(rio *rdb, robj *o, robj *key, int dbid); +ssize_t rdbSaveObject(rio *rdb, robj *o, robj *key, int dbid, unsigned char type); size_t rdbSavedObjectLen(robj *o, robj *key, int dbid); robj *rdbLoadObject(int rdbtype, rio *rdb, sds key, int dbid, int *error); void backgroundSaveDoneHandler(int exitcode, int bysignal); -int rdbSaveKeyValuePair(rio *rdb, robj *key, robj *val, long long expiretime, int dbid); +int rdbSaveKeyValuePair(rio *rdb, robj *key, robj *val, long long expiretime, int dbid, int rdbver); ssize_t rdbSaveSingleModuleAux(rio *rdb, int when, moduleType *mt); robj *rdbLoadCheckModuleValue(rio *rdb, char *modulename); robj *rdbLoadStringObject(rio *rdb); @@ -210,7 +221,7 @@ int rdbLoadBinaryFloatValue(rio *rdb, float *val); int rdbLoadRio(rio *rdb, int rdbflags, rdbSaveInfo *rsi); int rdbLoadRioWithLoadingCtxScopedRdb(rio *rdb, int rdbflags, rdbSaveInfo *rsi, rdbLoadingCtx *rdb_loading_ctx); int rdbFunctionLoad(rio *rdb, int ver, functionsLibCtx *lib_ctx, int rdbflags, sds *err); -int rdbSaveRio(int req, rio *rdb, int *error, int rdbflags, rdbSaveInfo *rsi); +int rdbSaveRio(int req, int rdbver, rio *rdb, int *error, int rdbflags, rdbSaveInfo *rsi); ssize_t rdbSaveFunctions(rio *rdb); rdbSaveInfo *rdbPopulateSaveInfo(rdbSaveInfo *rsi); void replicationEmptyDbCallback(hashtable *ht); diff --git a/src/replication.c b/src/replication.c index 403662522..1205545cb 100644 --- a/src/replication.c +++ b/src/replication.c @@ -215,6 +215,31 @@ static inline client *lookupRdbClientByID(uint64_t id) { return c; } +/* Decide which RDB version to send to a replica. */ +int replicaRdbVersion(client *replica) { + if (!(replica->repl_data->replica_capa & REPLICA_CAPA_EOF)) { + /* The replica doesn't have CAPA EOF, so we need to use disk-based + * replication, but we don't want to write an old RDB version to disk, + * because it can be reused for disk-based full sync to other replicas, + * so we force the latest RDB version. */ + return RDB_VERSION; + } + /* Search the version map backwards to select the highest RDB version the + * replica understands. */ + const int n = sizeof(RDB_VERSION_MAP) / sizeof(RDB_VERSION_MAP[0]); + for (int i = n - 1; i >= 0; i--) { + if (replica->repl_data->replica_version >= RDB_VERSION_MAP[i][1]) { + return RDB_VERSION_MAP[i][0]; + } + } + /* Fallback to RDB 11, which was introduced in 7.2. + * + * 7.2 and older don't report their version. If no version was provided, we + * assume it's 7.2. We don't currently produce RDB 10 (7.0) and RDB 9 + * (5.0--6.2). */ + return 11; +} + /* Replication: Primary side - connections association. * During dual channel sync, association is used to keep replication data * in the backlog until the replica requests PSYNC. @@ -945,6 +970,9 @@ need_full_resync: * of the replicas waiting for this BGSAVE, so represents the replica capabilities * all the replicas support. Can be tested via REPLICA_CAPA_* macros. * + * The rdbver argument is the RDB version to use. It should be calculated based + * on what the replicas reported using REPLCONF VERSION. + * * Side effects, other than starting a BGSAVE: * * 1) Handle the replicas in WAIT_START state, by preparing them for a full @@ -955,16 +983,23 @@ need_full_resync: * started. * * Returns C_OK on success or C_ERR otherwise. */ -int startBgsaveForReplication(int mincapa, int req) { +int startBgsaveForReplication(int mincapa, int req, int rdbver) { int retval; int socket_target = 0; listIter li; listNode *ln; - /* We use a socket target if replica can handle the EOF marker and we're configured to do diskless syncs. - * Note that in case we're creating a "filtered" RDB (functions-only, for example) we also force socket replication - * to avoid overwriting the snapshot RDB file with filtered data. */ - socket_target = (server.repl_diskless_sync || req & REPLICA_REQ_RDB_MASK) && (mincapa & REPLICA_CAPA_EOF); + /* We use a socket target if replica can handle the EOF marker and we're + * configured to do diskless syncs. + * + * Note that in case we're creating a "filtered" RDB (functions-only, for + * example) or an older RDB version, we also force socket replication to + * avoid overwriting the snapshot RDB file, which needs to be usable by + * other replicas (not using filtered RDB or older versions) in disk-based + * full sync. */ + socket_target = (mincapa & REPLICA_CAPA_EOF) && (server.repl_diskless_sync || + (req & REPLICA_REQ_RDB_MASK) || + rdbver != RDB_VERSION); /* `SYNC` should have failed with error if we don't support socket and require a filter, assert this here */ serverAssert(socket_target || !(req & REPLICA_REQ_RDB_MASK)); @@ -978,7 +1013,7 @@ int startBgsaveForReplication(int mincapa, int req) { * otherwise replica will miss repl-stream-db. */ if (rsiptr) { if (socket_target) - retval = rdbSaveToReplicasSockets(req, rsiptr); + retval = rdbSaveToReplicasSockets(req, rdbver, rsiptr); else { /* Keep the page cache since it'll get used soon */ retval = rdbSaveBackground(req, server.rdb_filename, rsiptr, RDBFLAGS_REPLICATION | RDBFLAGS_KEEP_CACHE); @@ -1027,6 +1062,7 @@ int startBgsaveForReplication(int mincapa, int req) { if (replica->repl_data->repl_state == REPLICA_STATE_WAIT_BGSAVE_START) { /* Check replica has the exact requirements */ if (replica->repl_data->replica_req != req) continue; + if (replicaRdbVersion(replica) != rdbver) continue; replicationSetupReplicaForFullResync(replica, getPsyncInitialOffset()); } } @@ -1227,7 +1263,9 @@ void syncCommand(client *c) { /* We don't have a BGSAVE in progress, let's start one. Diskless * or disk-based mode is determined by replica's capacity. */ if (!hasActiveChildProcess()) { - startBgsaveForReplication(c->repl_data->replica_capa, c->repl_data->replica_req); + startBgsaveForReplication(c->repl_data->replica_capa, + c->repl_data->replica_req, + replicaRdbVersion(c)); } else { serverLog(LL_NOTICE, "No BGSAVE in progress, but another BG operation is active. " "BGSAVE for replication delayed"); @@ -5325,7 +5363,7 @@ void replicationCron(void) { replication_cron_loops++; /* Incremented with frequency 1 HZ. */ } -int shouldStartChildReplication(int *mincapa_out, int *req_out) { +int shouldStartChildReplication(int *mincapa_out, int *req_out, int *rdbver_out) { /* We should start a BGSAVE good for replication if we have replicas in * WAIT_BGSAVE_START state. * @@ -5336,6 +5374,7 @@ int shouldStartChildReplication(int *mincapa_out, int *req_out) { time_t idle, max_idle = 0; int replicas_waiting = 0; int mincapa; + int rdbver; int req; int first = 1; listNode *ln; @@ -5348,7 +5387,9 @@ int shouldStartChildReplication(int *mincapa_out, int *req_out) { if (first) { /* Get first replica's requirements */ req = replica->repl_data->replica_req; - } else if (req != replica->repl_data->replica_req) { + rdbver = replicaRdbVersion(replica); + } else if (req != replica->repl_data->replica_req || + rdbver != replicaRdbVersion(replica)) { /* Skip replicas that don't match */ continue; } @@ -5366,6 +5407,7 @@ int shouldStartChildReplication(int *mincapa_out, int *req_out) { max_idle >= server.repl_diskless_sync_delay)) { if (mincapa_out) *mincapa_out = mincapa; if (req_out) *req_out = req; + if (rdbver_out) *rdbver_out = rdbver; return 1; } } @@ -5376,12 +5418,13 @@ int shouldStartChildReplication(int *mincapa_out, int *req_out) { void replicationStartPendingFork(void) { int mincapa = -1; int req = -1; + int rdbver = -1; - if (shouldStartChildReplication(&mincapa, &req)) { + if (shouldStartChildReplication(&mincapa, &req, &rdbver)) { /* Start the BGSAVE. The called function may start a * BGSAVE with socket target or disk target depending on the * configuration and replicas capabilities and requirements. */ - startBgsaveForReplication(mincapa, req); + startBgsaveForReplication(mincapa, req, rdbver); } } diff --git a/src/server.h b/src/server.h index aba34428d..c98c3080b 100644 --- a/src/server.h +++ b/src/server.h @@ -3116,6 +3116,7 @@ void abortFailover(const char *err); const char *getFailoverStateString(void); sds getReplicaPortString(void); int sendCurrentOffsetToReplica(client *replica); +int replicaRdbVersion(client *replica); void addRdbReplicaToPsyncWait(client *replica); void initClientReplicationData(client *c); void freeClientReplicationData(client *c); diff --git a/tests/integration/cross-version-replication.tcl b/tests/integration/cross-version-replication.tcl index 47329ee5a..0e3de03de 100644 --- a/tests/integration/cross-version-replication.tcl +++ b/tests/integration/cross-version-replication.tcl @@ -1,18 +1,9 @@ # Test replication from an older version primary. # # Use minimal.conf to make sure we don't use any configs not supported on the old version. - -proc server_name_and_version {} { - set server_name [s server_name] - if {$server_name eq {}} { - set server_name redis - } - set server_version [s "${server_name}_version"] - return "$server_name $server_version" -} - start_server {tags {"repl needs:other-server external:skip compatible-redis"} start-other-server 1 config "minimal.conf"} { - set primary_name_and_version [server_name_and_version] + set hello [r hello] + set primary_name_and_version "[dict get $hello server] [dict get $hello version]" r set foo bar start_server {} { @@ -32,3 +23,68 @@ start_server {tags {"repl needs:other-server external:skip compatible-redis"} st } } } + +# Test replication from the current version to an older version replica. +start_server {tags {"repl needs:other-server external:skip"}} { + set primary [srv 0 client] + set primary_host [srv 0 host] + set primary_port [srv 0 port] + $primary config set repl-diskless-sync yes + $primary config set repl-diskless-sync-delay 1 + + # As a side-effect, this first start_server block initializes old_replica_version which + # is used in the tests below. + start_server {start-other-server 1 config "minimal.conf"} { + set hello [r hello] + set old_replica_version [dict get $hello version] + # set replica_name_and_version "[dict get $hello server] $replica_version" + set old_replica [srv 0 client] + start_server {} { + set new_replica [srv 0 client] + test {Keys can be sync'ed by old and new replicas} { + $primary set foo bar + $old_replica replicaof $primary_host $primary_port + $new_replica replicaof $primary_host $primary_port + wait_for_sync $old_replica 500 100 + wait_for_sync $new_replica 500 100 + assert_equal bar [$old_replica get foo] + assert_equal bar [$new_replica get foo] + } + } + } + + test "Old pre-HFE replica can't sync but doesn't prevent new replica from sync" { + if {[version_greater_or_equal $old_replica_version 9.0.0]} { + skip "Replica $old_replica_version does support HFE" + } + r flushall + r hsetex hfe ex 1000 fields 1 field1 value1 + start_server {start-other-server 1 config "minimal.conf"} { + set old_replica [srv 0 client] + start_server {} { + set new_replica [srv 0 client] + $old_replica replicaof $primary_host $primary_port + $new_replica replicaof $primary_host $primary_port + wait_for_sync $new_replica 500 100 + wait_for_log_messages -2 [list {*Can't store key 'hfe'*}] 0 50 100 + assert_equal value1 [$new_replica hget hfe field1] + assert_match {*master_link_status:up*} [$new_replica info replication] + assert_match {*master_link_status:down*} [$old_replica info replication] + } + } + } + + test "Replica with HFE support can full sync" { + if {![version_greater_or_equal $old_replica_version 9.0.0]} { + skip "Replica $old_replica_version doesn't support HFE" + } + r flushall + r hsetex hfe ex 1000 fields 1 field1 value1 + start_server {start-other-server 1 config "minimal.conf"} { + set old_replica [srv 0 client] + $old_replica replicaof $primary_host $primary_port + wait_for_sync $old_replica 500 100 + assert_equal value1 [$old_replica hget hfe field1] + } + } +} diff --git a/tests/support/util.tcl b/tests/support/util.tcl index a0d3e0655..b42befe41 100644 --- a/tests/support/util.tcl +++ b/tests/support/util.tcl @@ -1280,3 +1280,22 @@ proc bp {{s {}}} { puts $res } } + +# Compares two version strings. Returns 1 if a >= b, 0 otherwise. +proc version_greater_or_equal {a b} { + regexp {^([0-9]+)\.([0-9]+)\.([0-9]+)$} $a -> a_major a_minor a_patch + regexp {^([0-9]+)\.([0-9]+)\.([0-9]+)$} $b -> b_major b_minor b_patch + if {$a_major < $b_major} { + return 0 + } elseif {$a_major > $b_major} { + return 1 + } elseif {$a_minor < $b_minor} { + return 0 + } elseif {$a_minor > $b_minor} { + return 1 + } elseif {$a_patch < $b_patch} { + return 0 + } else { + return 1 + } +} diff --git a/tests/unit/cluster/cross-version-cluster.tcl b/tests/unit/cluster/cross-version-cluster.tcl index bfc57e10c..10543ef0d 100644 --- a/tests/unit/cluster/cross-version-cluster.tcl +++ b/tests/unit/cluster/cross-version-cluster.tcl @@ -17,9 +17,8 @@ tags {external:skip needs:other-server cluster singledb} { set primary_id [$primary cluster myid] start_server {config "minimal-cluster.conf" start-other-server 1 overrides {cluster-ping-interval 1000}} { - set res [dict get [r hello] version] - assert [regexp {([0-9]+)\.([0-9]+)\.[0-9]+} $res -> major minor] - if {($major < 8) || ($major == 8 && $minor < 1)} { + set version [dict get [r hello] version] + if {![version_greater_or_equal $version 8.1.0]} { skip "Requires Valkey 8.1 or above" } r config set rdb-version-check relaxed