diff --git a/src/aof.c b/src/aof.c index 1ec77dc0e..c7c0731b9 100644 --- a/src/aof.c +++ b/src/aof.c @@ -1459,7 +1459,7 @@ int loadSingleAppendOnlyFile(char *filename) { if (server.repl_backlog == NULL) createReplicationBacklog(); rdb_flags |= RDBFLAGS_FEED_REPL; } - if (rdbLoadRio(&rdb, rdb_flags, &rsi) != C_OK) { + if (rdbLoadRio(&rdb, rdb_flags, &rsi) != RDB_OK) { if (old_style) serverLog(LL_WARNING, "Error reading the RDB preamble of the AOF file %s, AOF loading aborted", filename); diff --git a/src/debug.c b/src/debug.c index ee0720f04..cdd76e4a1 100644 --- a/src/debug.c +++ b/src/debug.c @@ -586,7 +586,7 @@ void debugCommand(client *c) { /* The default behavior is to remove the current dataset from * memory before loading the RDB file, however when MERGE is * used together with NOFLUSH, we are able to merge two datasets. */ - if (flush) emptyData(-1, EMPTYDB_NO_FLAGS, NULL); + if (flush) flags |= RDBFLAGS_EMPTY_DATA; protectClient(c); int ret = rdbLoad(server.rdb_filename, NULL, flags); diff --git a/src/module.c b/src/module.c index ab9361153..ae5f62278 100644 --- a/src/module.c +++ b/src/module.c @@ -13469,8 +13469,6 @@ int VM_RdbLoad(ValkeyModuleCtx *ctx, ValkeyModuleRdbStream *stream, int flags) { * will prevent COW memory issue. */ if (server.child_type == CHILD_TYPE_SLOT_MIGRATION) killSlotMigrationChild(); - emptyData(-1, EMPTYDB_NO_FLAGS, NULL); - /* rdbLoad() can go back to the networking and process network events. If * VM_RdbLoad() is called inside a command callback, we don't want to * process the current client. Otherwise, we may free the client or try to @@ -13478,7 +13476,7 @@ int VM_RdbLoad(ValkeyModuleCtx *ctx, ValkeyModuleRdbStream *stream, int flags) { if (server.current_client) protectClient(server.current_client); serverAssert(stream->type == VALKEYMODULE_RDB_STREAM_FILE); - int ret = rdbLoad(stream->data.filename, NULL, RDBFLAGS_NONE); + int ret = rdbLoad(stream->data.filename, NULL, RDBFLAGS_EMPTY_DATA); if (server.current_client) unprotectClient(server.current_client); diff --git a/src/rdb.c b/src/rdb.c index 9b07d4d36..98b8a3d43 100644 --- a/src/rdb.c +++ b/src/rdb.c @@ -75,6 +75,7 @@ extern int rdbCheckMode; void rdbCheckError(const char *fmt, ...); void rdbCheckSetError(const char *fmt, ...); int rdbLoadRioWithLoadingCtx(rio *rdb, int rdbflags, rdbSaveInfo *rsi, rdbLoadingCtx *rdb_loading_ctx); +void replicationEmptyDbCallback(hashtable *ht); /* Returns true if the RDB version is valid and accepted, false otherwise. This * function takes configuration into account. The parameter `is_valkey_magic` @@ -3082,8 +3083,10 @@ int rdbLoadRioWithLoadingCtxScopedRdb(rio *rdb, int rdbflags, rdbSaveInfo *rsi, return retval; } -/* Load an RDB file from the rio stream 'rdb'. On success C_OK is returned, - * otherwise C_ERR is returned. +/* Load an RDB file from the rio stream 'rdb'. We return one of the following: + * - RDB_OK On success + * - RDB_INCOMPATIBLE If the RDB has an invalid signature or version + * - RDB_FAILED in all other failure cases * The rdb_loading_ctx argument holds objects to which the rdb will be loaded to, * currently it only allow to set db object and functionLibCtx to which the data * will be loaded (in the future it might contains more such objects). */ @@ -3092,10 +3095,6 @@ int rdbLoadRioWithLoadingCtx(rio *rdb, int rdbflags, rdbSaveInfo *rsi, rdbLoadin int type, rdbver; uint64_t db_size = 0, expires_size = 0; int should_expand_db = 0; - if (rdb_loading_ctx->dbarray[0] == NULL) { - rdb_loading_ctx->dbarray[0] = createDatabase(0); - } - serverDb *db = rdb_loading_ctx->dbarray[0]; char buf[1024]; int error; long long empty_keys_skipped = 0; @@ -3111,14 +3110,30 @@ int rdbLoadRioWithLoadingCtx(rio *rdb, int rdbflags, rdbSaveInfo *rsi, rdbLoadin is_valkey_magic = true; } else { serverLog(LL_WARNING, "Wrong signature trying to load DB from file: %.9s", buf); - return C_ERR; + /* Signal to terminate the rdbLoad without clearing existing data */ + return RDB_INCOMPATIBLE; } rdbver = atoi(buf + 6); if (!rdbIsVersionAccepted(rdbver, is_valkey_magic, is_redis_magic)) { serverLog(LL_WARNING, "Can't handle RDB format version %d", rdbver); - return C_ERR; + return RDB_INCOMPATIBLE; } + /* Only empty data if RDBFLAGS_EMPTY_DATA is set */ + if (rdbflags & RDBFLAGS_EMPTY_DATA) { + int empty_db_flags = server.repl_replica_lazy_flush ? EMPTYDB_ASYNC : EMPTYDB_NO_FLAGS; + serverLog(LL_NOTICE, "RDB signature and version check passed. Flushing old data"); + emptyData(-1, empty_db_flags, replicationEmptyDbCallback); + + /* functionsLibCtx is cleared when we call emptyData, reinitialize here. */ + rdb_loading_ctx->functions_lib_ctx = functionsLibCtxGetCurrent(); + } + + if (rdb_loading_ctx->dbarray[0] == NULL) { + rdb_loading_ctx->dbarray[0] = createDatabase(0); + } + serverDb *db = rdb_loading_ctx->dbarray[0]; + /* Key-specific attributes, set by opcodes before the key type. */ long long lru_idle = -1, lfu_freq = -1, expiretime = -1, now = mstime(); long long lru_clock = LRU_CLOCK(); @@ -3134,7 +3149,7 @@ int rdbLoadRioWithLoadingCtx(rio *rdb, int rdbflags, rdbSaveInfo *rsi, rdbLoadin if (is_redis_magic && type >= RDB_FOREIGN_TYPE_MIN && type <= RDB_FOREIGN_TYPE_MAX) { serverLog(LL_WARNING, "Can't handle foreign type or opcode %d in RDB with version %d", type, rdbver); - return C_ERR; + return RDB_FAILED; } /* Handle special types. */ @@ -3418,7 +3433,7 @@ int rdbLoadRioWithLoadingCtx(rio *rdb, int rdbflags, rdbSaveInfo *rsi, rdbLoadin } else if (error == RDB_LOAD_ERR_UNKNOWN_TYPE) { sdsfree(key); serverLog(LL_WARNING, "Unknown type or opcode when loading DB. Unrecoverable error, aborting now."); - return C_ERR; + return RDB_FAILED; } else { sdsfree(key); goto eoferr; @@ -3502,7 +3517,7 @@ int rdbLoadRioWithLoadingCtx(rio *rdb, int rdbflags, rdbSaveInfo *rsi, rdbLoadin "got (%llx). Aborting now.", (unsigned long long)expected, (unsigned long long)cksum); rdbReportCorruptRDB("RDB CRC error"); - return C_ERR; + return RDB_FAILED; } } } @@ -3514,7 +3529,7 @@ int rdbLoadRioWithLoadingCtx(rio *rdb, int rdbflags, rdbSaveInfo *rsi, rdbLoadin serverLog(LL_NOTICE, "Done loading RDB, keys loaded: %lld, keys expired: %lld.", server.rdb_last_load_keys_loaded, server.rdb_last_load_keys_expired); } - return C_OK; + return RDB_OK; /* Unexpected end of file is handled here calling rdbReportReadError(): * this will in turn either abort the server in most cases, or if we are loading @@ -3523,7 +3538,7 @@ int rdbLoadRioWithLoadingCtx(rio *rdb, int rdbflags, rdbSaveInfo *rsi, rdbLoadin eoferr: serverLog(LL_WARNING, "Short read or OOM loading DB. Unrecoverable error, aborting now."); rdbReportReadError("Unexpected EOF reading RDB file"); - return C_ERR; + return RDB_FAILED; } /* Like rdbLoadRio() but takes a filename instead of a rio stream. The @@ -3556,14 +3571,15 @@ int rdbLoad(char *filename, rdbSaveInfo *rsi, int rdbflags) { retval = rdbLoadRio(&rdb, rdbflags, rsi); fclose(fp); - stopLoading(retval == C_OK); + stopLoading(retval == RDB_OK); /* Reclaim the cache backed by rdb */ - if (retval == C_OK && !(rdbflags & RDBFLAGS_KEEP_CACHE)) { + if (retval == RDB_OK && !(rdbflags & RDBFLAGS_KEEP_CACHE)) { /* TODO: maybe we could combine the fopen and open into one in the future */ rdb_fd = open(filename, O_RDONLY); if (rdb_fd >= 0) bioCreateCloseJob(rdb_fd, 0, 1); } - return (retval == C_OK) ? RDB_OK : RDB_FAILED; + + return retval; } /* A background saving child (BGSAVE) terminated its work. Handle this. diff --git a/src/rdb.h b/src/rdb.h index 7ed8973f0..2aed096cc 100644 --- a/src/rdb.h +++ b/src/rdb.h @@ -166,6 +166,7 @@ enum RdbType { #define RDBFLAGS_ALLOW_DUP (1 << 2) /* Allow duplicated keys when loading.*/ #define RDBFLAGS_FEED_REPL (1 << 3) /* Feed replication stream when loading.*/ #define RDBFLAGS_KEEP_CACHE (1 << 4) /* Don't reclaim cache after rdb file is generated */ +#define RDBFLAGS_EMPTY_DATA (1 << 5) /* Flush the database after validating magic and rdb version*/ /* When rdbLoadObject() returns NULL, the err flag is * set to hold the type of error that occurred */ @@ -213,5 +214,6 @@ int rdbSaveRio(int req, rio *rdb, int *error, int rdbflags, rdbSaveInfo *rsi); ssize_t rdbSaveFunctions(rio *rdb); rdbSaveInfo *rdbPopulateSaveInfo(rdbSaveInfo *rsi); int rdbRestoreOffsetFromSaveInfo(rdbSaveInfo *rsi, bool is_aof_preamble); +void replicationEmptyDbCallback(hashtable *ht); #endif diff --git a/src/replication.c b/src/replication.c index 6b215f4b9..ea6aa729a 100644 --- a/src/replication.c +++ b/src/replication.c @@ -2069,8 +2069,8 @@ void replicationSendNewlineToPrimary(void) { /* Callback used by emptyData() while flushing away old data to load * the new dataset received by the primary and by discardTempDb() * after loading succeeded or failed. */ -void replicationEmptyDbCallback(hashtable *d) { - UNUSED(d); +void replicationEmptyDbCallback(hashtable *ht) { + UNUSED(ht); if (server.repl_state == REPL_STATE_TRANSFER) replicationSendNewlineToPrimary(); } @@ -2366,11 +2366,6 @@ int replicaLoadPrimaryRDBFromSocket(connection *conn, char *buf, char *eofmark, /* We will soon start loading the RDB from socket, the replication history is changed, * we must discard the cached primary structure and force resync of sub-replicas. */ replicationAttachToNewPrimary(); - - /* Even though we are on-empty-db and the database is empty, we still call emptyData. */ - serverLog(LL_NOTICE, "PRIMARY <-> REPLICA sync: Flushing old data"); - emptyData(-1, empty_db_flags, replicationEmptyDbCallback); - dbarray = server.db; functions_lib_ctx = functionsLibCtxGetCurrent(); } @@ -2387,7 +2382,11 @@ int replicaLoadPrimaryRDBFromSocket(connection *conn, char *buf, char *eofmark, if (replicationSupportSkipRDBChecksum(conn, 1, *usemark)) rdb.flags |= RIO_FLAG_SKIP_RDB_CHECKSUM; int loadingFailed = 0; rdbLoadingCtx loadingCtx = {.dbarray = dbarray, .functions_lib_ctx = functions_lib_ctx}; - if (rdbLoadRioWithLoadingCtxScopedRdb(&rdb, RDBFLAGS_REPLICATION, rsi, &loadingCtx) != C_OK) { + /* If we aren't using the swapdb method, then we want to empty the data before loading the rdb */ + int flags = RDBFLAGS_REPLICATION; + if (server.repl_diskless_load != REPL_DISKLESS_LOAD_SWAPDB) flags |= RDBFLAGS_EMPTY_DATA; + int retval = rdbLoadRioWithLoadingCtxScopedRdb(&rdb, flags, rsi, &loadingCtx); + if (retval != RDB_OK) { /* RDB loading failed. */ serverLog(LL_WARNING, "Failed trying to load the PRIMARY synchronization DB " "from socket, check server logs."); @@ -2413,9 +2412,14 @@ int replicaLoadPrimaryRDBFromSocket(connection *conn, char *buf, char *eofmark, disklessLoadDiscardFunctionsLibCtx(temp_functions_lib_ctx); serverLog(LL_NOTICE, "PRIMARY <-> REPLICA sync: Discarding temporary DB in background"); } else { - /* Remove the half-loaded data in case we started with an empty replica. */ - serverLog(LL_NOTICE, "PRIMARY <-> REPLICA sync: Discarding the half-loaded data"); - emptyData(-1, empty_db_flags, replicationEmptyDbCallback); + /* If we received RDB_INCOMPATIBLE, the old data was preserved */ + if (retval == RDB_INCOMPATIBLE) { + serverLog(LL_NOTICE, "PRIMARY <-> REPLICA sync: RDB version or signature incompatible, old data preserved"); + } else { + /* Remove the half-loaded data in case the load failed for other reasons. */ + serverLog(LL_NOTICE, "PRIMARY <-> REPLICA sync: Discarding the half-loaded data"); + emptyData(-1, empty_db_flags, replicationEmptyDbCallback); + } } /* Note that there's no point in restarting the AOF on SYNC @@ -2496,14 +2500,12 @@ int replicaLoadPrimaryRDBFromDisk(rdbSaveInfo *rsi) { * we must discard the cached primary structure and force resync of sub-replicas. */ replicationAttachToNewPrimary(); - /* Empty the databases only after the RDB file is ok, that is, before the RDB file - * is actually loaded, in case we encounter an error and drop the replication stream - * and leave an empty database. */ - serverLog(LL_NOTICE, "PRIMARY <-> REPLICA sync: Flushing old data"); - emptyData(-1, empty_db_flags, replicationEmptyDbCallback); - + /* We pass RDBFLAGS_EMPTY_DATA to call emptyData() after validating rdb compatibility + * and before loading the data from the RDB */ serverLog(LL_NOTICE, "PRIMARY <-> REPLICA sync: Loading DB in memory"); - if (rdbLoad(server.rdb_filename, rsi, RDBFLAGS_REPLICATION) != RDB_OK) { + int retval = rdbLoad(server.rdb_filename, rsi, RDBFLAGS_REPLICATION | RDBFLAGS_EMPTY_DATA); + + if (retval != RDB_OK) { serverLog(LL_WARNING, "Failed trying to load the PRIMARY synchronization " "DB from disk, check server logs."); if (server.rdb_del_sync_files && allPersistenceDisabled()) { @@ -2513,9 +2515,15 @@ int replicaLoadPrimaryRDBFromDisk(rdbSaveInfo *rsi) { bg_unlink(server.rdb_filename); } - /* If disk-based RDB loading fails, remove the half-loaded dataset. */ - serverLog(LL_NOTICE, "PRIMARY <-> REPLICA sync: Discarding the half-loaded data"); - emptyData(-1, empty_db_flags, replicationEmptyDbCallback); + /* If RDB failed compatibility check, we did not load the new data set or flush our old data. */ + if (retval == RDB_INCOMPATIBLE) { + serverLog(LL_NOTICE, "PRIMARY <-> REPLICA sync: Skipping flush, no new data was loaded."); + } else { + /* If disk-based RDB loading fails, remove the half-loaded dataset. */ + serverLog(LL_NOTICE, "PRIMARY <-> REPLICA sync: Discarding the half-loaded data"); + emptyData(-1, empty_db_flags, replicationEmptyDbCallback); + } + /* Note that there's no point in restarting the AOF on sync failure, * it'll be restarted when sync succeeds or replica promoted. */ diff --git a/src/server.h b/src/server.h index b930b7511..1413712b0 100644 --- a/src/server.h +++ b/src/server.h @@ -347,8 +347,9 @@ extern int configOOMScoreAdjValuesDefaults[CONFIG_OOM_COUNT]; /* RDB return values for rdbLoad. */ #define RDB_OK 0 -#define RDB_NOT_EXIST 1 /* RDB file doesn't exist. */ -#define RDB_FAILED 2 /* Failed to load the RDB file. */ +#define RDB_NOT_EXIST 1 /* RDB file doesn't exist. */ +#define RDB_INCOMPATIBLE 2 /* RDB version or signature is not compatible */ +#define RDB_FAILED 3 /* Failed to load the RDB file. */ /* Command doc flags */ #define CMD_DOC_NONE 0 diff --git a/tests/integration/rdb.tcl b/tests/integration/rdb.tcl index efc304f28..d4e979d7a 100644 --- a/tests/integration/rdb.tcl +++ b/tests/integration/rdb.tcl @@ -567,4 +567,32 @@ start_server {} { } {OK} } +start_server {} { + test {RDB Load from incompatible version preserves data} { + # Set test keys + r set testkey1 "value1" + r set testkey2 "value2" + + # Use RDB with version 987. + # This emulates a full sync from a server with a future version + set server_dir [lindex [r config get dir] 1] + set rdb_filename [lindex [r config get dbfilename] 1] + set rdb_path "$server_dir/$rdb_filename" + exec cp tests/assets/encodings-rdb987.rdb $rdb_path + + # Reload will trigger the rdbLoad code path with the RDBFLAGS_EMPTY_DATA flag + catch {r debug reload nosave} + + # Check that version error appears in logs + verify_log_message 0 "*Can't handle RDB format version*" 0 + + # Verify we don't enter the flushing code path + verify_no_log_message 0 "*RDB signature and version check passed*" 0 + + # Verify our original data is not flushed + assert_equal [r get testkey1] "value1" + assert_equal [r get testkey2] "value2" + } +} + } ;# tags