Add slot migration client flags and module context flags (#2639)

New client flags in reported by CLIENT INFO and CLIENT LIST:

* `i` for atomic slot migration importing client
* `E` for atomic slot migration exporting client

New flags in return value of `ValkeyModule_GetContextFlags`:

* `VALKEYMODULE_CTX_FLAGS_SLOT_IMPORT_CLIENT`: Indicate the that client
attached to this context is the slot import client.
* `VALKEYMODULE_CTX_FLAGS_SLOT_EXPORT_CLIENT`: Indicate the that client
attached to this context is the slot export client.

Users could use this to monitor the underlying client info of the slot
migration, and more clearly understand why they see extra clients during
the migration.

Modules can use these to detect keyspace notifications on import
clients. I am also adding export flags for symmetry, although there
should not be keyspace notifications. But they would potentially be
visible in command filters or in server events triggered by that client.

---------

Signed-off-by: Jacob Murphy <jkmurphy@google.com>
This commit is contained in:
Jacob Murphy 2025-10-02 12:12:34 -07:00 committed by GitHub
parent 307397904f
commit d5bb986fd5
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
4 changed files with 76 additions and 2 deletions

View File

@ -4004,6 +4004,12 @@ int VM_GetSelectedDb(ValkeyModuleCtx *ctx) {
* context is using RESP3.
*
* * VALKEYMODULE_CTX_FLAGS_SERVER_STARTUP: The instance is starting
*
* * VALKEYMODULE_CTX_FLAGS_SLOT_IMPORT_CLIENT: Indicate the that client attached to this
* context is the slot import client.
*
* * VALKEYMODULE_CTX_FLAGS_SLOT_EXPORT_CLIENT: Indicate the that client attached to this
* context is the slot export client.
*/
int VM_GetContextFlags(ValkeyModuleCtx *ctx) {
int flags = 0;
@ -4017,6 +4023,11 @@ int VM_GetContextFlags(ValkeyModuleCtx *ctx) {
if (ctx->client->resp == 3) {
flags |= VALKEYMODULE_CTX_FLAGS_RESP3;
}
if (ctx->client->slot_migration_job && isImportSlotMigrationJob(ctx->client->slot_migration_job)) {
flags |= VALKEYMODULE_CTX_FLAGS_SLOT_IMPORT_CLIENT;
} else if (ctx->client->slot_migration_job) {
flags |= VALKEYMODULE_CTX_FLAGS_SLOT_EXPORT_CLIENT;
}
}
/* For DIRTY flags, we need the blocked client if used */

View File

@ -4212,6 +4212,8 @@ sds catClientInfoString(sds s, client *client, int hide_user_data) {
if (client->flag.no_evict) *p++ = 'e';
if (client->flag.no_touch) *p++ = 'T';
if (client->flag.import_source) *p++ = 'I';
if (client->slot_migration_job && isImportSlotMigrationJob(client->slot_migration_job)) *p++ = 'i';
if (client->slot_migration_job && !isImportSlotMigrationJob(client->slot_migration_job)) *p++ = 'E';
if (p == flags) *p++ = 'N';
*p++ = '\0';
@ -4696,6 +4698,8 @@ static int validateClientFlagFilter(sds flag_filter) {
case 'e':
case 'T':
case 'I':
case 'i':
case 'E':
case 'N':
/* Valid flag, do nothing. */
break;
@ -4850,6 +4854,12 @@ static int clientMatchesFlagFilter(client *c, sds flag_filter) {
case 'I': /* Import source flag */
if (!c->flag.import_source) return 0;
break;
case 'i': /* Slot migration import flag */
if (!c->slot_migration_job || !isImportSlotMigrationJob(c->slot_migration_job)) return 0;
break;
case 'E': /* Slot migration export flag */
if (!c->slot_migration_job || isImportSlotMigrationJob(c->slot_migration_job)) return 0;
break;
case 'N': /* Check for no flags */
if (c->flag.replica || c->flag.primary || c->flag.pubsub ||
c->flag.multi || c->flag.blocked || c->flag.tracking ||
@ -4858,7 +4868,7 @@ static int clientMatchesFlagFilter(client *c, sds flag_filter) {
c->flag.unblocked || c->flag.close_asap ||
c->flag.unix_socket || c->flag.readonly ||
c->flag.no_evict || c->flag.no_touch ||
c->flag.import_source) {
c->flag.import_source || c->slot_migration_job) {
return 0;
}
break;

View File

@ -221,11 +221,15 @@ typedef struct ValkeyModuleStreamID {
#define VALKEYMODULE_CTX_FLAGS_ASYNC_LOADING (1 << 23)
/* Valkey is starting. */
#define VALKEYMODULE_CTX_FLAGS_SERVER_STARTUP (1 << 24)
/* The current client is the slot import client */
#define VALKEYMODULE_CTX_FLAGS_SLOT_IMPORT_CLIENT (1 << 25)
/* The current client is the slot export client */
#define VALKEYMODULE_CTX_FLAGS_SLOT_EXPORT_CLIENT (1 << 26)
/* Next context flag, must be updated when adding new flags above!
This flag should not be used directly by the module.
* Use ValkeyModule_GetContextFlagsAll instead. */
#define _VALKEYMODULE_CTX_FLAGS_NEXT (1 << 25)
#define _VALKEYMODULE_CTX_FLAGS_NEXT (1 << 27)
/* Keyspace changes notification classes. Every class is associated with a
* character for configuration purposes.

View File

@ -521,6 +521,55 @@ start_cluster 3 3 {tags {logreqres:skip external:skip cluster} overrides {cluste
}
}
proc verify_client_flag {idx flag expected_count} {
set clients [split [string trim [R $idx client list]] "\r\n"]
set found 0
foreach client $clients {
if {[regexp "flags=\[a-zA-Z\]*$flag" $client]} {
incr found
}
}
if {$found ne $expected_count} {
fail "Expected $flag to appear in client list $expected_count times, got $found: $clients"
}
}
test "Slot migration seen in client flags" {
assert_does_not_resync {
set_debug_prevent_pause 1
assert_match "OK" [R 2 CLUSTER MIGRATESLOTS SLOTSRANGE 16383 16383 NODE $node0_id]
set jobname [get_job_name 2 16383]
# Check the flags
verify_client_flag 0 "E" 0
verify_client_flag 2 "E" 1
verify_client_flag 0 "i" 1
verify_client_flag 2 "i" 0
assert_match "id=*" [R 2 CLIENT LIST FLAGS E]
assert_equal "" [R 2 CLIENT LIST FLAGS i]
assert_equal "" [R 0 CLIENT LIST FLAGS E]
assert_match "id=*" [R 0 CLIENT LIST FLAGS i]
set_debug_prevent_pause 0
wait_for_migration 0 16383
# Check the flags again
verify_client_flag 0 "E" 0
verify_client_flag 2 "E" 0
verify_client_flag 0 "i" 0
verify_client_flag 2 "i" 0
assert_equal "" [R 2 CLIENT LIST FLAGS E]
assert_equal "" [R 2 CLIENT LIST FLAGS i]
assert_equal "" [R 0 CLIENT LIST FLAGS E]
assert_equal "" [R 0 CLIENT LIST FLAGS i]
# Cleanup for the next test
assert_match "OK" [R 0 CLUSTER MIGRATESLOTS SLOTSRANGE 16383 16383 NODE $node2_id]
wait_for_migration 2 16383
}
}
test "Import with hz set to 1" {
assert_does_not_resync {
set old_hz [lindex [R 0 CONFIG GET hz] 1]