mirror of https://github.com/valkey-io/valkey
Perform data cleanup during RDB load on successful version/signature validation (#2600)
Addresses: https://github.com/valkey-io/valkey/issues/2588 ## Overview Previously we call `emptyData()` during a fullSync before validating the RDB version is compatible. This change adds an rdb flag that allows us to flush the database from within `rdbLoadRioWithLoadingCtx`. THhis provides the option to only flush the data if the rdb has a valid version and signature. In the case where we do have an invalid version and signature, we don't emptyData, so if a full sync fails for that reason a replica can still serve stale data instead of clients experiencing cache misses. ## Changes - Added a new flag `RDBFLAGS_EMPTY_DATA` that signals to flush the database after rdb validation - Added logic to call `emptyData` in `rdbLoadRioWithLoadingCtx` in `rdb.c` - Added logic to not clear data if the RDB validation fails in `replication.c` using new return type `RDB_INCOMPATIBLE` - Modified the signature of `rdbLoadRioWithLoadingCtx` to return RDB success codes and updated all calling sites. ## Testing Added a tcl test that uses the debug command `reload nosave` to load from an RDB that has a future version number. This triggers the same code path that full sync's will use, and verifies that we don't flush the data until after the validation is complete. A test already exists that checks that the data is flushed: https://github.com/valkey-io/valkey/blob/unstable/tests/integration/replication.tcl#L1504 --------- Signed-off-by: Venkat Pamulapati <pamuvenk@amazon.com> Signed-off-by: Venkat Pamulapati <33398322+ChiliPaneer@users.noreply.github.com> Co-authored-by: Venkat Pamulapati <pamuvenk@amazon.com> Co-authored-by: Harkrishn Patro <bunty.hari@gmail.com>
This commit is contained in:
parent
57892663be
commit
3c3a1966ec
|
|
@ -1459,7 +1459,7 @@ int loadSingleAppendOnlyFile(char *filename) {
|
||||||
if (server.repl_backlog == NULL) createReplicationBacklog();
|
if (server.repl_backlog == NULL) createReplicationBacklog();
|
||||||
rdb_flags |= RDBFLAGS_FEED_REPL;
|
rdb_flags |= RDBFLAGS_FEED_REPL;
|
||||||
}
|
}
|
||||||
if (rdbLoadRio(&rdb, rdb_flags, &rsi) != C_OK) {
|
if (rdbLoadRio(&rdb, rdb_flags, &rsi) != RDB_OK) {
|
||||||
if (old_style)
|
if (old_style)
|
||||||
serverLog(LL_WARNING, "Error reading the RDB preamble of the AOF file %s, AOF loading aborted",
|
serverLog(LL_WARNING, "Error reading the RDB preamble of the AOF file %s, AOF loading aborted",
|
||||||
filename);
|
filename);
|
||||||
|
|
|
||||||
|
|
@ -586,7 +586,7 @@ void debugCommand(client *c) {
|
||||||
/* The default behavior is to remove the current dataset from
|
/* The default behavior is to remove the current dataset from
|
||||||
* memory before loading the RDB file, however when MERGE is
|
* memory before loading the RDB file, however when MERGE is
|
||||||
* used together with NOFLUSH, we are able to merge two datasets. */
|
* used together with NOFLUSH, we are able to merge two datasets. */
|
||||||
if (flush) emptyData(-1, EMPTYDB_NO_FLAGS, NULL);
|
if (flush) flags |= RDBFLAGS_EMPTY_DATA;
|
||||||
|
|
||||||
protectClient(c);
|
protectClient(c);
|
||||||
int ret = rdbLoad(server.rdb_filename, NULL, flags);
|
int ret = rdbLoad(server.rdb_filename, NULL, flags);
|
||||||
|
|
|
||||||
|
|
@ -13469,8 +13469,6 @@ int VM_RdbLoad(ValkeyModuleCtx *ctx, ValkeyModuleRdbStream *stream, int flags) {
|
||||||
* will prevent COW memory issue. */
|
* will prevent COW memory issue. */
|
||||||
if (server.child_type == CHILD_TYPE_SLOT_MIGRATION) killSlotMigrationChild();
|
if (server.child_type == CHILD_TYPE_SLOT_MIGRATION) killSlotMigrationChild();
|
||||||
|
|
||||||
emptyData(-1, EMPTYDB_NO_FLAGS, NULL);
|
|
||||||
|
|
||||||
/* rdbLoad() can go back to the networking and process network events. If
|
/* rdbLoad() can go back to the networking and process network events. If
|
||||||
* VM_RdbLoad() is called inside a command callback, we don't want to
|
* VM_RdbLoad() is called inside a command callback, we don't want to
|
||||||
* process the current client. Otherwise, we may free the client or try to
|
* process the current client. Otherwise, we may free the client or try to
|
||||||
|
|
@ -13478,7 +13476,7 @@ int VM_RdbLoad(ValkeyModuleCtx *ctx, ValkeyModuleRdbStream *stream, int flags) {
|
||||||
if (server.current_client) protectClient(server.current_client);
|
if (server.current_client) protectClient(server.current_client);
|
||||||
|
|
||||||
serverAssert(stream->type == VALKEYMODULE_RDB_STREAM_FILE);
|
serverAssert(stream->type == VALKEYMODULE_RDB_STREAM_FILE);
|
||||||
int ret = rdbLoad(stream->data.filename, NULL, RDBFLAGS_NONE);
|
int ret = rdbLoad(stream->data.filename, NULL, RDBFLAGS_EMPTY_DATA);
|
||||||
|
|
||||||
if (server.current_client) unprotectClient(server.current_client);
|
if (server.current_client) unprotectClient(server.current_client);
|
||||||
|
|
||||||
|
|
|
||||||
48
src/rdb.c
48
src/rdb.c
|
|
@ -75,6 +75,7 @@ extern int rdbCheckMode;
|
||||||
void rdbCheckError(const char *fmt, ...);
|
void rdbCheckError(const char *fmt, ...);
|
||||||
void rdbCheckSetError(const char *fmt, ...);
|
void rdbCheckSetError(const char *fmt, ...);
|
||||||
int rdbLoadRioWithLoadingCtx(rio *rdb, int rdbflags, rdbSaveInfo *rsi, rdbLoadingCtx *rdb_loading_ctx);
|
int rdbLoadRioWithLoadingCtx(rio *rdb, int rdbflags, rdbSaveInfo *rsi, rdbLoadingCtx *rdb_loading_ctx);
|
||||||
|
void replicationEmptyDbCallback(hashtable *ht);
|
||||||
|
|
||||||
/* Returns true if the RDB version is valid and accepted, false otherwise. This
|
/* Returns true if the RDB version is valid and accepted, false otherwise. This
|
||||||
* function takes configuration into account. The parameter `is_valkey_magic`
|
* function takes configuration into account. The parameter `is_valkey_magic`
|
||||||
|
|
@ -3082,8 +3083,10 @@ int rdbLoadRioWithLoadingCtxScopedRdb(rio *rdb, int rdbflags, rdbSaveInfo *rsi,
|
||||||
return retval;
|
return retval;
|
||||||
}
|
}
|
||||||
|
|
||||||
/* Load an RDB file from the rio stream 'rdb'. On success C_OK is returned,
|
/* Load an RDB file from the rio stream 'rdb'. We return one of the following:
|
||||||
* otherwise C_ERR is returned.
|
* - RDB_OK On success
|
||||||
|
* - RDB_INCOMPATIBLE If the RDB has an invalid signature or version
|
||||||
|
* - RDB_FAILED in all other failure cases
|
||||||
* The rdb_loading_ctx argument holds objects to which the rdb will be loaded to,
|
* The rdb_loading_ctx argument holds objects to which the rdb will be loaded to,
|
||||||
* currently it only allow to set db object and functionLibCtx to which the data
|
* currently it only allow to set db object and functionLibCtx to which the data
|
||||||
* will be loaded (in the future it might contains more such objects). */
|
* will be loaded (in the future it might contains more such objects). */
|
||||||
|
|
@ -3092,10 +3095,6 @@ int rdbLoadRioWithLoadingCtx(rio *rdb, int rdbflags, rdbSaveInfo *rsi, rdbLoadin
|
||||||
int type, rdbver;
|
int type, rdbver;
|
||||||
uint64_t db_size = 0, expires_size = 0;
|
uint64_t db_size = 0, expires_size = 0;
|
||||||
int should_expand_db = 0;
|
int should_expand_db = 0;
|
||||||
if (rdb_loading_ctx->dbarray[0] == NULL) {
|
|
||||||
rdb_loading_ctx->dbarray[0] = createDatabase(0);
|
|
||||||
}
|
|
||||||
serverDb *db = rdb_loading_ctx->dbarray[0];
|
|
||||||
char buf[1024];
|
char buf[1024];
|
||||||
int error;
|
int error;
|
||||||
long long empty_keys_skipped = 0;
|
long long empty_keys_skipped = 0;
|
||||||
|
|
@ -3111,14 +3110,30 @@ int rdbLoadRioWithLoadingCtx(rio *rdb, int rdbflags, rdbSaveInfo *rsi, rdbLoadin
|
||||||
is_valkey_magic = true;
|
is_valkey_magic = true;
|
||||||
} else {
|
} else {
|
||||||
serverLog(LL_WARNING, "Wrong signature trying to load DB from file: %.9s", buf);
|
serverLog(LL_WARNING, "Wrong signature trying to load DB from file: %.9s", buf);
|
||||||
return C_ERR;
|
/* Signal to terminate the rdbLoad without clearing existing data */
|
||||||
|
return RDB_INCOMPATIBLE;
|
||||||
}
|
}
|
||||||
rdbver = atoi(buf + 6);
|
rdbver = atoi(buf + 6);
|
||||||
if (!rdbIsVersionAccepted(rdbver, is_valkey_magic, is_redis_magic)) {
|
if (!rdbIsVersionAccepted(rdbver, is_valkey_magic, is_redis_magic)) {
|
||||||
serverLog(LL_WARNING, "Can't handle RDB format version %d", rdbver);
|
serverLog(LL_WARNING, "Can't handle RDB format version %d", rdbver);
|
||||||
return C_ERR;
|
return RDB_INCOMPATIBLE;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/* Only empty data if RDBFLAGS_EMPTY_DATA is set */
|
||||||
|
if (rdbflags & RDBFLAGS_EMPTY_DATA) {
|
||||||
|
int empty_db_flags = server.repl_replica_lazy_flush ? EMPTYDB_ASYNC : EMPTYDB_NO_FLAGS;
|
||||||
|
serverLog(LL_NOTICE, "RDB signature and version check passed. Flushing old data");
|
||||||
|
emptyData(-1, empty_db_flags, replicationEmptyDbCallback);
|
||||||
|
|
||||||
|
/* functionsLibCtx is cleared when we call emptyData, reinitialize here. */
|
||||||
|
rdb_loading_ctx->functions_lib_ctx = functionsLibCtxGetCurrent();
|
||||||
|
}
|
||||||
|
|
||||||
|
if (rdb_loading_ctx->dbarray[0] == NULL) {
|
||||||
|
rdb_loading_ctx->dbarray[0] = createDatabase(0);
|
||||||
|
}
|
||||||
|
serverDb *db = rdb_loading_ctx->dbarray[0];
|
||||||
|
|
||||||
/* Key-specific attributes, set by opcodes before the key type. */
|
/* Key-specific attributes, set by opcodes before the key type. */
|
||||||
long long lru_idle = -1, lfu_freq = -1, expiretime = -1, now = mstime();
|
long long lru_idle = -1, lfu_freq = -1, expiretime = -1, now = mstime();
|
||||||
long long lru_clock = LRU_CLOCK();
|
long long lru_clock = LRU_CLOCK();
|
||||||
|
|
@ -3134,7 +3149,7 @@ int rdbLoadRioWithLoadingCtx(rio *rdb, int rdbflags, rdbSaveInfo *rsi, rdbLoadin
|
||||||
if (is_redis_magic && type >= RDB_FOREIGN_TYPE_MIN && type <= RDB_FOREIGN_TYPE_MAX) {
|
if (is_redis_magic && type >= RDB_FOREIGN_TYPE_MIN && type <= RDB_FOREIGN_TYPE_MAX) {
|
||||||
serverLog(LL_WARNING, "Can't handle foreign type or opcode %d in RDB with version %d",
|
serverLog(LL_WARNING, "Can't handle foreign type or opcode %d in RDB with version %d",
|
||||||
type, rdbver);
|
type, rdbver);
|
||||||
return C_ERR;
|
return RDB_FAILED;
|
||||||
}
|
}
|
||||||
|
|
||||||
/* Handle special types. */
|
/* Handle special types. */
|
||||||
|
|
@ -3418,7 +3433,7 @@ int rdbLoadRioWithLoadingCtx(rio *rdb, int rdbflags, rdbSaveInfo *rsi, rdbLoadin
|
||||||
} else if (error == RDB_LOAD_ERR_UNKNOWN_TYPE) {
|
} else if (error == RDB_LOAD_ERR_UNKNOWN_TYPE) {
|
||||||
sdsfree(key);
|
sdsfree(key);
|
||||||
serverLog(LL_WARNING, "Unknown type or opcode when loading DB. Unrecoverable error, aborting now.");
|
serverLog(LL_WARNING, "Unknown type or opcode when loading DB. Unrecoverable error, aborting now.");
|
||||||
return C_ERR;
|
return RDB_FAILED;
|
||||||
} else {
|
} else {
|
||||||
sdsfree(key);
|
sdsfree(key);
|
||||||
goto eoferr;
|
goto eoferr;
|
||||||
|
|
@ -3502,7 +3517,7 @@ int rdbLoadRioWithLoadingCtx(rio *rdb, int rdbflags, rdbSaveInfo *rsi, rdbLoadin
|
||||||
"got (%llx). Aborting now.",
|
"got (%llx). Aborting now.",
|
||||||
(unsigned long long)expected, (unsigned long long)cksum);
|
(unsigned long long)expected, (unsigned long long)cksum);
|
||||||
rdbReportCorruptRDB("RDB CRC error");
|
rdbReportCorruptRDB("RDB CRC error");
|
||||||
return C_ERR;
|
return RDB_FAILED;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
@ -3514,7 +3529,7 @@ int rdbLoadRioWithLoadingCtx(rio *rdb, int rdbflags, rdbSaveInfo *rsi, rdbLoadin
|
||||||
serverLog(LL_NOTICE, "Done loading RDB, keys loaded: %lld, keys expired: %lld.",
|
serverLog(LL_NOTICE, "Done loading RDB, keys loaded: %lld, keys expired: %lld.",
|
||||||
server.rdb_last_load_keys_loaded, server.rdb_last_load_keys_expired);
|
server.rdb_last_load_keys_loaded, server.rdb_last_load_keys_expired);
|
||||||
}
|
}
|
||||||
return C_OK;
|
return RDB_OK;
|
||||||
|
|
||||||
/* Unexpected end of file is handled here calling rdbReportReadError():
|
/* Unexpected end of file is handled here calling rdbReportReadError():
|
||||||
* this will in turn either abort the server in most cases, or if we are loading
|
* this will in turn either abort the server in most cases, or if we are loading
|
||||||
|
|
@ -3523,7 +3538,7 @@ int rdbLoadRioWithLoadingCtx(rio *rdb, int rdbflags, rdbSaveInfo *rsi, rdbLoadin
|
||||||
eoferr:
|
eoferr:
|
||||||
serverLog(LL_WARNING, "Short read or OOM loading DB. Unrecoverable error, aborting now.");
|
serverLog(LL_WARNING, "Short read or OOM loading DB. Unrecoverable error, aborting now.");
|
||||||
rdbReportReadError("Unexpected EOF reading RDB file");
|
rdbReportReadError("Unexpected EOF reading RDB file");
|
||||||
return C_ERR;
|
return RDB_FAILED;
|
||||||
}
|
}
|
||||||
|
|
||||||
/* Like rdbLoadRio() but takes a filename instead of a rio stream. The
|
/* Like rdbLoadRio() but takes a filename instead of a rio stream. The
|
||||||
|
|
@ -3556,14 +3571,15 @@ int rdbLoad(char *filename, rdbSaveInfo *rsi, int rdbflags) {
|
||||||
retval = rdbLoadRio(&rdb, rdbflags, rsi);
|
retval = rdbLoadRio(&rdb, rdbflags, rsi);
|
||||||
|
|
||||||
fclose(fp);
|
fclose(fp);
|
||||||
stopLoading(retval == C_OK);
|
stopLoading(retval == RDB_OK);
|
||||||
/* Reclaim the cache backed by rdb */
|
/* Reclaim the cache backed by rdb */
|
||||||
if (retval == C_OK && !(rdbflags & RDBFLAGS_KEEP_CACHE)) {
|
if (retval == RDB_OK && !(rdbflags & RDBFLAGS_KEEP_CACHE)) {
|
||||||
/* TODO: maybe we could combine the fopen and open into one in the future */
|
/* TODO: maybe we could combine the fopen and open into one in the future */
|
||||||
rdb_fd = open(filename, O_RDONLY);
|
rdb_fd = open(filename, O_RDONLY);
|
||||||
if (rdb_fd >= 0) bioCreateCloseJob(rdb_fd, 0, 1);
|
if (rdb_fd >= 0) bioCreateCloseJob(rdb_fd, 0, 1);
|
||||||
}
|
}
|
||||||
return (retval == C_OK) ? RDB_OK : RDB_FAILED;
|
|
||||||
|
return retval;
|
||||||
}
|
}
|
||||||
|
|
||||||
/* A background saving child (BGSAVE) terminated its work. Handle this.
|
/* A background saving child (BGSAVE) terminated its work. Handle this.
|
||||||
|
|
|
||||||
|
|
@ -166,6 +166,7 @@ enum RdbType {
|
||||||
#define RDBFLAGS_ALLOW_DUP (1 << 2) /* Allow duplicated keys when loading.*/
|
#define RDBFLAGS_ALLOW_DUP (1 << 2) /* Allow duplicated keys when loading.*/
|
||||||
#define RDBFLAGS_FEED_REPL (1 << 3) /* Feed replication stream when loading.*/
|
#define RDBFLAGS_FEED_REPL (1 << 3) /* Feed replication stream when loading.*/
|
||||||
#define RDBFLAGS_KEEP_CACHE (1 << 4) /* Don't reclaim cache after rdb file is generated */
|
#define RDBFLAGS_KEEP_CACHE (1 << 4) /* Don't reclaim cache after rdb file is generated */
|
||||||
|
#define RDBFLAGS_EMPTY_DATA (1 << 5) /* Flush the database after validating magic and rdb version*/
|
||||||
|
|
||||||
/* When rdbLoadObject() returns NULL, the err flag is
|
/* When rdbLoadObject() returns NULL, the err flag is
|
||||||
* set to hold the type of error that occurred */
|
* set to hold the type of error that occurred */
|
||||||
|
|
@ -213,5 +214,6 @@ int rdbSaveRio(int req, rio *rdb, int *error, int rdbflags, rdbSaveInfo *rsi);
|
||||||
ssize_t rdbSaveFunctions(rio *rdb);
|
ssize_t rdbSaveFunctions(rio *rdb);
|
||||||
rdbSaveInfo *rdbPopulateSaveInfo(rdbSaveInfo *rsi);
|
rdbSaveInfo *rdbPopulateSaveInfo(rdbSaveInfo *rsi);
|
||||||
int rdbRestoreOffsetFromSaveInfo(rdbSaveInfo *rsi, bool is_aof_preamble);
|
int rdbRestoreOffsetFromSaveInfo(rdbSaveInfo *rsi, bool is_aof_preamble);
|
||||||
|
void replicationEmptyDbCallback(hashtable *ht);
|
||||||
|
|
||||||
#endif
|
#endif
|
||||||
|
|
|
||||||
|
|
@ -2069,8 +2069,8 @@ void replicationSendNewlineToPrimary(void) {
|
||||||
/* Callback used by emptyData() while flushing away old data to load
|
/* Callback used by emptyData() while flushing away old data to load
|
||||||
* the new dataset received by the primary and by discardTempDb()
|
* the new dataset received by the primary and by discardTempDb()
|
||||||
* after loading succeeded or failed. */
|
* after loading succeeded or failed. */
|
||||||
void replicationEmptyDbCallback(hashtable *d) {
|
void replicationEmptyDbCallback(hashtable *ht) {
|
||||||
UNUSED(d);
|
UNUSED(ht);
|
||||||
if (server.repl_state == REPL_STATE_TRANSFER) replicationSendNewlineToPrimary();
|
if (server.repl_state == REPL_STATE_TRANSFER) replicationSendNewlineToPrimary();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
@ -2366,11 +2366,6 @@ int replicaLoadPrimaryRDBFromSocket(connection *conn, char *buf, char *eofmark,
|
||||||
/* We will soon start loading the RDB from socket, the replication history is changed,
|
/* We will soon start loading the RDB from socket, the replication history is changed,
|
||||||
* we must discard the cached primary structure and force resync of sub-replicas. */
|
* we must discard the cached primary structure and force resync of sub-replicas. */
|
||||||
replicationAttachToNewPrimary();
|
replicationAttachToNewPrimary();
|
||||||
|
|
||||||
/* Even though we are on-empty-db and the database is empty, we still call emptyData. */
|
|
||||||
serverLog(LL_NOTICE, "PRIMARY <-> REPLICA sync: Flushing old data");
|
|
||||||
emptyData(-1, empty_db_flags, replicationEmptyDbCallback);
|
|
||||||
|
|
||||||
dbarray = server.db;
|
dbarray = server.db;
|
||||||
functions_lib_ctx = functionsLibCtxGetCurrent();
|
functions_lib_ctx = functionsLibCtxGetCurrent();
|
||||||
}
|
}
|
||||||
|
|
@ -2387,7 +2382,11 @@ int replicaLoadPrimaryRDBFromSocket(connection *conn, char *buf, char *eofmark,
|
||||||
if (replicationSupportSkipRDBChecksum(conn, 1, *usemark)) rdb.flags |= RIO_FLAG_SKIP_RDB_CHECKSUM;
|
if (replicationSupportSkipRDBChecksum(conn, 1, *usemark)) rdb.flags |= RIO_FLAG_SKIP_RDB_CHECKSUM;
|
||||||
int loadingFailed = 0;
|
int loadingFailed = 0;
|
||||||
rdbLoadingCtx loadingCtx = {.dbarray = dbarray, .functions_lib_ctx = functions_lib_ctx};
|
rdbLoadingCtx loadingCtx = {.dbarray = dbarray, .functions_lib_ctx = functions_lib_ctx};
|
||||||
if (rdbLoadRioWithLoadingCtxScopedRdb(&rdb, RDBFLAGS_REPLICATION, rsi, &loadingCtx) != C_OK) {
|
/* If we aren't using the swapdb method, then we want to empty the data before loading the rdb */
|
||||||
|
int flags = RDBFLAGS_REPLICATION;
|
||||||
|
if (server.repl_diskless_load != REPL_DISKLESS_LOAD_SWAPDB) flags |= RDBFLAGS_EMPTY_DATA;
|
||||||
|
int retval = rdbLoadRioWithLoadingCtxScopedRdb(&rdb, flags, rsi, &loadingCtx);
|
||||||
|
if (retval != RDB_OK) {
|
||||||
/* RDB loading failed. */
|
/* RDB loading failed. */
|
||||||
serverLog(LL_WARNING, "Failed trying to load the PRIMARY synchronization DB "
|
serverLog(LL_WARNING, "Failed trying to load the PRIMARY synchronization DB "
|
||||||
"from socket, check server logs.");
|
"from socket, check server logs.");
|
||||||
|
|
@ -2413,9 +2412,14 @@ int replicaLoadPrimaryRDBFromSocket(connection *conn, char *buf, char *eofmark,
|
||||||
disklessLoadDiscardFunctionsLibCtx(temp_functions_lib_ctx);
|
disklessLoadDiscardFunctionsLibCtx(temp_functions_lib_ctx);
|
||||||
serverLog(LL_NOTICE, "PRIMARY <-> REPLICA sync: Discarding temporary DB in background");
|
serverLog(LL_NOTICE, "PRIMARY <-> REPLICA sync: Discarding temporary DB in background");
|
||||||
} else {
|
} else {
|
||||||
/* Remove the half-loaded data in case we started with an empty replica. */
|
/* If we received RDB_INCOMPATIBLE, the old data was preserved */
|
||||||
serverLog(LL_NOTICE, "PRIMARY <-> REPLICA sync: Discarding the half-loaded data");
|
if (retval == RDB_INCOMPATIBLE) {
|
||||||
emptyData(-1, empty_db_flags, replicationEmptyDbCallback);
|
serverLog(LL_NOTICE, "PRIMARY <-> REPLICA sync: RDB version or signature incompatible, old data preserved");
|
||||||
|
} else {
|
||||||
|
/* Remove the half-loaded data in case the load failed for other reasons. */
|
||||||
|
serverLog(LL_NOTICE, "PRIMARY <-> REPLICA sync: Discarding the half-loaded data");
|
||||||
|
emptyData(-1, empty_db_flags, replicationEmptyDbCallback);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
/* Note that there's no point in restarting the AOF on SYNC
|
/* Note that there's no point in restarting the AOF on SYNC
|
||||||
|
|
@ -2496,14 +2500,12 @@ int replicaLoadPrimaryRDBFromDisk(rdbSaveInfo *rsi) {
|
||||||
* we must discard the cached primary structure and force resync of sub-replicas. */
|
* we must discard the cached primary structure and force resync of sub-replicas. */
|
||||||
replicationAttachToNewPrimary();
|
replicationAttachToNewPrimary();
|
||||||
|
|
||||||
/* Empty the databases only after the RDB file is ok, that is, before the RDB file
|
/* We pass RDBFLAGS_EMPTY_DATA to call emptyData() after validating rdb compatibility
|
||||||
* is actually loaded, in case we encounter an error and drop the replication stream
|
* and before loading the data from the RDB */
|
||||||
* and leave an empty database. */
|
|
||||||
serverLog(LL_NOTICE, "PRIMARY <-> REPLICA sync: Flushing old data");
|
|
||||||
emptyData(-1, empty_db_flags, replicationEmptyDbCallback);
|
|
||||||
|
|
||||||
serverLog(LL_NOTICE, "PRIMARY <-> REPLICA sync: Loading DB in memory");
|
serverLog(LL_NOTICE, "PRIMARY <-> REPLICA sync: Loading DB in memory");
|
||||||
if (rdbLoad(server.rdb_filename, rsi, RDBFLAGS_REPLICATION) != RDB_OK) {
|
int retval = rdbLoad(server.rdb_filename, rsi, RDBFLAGS_REPLICATION | RDBFLAGS_EMPTY_DATA);
|
||||||
|
|
||||||
|
if (retval != RDB_OK) {
|
||||||
serverLog(LL_WARNING, "Failed trying to load the PRIMARY synchronization "
|
serverLog(LL_WARNING, "Failed trying to load the PRIMARY synchronization "
|
||||||
"DB from disk, check server logs.");
|
"DB from disk, check server logs.");
|
||||||
if (server.rdb_del_sync_files && allPersistenceDisabled()) {
|
if (server.rdb_del_sync_files && allPersistenceDisabled()) {
|
||||||
|
|
@ -2513,9 +2515,15 @@ int replicaLoadPrimaryRDBFromDisk(rdbSaveInfo *rsi) {
|
||||||
bg_unlink(server.rdb_filename);
|
bg_unlink(server.rdb_filename);
|
||||||
}
|
}
|
||||||
|
|
||||||
/* If disk-based RDB loading fails, remove the half-loaded dataset. */
|
/* If RDB failed compatibility check, we did not load the new data set or flush our old data. */
|
||||||
serverLog(LL_NOTICE, "PRIMARY <-> REPLICA sync: Discarding the half-loaded data");
|
if (retval == RDB_INCOMPATIBLE) {
|
||||||
emptyData(-1, empty_db_flags, replicationEmptyDbCallback);
|
serverLog(LL_NOTICE, "PRIMARY <-> REPLICA sync: Skipping flush, no new data was loaded.");
|
||||||
|
} else {
|
||||||
|
/* If disk-based RDB loading fails, remove the half-loaded dataset. */
|
||||||
|
serverLog(LL_NOTICE, "PRIMARY <-> REPLICA sync: Discarding the half-loaded data");
|
||||||
|
emptyData(-1, empty_db_flags, replicationEmptyDbCallback);
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
/* Note that there's no point in restarting the AOF on sync failure,
|
/* Note that there's no point in restarting the AOF on sync failure,
|
||||||
* it'll be restarted when sync succeeds or replica promoted. */
|
* it'll be restarted when sync succeeds or replica promoted. */
|
||||||
|
|
|
||||||
|
|
@ -347,8 +347,9 @@ extern int configOOMScoreAdjValuesDefaults[CONFIG_OOM_COUNT];
|
||||||
|
|
||||||
/* RDB return values for rdbLoad. */
|
/* RDB return values for rdbLoad. */
|
||||||
#define RDB_OK 0
|
#define RDB_OK 0
|
||||||
#define RDB_NOT_EXIST 1 /* RDB file doesn't exist. */
|
#define RDB_NOT_EXIST 1 /* RDB file doesn't exist. */
|
||||||
#define RDB_FAILED 2 /* Failed to load the RDB file. */
|
#define RDB_INCOMPATIBLE 2 /* RDB version or signature is not compatible */
|
||||||
|
#define RDB_FAILED 3 /* Failed to load the RDB file. */
|
||||||
|
|
||||||
/* Command doc flags */
|
/* Command doc flags */
|
||||||
#define CMD_DOC_NONE 0
|
#define CMD_DOC_NONE 0
|
||||||
|
|
|
||||||
|
|
@ -567,4 +567,32 @@ start_server {} {
|
||||||
} {OK}
|
} {OK}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
start_server {} {
|
||||||
|
test {RDB Load from incompatible version preserves data} {
|
||||||
|
# Set test keys
|
||||||
|
r set testkey1 "value1"
|
||||||
|
r set testkey2 "value2"
|
||||||
|
|
||||||
|
# Use RDB with version 987.
|
||||||
|
# This emulates a full sync from a server with a future version
|
||||||
|
set server_dir [lindex [r config get dir] 1]
|
||||||
|
set rdb_filename [lindex [r config get dbfilename] 1]
|
||||||
|
set rdb_path "$server_dir/$rdb_filename"
|
||||||
|
exec cp tests/assets/encodings-rdb987.rdb $rdb_path
|
||||||
|
|
||||||
|
# Reload will trigger the rdbLoad code path with the RDBFLAGS_EMPTY_DATA flag
|
||||||
|
catch {r debug reload nosave}
|
||||||
|
|
||||||
|
# Check that version error appears in logs
|
||||||
|
verify_log_message 0 "*Can't handle RDB format version*" 0
|
||||||
|
|
||||||
|
# Verify we don't enter the flushing code path
|
||||||
|
verify_no_log_message 0 "*RDB signature and version check passed*" 0
|
||||||
|
|
||||||
|
# Verify our original data is not flushed
|
||||||
|
assert_equal [r get testkey1] "value1"
|
||||||
|
assert_equal [r get testkey2] "value2"
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
} ;# tags
|
} ;# tags
|
||||||
|
|
|
||||||
Loading…
Reference in New Issue