Perform data cleanup during RDB load on successful version/signature validation (#2600)

Addresses: https://github.com/valkey-io/valkey/issues/2588

## Overview
Previously we call `emptyData()` during a fullSync before validating the
RDB version is compatible.

This change adds an rdb flag that allows us to flush the database from
within `rdbLoadRioWithLoadingCtx`. THhis provides the option to only
flush the data if the rdb has a valid version and signature. In the case
where we do have an invalid version and signature, we don't emptyData,
so if a full sync fails for that reason a replica can still serve stale
data instead of clients experiencing cache misses.

## Changes
- Added a new flag `RDBFLAGS_EMPTY_DATA` that signals to flush the
database after rdb validation
- Added logic to call `emptyData` in `rdbLoadRioWithLoadingCtx` in
`rdb.c`
- Added logic to not clear data if the RDB validation fails in
`replication.c` using new return type `RDB_INCOMPATIBLE`
- Modified the signature of `rdbLoadRioWithLoadingCtx` to return RDB
success codes and updated all calling sites.

## Testing
Added a tcl test that uses the debug command `reload nosave` to load
from an RDB that has a future version number. This triggers the same
code path that full sync's will use, and verifies that we don't flush
the data until after the validation is complete.

A test already exists that checks that the data is flushed:
https://github.com/valkey-io/valkey/blob/unstable/tests/integration/replication.tcl#L1504

---------

Signed-off-by: Venkat Pamulapati <pamuvenk@amazon.com>
Signed-off-by: Venkat Pamulapati <33398322+ChiliPaneer@users.noreply.github.com>
Co-authored-by: Venkat Pamulapati <pamuvenk@amazon.com>
Co-authored-by: Harkrishn Patro <bunty.hari@gmail.com>
This commit is contained in:
Venkat Pamulapati 2025-11-18 17:08:10 -08:00 committed by GitHub
parent 57892663be
commit 3c3a1966ec
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
8 changed files with 97 additions and 44 deletions

View File

@ -1459,7 +1459,7 @@ int loadSingleAppendOnlyFile(char *filename) {
if (server.repl_backlog == NULL) createReplicationBacklog(); if (server.repl_backlog == NULL) createReplicationBacklog();
rdb_flags |= RDBFLAGS_FEED_REPL; rdb_flags |= RDBFLAGS_FEED_REPL;
} }
if (rdbLoadRio(&rdb, rdb_flags, &rsi) != C_OK) { if (rdbLoadRio(&rdb, rdb_flags, &rsi) != RDB_OK) {
if (old_style) if (old_style)
serverLog(LL_WARNING, "Error reading the RDB preamble of the AOF file %s, AOF loading aborted", serverLog(LL_WARNING, "Error reading the RDB preamble of the AOF file %s, AOF loading aborted",
filename); filename);

View File

@ -586,7 +586,7 @@ void debugCommand(client *c) {
/* The default behavior is to remove the current dataset from /* The default behavior is to remove the current dataset from
* memory before loading the RDB file, however when MERGE is * memory before loading the RDB file, however when MERGE is
* used together with NOFLUSH, we are able to merge two datasets. */ * used together with NOFLUSH, we are able to merge two datasets. */
if (flush) emptyData(-1, EMPTYDB_NO_FLAGS, NULL); if (flush) flags |= RDBFLAGS_EMPTY_DATA;
protectClient(c); protectClient(c);
int ret = rdbLoad(server.rdb_filename, NULL, flags); int ret = rdbLoad(server.rdb_filename, NULL, flags);

View File

@ -13469,8 +13469,6 @@ int VM_RdbLoad(ValkeyModuleCtx *ctx, ValkeyModuleRdbStream *stream, int flags) {
* will prevent COW memory issue. */ * will prevent COW memory issue. */
if (server.child_type == CHILD_TYPE_SLOT_MIGRATION) killSlotMigrationChild(); if (server.child_type == CHILD_TYPE_SLOT_MIGRATION) killSlotMigrationChild();
emptyData(-1, EMPTYDB_NO_FLAGS, NULL);
/* rdbLoad() can go back to the networking and process network events. If /* rdbLoad() can go back to the networking and process network events. If
* VM_RdbLoad() is called inside a command callback, we don't want to * VM_RdbLoad() is called inside a command callback, we don't want to
* process the current client. Otherwise, we may free the client or try to * process the current client. Otherwise, we may free the client or try to
@ -13478,7 +13476,7 @@ int VM_RdbLoad(ValkeyModuleCtx *ctx, ValkeyModuleRdbStream *stream, int flags) {
if (server.current_client) protectClient(server.current_client); if (server.current_client) protectClient(server.current_client);
serverAssert(stream->type == VALKEYMODULE_RDB_STREAM_FILE); serverAssert(stream->type == VALKEYMODULE_RDB_STREAM_FILE);
int ret = rdbLoad(stream->data.filename, NULL, RDBFLAGS_NONE); int ret = rdbLoad(stream->data.filename, NULL, RDBFLAGS_EMPTY_DATA);
if (server.current_client) unprotectClient(server.current_client); if (server.current_client) unprotectClient(server.current_client);

View File

@ -75,6 +75,7 @@ extern int rdbCheckMode;
void rdbCheckError(const char *fmt, ...); void rdbCheckError(const char *fmt, ...);
void rdbCheckSetError(const char *fmt, ...); void rdbCheckSetError(const char *fmt, ...);
int rdbLoadRioWithLoadingCtx(rio *rdb, int rdbflags, rdbSaveInfo *rsi, rdbLoadingCtx *rdb_loading_ctx); int rdbLoadRioWithLoadingCtx(rio *rdb, int rdbflags, rdbSaveInfo *rsi, rdbLoadingCtx *rdb_loading_ctx);
void replicationEmptyDbCallback(hashtable *ht);
/* Returns true if the RDB version is valid and accepted, false otherwise. This /* Returns true if the RDB version is valid and accepted, false otherwise. This
* function takes configuration into account. The parameter `is_valkey_magic` * function takes configuration into account. The parameter `is_valkey_magic`
@ -3082,8 +3083,10 @@ int rdbLoadRioWithLoadingCtxScopedRdb(rio *rdb, int rdbflags, rdbSaveInfo *rsi,
return retval; return retval;
} }
/* Load an RDB file from the rio stream 'rdb'. On success C_OK is returned, /* Load an RDB file from the rio stream 'rdb'. We return one of the following:
* otherwise C_ERR is returned. * - RDB_OK On success
* - RDB_INCOMPATIBLE If the RDB has an invalid signature or version
* - RDB_FAILED in all other failure cases
* The rdb_loading_ctx argument holds objects to which the rdb will be loaded to, * The rdb_loading_ctx argument holds objects to which the rdb will be loaded to,
* currently it only allow to set db object and functionLibCtx to which the data * currently it only allow to set db object and functionLibCtx to which the data
* will be loaded (in the future it might contains more such objects). */ * will be loaded (in the future it might contains more such objects). */
@ -3092,10 +3095,6 @@ int rdbLoadRioWithLoadingCtx(rio *rdb, int rdbflags, rdbSaveInfo *rsi, rdbLoadin
int type, rdbver; int type, rdbver;
uint64_t db_size = 0, expires_size = 0; uint64_t db_size = 0, expires_size = 0;
int should_expand_db = 0; int should_expand_db = 0;
if (rdb_loading_ctx->dbarray[0] == NULL) {
rdb_loading_ctx->dbarray[0] = createDatabase(0);
}
serverDb *db = rdb_loading_ctx->dbarray[0];
char buf[1024]; char buf[1024];
int error; int error;
long long empty_keys_skipped = 0; long long empty_keys_skipped = 0;
@ -3111,14 +3110,30 @@ int rdbLoadRioWithLoadingCtx(rio *rdb, int rdbflags, rdbSaveInfo *rsi, rdbLoadin
is_valkey_magic = true; is_valkey_magic = true;
} else { } else {
serverLog(LL_WARNING, "Wrong signature trying to load DB from file: %.9s", buf); serverLog(LL_WARNING, "Wrong signature trying to load DB from file: %.9s", buf);
return C_ERR; /* Signal to terminate the rdbLoad without clearing existing data */
return RDB_INCOMPATIBLE;
} }
rdbver = atoi(buf + 6); rdbver = atoi(buf + 6);
if (!rdbIsVersionAccepted(rdbver, is_valkey_magic, is_redis_magic)) { if (!rdbIsVersionAccepted(rdbver, is_valkey_magic, is_redis_magic)) {
serverLog(LL_WARNING, "Can't handle RDB format version %d", rdbver); serverLog(LL_WARNING, "Can't handle RDB format version %d", rdbver);
return C_ERR; return RDB_INCOMPATIBLE;
} }
/* Only empty data if RDBFLAGS_EMPTY_DATA is set */
if (rdbflags & RDBFLAGS_EMPTY_DATA) {
int empty_db_flags = server.repl_replica_lazy_flush ? EMPTYDB_ASYNC : EMPTYDB_NO_FLAGS;
serverLog(LL_NOTICE, "RDB signature and version check passed. Flushing old data");
emptyData(-1, empty_db_flags, replicationEmptyDbCallback);
/* functionsLibCtx is cleared when we call emptyData, reinitialize here. */
rdb_loading_ctx->functions_lib_ctx = functionsLibCtxGetCurrent();
}
if (rdb_loading_ctx->dbarray[0] == NULL) {
rdb_loading_ctx->dbarray[0] = createDatabase(0);
}
serverDb *db = rdb_loading_ctx->dbarray[0];
/* Key-specific attributes, set by opcodes before the key type. */ /* Key-specific attributes, set by opcodes before the key type. */
long long lru_idle = -1, lfu_freq = -1, expiretime = -1, now = mstime(); long long lru_idle = -1, lfu_freq = -1, expiretime = -1, now = mstime();
long long lru_clock = LRU_CLOCK(); long long lru_clock = LRU_CLOCK();
@ -3134,7 +3149,7 @@ int rdbLoadRioWithLoadingCtx(rio *rdb, int rdbflags, rdbSaveInfo *rsi, rdbLoadin
if (is_redis_magic && type >= RDB_FOREIGN_TYPE_MIN && type <= RDB_FOREIGN_TYPE_MAX) { if (is_redis_magic && type >= RDB_FOREIGN_TYPE_MIN && type <= RDB_FOREIGN_TYPE_MAX) {
serverLog(LL_WARNING, "Can't handle foreign type or opcode %d in RDB with version %d", serverLog(LL_WARNING, "Can't handle foreign type or opcode %d in RDB with version %d",
type, rdbver); type, rdbver);
return C_ERR; return RDB_FAILED;
} }
/* Handle special types. */ /* Handle special types. */
@ -3418,7 +3433,7 @@ int rdbLoadRioWithLoadingCtx(rio *rdb, int rdbflags, rdbSaveInfo *rsi, rdbLoadin
} else if (error == RDB_LOAD_ERR_UNKNOWN_TYPE) { } else if (error == RDB_LOAD_ERR_UNKNOWN_TYPE) {
sdsfree(key); sdsfree(key);
serverLog(LL_WARNING, "Unknown type or opcode when loading DB. Unrecoverable error, aborting now."); serverLog(LL_WARNING, "Unknown type or opcode when loading DB. Unrecoverable error, aborting now.");
return C_ERR; return RDB_FAILED;
} else { } else {
sdsfree(key); sdsfree(key);
goto eoferr; goto eoferr;
@ -3502,7 +3517,7 @@ int rdbLoadRioWithLoadingCtx(rio *rdb, int rdbflags, rdbSaveInfo *rsi, rdbLoadin
"got (%llx). Aborting now.", "got (%llx). Aborting now.",
(unsigned long long)expected, (unsigned long long)cksum); (unsigned long long)expected, (unsigned long long)cksum);
rdbReportCorruptRDB("RDB CRC error"); rdbReportCorruptRDB("RDB CRC error");
return C_ERR; return RDB_FAILED;
} }
} }
} }
@ -3514,7 +3529,7 @@ int rdbLoadRioWithLoadingCtx(rio *rdb, int rdbflags, rdbSaveInfo *rsi, rdbLoadin
serverLog(LL_NOTICE, "Done loading RDB, keys loaded: %lld, keys expired: %lld.", serverLog(LL_NOTICE, "Done loading RDB, keys loaded: %lld, keys expired: %lld.",
server.rdb_last_load_keys_loaded, server.rdb_last_load_keys_expired); server.rdb_last_load_keys_loaded, server.rdb_last_load_keys_expired);
} }
return C_OK; return RDB_OK;
/* Unexpected end of file is handled here calling rdbReportReadError(): /* Unexpected end of file is handled here calling rdbReportReadError():
* this will in turn either abort the server in most cases, or if we are loading * this will in turn either abort the server in most cases, or if we are loading
@ -3523,7 +3538,7 @@ int rdbLoadRioWithLoadingCtx(rio *rdb, int rdbflags, rdbSaveInfo *rsi, rdbLoadin
eoferr: eoferr:
serverLog(LL_WARNING, "Short read or OOM loading DB. Unrecoverable error, aborting now."); serverLog(LL_WARNING, "Short read or OOM loading DB. Unrecoverable error, aborting now.");
rdbReportReadError("Unexpected EOF reading RDB file"); rdbReportReadError("Unexpected EOF reading RDB file");
return C_ERR; return RDB_FAILED;
} }
/* Like rdbLoadRio() but takes a filename instead of a rio stream. The /* Like rdbLoadRio() but takes a filename instead of a rio stream. The
@ -3556,14 +3571,15 @@ int rdbLoad(char *filename, rdbSaveInfo *rsi, int rdbflags) {
retval = rdbLoadRio(&rdb, rdbflags, rsi); retval = rdbLoadRio(&rdb, rdbflags, rsi);
fclose(fp); fclose(fp);
stopLoading(retval == C_OK); stopLoading(retval == RDB_OK);
/* Reclaim the cache backed by rdb */ /* Reclaim the cache backed by rdb */
if (retval == C_OK && !(rdbflags & RDBFLAGS_KEEP_CACHE)) { if (retval == RDB_OK && !(rdbflags & RDBFLAGS_KEEP_CACHE)) {
/* TODO: maybe we could combine the fopen and open into one in the future */ /* TODO: maybe we could combine the fopen and open into one in the future */
rdb_fd = open(filename, O_RDONLY); rdb_fd = open(filename, O_RDONLY);
if (rdb_fd >= 0) bioCreateCloseJob(rdb_fd, 0, 1); if (rdb_fd >= 0) bioCreateCloseJob(rdb_fd, 0, 1);
} }
return (retval == C_OK) ? RDB_OK : RDB_FAILED;
return retval;
} }
/* A background saving child (BGSAVE) terminated its work. Handle this. /* A background saving child (BGSAVE) terminated its work. Handle this.

View File

@ -166,6 +166,7 @@ enum RdbType {
#define RDBFLAGS_ALLOW_DUP (1 << 2) /* Allow duplicated keys when loading.*/ #define RDBFLAGS_ALLOW_DUP (1 << 2) /* Allow duplicated keys when loading.*/
#define RDBFLAGS_FEED_REPL (1 << 3) /* Feed replication stream when loading.*/ #define RDBFLAGS_FEED_REPL (1 << 3) /* Feed replication stream when loading.*/
#define RDBFLAGS_KEEP_CACHE (1 << 4) /* Don't reclaim cache after rdb file is generated */ #define RDBFLAGS_KEEP_CACHE (1 << 4) /* Don't reclaim cache after rdb file is generated */
#define RDBFLAGS_EMPTY_DATA (1 << 5) /* Flush the database after validating magic and rdb version*/
/* When rdbLoadObject() returns NULL, the err flag is /* When rdbLoadObject() returns NULL, the err flag is
* set to hold the type of error that occurred */ * set to hold the type of error that occurred */
@ -213,5 +214,6 @@ int rdbSaveRio(int req, rio *rdb, int *error, int rdbflags, rdbSaveInfo *rsi);
ssize_t rdbSaveFunctions(rio *rdb); ssize_t rdbSaveFunctions(rio *rdb);
rdbSaveInfo *rdbPopulateSaveInfo(rdbSaveInfo *rsi); rdbSaveInfo *rdbPopulateSaveInfo(rdbSaveInfo *rsi);
int rdbRestoreOffsetFromSaveInfo(rdbSaveInfo *rsi, bool is_aof_preamble); int rdbRestoreOffsetFromSaveInfo(rdbSaveInfo *rsi, bool is_aof_preamble);
void replicationEmptyDbCallback(hashtable *ht);
#endif #endif

