mirror of https://github.com/valkey-io/valkey
Perform data cleanup during RDB load on successful version/signature validation (#2600)
Addresses: https://github.com/valkey-io/valkey/issues/2588 ## Overview Previously we call `emptyData()` during a fullSync before validating the RDB version is compatible. This change adds an rdb flag that allows us to flush the database from within `rdbLoadRioWithLoadingCtx`. THhis provides the option to only flush the data if the rdb has a valid version and signature. In the case where we do have an invalid version and signature, we don't emptyData, so if a full sync fails for that reason a replica can still serve stale data instead of clients experiencing cache misses. ## Changes - Added a new flag `RDBFLAGS_EMPTY_DATA` that signals to flush the database after rdb validation - Added logic to call `emptyData` in `rdbLoadRioWithLoadingCtx` in `rdb.c` - Added logic to not clear data if the RDB validation fails in `replication.c` using new return type `RDB_INCOMPATIBLE` - Modified the signature of `rdbLoadRioWithLoadingCtx` to return RDB success codes and updated all calling sites. ## Testing Added a tcl test that uses the debug command `reload nosave` to load from an RDB that has a future version number. This triggers the same code path that full sync's will use, and verifies that we don't flush the data until after the validation is complete. A test already exists that checks that the data is flushed: https://github.com/valkey-io/valkey/blob/unstable/tests/integration/replication.tcl#L1504 --------- Signed-off-by: Venkat Pamulapati <pamuvenk@amazon.com> Signed-off-by: Venkat Pamulapati <33398322+ChiliPaneer@users.noreply.github.com> Co-authored-by: Venkat Pamulapati <pamuvenk@amazon.com> Co-authored-by: Harkrishn Patro <bunty.hari@gmail.com>
This commit is contained in:
parent
57892663be
commit
3c3a1966ec
|
|
@ -1459,7 +1459,7 @@ int loadSingleAppendOnlyFile(char *filename) {
|
|||
if (server.repl_backlog == NULL) createReplicationBacklog();
|
||||
rdb_flags |= RDBFLAGS_FEED_REPL;
|
||||
}
|
||||
if (rdbLoadRio(&rdb, rdb_flags, &rsi) != C_OK) {
|
||||
if (rdbLoadRio(&rdb, rdb_flags, &rsi) != RDB_OK) {
|
||||
if (old_style)
|
||||
serverLog(LL_WARNING, "Error reading the RDB preamble of the AOF file %s, AOF loading aborted",
|
||||
filename);
|
||||
|
|
|
|||
|
|
@ -586,7 +586,7 @@ void debugCommand(client *c) {
|
|||
/* The default behavior is to remove the current dataset from
|
||||
* memory before loading the RDB file, however when MERGE is
|
||||
* used together with NOFLUSH, we are able to merge two datasets. */
|
||||
if (flush) emptyData(-1, EMPTYDB_NO_FLAGS, NULL);
|
||||
if (flush) flags |= RDBFLAGS_EMPTY_DATA;
|
||||
|
||||
protectClient(c);
|
||||
int ret = rdbLoad(server.rdb_filename, NULL, flags);
|
||||
|
|
|
|||
|
|
@ -13469,8 +13469,6 @@ int VM_RdbLoad(ValkeyModuleCtx *ctx, ValkeyModuleRdbStream *stream, int flags) {
|
|||
* will prevent COW memory issue. */
|
||||
if (server.child_type == CHILD_TYPE_SLOT_MIGRATION) killSlotMigrationChild();
|
||||
|
||||
emptyData(-1, EMPTYDB_NO_FLAGS, NULL);
|
||||
|
||||
/* rdbLoad() can go back to the networking and process network events. If
|
||||
* VM_RdbLoad() is called inside a command callback, we don't want to
|
||||
* process the current client. Otherwise, we may free the client or try to
|
||||
|
|
@ -13478,7 +13476,7 @@ int VM_RdbLoad(ValkeyModuleCtx *ctx, ValkeyModuleRdbStream *stream, int flags) {
|
|||
if (server.current_client) protectClient(server.current_client);
|
||||
|
||||
serverAssert(stream->type == VALKEYMODULE_RDB_STREAM_FILE);
|
||||
int ret = rdbLoad(stream->data.filename, NULL, RDBFLAGS_NONE);
|
||||
int ret = rdbLoad(stream->data.filename, NULL, RDBFLAGS_EMPTY_DATA);
|
||||
|
||||
if (server.current_client) unprotectClient(server.current_client);
|
||||
|
||||
|
|
|
|||
48
src/rdb.c
48
src/rdb.c
|
|
@ -75,6 +75,7 @@ extern int rdbCheckMode;
|
|||
void rdbCheckError(const char *fmt, ...);
|
||||
void rdbCheckSetError(const char *fmt, ...);
|
||||
int rdbLoadRioWithLoadingCtx(rio *rdb, int rdbflags, rdbSaveInfo *rsi, rdbLoadingCtx *rdb_loading_ctx);
|
||||
void replicationEmptyDbCallback(hashtable *ht);
|
||||
|
||||
/* Returns true if the RDB version is valid and accepted, false otherwise. This
|
||||
* function takes configuration into account. The parameter `is_valkey_magic`
|
||||
|
|
@ -3082,8 +3083,10 @@ int rdbLoadRioWithLoadingCtxScopedRdb(rio *rdb, int rdbflags, rdbSaveInfo *rsi,
|
|||
return retval;
|
||||
}
|
||||
|
||||
/* Load an RDB file from the rio stream 'rdb'. On success C_OK is returned,
|
||||
* otherwise C_ERR is returned.
|
||||
/* Load an RDB file from the rio stream 'rdb'. We return one of the following:
|
||||
* - RDB_OK On success
|
||||
* - RDB_INCOMPATIBLE If the RDB has an invalid signature or version
|
||||
* - RDB_FAILED in all other failure cases
|
||||
* The rdb_loading_ctx argument holds objects to which the rdb will be loaded to,
|
||||
* currently it only allow to set db object and functionLibCtx to which the data
|
||||
* will be loaded (in the future it might contains more such objects). */
|
||||
|
|
@ -3092,10 +3095,6 @@ int rdbLoadRioWithLoadingCtx(rio *rdb, int rdbflags, rdbSaveInfo *rsi, rdbLoadin
|
|||
int type, rdbver;
|
||||
uint64_t db_size = 0, expires_size = 0;
|
||||
int should_expand_db = 0;
|
||||
if (rdb_loading_ctx->dbarray[0] == NULL) {
|
||||
rdb_loading_ctx->dbarray[0] = createDatabase(0);
|
||||
}
|
||||
serverDb *db = rdb_loading_ctx->dbarray[0];
|
||||
char buf[1024];
|
||||
int error;
|
||||
long long empty_keys_skipped = 0;
|
||||
|
|
@ -3111,14 +3110,30 @@ int rdbLoadRioWithLoadingCtx(rio *rdb, int rdbflags, rdbSaveInfo *rsi, rdbLoadin
|
|||
is_valkey_magic = true;
|
||||
} else {
|
||||
serverLog(LL_WARNING, "Wrong signature trying to load DB from file: %.9s", buf);
|
||||
return C_ERR;
|
||||
/* Signal to terminate the rdbLoad without clearing existing data */
|
||||
return RDB_INCOMPATIBLE;
|
||||
}
|
||||
rdbver = atoi(buf + 6);
|
||||
if (!rdbIsVersionAccepted(rdbver, is_valkey_magic, is_redis_magic)) {
|
||||
serverLog(LL_WARNING, "Can't handle RDB format version %d", rdbver);
|
||||
return C_ERR;
|
||||
return RDB_INCOMPATIBLE;
|
||||
}
|
||||
|
||||
/* Only empty data if RDBFLAGS_EMPTY_DATA is set */
|
||||
if (rdbflags & RDBFLAGS_EMPTY_DATA) {
|
||||
int empty_db_flags = server.repl_replica_lazy_flush ? EMPTYDB_ASYNC : EMPTYDB_NO_FLAGS;
|
||||
serverLog(LL_NOTICE, "RDB signature and version check passed. Flushing old data");
|
||||
emptyData(-1, empty_db_flags, replicationEmptyDbCallback);
|
||||
|
||||
/* functionsLibCtx is cleared when we call emptyData, reinitialize here. */
|
||||
rdb_loading_ctx->functions_lib_ctx = functionsLibCtxGetCurrent();
|
||||
}
|
||||
|
||||
if (rdb_loading_ctx->dbarray[0] == NULL) {
|
||||
rdb_loading_ctx->dbarray[0] = createDatabase(0);
|
||||
}
|
||||
serverDb *db = rdb_loading_ctx->dbarray[0];
|
||||
|
||||
/* Key-specific attributes, set by opcodes before the key type. */
|
||||
long long lru_idle = -1, lfu_freq = -1, expiretime = -1, now = mstime();
|
||||
long long lru_clock = LRU_CLOCK();
|
||||
|
|
@ -3134,7 +3149,7 @@ int rdbLoadRioWithLoadingCtx(rio *rdb, int rdbflags, rdbSaveInfo *rsi, rdbLoadin
|
|||
if (is_redis_magic && type >= RDB_FOREIGN_TYPE_MIN && type <= RDB_FOREIGN_TYPE_MAX) {
|
||||
serverLog(LL_WARNING, "Can't handle foreign type or opcode %d in RDB with version %d",
|
||||
type, rdbver);
|
||||
return C_ERR;
|
||||
return RDB_FAILED;
|
||||
}
|
||||
|
||||
/* Handle special types. */
|
||||
|
|
@ -3418,7 +3433,7 @@ int rdbLoadRioWithLoadingCtx(rio *rdb, int rdbflags, rdbSaveInfo *rsi, rdbLoadin
|
|||
} else if (error == RDB_LOAD_ERR_UNKNOWN_TYPE) {
|
||||
sdsfree(key);
|
||||
serverLog(LL_WARNING, "Unknown type or opcode when loading DB. Unrecoverable error, aborting now.");
|
||||
return C_ERR;
|
||||
return RDB_FAILED;
|
||||
} else {
|
||||
sdsfree(key);
|
||||
goto eoferr;
|
||||
|
|
@ -3502,7 +3517,7 @@ int rdbLoadRioWithLoadingCtx(rio *rdb, int rdbflags, rdbSaveInfo *rsi, rdbLoadin
|
|||
"got (%llx). Aborting now.",
|
||||
(unsigned long long)expected, (unsigned long long)cksum);
|
||||
rdbReportCorruptRDB("RDB CRC error");
|
||||
return C_ERR;
|
||||
return RDB_FAILED;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
@ -3514,7 +3529,7 @@ int rdbLoadRioWithLoadingCtx(rio *rdb, int rdbflags, rdbSaveInfo *rsi, rdbLoadin
|
|||
serverLog(LL_NOTICE, "Done loading RDB, keys loaded: %lld, keys expired: %lld.",
|
||||
server.rdb_last_load_keys_loaded, server.rdb_last_load_keys_expired);
|
||||
}
|
||||
return C_OK;
|
||||
return RDB_OK;
|
||||
|
||||
/* Unexpected end of file is handled here calling rdbReportReadError():
|
||||
* this will in turn either abort the server in most cases, or if we are loading
|
||||
|
|
@ -3523,7 +3538,7 @@ int rdbLoadRioWithLoadingCtx(rio *rdb, int rdbflags, rdbSaveInfo *rsi, rdbLoadin
|
|||
eoferr:
|
||||
serverLog(LL_WARNING, "Short read or OOM loading DB. Unrecoverable error, aborting now.");
|
||||
rdbReportReadError("Unexpected EOF reading RDB file");
|
||||
return C_ERR;
|
||||
return RDB_FAILED;
|
||||
}
|
||||
|
||||
/* Like rdbLoadRio() but takes a filename instead of a rio stream. The
|
||||
|
|
@ -3556,14 +3571,15 @@ int rdbLoad(char *filename, rdbSaveInfo *rsi, int rdbflags) {
|
|||
retval = rdbLoadRio(&rdb, rdbflags, rsi);
|
||||
|
||||
fclose(fp);
|
||||
stopLoading(retval == C_OK);
|
||||
stopLoading(retval == RDB_OK);
|
||||
/* Reclaim the cache backed by rdb */
|
||||
if (retval == C_OK && !(rdbflags & RDBFLAGS_KEEP_CACHE)) {
|
||||
if (retval == RDB_OK && !(rdbflags & RDBFLAGS_KEEP_CACHE)) {
|
||||
/* TODO: maybe we could combine the fopen and open into one in the future */
|
||||
rdb_fd = open(filename, O_RDONLY);
|
||||
if (rdb_fd >= 0) bioCreateCloseJob(rdb_fd, 0, 1);
|
||||
}
|
||||
return (retval == C_OK) ? RDB_OK : RDB_FAILED;
|
||||
|
||||
return retval;
|
||||
}
|
||||
|
||||
/* A background saving child (BGSAVE) terminated its work. Handle this.
|
||||
|
|
|
|||
|
|
@ -166,6 +166,7 @@ enum RdbType {
|
|||
#define RDBFLAGS_ALLOW_DUP (1 << 2) /* Allow duplicated keys when loading.*/
|
||||
#define RDBFLAGS_FEED_REPL (1 << 3) /* Feed replication stream when loading.*/
|
||||
#define RDBFLAGS_KEEP_CACHE (1 << 4) /* Don't reclaim cache after rdb file is generated */
|
||||
#define RDBFLAGS_EMPTY_DATA (1 << 5) /* Flush the database after validating magic and rdb version*/
|
||||
|
||||
/* When rdbLoadObject() returns NULL, the err flag is
|
||||
* set to hold the type of error that occurred */
|
||||
|
|
@ -213,5 +214,6 @@ int rdbSaveRio(int req, rio *rdb, int *error, int rdbflags, rdbSaveInfo *rsi);
|
|||
ssize_t rdbSaveFunctions(rio *rdb);
|
||||
rdbSaveInfo *rdbPopulateSaveInfo(rdbSaveInfo *rsi);
|
||||
int rdbRestoreOffsetFromSaveInfo(rdbSaveInfo *rsi, bool is_aof_preamble);
|
||||
void replicationEmptyDbCallback(hashtable *ht);
|
||||
|
||||
#endif
|
||||
|
|
|
|||
|
|
@ -2069,8 +2069,8 @@ void replicationSendNewlineToPrimary(void) {
|
|||
/* Callback used by emptyData() while flushing away old data to load
|
||||
* the new dataset received by the primary and by discardTempDb()
|
||||
* after loading succeeded or failed. */
|
||||
void replicationEmptyDbCallback(hashtable *d) {
|
||||
UNUSED(d);
|
||||
void replicationEmptyDbCallback(hashtable *ht) {
|
||||
UNUSED(ht);
|
||||
if (server.repl_state == REPL_STATE_TRANSFER) replicationSendNewlineToPrimary();
|
||||
}
|
||||
|
||||
|
|
@ -2366,11 +2366,6 @@ int replicaLoadPrimaryRDBFromSocket(connection *conn, char *buf, char *eofmark,
|
|||
/* We will soon start loading the RDB from socket, the replication history is changed,
|
||||
* we must discard the cached primary structure and force resync of sub-replicas. */
|
||||
replicationAttachToNewPrimary();
|
||||
|
||||
/* Even though we are on-empty-db and the database is empty, we still call emptyData. */
|
||||
serverLog(LL_NOTICE, "PRIMARY <-> REPLICA sync: Flushing old data");
|
||||
emptyData(-1, empty_db_flags, replicationEmptyDbCallback);
|
||||
|
||||
dbarray = server.db;
|
||||
functions_lib_ctx = functionsLibCtxGetCurrent();
|
||||
}
|
||||
|
|
@ -2387,7 +2382,11 @@ int replicaLoadPrimaryRDBFromSocket(connection *conn, char *buf, char *eofmark,
|
|||
if (replicationSupportSkipRDBChecksum(conn, 1, *usemark)) rdb.flags |= RIO_FLAG_SKIP_RDB_CHECKSUM;
|
||||
int loadingFailed = 0;
|
||||
rdbLoadingCtx loadingCtx = {.dbarray = dbarray, .functions_lib_ctx = functions_lib_ctx};
|
||||
if (rdbLoadRioWithLoadingCtxScopedRdb(&rdb, RDBFLAGS_REPLICATION, rsi, &loadingCtx) != C_OK) {
|
||||
/* If we aren't using the swapdb method, then we want to empty the data before loading the rdb */
|
||||
int flags = RDBFLAGS_REPLICATION;
|
||||
if (server.repl_diskless_load != REPL_DISKLESS_LOAD_SWAPDB) flags |= RDBFLAGS_EMPTY_DATA;
|
||||
int retval = rdbLoadRioWithLoadingCtxScopedRdb(&rdb, flags, rsi, &loadingCtx);
|
||||
if (retval != RDB_OK) {
|
||||
/* RDB loading failed. */
|
||||
serverLog(LL_WARNING, "Failed trying to load the PRIMARY synchronization DB "
|
||||
"from socket, check server logs.");
|
||||
|
|
@ -2413,10 +2412,15 @@ int replicaLoadPrimaryRDBFromSocket(connection *conn, char *buf, char *eofmark,
|
|||
disklessLoadDiscardFunctionsLibCtx(temp_functions_lib_ctx);
|
||||
serverLog(LL_NOTICE, "PRIMARY <-> REPLICA sync: Discarding temporary DB in background");
|
||||
} else {
|
||||
/* Remove the half-loaded data in case we started with an empty replica. */
|
||||
/* If we received RDB_INCOMPATIBLE, the old data was preserved */
|
||||
if (retval == RDB_INCOMPATIBLE) {
|
||||
serverLog(LL_NOTICE, "PRIMARY <-> REPLICA sync: RDB version or signature incompatible, old data preserved");
|
||||
} else {
|
||||
/* Remove the half-loaded data in case the load failed for other reasons. */
|
||||
serverLog(LL_NOTICE, "PRIMARY <-> REPLICA sync: Discarding the half-loaded data");
|
||||
emptyData(-1, empty_db_flags, replicationEmptyDbCallback);
|
||||
}
|
||||
}
|
||||
|
||||
/* Note that there's no point in restarting the AOF on SYNC
|
||||
* failure, it'll be restarted when sync succeeds or the replica
|
||||
|
|
@ -2496,14 +2500,12 @@ int replicaLoadPrimaryRDBFromDisk(rdbSaveInfo *rsi) {
|
|||
* we must discard the cached primary structure and force resync of sub-replicas. */
|
||||
replicationAttachToNewPrimary();
|
||||
|
||||
/* Empty the databases only after the RDB file is ok, that is, before the RDB file
|
||||
* is actually loaded, in case we encounter an error and drop the replication stream
|
||||
* and leave an empty database. */
|
||||
serverLog(LL_NOTICE, "PRIMARY <-> REPLICA sync: Flushing old data");
|
||||
emptyData(-1, empty_db_flags, replicationEmptyDbCallback);
|
||||
|
||||
/* We pass RDBFLAGS_EMPTY_DATA to call emptyData() after validating rdb compatibility
|
||||
* and before loading the data from the RDB */
|
||||
serverLog(LL_NOTICE, "PRIMARY <-> REPLICA sync: Loading DB in memory");
|
||||
if (rdbLoad(server.rdb_filename, rsi, RDBFLAGS_REPLICATION) != RDB_OK) {
|
||||
int retval = rdbLoad(server.rdb_filename, rsi, RDBFLAGS_REPLICATION | RDBFLAGS_EMPTY_DATA);
|
||||
|
||||
if (retval != RDB_OK) {
|
||||
serverLog(LL_WARNING, "Failed trying to load the PRIMARY synchronization "
|
||||
"DB from disk, check server logs.");
|
||||
if (server.rdb_del_sync_files && allPersistenceDisabled()) {
|
||||
|
|
@ -2513,9 +2515,15 @@ int replicaLoadPrimaryRDBFromDisk(rdbSaveInfo *rsi) {
|
|||
bg_unlink(server.rdb_filename);
|
||||
}
|
||||
|
||||
/* If RDB failed compatibility check, we did not load the new data set or flush our old data. */
|
||||
if (retval == RDB_INCOMPATIBLE) {
|
||||
serverLog(LL_NOTICE, "PRIMARY <-> REPLICA sync: Skipping flush, no new data was loaded.");
|
||||
} else {
|
||||
/* If disk-based RDB loading fails, remove the half-loaded dataset. */
|
||||
serverLog(LL_NOTICE, "PRIMARY <-> REPLICA sync: Discarding the half-loaded data");
|
||||
emptyData(-1, empty_db_flags, replicationEmptyDbCallback);
|
||||
}
|
||||
|
||||
|
||||
/* Note that there's no point in restarting the AOF on sync failure,
|
||||
* it'll be restarted when sync succeeds or replica promoted. */
|
||||
|
|
|
|||
|
|
@ -348,7 +348,8 @@ extern int configOOMScoreAdjValuesDefaults[CONFIG_OOM_COUNT];
|
|||
/* RDB return values for rdbLoad. */
|
||||
#define RDB_OK 0
|
||||
#define RDB_NOT_EXIST 1 /* RDB file doesn't exist. */
|
||||
#define RDB_FAILED 2 /* Failed to load the RDB file. */
|
||||
#define RDB_INCOMPATIBLE 2 /* RDB version or signature is not compatible */
|
||||
#define RDB_FAILED 3 /* Failed to load the RDB file. */
|
||||
|
||||
/* Command doc flags */
|
||||
#define CMD_DOC_NONE 0
|
||||
|
|
|
|||
|
|
@ -567,4 +567,32 @@ start_server {} {
|
|||
} {OK}
|
||||
}
|
||||
|
||||
start_server {} {
|
||||
test {RDB Load from incompatible version preserves data} {
|
||||
# Set test keys
|
||||
r set testkey1 "value1"
|
||||
r set testkey2 "value2"
|
||||
|
||||
# Use RDB with version 987.
|
||||
# This emulates a full sync from a server with a future version
|
||||
set server_dir [lindex [r config get dir] 1]
|
||||
set rdb_filename [lindex [r config get dbfilename] 1]
|
||||
set rdb_path "$server_dir/$rdb_filename"
|
||||
exec cp tests/assets/encodings-rdb987.rdb $rdb_path
|
||||
|
||||
# Reload will trigger the rdbLoad code path with the RDBFLAGS_EMPTY_DATA flag
|
||||
catch {r debug reload nosave}
|
||||
|
||||
# Check that version error appears in logs
|
||||
verify_log_message 0 "*Can't handle RDB format version*" 0
|
||||
|
||||
# Verify we don't enter the flushing code path
|
||||
verify_no_log_message 0 "*RDB signature and version check passed*" 0
|
||||
|
||||
# Verify our original data is not flushed
|
||||
assert_equal [r get testkey1] "value1"
|
||||
assert_equal [r get testkey2] "value2"
|
||||
}
|
||||
}
|
||||
|
||||
} ;# tags
|
||||
|
|
|
|||
Loading…
Reference in New Issue