From d5bb986fd592733330ff4042c011f7c64da543ec Mon Sep 17 00:00:00 2001 From: Jacob Murphy Date: Thu, 2 Oct 2025 12:12:34 -0700 Subject: [PATCH] Add slot migration client flags and module context flags (#2639) New client flags in reported by CLIENT INFO and CLIENT LIST: * `i` for atomic slot migration importing client * `E` for atomic slot migration exporting client New flags in return value of `ValkeyModule_GetContextFlags`: * `VALKEYMODULE_CTX_FLAGS_SLOT_IMPORT_CLIENT`: Indicate the that client attached to this context is the slot import client. * `VALKEYMODULE_CTX_FLAGS_SLOT_EXPORT_CLIENT`: Indicate the that client attached to this context is the slot export client. Users could use this to monitor the underlying client info of the slot migration, and more clearly understand why they see extra clients during the migration. Modules can use these to detect keyspace notifications on import clients. I am also adding export flags for symmetry, although there should not be keyspace notifications. But they would potentially be visible in command filters or in server events triggered by that client. --------- Signed-off-by: Jacob Murphy --- src/module.c | 11 +++++ src/networking.c | 12 ++++- src/valkeymodule.h | 6 ++- tests/unit/cluster/cluster-migrateslots.tcl | 49 +++++++++++++++++++++ 4 files changed, 76 insertions(+), 2 deletions(-) diff --git a/src/module.c b/src/module.c index d68991eea..e5afa952f 100644 --- a/src/module.c +++ b/src/module.c @@ -4004,6 +4004,12 @@ int VM_GetSelectedDb(ValkeyModuleCtx *ctx) { * context is using RESP3. * * * VALKEYMODULE_CTX_FLAGS_SERVER_STARTUP: The instance is starting + * + * * VALKEYMODULE_CTX_FLAGS_SLOT_IMPORT_CLIENT: Indicate the that client attached to this + * context is the slot import client. + * + * * VALKEYMODULE_CTX_FLAGS_SLOT_EXPORT_CLIENT: Indicate the that client attached to this + * context is the slot export client. */ int VM_GetContextFlags(ValkeyModuleCtx *ctx) { int flags = 0; @@ -4017,6 +4023,11 @@ int VM_GetContextFlags(ValkeyModuleCtx *ctx) { if (ctx->client->resp == 3) { flags |= VALKEYMODULE_CTX_FLAGS_RESP3; } + if (ctx->client->slot_migration_job && isImportSlotMigrationJob(ctx->client->slot_migration_job)) { + flags |= VALKEYMODULE_CTX_FLAGS_SLOT_IMPORT_CLIENT; + } else if (ctx->client->slot_migration_job) { + flags |= VALKEYMODULE_CTX_FLAGS_SLOT_EXPORT_CLIENT; + } } /* For DIRTY flags, we need the blocked client if used */ diff --git a/src/networking.c b/src/networking.c index 62d55697d..55b54bb20 100644 --- a/src/networking.c +++ b/src/networking.c @@ -4212,6 +4212,8 @@ sds catClientInfoString(sds s, client *client, int hide_user_data) { if (client->flag.no_evict) *p++ = 'e'; if (client->flag.no_touch) *p++ = 'T'; if (client->flag.import_source) *p++ = 'I'; + if (client->slot_migration_job && isImportSlotMigrationJob(client->slot_migration_job)) *p++ = 'i'; + if (client->slot_migration_job && !isImportSlotMigrationJob(client->slot_migration_job)) *p++ = 'E'; if (p == flags) *p++ = 'N'; *p++ = '\0'; @@ -4696,6 +4698,8 @@ static int validateClientFlagFilter(sds flag_filter) { case 'e': case 'T': case 'I': + case 'i': + case 'E': case 'N': /* Valid flag, do nothing. */ break; @@ -4850,6 +4854,12 @@ static int clientMatchesFlagFilter(client *c, sds flag_filter) { case 'I': /* Import source flag */ if (!c->flag.import_source) return 0; break; + case 'i': /* Slot migration import flag */ + if (!c->slot_migration_job || !isImportSlotMigrationJob(c->slot_migration_job)) return 0; + break; + case 'E': /* Slot migration export flag */ + if (!c->slot_migration_job || isImportSlotMigrationJob(c->slot_migration_job)) return 0; + break; case 'N': /* Check for no flags */ if (c->flag.replica || c->flag.primary || c->flag.pubsub || c->flag.multi || c->flag.blocked || c->flag.tracking || @@ -4858,7 +4868,7 @@ static int clientMatchesFlagFilter(client *c, sds flag_filter) { c->flag.unblocked || c->flag.close_asap || c->flag.unix_socket || c->flag.readonly || c->flag.no_evict || c->flag.no_touch || - c->flag.import_source) { + c->flag.import_source || c->slot_migration_job) { return 0; } break; diff --git a/src/valkeymodule.h b/src/valkeymodule.h index acbbbb14e..2d5dbca7b 100644 --- a/src/valkeymodule.h +++ b/src/valkeymodule.h @@ -221,11 +221,15 @@ typedef struct ValkeyModuleStreamID { #define VALKEYMODULE_CTX_FLAGS_ASYNC_LOADING (1 << 23) /* Valkey is starting. */ #define VALKEYMODULE_CTX_FLAGS_SERVER_STARTUP (1 << 24) +/* The current client is the slot import client */ +#define VALKEYMODULE_CTX_FLAGS_SLOT_IMPORT_CLIENT (1 << 25) +/* The current client is the slot export client */ +#define VALKEYMODULE_CTX_FLAGS_SLOT_EXPORT_CLIENT (1 << 26) /* Next context flag, must be updated when adding new flags above! This flag should not be used directly by the module. * Use ValkeyModule_GetContextFlagsAll instead. */ -#define _VALKEYMODULE_CTX_FLAGS_NEXT (1 << 25) +#define _VALKEYMODULE_CTX_FLAGS_NEXT (1 << 27) /* Keyspace changes notification classes. Every class is associated with a * character for configuration purposes. diff --git a/tests/unit/cluster/cluster-migrateslots.tcl b/tests/unit/cluster/cluster-migrateslots.tcl index 032b6ae18..1b2d12136 100644 --- a/tests/unit/cluster/cluster-migrateslots.tcl +++ b/tests/unit/cluster/cluster-migrateslots.tcl @@ -521,6 +521,55 @@ start_cluster 3 3 {tags {logreqres:skip external:skip cluster} overrides {cluste } } + proc verify_client_flag {idx flag expected_count} { + set clients [split [string trim [R $idx client list]] "\r\n"] + set found 0 + foreach client $clients { + if {[regexp "flags=\[a-zA-Z\]*$flag" $client]} { + incr found + } + } + if {$found ne $expected_count} { + fail "Expected $flag to appear in client list $expected_count times, got $found: $clients" + } + } + + test "Slot migration seen in client flags" { + assert_does_not_resync { + set_debug_prevent_pause 1 + + assert_match "OK" [R 2 CLUSTER MIGRATESLOTS SLOTSRANGE 16383 16383 NODE $node0_id] + set jobname [get_job_name 2 16383] + + # Check the flags + verify_client_flag 0 "E" 0 + verify_client_flag 2 "E" 1 + verify_client_flag 0 "i" 1 + verify_client_flag 2 "i" 0 + assert_match "id=*" [R 2 CLIENT LIST FLAGS E] + assert_equal "" [R 2 CLIENT LIST FLAGS i] + assert_equal "" [R 0 CLIENT LIST FLAGS E] + assert_match "id=*" [R 0 CLIENT LIST FLAGS i] + + set_debug_prevent_pause 0 + wait_for_migration 0 16383 + + # Check the flags again + verify_client_flag 0 "E" 0 + verify_client_flag 2 "E" 0 + verify_client_flag 0 "i" 0 + verify_client_flag 2 "i" 0 + assert_equal "" [R 2 CLIENT LIST FLAGS E] + assert_equal "" [R 2 CLIENT LIST FLAGS i] + assert_equal "" [R 0 CLIENT LIST FLAGS E] + assert_equal "" [R 0 CLIENT LIST FLAGS i] + + # Cleanup for the next test + assert_match "OK" [R 0 CLUSTER MIGRATESLOTS SLOTSRANGE 16383 16383 NODE $node2_id] + wait_for_migration 2 16383 + } + } + test "Import with hz set to 1" { assert_does_not_resync { set old_hz [lindex [R 0 CONFIG GET hz] 1]