Allow partial sync after loading AOF with preamble (#2366)

The AOF preamble mechanism replaces the traditional AOF base file with
an RDB snapshot during rewrite operations, which reduces I/O overhead
and improves loading performance.
However, when valkey loads the RDB-formatted preamble from the base AOF
file, it does not process the replication ID (replid) information within
the RDB AUX fields. This omission has two limitations:

* On a primary, it prevents the primary from accepting PSYNC continue
  requests after restarting with a preamble-enabled AOF file.
* On a replica, it prevents the replica from successfully performing
  partial sync requests (avoiding full sync) after restarting with a
  preamble-enabled AOF file.

To resolve this, this commit aligns the AOF preamble handling with the
logic used for standalone RDB files, by storing the replication ID and
replication offset in the AOF preamble and restoring them when loading
the AOF file.

Resolves #2677

---------

Signed-off-by: arthur.lee <liziang.arthur@bytedance.com>
Signed-off-by: Arthur Lee <arthurkiller@users.noreply.github.com>
Co-authored-by: Viktor Söderqvist <viktor.soderqvist@est.tech>
This commit is contained in:
Arthur Lee 2025-11-11 19:41:27 +08:00 committed by GitHub
parent 7fbd4cb260
commit 2da21d9def
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
5 changed files with 130 additions and 35 deletions

View File

@ -1452,7 +1452,14 @@ int loadSingleAppendOnlyFile(char *filename) {
if (fseek(fp, 0, SEEK_SET) == -1) goto readerr; if (fseek(fp, 0, SEEK_SET) == -1) goto readerr;
rioInitWithFile(&rdb, fp); rioInitWithFile(&rdb, fp);
if (rdbLoadRio(&rdb, RDBFLAGS_AOF_PREAMBLE, NULL) != C_OK) { rdbSaveInfo rsi = RDB_SAVE_INFO_INIT;
int rdb_flags = RDBFLAGS_AOF_PREAMBLE;
int rsi_is_valid = 0;
if (iAmPrimary()) {
if (server.repl_backlog == NULL) createReplicationBacklog();
rdb_flags |= RDBFLAGS_FEED_REPL;
}
if (rdbLoadRio(&rdb, rdb_flags, &rsi) != C_OK) {
if (old_style) if (old_style)
serverLog(LL_WARNING, "Error reading the RDB preamble of the AOF file %s, AOF loading aborted", serverLog(LL_WARNING, "Error reading the RDB preamble of the AOF file %s, AOF loading aborted",
filename); filename);
@ -1462,10 +1469,15 @@ int loadSingleAppendOnlyFile(char *filename) {
ret = AOF_FAILED; ret = AOF_FAILED;
goto cleanup; goto cleanup;
} else { } else {
/* Restore the replication ID / offset from the RDB file. */
rsi_is_valid = rdbRestoreOffsetFromSaveInfo(&rsi, true);
loadingAbsProgress(ftello(fp)); loadingAbsProgress(ftello(fp));
last_progress_report_size = ftello(fp); last_progress_report_size = ftello(fp);
if (old_style) serverLog(LL_NOTICE, "Reading the remaining AOF tail..."); if (old_style) serverLog(LL_NOTICE, "Reading the remaining AOF tail...");
} }
/* If the AOF didn't contain replication info, it's not possible to
* support partial resync, so we can free the backlog to save memory. */
if (!rsi_is_valid && server.repl_backlog && listLength(server.replicas) == 0) freeReplicationBacklog();
} }
/* Read the actual AOF file, in REPL format, command by command. */ /* Read the actual AOF file, in REPL format, command by command. */
@ -2400,8 +2412,10 @@ int rewriteAppendOnlyFile(char *filename) {
startSaving(RDBFLAGS_AOF_PREAMBLE); startSaving(RDBFLAGS_AOF_PREAMBLE);
if (server.aof_use_rdb_preamble) { if (server.aof_use_rdb_preamble) {
rdbSaveInfo rsi, *rsiptr;
rsiptr = rdbPopulateSaveInfo(&rsi);
int error; int error;
if (rdbSaveRio(REPLICA_REQ_NONE, &aof, &error, RDBFLAGS_AOF_PREAMBLE, NULL) == C_ERR) { if (rdbSaveRio(REPLICA_REQ_NONE, &aof, &error, RDBFLAGS_AOF_PREAMBLE, rsiptr) == C_ERR) {
errno = error; errno = error;
goto werr; goto werr;
} }

View File

@ -3952,3 +3952,41 @@ rdbSaveInfo *rdbPopulateSaveInfo(rdbSaveInfo *rsi) {
} }
return NULL; return NULL;
} }
/* Restore the replication ID / offset from the RDB file
* return 1 if replication ID and offset were restored from the rdbSaveInfo */
int rdbRestoreOffsetFromSaveInfo(rdbSaveInfo *rsi, bool is_aof_preamble) {
int rsi_is_valid = 0;
serverAssert(rsi != NULL);
if (rsi->repl_id_is_set && rsi->repl_offset != -1 && rsi->repl_stream_db != -1) {
/* Note that older implementations may save a repl_stream_db
* of -1 inside the RDB file in a wrong way, see more
* information in function rdbPopulateSaveInfo. */
rsi_is_valid = 1;
if (!iAmPrimary()) {
memcpy(server.replid, rsi->repl_id, sizeof(server.replid));
server.primary_repl_offset = rsi->repl_offset;
if (!is_aof_preamble || (!server.primary && !server.cached_primary)) {
/* If this is a replica, create a cached primary from this
* information, in order to allow partial resynchronizations
* with primaries. For AOF, only cache the primary if replica
* has not synced to its primary node yet. */
replicationCachePrimaryUsingMyself();
selectDb(server.cached_primary, rsi->repl_stream_db);
}
} else {
/* If this is a primary, we can save the replication info
* as secondary ID and offset, in order to allow replicas
* to partial resynchronizations with primaries. */
memcpy(server.replid2, rsi->repl_id, sizeof(server.replid));
server.second_replid_offset = rsi->repl_offset + 1;
/* Rebase primary_repl_offset from rsi.repl_offset. */
server.primary_repl_offset += rsi->repl_offset;
serverAssert(server.repl_backlog);
server.repl_backlog->offset = server.primary_repl_offset - server.repl_backlog->histlen + 1;
rebaseReplicationBuffer(rsi->repl_offset);
server.repl_no_replicas_since = time(NULL);
}
}
return rsi_is_valid;
}

View File

@ -212,5 +212,6 @@ int rdbFunctionLoad(rio *rdb, int ver, functionsLibCtx *lib_ctx, int rdbflags, s
int rdbSaveRio(int req, rio *rdb, int *error, int rdbflags, rdbSaveInfo *rsi); int rdbSaveRio(int req, rio *rdb, int *error, int rdbflags, rdbSaveInfo *rsi);
ssize_t rdbSaveFunctions(rio *rdb); ssize_t rdbSaveFunctions(rio *rdb);
rdbSaveInfo *rdbPopulateSaveInfo(rdbSaveInfo *rsi); rdbSaveInfo *rdbPopulateSaveInfo(rdbSaveInfo *rsi);
int rdbRestoreOffsetFromSaveInfo(rdbSaveInfo *rsi, bool is_aof_preamble);
#endif #endif

View File

@ -6989,36 +6989,7 @@ void loadDataFromDisk(void) {
int rdb_load_ret = rdbLoad(server.rdb_filename, &rsi, rdb_flags); int rdb_load_ret = rdbLoad(server.rdb_filename, &rsi, rdb_flags);
if (rdb_load_ret == RDB_OK) { if (rdb_load_ret == RDB_OK) {
serverLog(LL_NOTICE, "DB loaded from disk: %.3f seconds", (float)(ustime() - start) / 1000000); serverLog(LL_NOTICE, "DB loaded from disk: %.3f seconds", (float)(ustime() - start) / 1000000);
rsi_is_valid = rdbRestoreOffsetFromSaveInfo(&rsi, false);
/* Restore the replication ID / offset from the RDB file. */
if (rsi.repl_id_is_set && rsi.repl_offset != -1 &&
/* Note that older implementations may save a repl_stream_db
* of -1 inside the RDB file in a wrong way, see more
* information in function rdbPopulateSaveInfo. */
rsi.repl_stream_db != -1) {
rsi_is_valid = 1;
if (!iAmPrimary()) {
memcpy(server.replid, rsi.repl_id, sizeof(server.replid));
server.primary_repl_offset = rsi.repl_offset;
/* If this is a replica, create a cached primary from this
* information, in order to allow partial resynchronizations
* with primaries. */
replicationCachePrimaryUsingMyself();
selectDb(server.cached_primary, rsi.repl_stream_db);
} else {
/* If this is a primary, we can save the replication info
* as secondary ID and offset, in order to allow replicas
* to partial resynchronizations with primaries. */
memcpy(server.replid2, rsi.repl_id, sizeof(server.replid));
server.second_replid_offset = rsi.repl_offset + 1;
/* Rebase primary_repl_offset from rsi.repl_offset. */
server.primary_repl_offset += rsi.repl_offset;
serverAssert(server.repl_backlog);
server.repl_backlog->offset = server.primary_repl_offset - server.repl_backlog->histlen + 1;
rebaseReplicationBuffer(rsi.repl_offset);
server.repl_no_replicas_since = time(NULL);
}
}
} else if (rdb_load_ret != RDB_NOT_EXIST) { } else if (rdb_load_ret != RDB_NOT_EXIST) {
serverLog(LL_WARNING, "Fatal error loading the DB, check server logs. Exiting."); serverLog(LL_WARNING, "Fatal error loading the DB, check server logs. Exiting.");
exit(1); exit(1);

View File

@ -266,8 +266,8 @@ start_server {tags {"repl external:skip"}} {
wait_for_sync $replica wait_for_sync $replica
} }
test {Data divergence can happen under default conditions} { test {Data divergence can happen under default conditions} {
$replica config set propagation-error-behavior ignore $replica config set propagation-error-behavior ignore
$master debug replicate fake-command-1 $master debug replicate fake-command-1
# Wait for replication to normalize # Wait for replication to normalize
@ -280,7 +280,7 @@ start_server {tags {"repl external:skip"}} {
assert_equal [count_log_message 0 "== CRITICAL =="] 1 assert_equal [count_log_message 0 "== CRITICAL =="] 1
} }
test {Data divergence is allowed on writable replicas} { test {Data divergence is allowed on writable replicas} {
$replica config set replica-read-only no $replica config set replica-read-only no
$replica set number2 foo $replica set number2 foo
$master incrby number2 1 $master incrby number2 1
@ -293,3 +293,74 @@ start_server {tags {"repl external:skip"}} {
} }
} }
} }
# test aof persistence replication info and load it after server restart
start_server {tags {"repl external:skip cluster:skip"} overrides {appendonly yes repl-ping-replica-period 10000 loglevel debug}} {
start_server {overrides {appendonly yes repl-ping-replica-period 10000 loglevel debug}} {
set primary_id -1
set replica_id 0
set primary [srv $primary_id client]
set primary_host [srv $primary_id host]
set primary_port [srv $primary_id port]
set replica [srv $replica_id client]
$replica replicaof $primary_host $primary_port
$primary config rewrite
$replica config rewrite
for {set k 0} {$k < 100} {incr k} {
$primary set foo_$k bar_$k
}
$primary set bar foo
wait_for_value_to_propagate_to_replica $primary $replica "bar"
waitForBgrewriteaof $primary
waitForBgrewriteaof $replica
wait_for_ofs_sync $primary $replica
assert_equal [status $primary sync_full] 1
wait_for_log_messages $replica_id {"*sync: Finished with success*"} 0 100 100
# save replid for both primary and replica
set prev_replid [status $primary master_replid]
set prev_repl_offset [status $primary master_repl_offset]
test {replica rewrite aof and load it after restart} {
# persist current replication info
$replica bgrewriteaof
waitForBgrewriteaof $replica
wait_for_ofs_sync $primary $replica
set logfile [srv $replica_id stdout]
set num_lines [lindex [exec wc -l $logfile] 0]
restart_server $replica_id true false true now
set replica [srv $replica_id client]
wait_for_ofs_sync $primary $replica
wait_for_log_messages $replica_id {"*Primary accepted a Partial Resynchronization*"} $num_lines 100 100
assert_equal [status $replica master_replid] $prev_replid
assert_equal [status $replica master_repl_offset] $prev_repl_offset
assert_equal [status $primary sync_full] 1
assert_equal [status $primary sync_partial_ok] 1
}
test {primary rewrite aof and load it after restart} {
# persist current replication info
$primary bgrewriteaof
waitForBgrewriteaof $primary
wait_for_ofs_sync $primary $replica
set prev_repl_offset [expr {[status $primary master_repl_offset] + 1}]
set logfile [srv $replica_id stdout]
set num_lines [lindex [exec wc -l $logfile] 0]
restart_server $primary_id true false true now
set primary [srv $primary_id client]
wait_for_ofs_sync $primary $replica
wait_for_log_messages $replica_id {"*Primary accepted a Partial Resynchronization*"} $num_lines 100 100
assert_equal [status $primary master_replid2] $prev_replid
assert_equal [status $primary second_repl_offset] $prev_repl_offset
assert_equal [status $primary sync_full] 0
assert_equal [status $primary sync_partial_ok] 1
}
}
}