This commit is contained in:
Viktor Söderqvist 2025-12-17 11:49:07 +08:00 committed by GitHub
commit 10b99190f5
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
11 changed files with 221 additions and 70 deletions

View File

@ -2401,7 +2401,7 @@ int rewriteAppendOnlyFile(char *filename) {
if (server.aof_use_rdb_preamble) {
int error;
if (rdbSaveRio(REPLICA_REQ_NONE, &aof, &error, RDBFLAGS_AOF_PREAMBLE, NULL) == C_ERR) {
if (rdbSaveRio(REPLICA_REQ_NONE, RDB_VERSION, &aof, &error, RDBFLAGS_AOF_PREAMBLE, NULL) == C_ERR) {
errno = error;
goto werr;
}

View File

@ -126,8 +126,10 @@ void createDumpPayload(rio *payload, robj *o, robj *key, int dbid) {
/* Serialize the object in an RDB-like format. It consist of an object type
* byte followed by the serialized object. This is understood by RESTORE. */
rioInitWithBuffer(payload, sdsempty());
serverAssert(rdbSaveObjectType(payload, o));
serverAssert(rdbSaveObject(payload, o, key, dbid));
int rdbtype = rdbGetObjectType(o, RDB_VERSION);
serverAssert(rdbtype >= 0);
serverAssert(rdbSaveType(payload, rdbtype));
serverAssert(rdbSaveObject(payload, o, key, dbid, rdbtype));
/* Write the footer, this is how it looks like:
* ----------------+---------------------+---------------+

View File

@ -359,7 +359,7 @@ void fireModuleSlotMigrationEvent(slotMigrationJob *job, int subevent) {
/* Save the active slot imports to the RDB file. The import job name and the
* slot ranges are saved. */
int clusterRDBSaveSlotImports(rio *rdb) {
int clusterRDBSaveSlotImports(rio *rdb, int rdbver) {
if (!server.cluster_enabled) return C_OK;
if (listLength(server.cluster->slot_migration_jobs) == 0) return C_OK;
listNode *ln;
@ -371,6 +371,10 @@ int clusterRDBSaveSlotImports(rio *rdb) {
slotMigrationJob *job = ln->value;
if (isSlotMigrationJobFinished(job)) continue;
if (job->type == SLOT_MIGRATION_EXPORT) continue;
if (rdbver < 80) {
serverLog(LL_WARNING, "Can't store slot migrations in RDB version %d", rdbver);
return C_ERR;
}
if (rdbSaveType(rdb, RDB_OPCODE_SLOT_IMPORT) < 0) return C_ERR;
if (rdbSaveRawString(rdb, (unsigned char *)job->name, CLUSTER_NAMELEN) < 0) return C_ERR;
if (rdbSaveLen(rdb, listLength(job->slot_ranges)) < 0) return C_ERR;

View File

@ -37,7 +37,7 @@ void clusterCleanSlotImportsOnFullSync(void);
void clusterCleanSlotImportsOnPromotion(void);
void clusterCleanSlotImportsBeforeLoad(void);
void clusterCleanSlotImportsAfterLoad(void);
int clusterRDBSaveSlotImports(rio *rdb);
int clusterRDBSaveSlotImports(rio *rdb, int rdbver);
int clusterRDBLoadSlotImport(rio *rdb);
#endif /* __CLUSTER_MIGRATESLOTS_H */

View File

@ -708,43 +708,47 @@ int rdbLoadBinaryFloatValue(rio *rdb, float *val) {
return 0;
}
/* Save the object type of object "o". */
int rdbSaveObjectType(rio *rdb, robj *o) {
/* Return the RDB object type to use for saving object "o", or -1 if the object
* can't be represented in the given RDB version (only for older RDB). */
int rdbGetObjectType(robj *o, int rdbver) {
switch (o->type) {
case OBJ_STRING: return rdbSaveType(rdb, RDB_TYPE_STRING);
case OBJ_STRING: return RDB_TYPE_STRING;
case OBJ_LIST:
if (o->encoding == OBJ_ENCODING_QUICKLIST || o->encoding == OBJ_ENCODING_LISTPACK)
return rdbSaveType(rdb, RDB_TYPE_LIST_QUICKLIST_2);
return RDB_TYPE_LIST_QUICKLIST_2;
else
serverPanic("Unknown list encoding");
case OBJ_SET:
if (o->encoding == OBJ_ENCODING_INTSET)
return rdbSaveType(rdb, RDB_TYPE_SET_INTSET);
return RDB_TYPE_SET_INTSET;
else if (o->encoding == OBJ_ENCODING_HASHTABLE)
return rdbSaveType(rdb, RDB_TYPE_SET);
return RDB_TYPE_SET;
else if (o->encoding == OBJ_ENCODING_LISTPACK)
return rdbSaveType(rdb, RDB_TYPE_SET_LISTPACK);
return RDB_TYPE_SET_LISTPACK;
else
serverPanic("Unknown set encoding");
case OBJ_ZSET:
if (o->encoding == OBJ_ENCODING_LISTPACK)
return rdbSaveType(rdb, RDB_TYPE_ZSET_LISTPACK);
return RDB_TYPE_ZSET_LISTPACK;
else if (o->encoding == OBJ_ENCODING_SKIPLIST)
return rdbSaveType(rdb, RDB_TYPE_ZSET_2);
return RDB_TYPE_ZSET_2;
else
serverPanic("Unknown sorted set encoding");
case OBJ_HASH:
if (o->encoding == OBJ_ENCODING_LISTPACK)
return rdbSaveType(rdb, RDB_TYPE_HASH_LISTPACK);
return RDB_TYPE_HASH_LISTPACK;
else if (o->encoding == OBJ_ENCODING_HASHTABLE)
if (hashTypeHasVolatileFields(o))
return rdbSaveType(rdb, RDB_TYPE_HASH_2);
if (rdbver >= 80)
return RDB_TYPE_HASH_2;
else
return -1; /* can't be stored in old RDB */
else
return rdbSaveType(rdb, RDB_TYPE_HASH);
return RDB_TYPE_HASH;
else
serverPanic("Unknown hash encoding");
case OBJ_STREAM: return rdbSaveType(rdb, RDB_TYPE_STREAM_LISTPACKS_3);
case OBJ_MODULE: return rdbSaveType(rdb, RDB_TYPE_MODULE_2);
case OBJ_STREAM: return RDB_TYPE_STREAM_LISTPACKS_3;
case OBJ_MODULE: return RDB_TYPE_MODULE_2;
default: serverPanic("Unknown object type");
}
return -1; /* avoid warning */
@ -861,7 +865,7 @@ size_t rdbSaveStreamConsumers(rio *rdb, streamCG *cg) {
/* Save an Object.
* Returns -1 on error, number of bytes written on success. */
ssize_t rdbSaveObject(rio *rdb, robj *o, robj *key, int dbid) {
ssize_t rdbSaveObject(rio *rdb, robj *o, robj *key, int dbid, unsigned char rdbtype) {
ssize_t n = 0, nwritten = 0;
if (o->type == OBJ_STRING) {
/* Save a string value */
@ -980,6 +984,7 @@ ssize_t rdbSaveObject(rio *rdb, robj *o, robj *key, int dbid) {
if ((n = rdbSaveRawString(rdb, o->ptr, l)) == -1) return -1;
nwritten += n;
} else if (o->encoding == OBJ_ENCODING_HASHTABLE) {
serverAssert(rdbtype == RDB_TYPE_HASH || rdbtype == RDB_TYPE_HASH_2);
hashtable *ht = o->ptr;
if ((n = rdbSaveLen(rdb, hashtableSize(ht))) == -1) {
@ -987,7 +992,7 @@ ssize_t rdbSaveObject(rio *rdb, robj *o, robj *key, int dbid) {
}
nwritten += n;
/* check if need to add expired time for the hash fields */
bool add_expiry = hashTypeHasVolatileFields(o);
bool add_expiry = (rdbtype == RDB_TYPE_HASH_2);
hashtableIterator iter;
hashtableInitIterator(&iter, ht, HASHTABLE_ITER_SKIP_VALIDATION);
void *next;
@ -1166,7 +1171,9 @@ ssize_t rdbSaveObject(rio *rdb, robj *o, robj *key, int dbid) {
* this length with very little changes to the code. In the future
* we could switch to a faster solution. */
size_t rdbSavedObjectLen(robj *o, robj *key, int dbid) {
ssize_t len = rdbSaveObject(NULL, o, key, dbid);
int rdbtype = rdbGetObjectType(o, RDB_VERSION);
serverAssert(rdbtype != -1);
ssize_t len = rdbSaveObject(NULL, o, key, dbid, rdbtype);
serverAssertWithInfo(NULL, o, len != -1);
return len;
}
@ -1174,7 +1181,7 @@ size_t rdbSavedObjectLen(robj *o, robj *key, int dbid) {
/* Save a key-value pair, with expire time, type, key, value.
* On error -1 is returned.
* On success if the key was actually saved 1 is returned. */
int rdbSaveKeyValuePair(rio *rdb, robj *key, robj *val, long long expiretime, int dbid) {
int rdbSaveKeyValuePair(rio *rdb, robj *key, robj *val, long long expiretime, int dbid, int rdbver) {
int savelru = server.maxmemory_policy & MAXMEMORY_FLAG_LRU;
int savelfu = server.maxmemory_policy & MAXMEMORY_FLAG_LFU;
@ -1203,9 +1210,15 @@ int rdbSaveKeyValuePair(rio *rdb, robj *key, robj *val, long long expiretime, in
}
/* Save type, key, value */
if (rdbSaveObjectType(rdb, val) == -1) return -1;
int rdbtype = rdbGetObjectType(val, rdbver);
if (rdbtype == -1) {
serverLog(LL_WARNING, "Can't store key '%s' (db %d) in RDB version %d",
(char *)key->ptr, dbid, rdbver);
return -1;
}
if (rdbSaveType(rdb, rdbtype) == -1) return -1;
if (rdbSaveStringObject(rdb, key) == -1) return -1;
if (rdbSaveObject(rdb, val, key, dbid) == -1) return -1;
if (rdbSaveObject(rdb, val, key, dbid, rdbtype) == -1) return -1;
/* Delay return if required (for testing) */
if (server.rdb_key_save_delay) debugDelay(server.rdb_key_save_delay);
@ -1364,7 +1377,7 @@ werr:
return -1;
}
ssize_t rdbSaveDb(rio *rdb, int dbid, int rdbflags, long *key_counter) {
ssize_t rdbSaveDb(rio *rdb, int dbid, int rdbflags, int rdbver, long *key_counter) {
ssize_t written = 0;
ssize_t res;
kvstoreIterator *kvs_it = NULL;
@ -1419,7 +1432,7 @@ ssize_t rdbSaveDb(rio *rdb, int dbid, int rdbflags, long *key_counter) {
initStaticStringObject(key, keystr);
expire = objectGetExpire(o);
if ((res = rdbSaveKeyValuePair(rdb, &key, o, expire, dbid)) < 0) goto werr;
if ((res = rdbSaveKeyValuePair(rdb, &key, o, expire, dbid, rdbver)) < 0) goto werr;
written += res;
/* In fork child process, we can try to release memory back to the
@ -1455,14 +1468,16 @@ werr:
* When the function returns C_ERR and if 'error' is not NULL, the
* integer pointed by 'error' is set to the value of errno just after the I/O
* error. */
int rdbSaveRio(int req, rio *rdb, int *error, int rdbflags, rdbSaveInfo *rsi) {
int rdbSaveRio(int req, int rdbver, rio *rdb, int *error, int rdbflags, rdbSaveInfo *rsi) {
char magic[10];
uint64_t cksum;
long key_counter = 0;
int j;
if (server.rdb_checksum) rdb->update_cksum = rioGenericUpdateChecksum;
snprintf(magic, sizeof(magic), "VALKEY%03d", RDB_VERSION);
const char *magic_prefix = rdbUseValkeyMagic(rdbver) ? "VALKEY" : "REDIS0";
serverAssert(rdbver >= 0 && rdbver <= RDB_VERSION);
snprintf(magic, sizeof(magic), "%s%03d", magic_prefix, rdbver);
if (rdbWriteRaw(rdb, magic, 9) == -1) goto werr;
if (rdbSaveInfoAuxFields(rdb, rdbflags, rsi) == -1) goto werr;
if (!(req & REPLICA_REQ_RDB_EXCLUDE_DATA) && rdbSaveModulesAux(rdb, VALKEYMODULE_AUX_BEFORE_RDB) == -1) goto werr;
@ -1474,9 +1489,9 @@ int rdbSaveRio(int req, rio *rdb, int *error, int rdbflags, rdbSaveInfo *rsi) {
if (!(req & REPLICA_REQ_RDB_EXCLUDE_DATA)) {
/* RDB slot import info is encoded in a required opcode since exposing
* importing slots is a consistency problem. */
if (clusterRDBSaveSlotImports(rdb) == C_ERR) goto werr;
if (clusterRDBSaveSlotImports(rdb, rdbver) == C_ERR) goto werr;
for (j = 0; j < server.dbnum; j++) {
if (rdbSaveDb(rdb, j, rdbflags, &key_counter) == -1) goto werr;
if (rdbSaveDb(rdb, j, rdbflags, rdbver, &key_counter) == -1) goto werr;
}
}
@ -1506,7 +1521,7 @@ werr:
* While the suffix is the 40 bytes hex string we announced in the prefix.
* This way processes receiving the payload can understand when it ends
* without doing any processing of the content. */
int rdbSaveRioWithEOFMark(int req, rio *rdb, int *error, rdbSaveInfo *rsi) {
int rdbSaveRioWithEOFMark(int req, int rdbver, rio *rdb, int *error, rdbSaveInfo *rsi) {
char eofmark[RDB_EOF_MARK_SIZE];
startSaving(RDBFLAGS_REPLICATION);
@ -1515,7 +1530,7 @@ int rdbSaveRioWithEOFMark(int req, rio *rdb, int *error, rdbSaveInfo *rsi) {
if (rioWrite(rdb, "$EOF:", 5) == 0) goto werr;
if (rioWrite(rdb, eofmark, RDB_EOF_MARK_SIZE) == 0) goto werr;
if (rioWrite(rdb, "\r\n", 2) == 0) goto werr;
if (rdbSaveRio(req, rdb, error, RDBFLAGS_REPLICATION, rsi) == C_ERR) goto werr;
if (rdbSaveRio(req, rdbver, rdb, error, RDBFLAGS_REPLICATION, rsi) == C_ERR) goto werr;
if (rioWrite(rdb, eofmark, RDB_EOF_MARK_SIZE) == 0) goto werr;
stopSaving(1);
return C_OK;
@ -1554,7 +1569,7 @@ static int rdbSaveInternal(int req, const char *filename, rdbSaveInfo *rsi, int
if (!(rdbflags & RDBFLAGS_KEEP_CACHE)) rioSetReclaimCache(&rdb, 1);
}
if (rdbSaveRio(req, &rdb, &error, rdbflags, rsi) == C_ERR) {
if (rdbSaveRio(req, RDB_VERSION, &rdb, &error, rdbflags, rsi) == C_ERR) {
errno = error;
err_op = "rdbSaveRio";
goto werr;
@ -3670,7 +3685,7 @@ void killRDBChild(void) {
/* Spawn an RDB child that writes the RDB to the sockets of the replicas
* that are currently in REPLICA_STATE_WAIT_BGSAVE_START state. */
int rdbSaveToReplicasSockets(int req, rdbSaveInfo *rsi) {
int rdbSaveToReplicasSockets(int req, int rdbver, rdbSaveInfo *rsi) {
listNode *ln;
listIter li;
pid_t childpid;
@ -3726,6 +3741,7 @@ int rdbSaveToReplicasSockets(int req, rdbSaveInfo *rsi) {
if (replica->repl_data->repl_state == REPLICA_STATE_WAIT_BGSAVE_START) {
/* Check replica has the exact requirements */
if (replica->repl_data->replica_req != req) continue;
if (replicaRdbVersion(replica) != rdbver) continue;
conns[connsnum++] = replica->conn;
if (dual_channel) {
@ -3771,7 +3787,7 @@ int rdbSaveToReplicasSockets(int req, rdbSaveInfo *rsi) {
if (skip_rdb_checksum) rdb.flags |= RIO_FLAG_SKIP_RDB_CHECKSUM;
retval = rdbSaveRioWithEOFMark(req, &rdb, NULL, rsi);
retval = rdbSaveRioWithEOFMark(req, rdbver, &rdb, NULL, rsi);
if (retval == C_OK && rioFlush(&rdb) == 0) retval = C_ERR;
if (retval == C_OK) {

View File

@ -51,6 +51,13 @@
* string. */
#define RDB_VERSION 80
/* Mapping between RDB version and the Valkey version where it was added. */
static const int RDB_VERSION_MAP[][2] = {
/* {RDB version, added in Valkey version} from oldest to newest. */
{11, 0x070200},
{80, 0x090000},
};
/* Reserved range for foreign (unsupported, non-OSS) RDB format. */
#define RDB_FOREIGN_VERSION_MIN 12
#define RDB_FOREIGN_VERSION_MAX 79
@ -61,6 +68,10 @@ static inline bool rdbIsForeignVersion(int rdbver) {
return rdbver >= RDB_FOREIGN_VERSION_MIN && rdbver <= RDB_FOREIGN_VERSION_MAX;
}
static inline bool rdbUseValkeyMagic(int rdbver) {
return rdbver > RDB_FOREIGN_VERSION_MAX;
}
/* Defines related to the dump file format. To store 32 bits lengths for short
* keys requires a lot of space, so we check the most significant 2 bits of
* the first byte to interpreter the length:
@ -110,13 +121,13 @@ enum RdbType {
RDB_TYPE_HASH_ZIPLIST = 13,
RDB_TYPE_LIST_QUICKLIST = 14,
RDB_TYPE_STREAM_LISTPACKS = 15,
RDB_TYPE_HASH_LISTPACK = 16,
RDB_TYPE_HASH_LISTPACK = 16, /* Added in RDB 10 (7.0) */
RDB_TYPE_ZSET_LISTPACK = 17,
RDB_TYPE_LIST_QUICKLIST_2 = 18,
RDB_TYPE_STREAM_LISTPACKS_2 = 19,
RDB_TYPE_SET_LISTPACK = 20,
RDB_TYPE_SET_LISTPACK = 20, /* Added in RDB 11 (7.2) */
RDB_TYPE_STREAM_LISTPACKS_3 = 21,
RDB_TYPE_HASH_2 = 22, /* Hash with field-level expiration (Valkey 9.0) */
RDB_TYPE_HASH_2 = 22, /* Hash with field-level expiration, RDB 80 (9.0) */
RDB_TYPE_LAST
};
/* NOTE: WHEN ADDING NEW RDB TYPE, UPDATE rdb_type_string[] */
@ -131,7 +142,7 @@ enum RdbType {
/* Special RDB opcodes (saved/loaded with rdbSaveType/rdbLoadType).
* These are special RDB types, but they start from 255 and grow down. */
#define RDB_OPCODE_SLOT_IMPORT 243 /* Slot import state. */
#define RDB_OPCODE_SLOT_IMPORT 243 /* Slot import state (9.0). */
#define RDB_OPCODE_SLOT_INFO 244 /* Foreign slot info, safe to ignore. */
#define RDB_OPCODE_FUNCTION2 245 /* function library data */
#define RDB_OPCODE_FUNCTION_PRE_GA 246 /* old function library data for 7.0 rc1 and rc2 */
@ -184,19 +195,19 @@ ssize_t rdbSaveMillisecondTime(rio *rdb, long long t);
long long rdbLoadMillisecondTime(rio *rdb, int rdbver);
uint64_t rdbLoadLen(rio *rdb, int *isencoded);
int rdbLoadLenByRef(rio *rdb, int *isencoded, uint64_t *lenptr);
int rdbSaveObjectType(rio *rdb, robj *o);
int rdbGetObjectType(robj *o, int rdbver);
int rdbLoadObjectType(rio *rdb);
int rdbLoad(char *filename, rdbSaveInfo *rsi, int rdbflags);
int rdbSaveBackground(int req, char *filename, rdbSaveInfo *rsi, int rdbflags);
int rdbSaveToReplicasSockets(int req, rdbSaveInfo *rsi);
int rdbSaveToReplicasSockets(int req, int rdbver, rdbSaveInfo *rsi);
void rdbRemoveTempFile(pid_t childpid, int from_signal);
int rdbSaveToFile(const char *filename);
int rdbSave(int req, char *filename, rdbSaveInfo *rsi, int rdbflags);
ssize_t rdbSaveObject(rio *rdb, robj *o, robj *key, int dbid);
ssize_t rdbSaveObject(rio *rdb, robj *o, robj *key, int dbid, unsigned char type);
size_t rdbSavedObjectLen(robj *o, robj *key, int dbid);
robj *rdbLoadObject(int rdbtype, rio *rdb, sds key, int dbid, int *error);
void backgroundSaveDoneHandler(int exitcode, int bysignal);
int rdbSaveKeyValuePair(rio *rdb, robj *key, robj *val, long long expiretime, int dbid);
int rdbSaveKeyValuePair(rio *rdb, robj *key, robj *val, long long expiretime, int dbid, int rdbver);
ssize_t rdbSaveSingleModuleAux(rio *rdb, int when, moduleType *mt);
robj *rdbLoadCheckModuleValue(rio *rdb, char *modulename);
robj *rdbLoadStringObject(rio *rdb);
@ -210,7 +221,7 @@ int rdbLoadBinaryFloatValue(rio *rdb, float *val);
int rdbLoadRio(rio *rdb, int rdbflags, rdbSaveInfo *rsi);
int rdbLoadRioWithLoadingCtxScopedRdb(rio *rdb, int rdbflags, rdbSaveInfo *rsi, rdbLoadingCtx *rdb_loading_ctx);
int rdbFunctionLoad(rio *rdb, int ver, functionsLibCtx *lib_ctx, int rdbflags, sds *err);
int rdbSaveRio(int req, rio *rdb, int *error, int rdbflags, rdbSaveInfo *rsi);
int rdbSaveRio(int req, int rdbver, rio *rdb, int *error, int rdbflags, rdbSaveInfo *rsi);
ssize_t rdbSaveFunctions(rio *rdb);
rdbSaveInfo *rdbPopulateSaveInfo(rdbSaveInfo *rsi);
void replicationEmptyDbCallback(hashtable *ht);

View File

@ -215,6 +215,31 @@ static inline client *lookupRdbClientByID(uint64_t id) {
return c;
}
/* Decide which RDB version to send to a replica. */
int replicaRdbVersion(client *replica) {
if (!(replica->repl_data->replica_capa & REPLICA_CAPA_EOF)) {
/* The replica doesn't have CAPA EOF, so we need to use disk-based
* replication, but we don't want to write an old RDB version to disk,
* because it can be reused for disk-based full sync to other replicas,
* so we force the latest RDB version. */
return RDB_VERSION;
}
/* Search the version map backwards to select the highest RDB version the
* replica understands. */
const int n = sizeof(RDB_VERSION_MAP) / sizeof(RDB_VERSION_MAP[0]);
for (int i = n - 1; i >= 0; i--) {
if (replica->repl_data->replica_version >= RDB_VERSION_MAP[i][1]) {
return RDB_VERSION_MAP[i][0];
}
}
/* Fallback to RDB 11, which was introduced in 7.2.
*
* 7.2 and older don't report their version. If no version was provided, we
* assume it's 7.2. We don't currently produce RDB 10 (7.0) and RDB 9
* (5.0--6.2). */
return 11;
}
/* Replication: Primary side - connections association.
* During dual channel sync, association is used to keep replication data
* in the backlog until the replica requests PSYNC.
@ -945,6 +970,9 @@ need_full_resync:
* of the replicas waiting for this BGSAVE, so represents the replica capabilities
* all the replicas support. Can be tested via REPLICA_CAPA_* macros.
*
* The rdbver argument is the RDB version to use. It should be calculated based
* on what the replicas reported using REPLCONF VERSION.
*
* Side effects, other than starting a BGSAVE:
*
* 1) Handle the replicas in WAIT_START state, by preparing them for a full
@ -955,16 +983,23 @@ need_full_resync:
* started.
*
* Returns C_OK on success or C_ERR otherwise. */
int startBgsaveForReplication(int mincapa, int req) {
int startBgsaveForReplication(int mincapa, int req, int rdbver) {
int retval;
int socket_target = 0;
listIter li;
listNode *ln;
/* We use a socket target if replica can handle the EOF marker and we're configured to do diskless syncs.
* Note that in case we're creating a "filtered" RDB (functions-only, for example) we also force socket replication
* to avoid overwriting the snapshot RDB file with filtered data. */
socket_target = (server.repl_diskless_sync || req & REPLICA_REQ_RDB_MASK) && (mincapa & REPLICA_CAPA_EOF);
/* We use a socket target if replica can handle the EOF marker and we're
* configured to do diskless syncs.
*
* Note that in case we're creating a "filtered" RDB (functions-only, for
* example) or an older RDB version, we also force socket replication to
* avoid overwriting the snapshot RDB file, which needs to be usable by
* other replicas (not using filtered RDB or older versions) in disk-based
* full sync. */
socket_target = (mincapa & REPLICA_CAPA_EOF) && (server.repl_diskless_sync ||
(req & REPLICA_REQ_RDB_MASK) ||
rdbver != RDB_VERSION);
/* `SYNC` should have failed with error if we don't support socket and require a filter, assert this here */
serverAssert(socket_target || !(req & REPLICA_REQ_RDB_MASK));
@ -978,7 +1013,7 @@ int startBgsaveForReplication(int mincapa, int req) {
* otherwise replica will miss repl-stream-db. */
if (rsiptr) {
if (socket_target)
retval = rdbSaveToReplicasSockets(req, rsiptr);
retval = rdbSaveToReplicasSockets(req, rdbver, rsiptr);
else {
/* Keep the page cache since it'll get used soon */
retval = rdbSaveBackground(req, server.rdb_filename, rsiptr, RDBFLAGS_REPLICATION | RDBFLAGS_KEEP_CACHE);
@ -1027,6 +1062,7 @@ int startBgsaveForReplication(int mincapa, int req) {
if (replica->repl_data->repl_state == REPLICA_STATE_WAIT_BGSAVE_START) {
/* Check replica has the exact requirements */
if (replica->repl_data->replica_req != req) continue;
if (replicaRdbVersion(replica) != rdbver) continue;
replicationSetupReplicaForFullResync(replica, getPsyncInitialOffset());
}
}
@ -1227,7 +1263,9 @@ void syncCommand(client *c) {
/* We don't have a BGSAVE in progress, let's start one. Diskless
* or disk-based mode is determined by replica's capacity. */
if (!hasActiveChildProcess()) {
startBgsaveForReplication(c->repl_data->replica_capa, c->repl_data->replica_req);
startBgsaveForReplication(c->repl_data->replica_capa,
c->repl_data->replica_req,
replicaRdbVersion(c));
} else {
serverLog(LL_NOTICE, "No BGSAVE in progress, but another BG operation is active. "
"BGSAVE for replication delayed");
@ -5325,7 +5363,7 @@ void replicationCron(void) {
replication_cron_loops++; /* Incremented with frequency 1 HZ. */
}
int shouldStartChildReplication(int *mincapa_out, int *req_out) {
int shouldStartChildReplication(int *mincapa_out, int *req_out, int *rdbver_out) {
/* We should start a BGSAVE good for replication if we have replicas in
* WAIT_BGSAVE_START state.
*
@ -5336,6 +5374,7 @@ int shouldStartChildReplication(int *mincapa_out, int *req_out) {
time_t idle, max_idle = 0;
int replicas_waiting = 0;
int mincapa;
int rdbver;
int req;
int first = 1;
listNode *ln;
@ -5348,7 +5387,9 @@ int shouldStartChildReplication(int *mincapa_out, int *req_out) {
if (first) {
/* Get first replica's requirements */
req = replica->repl_data->replica_req;
} else if (req != replica->repl_data->replica_req) {
rdbver = replicaRdbVersion(replica);
} else if (req != replica->repl_data->replica_req ||
rdbver != replicaRdbVersion(replica)) {
/* Skip replicas that don't match */
continue;
}
@ -5366,6 +5407,7 @@ int shouldStartChildReplication(int *mincapa_out, int *req_out) {
max_idle >= server.repl_diskless_sync_delay)) {
if (mincapa_out) *mincapa_out = mincapa;
if (req_out) *req_out = req;
if (rdbver_out) *rdbver_out = rdbver;
return 1;
}
}
@ -5376,12 +5418,13 @@ int shouldStartChildReplication(int *mincapa_out, int *req_out) {
void replicationStartPendingFork(void) {
int mincapa = -1;
int req = -1;
int rdbver = -1;
if (shouldStartChildReplication(&mincapa, &req)) {
if (shouldStartChildReplication(&mincapa, &req, &rdbver)) {
/* Start the BGSAVE. The called function may start a
* BGSAVE with socket target or disk target depending on the
* configuration and replicas capabilities and requirements. */
startBgsaveForReplication(mincapa, req);
startBgsaveForReplication(mincapa, req, rdbver);
}
}

View File

@ -3116,6 +3116,7 @@ void abortFailover(const char *err);
const char *getFailoverStateString(void);
sds getReplicaPortString(void);
int sendCurrentOffsetToReplica(client *replica);
int replicaRdbVersion(client *replica);
void addRdbReplicaToPsyncWait(client *replica);
void initClientReplicationData(client *c);
void freeClientReplicationData(client *c);

View File

@ -1,18 +1,9 @@
# Test replication from an older version primary.
#
# Use minimal.conf to make sure we don't use any configs not supported on the old version.
proc server_name_and_version {} {
set server_name [s server_name]
if {$server_name eq {}} {
set server_name redis
}
set server_version [s "${server_name}_version"]
return "$server_name $server_version"
}
start_server {tags {"repl needs:other-server external:skip compatible-redis"} start-other-server 1 config "minimal.conf"} {
set primary_name_and_version [server_name_and_version]
set hello [r hello]
set primary_name_and_version "[dict get $hello server] [dict get $hello version]"
r set foo bar
start_server {} {
@ -32,3 +23,68 @@ start_server {tags {"repl needs:other-server external:skip compatible-redis"} st
}
}
}
# Test replication from the current version to an older version replica.
start_server {tags {"repl needs:other-server external:skip"}} {
set primary [srv 0 client]
set primary_host [srv 0 host]
set primary_port [srv 0 port]
$primary config set repl-diskless-sync yes
$primary config set repl-diskless-sync-delay 1
# As a side-effect, this first start_server block initializes old_replica_version which
# is used in the tests below.
start_server {start-other-server 1 config "minimal.conf"} {
set hello [r hello]
set old_replica_version [dict get $hello version]
# set replica_name_and_version "[dict get $hello server] $replica_version"
set old_replica [srv 0 client]
start_server {} {
set new_replica [srv 0 client]
test {Keys can be sync'ed by old and new replicas} {
$primary set foo bar
$old_replica replicaof $primary_host $primary_port
$new_replica replicaof $primary_host $primary_port
wait_for_sync $old_replica 500 100
wait_for_sync $new_replica 500 100
assert_equal bar [$old_replica get foo]
assert_equal bar [$new_replica get foo]
}
}
}
test "Old pre-HFE replica can't sync but doesn't prevent new replica from sync" {
if {[version_greater_or_equal $old_replica_version 9.0.0]} {
skip "Replica $old_replica_version does support HFE"
}
r flushall
r hsetex hfe ex 1000 fields 1 field1 value1
start_server {start-other-server 1 config "minimal.conf"} {
set old_replica [srv 0 client]
start_server {} {
set new_replica [srv 0 client]
$old_replica replicaof $primary_host $primary_port
$new_replica replicaof $primary_host $primary_port
wait_for_sync $new_replica 500 100
wait_for_log_messages -2 [list {*Can't store key 'hfe'*}] 0 50 100
assert_equal value1 [$new_replica hget hfe field1]
assert_match {*master_link_status:up*} [$new_replica info replication]
assert_match {*master_link_status:down*} [$old_replica info replication]
}
}
}
test "Replica with HFE support can full sync" {
if {![version_greater_or_equal $old_replica_version 9.0.0]} {
skip "Replica $old_replica_version doesn't support HFE"
}
r flushall
r hsetex hfe ex 1000 fields 1 field1 value1
start_server {start-other-server 1 config "minimal.conf"} {
set old_replica [srv 0 client]
$old_replica replicaof $primary_host $primary_port
wait_for_sync $old_replica 500 100
assert_equal value1 [$old_replica hget hfe field1]
}
}
}

View File

@ -1280,3 +1280,22 @@ proc bp {{s {}}} {
puts $res
}
}
# Compares two version strings. Returns 1 if a >= b, 0 otherwise.
proc version_greater_or_equal {a b} {
regexp {^([0-9]+)\.([0-9]+)\.([0-9]+)$} $a -> a_major a_minor a_patch
regexp {^([0-9]+)\.([0-9]+)\.([0-9]+)$} $b -> b_major b_minor b_patch
if {$a_major < $b_major} {
return 0
} elseif {$a_major > $b_major} {
return 1
} elseif {$a_minor < $b_minor} {
return 0
} elseif {$a_minor > $b_minor} {
return 1
} elseif {$a_patch < $b_patch} {
return 0
} else {
return 1
}
}

View File

@ -17,9 +17,8 @@ tags {external:skip needs:other-server cluster singledb} {
set primary_id [$primary cluster myid]
start_server {config "minimal-cluster.conf" start-other-server 1 overrides {cluster-ping-interval 1000}} {
set res [dict get [r hello] version]
assert [regexp {([0-9]+)\.([0-9]+)\.[0-9]+} $res -> major minor]
if {($major < 8) || ($major == 8 && $minor < 1)} {
set version [dict get [r hello] version]
if {![version_greater_or_equal $version 8.1.0]} {
skip "Requires Valkey 8.1 or above"
}
r config set rdb-version-check relaxed