View File

@ -2069,8 +2069,8 @@ void replicationSendNewlineToPrimary(void) {
/* Callback used by emptyData() while flushing away old data to load /* Callback used by emptyData() while flushing away old data to load
* the new dataset received by the primary and by discardTempDb() * the new dataset received by the primary and by discardTempDb()
* after loading succeeded or failed. */ * after loading succeeded or failed. */
void replicationEmptyDbCallback(hashtable *d) { void replicationEmptyDbCallback(hashtable *ht) {
UNUSED(d); UNUSED(ht);
if (server.repl_state == REPL_STATE_TRANSFER) replicationSendNewlineToPrimary(); if (server.repl_state == REPL_STATE_TRANSFER) replicationSendNewlineToPrimary();
} }
@ -2366,11 +2366,6 @@ int replicaLoadPrimaryRDBFromSocket(connection *conn, char *buf, char *eofmark,
/* We will soon start loading the RDB from socket, the replication history is changed, /* We will soon start loading the RDB from socket, the replication history is changed,
* we must discard the cached primary structure and force resync of sub-replicas. */ * we must discard the cached primary structure and force resync of sub-replicas. */
replicationAttachToNewPrimary(); replicationAttachToNewPrimary();
/* Even though we are on-empty-db and the database is empty, we still call emptyData. */
serverLog(LL_NOTICE, "PRIMARY <-> REPLICA sync: Flushing old data");
emptyData(-1, empty_db_flags, replicationEmptyDbCallback);
dbarray = server.db; dbarray = server.db;
functions_lib_ctx = functionsLibCtxGetCurrent(); functions_lib_ctx = functionsLibCtxGetCurrent();
} }
@ -2387,7 +2382,11 @@ int replicaLoadPrimaryRDBFromSocket(connection *conn, char *buf, char *eofmark,
if (replicationSupportSkipRDBChecksum(conn, 1, *usemark)) rdb.flags |= RIO_FLAG_SKIP_RDB_CHECKSUM; if (replicationSupportSkipRDBChecksum(conn, 1, *usemark)) rdb.flags |= RIO_FLAG_SKIP_RDB_CHECKSUM;
int loadingFailed = 0; int loadingFailed = 0;
rdbLoadingCtx loadingCtx = {.dbarray = dbarray, .functions_lib_ctx = functions_lib_ctx}; rdbLoadingCtx loadingCtx = {.dbarray = dbarray, .functions_lib_ctx = functions_lib_ctx};
if (rdbLoadRioWithLoadingCtxScopedRdb(&rdb, RDBFLAGS_REPLICATION, rsi, &loadingCtx) != C_OK) { /* If we aren't using the swapdb method, then we want to empty the data before loading the rdb */
int flags = RDBFLAGS_REPLICATION;
if (server.repl_diskless_load != REPL_DISKLESS_LOAD_SWAPDB) flags |= RDBFLAGS_EMPTY_DATA;
int retval = rdbLoadRioWithLoadingCtxScopedRdb(&rdb, flags, rsi, &loadingCtx);
if (retval != RDB_OK) {
/* RDB loading failed. */ /* RDB loading failed. */
serverLog(LL_WARNING, "Failed trying to load the PRIMARY synchronization DB " serverLog(LL_WARNING, "Failed trying to load the PRIMARY synchronization DB "
"from socket, check server logs."); "from socket, check server logs.");
@ -2413,9 +2412,14 @@ int replicaLoadPrimaryRDBFromSocket(connection *conn, char *buf, char *eofmark,
disklessLoadDiscardFunctionsLibCtx(temp_functions_lib_ctx); disklessLoadDiscardFunctionsLibCtx(temp_functions_lib_ctx);
serverLog(LL_NOTICE, "PRIMARY <-> REPLICA sync: Discarding temporary DB in background"); serverLog(LL_NOTICE, "PRIMARY <-> REPLICA sync: Discarding temporary DB in background");
} else { } else {
/* Remove the half-loaded data in case we started with an empty replica. */ /* If we received RDB_INCOMPATIBLE, the old data was preserved */
serverLog(LL_NOTICE, "PRIMARY <-> REPLICA sync: Discarding the half-loaded data"); if (retval == RDB_INCOMPATIBLE) {
emptyData(-1, empty_db_flags, replicationEmptyDbCallback); serverLog(LL_NOTICE, "PRIMARY <-> REPLICA sync: RDB version or signature incompatible, old data preserved");
} else {
/* Remove the half-loaded data in case the load failed for other reasons. */
serverLog(LL_NOTICE, "PRIMARY <-> REPLICA sync: Discarding the half-loaded data");
emptyData(-1, empty_db_flags, replicationEmptyDbCallback);
}
} }
/* Note that there's no point in restarting the AOF on SYNC /* Note that there's no point in restarting the AOF on SYNC
@ -2496,14 +2500,12 @@ int replicaLoadPrimaryRDBFromDisk(rdbSaveInfo *rsi) {
* we must discard the cached primary structure and force resync of sub-replicas. */ * we must discard the cached primary structure and force resync of sub-replicas. */
replicationAttachToNewPrimary(); replicationAttachToNewPrimary();
/* Empty the databases only after the RDB file is ok, that is, before the RDB file /* We pass RDBFLAGS_EMPTY_DATA to call emptyData() after validating rdb compatibility
* is actually loaded, in case we encounter an error and drop the replication stream * and before loading the data from the RDB */
* and leave an empty database. */
serverLog(LL_NOTICE, "PRIMARY <-> REPLICA sync: Flushing old data");
emptyData(-1, empty_db_flags, replicationEmptyDbCallback);
serverLog(LL_NOTICE, "PRIMARY <-> REPLICA sync: Loading DB in memory"); serverLog(LL_NOTICE, "PRIMARY <-> REPLICA sync: Loading DB in memory");
if (rdbLoad(server.rdb_filename, rsi, RDBFLAGS_REPLICATION) != RDB_OK) { int retval = rdbLoad(server.rdb_filename, rsi, RDBFLAGS_REPLICATION | RDBFLAGS_EMPTY_DATA);
if (retval != RDB_OK) {
serverLog(LL_WARNING, "Failed trying to load the PRIMARY synchronization " serverLog(LL_WARNING, "Failed trying to load the PRIMARY synchronization "
"DB from disk, check server logs."); "DB from disk, check server logs.");
if (server.rdb_del_sync_files && allPersistenceDisabled()) { if (server.rdb_del_sync_files && allPersistenceDisabled()) {
@ -2513,9 +2515,15 @@ int replicaLoadPrimaryRDBFromDisk(rdbSaveInfo *rsi) {
bg_unlink(server.rdb_filename); bg_unlink(server.rdb_filename);
} }
/* If disk-based RDB loading fails, remove the half-loaded dataset. */ /* If RDB failed compatibility check, we did not load the new data set or flush our old data. */
serverLog(LL_NOTICE, "PRIMARY <-> REPLICA sync: Discarding the half-loaded data"); if (retval == RDB_INCOMPATIBLE) {
emptyData(-1, empty_db_flags, replicationEmptyDbCallback); serverLog(LL_NOTICE, "PRIMARY <-> REPLICA sync: Skipping flush, no new data was loaded.");
} else {
/* If disk-based RDB loading fails, remove the half-loaded dataset. */
serverLog(LL_NOTICE, "PRIMARY <-> REPLICA sync: Discarding the half-loaded data");
emptyData(-1, empty_db_flags, replicationEmptyDbCallback);
}
/* Note that there's no point in restarting the AOF on sync failure, /* Note that there's no point in restarting the AOF on sync failure,
* it'll be restarted when sync succeeds or replica promoted. */ * it'll be restarted when sync succeeds or replica promoted. */

View File

@ -347,8 +347,9 @@ extern int configOOMScoreAdjValuesDefaults[CONFIG_OOM_COUNT];
/* RDB return values for rdbLoad. */ /* RDB return values for rdbLoad. */
#define RDB_OK 0 #define RDB_OK 0
#define RDB_NOT_EXIST 1 /* RDB file doesn't exist. */ #define RDB_NOT_EXIST 1 /* RDB file doesn't exist. */
#define RDB_FAILED 2 /* Failed to load the RDB file. */ #define RDB_INCOMPATIBLE 2 /* RDB version or signature is not compatible */
#define RDB_FAILED 3 /* Failed to load the RDB file. */
/* Command doc flags */ /* Command doc flags */
#define CMD_DOC_NONE 0 #define CMD_DOC_NONE 0

View File

@ -567,4 +567,32 @@ start_server {} {
} {OK} } {OK}
} }
start_server {} {
test {RDB Load from incompatible version preserves data} {
# Set test keys
r set testkey1 "value1"
r set testkey2 "value2"
# Use RDB with version 987.
# This emulates a full sync from a server with a future version
set server_dir [lindex [r config get dir] 1]
set rdb_filename [lindex [r config get dbfilename] 1]
set rdb_path "$server_dir/$rdb_filename"
exec cp tests/assets/encodings-rdb987.rdb $rdb_path
# Reload will trigger the rdbLoad code path with the RDBFLAGS_EMPTY_DATA flag
catch {r debug reload nosave}
# Check that version error appears in logs
verify_log_message 0 "*Can't handle RDB format version*" 0
# Verify we don't enter the flushing code path
verify_no_log_message 0 "*RDB signature and version check passed*" 0
# Verify our original data is not flushed
assert_equal [r get testkey1] "value1"
assert_equal [r get testkey2] "value2"
}
}
} ;# tags } ;# tags