This commit is contained in:
martinrvisser 2025-12-17 13:10:07 +08:00 committed by GitHub
commit 3e2f785bda
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
12 changed files with 1460 additions and 10 deletions

View File

@ -390,6 +390,30 @@ typedef struct ValkeyModuleCommandFilter {
/* Registered filters */
static list *moduleCommandFilters;
typedef struct ValkeyModuleCommandResultCtx {
client *c;
struct serverCommand *cmd;
int result_status;
long long duration;
long long dirty;
} ValkeyModuleCommandResultCtx;
typedef void (*ValkeyModuleCommandResultFunc)(ValkeyModuleCommandResultCtx *result);
typedef struct ValkeyModuleCommandResult {
/* The module that registered the result callback */
ValkeyModule *module;
/* Result callback function */
ValkeyModuleCommandResultFunc callback;
/* VALKEYMODULE_CMDRESULT_* flags */
int flags;
/* Command counter when this callback was registered (to skip firing on registration command) */
long long registered_at_cmd_count;
} ValkeyModuleCommandResult;
/* Registered command result callbacks */
static list *moduleCommandResultCallbacks;
typedef void (*ValkeyModuleForkDoneHandler)(int exitcode, int bysignal, void *user_data);
static struct ValkeyModuleForkInfo {
@ -2391,6 +2415,7 @@ void VM_SetModuleAttribs(ValkeyModuleCtx *ctx, const char *name, int ver, int ap
module->usedby = listCreate();
module->using = listCreate();
module->filters = listCreate();
module->result_callbacks = listCreate();
module->module_configs = listCreate();
listSetMatchMethod(module->module_configs, moduleListConfigMatch);
listSetFreeMethod(module->module_configs, moduleListFree);
@ -3826,6 +3851,13 @@ int modulePopulateClientInfoStructure(void *ci, client *client, int structver) {
if (client->flag.blocked) ci1->flags |= VALKEYMODULE_CLIENTINFO_FLAG_BLOCKED;
if (client->conn->type == connectionTypeTls()) ci1->flags |= VALKEYMODULE_CLIENTINFO_FLAG_SSL;
if (client->flag.readonly) ci1->flags |= VALKEYMODULE_CLIENTINFO_FLAG_READONLY;
if (client->flag.primary) ci1->flags |= VALKEYMODULE_CLIENTINFO_FLAG_PRIMARY;
if (client->flag.replica) ci1->flags |= VALKEYMODULE_CLIENTINFO_FLAG_REPLICA;
if (client->flag.monitor) ci1->flags |= VALKEYMODULE_CLIENTINFO_FLAG_MONITOR;
if (client->flag.module) ci1->flags |= VALKEYMODULE_CLIENTINFO_FLAG_MODULE;
if (client->flag.authenticated) ci1->flags |= VALKEYMODULE_CLIENTINFO_FLAG_AUTHENTICATED;
if (client->flag.ever_authenticated) ci1->flags |= VALKEYMODULE_CLIENTINFO_FLAG_EVER_AUTHENTICATED;
if (client->flag.fake) ci1->flags |= VALKEYMODULE_CLIENTINFO_FLAG_FAKE;
int port;
connAddrPeerName(client->conn, ci1->addr, sizeof(ci1->addr), &port);
@ -11221,6 +11253,164 @@ unsigned long long VM_CommandFilterGetClientId(ValkeyModuleCommandFilterCtx *fct
return fctx->c->id;
}
/* --------------------------------------------------------------------------
* ## Module Command Result Callback API
* -------------------------------------------------------------------------- */
/* Unregister all command result callbacks registered by a module.
* This is called when a module is being unloaded.
*
* Returns the number of callbacks unregistered. */
int moduleUnregisterCommandResultCallbacks(ValkeyModule *module) {
listIter li;
listNode *ln;
int count = 0;
listRewind(module->result_callbacks, &li);
while ((ln = listNext(&li))) {
ValkeyModuleCommandResult *result = ln->value;
listNode *ln = listSearchKey(moduleCommandResultCallbacks, result);
if (ln) {
listDelNode(moduleCommandResultCallbacks, ln);
count++;
}
zfree(result);
}
return count;
}
/* Register a new command result callback function.
*
* Command result callbacks are invoked after a command has been executed,
* providing modules with information about whether the command succeeded or
* failed, along with timing and other execution details.
*
* The callback is invoked for all commands including:
* 1. Commands from clients
* 2. Commands from ValkeyModule_Call()
* 3. Commands from Lua scripts
* 4. Replicated commands
*
* The callback receives a ValkeyModuleCommandResultCtx with:
* - result_status: VALKEYMODULE_CMDRESULT_SUCCESS or VALKEYMODULE_CMDRESULT_FAILURE
* - cmd: The command that was executed
* - c: The client that executed the command
* - duration: Command execution time in microseconds
* - dirty: Number of keys modified
*
* Flags:
* - VALKEYMODULE_CMDRESULT_FAILURES_ONLY: Only invoke callback for failed commands
* - VALKEYMODULE_CMDRESULT_NOSELF: Don't invoke for commands from this module's RM_Call()
*
* Note: Callbacks execute on the critical path. Keep them efficient to minimize
* performance impact. Using FAILURES_ONLY is recommended for most use cases.
*
* Returns a ValkeyModuleCommandResult pointer that can be used with
* ValkeyModule_UnregisterCommandResult().
*/
ValkeyModuleCommandResult *VM_RegisterCommandResult(ValkeyModuleCtx *ctx,
ValkeyModuleCommandResultFunc callback,
int flags) {
ValkeyModuleCommandResult *result = zmalloc(sizeof(*result));
result->module = ctx->module;
result->callback = callback;
result->flags = flags;
/* Record the command count at registration time to avoid firing callback
* for the registration command itself */
result->registered_at_cmd_count = server.stat_numcommands;
listAddNodeTail(moduleCommandResultCallbacks, result);
listAddNodeTail(ctx->module->result_callbacks, result);
return result;
}
/* Unregister a command result callback. */
int VM_UnregisterCommandResult(ValkeyModuleCtx *ctx, ValkeyModuleCommandResult *result) {
listNode *ln;
/* A module can only remove its own callbacks */
if (result->module != ctx->module) return VALKEYMODULE_ERR;
ln = listSearchKey(moduleCommandResultCallbacks, result);
if (!ln) return VALKEYMODULE_ERR;
listDelNode(moduleCommandResultCallbacks, ln);
ln = listSearchKey(ctx->module->result_callbacks, result);
if (!ln) return VALKEYMODULE_ERR; /* Shouldn't happen */
listDelNode(ctx->module->result_callbacks, ln);
zfree(result);
return VALKEYMODULE_OK;
}
/* Call all registered command result callbacks.
* This is invoked from call() after command execution. */
void moduleCallCommandResultCallbacks(client *c,
struct serverCommand *cmd,
int command_failed,
long long duration,
long long dirty) {
if (listLength(moduleCommandResultCallbacks) == 0) return;
listIter li;
listNode *ln;
listRewind(moduleCommandResultCallbacks, &li);
int result_status = command_failed ? 1 : 0; /* 1 = FAILURE, 0 = SUCCESS */
ValkeyModuleCommandResultCtx result_ctx = {
.c = c, .cmd = cmd, .result_status = result_status, .duration = duration, .dirty = dirty};
while ((ln = listNext(&li))) {
ValkeyModuleCommandResult *r = ln->value;
/* Skip callbacks registered during the current command to avoid firing on registration */
if (r->registered_at_cmd_count == server.stat_numcommands) {
continue;
}
/* Skip if FAILURES_ONLY flag is set and command succeeded */
if ((r->flags & VALKEYMODULE_CMDRESULT_FAILURES_ONLY) && !command_failed) {
continue;
}
/* Skip if NOSELF flag is set and module is currently processing a command */
if ((r->flags & VALKEYMODULE_CMDRESULT_NOSELF) && r->module->in_call) {
continue;
}
/* Call the callback */
r->callback(&result_ctx);
}
}
/* Get the result status from a command result context.
* Returns VALKEYMODULE_CMDRESULT_SUCCESS (0) or VALKEYMODULE_CMDRESULT_FAILURE (1). */
int VM_CommandResultStatus(ValkeyModuleCommandResultCtx *rctx) {
return rctx->result_status;
}
/* Get the command name from a command result context. */
const char *VM_CommandResultCommandName(ValkeyModuleCommandResultCtx *rctx) {
return rctx->cmd ? rctx->cmd->fullname : NULL;
}
/* Get the command execution duration in microseconds from a command result context. */
long long VM_CommandResultDuration(ValkeyModuleCommandResultCtx *rctx) {
return rctx->duration;
}
/* Get the number of dirty (modified) keys from a command result context. */
long long VM_CommandResultDirty(ValkeyModuleCommandResultCtx *rctx) {
return rctx->dirty;
}
/* Get the client ID from a command result context. */
unsigned long long VM_CommandResultClientId(ValkeyModuleCommandResultCtx *rctx) {
return rctx->c->id;
}
/* For a given pointer allocated via ValkeyModule_Alloc() or
* ValkeyModule_Realloc(), return the amount of memory allocated for it.
* Note that this may be different (larger) than the memory we allocated
@ -12349,6 +12539,9 @@ void moduleInitModulesSystem(void) {
/* Set up filter list */
moduleCommandFilters = listCreate();
/* Set up command result callback list */
moduleCommandResultCallbacks = listCreate();
moduleRegisterCoreAPI();
/* Create a pipe for module threads to be able to wake up the server main thread.
@ -12473,6 +12666,7 @@ void moduleLoadFromQueue(void) {
void moduleFreeModuleStructure(struct ValkeyModule *module) {
listRelease(module->types);
listRelease(module->filters);
listRelease(module->result_callbacks);
listRelease(module->usedby);
listRelease(module->using);
listRelease(module->module_configs);
@ -12623,6 +12817,7 @@ void moduleUnregisterCleanup(ValkeyModule *module) {
moduleUnregisterSharedAPI(module);
moduleUnregisterUsedAPI(module);
moduleUnregisterFilters(module);
moduleUnregisterCommandResultCallbacks(module);
moduleUnsubscribeAllServerEvents(module);
moduleRemoveConfigs(module);
moduleUnregisterAuthCBs(module);
@ -14473,6 +14668,13 @@ void moduleRegisterCoreAPI(void) {
REGISTER_API(CommandFilterArgReplace);
REGISTER_API(CommandFilterArgDelete);
REGISTER_API(CommandFilterGetClientId);
REGISTER_API(RegisterCommandResult);
REGISTER_API(UnregisterCommandResult);
REGISTER_API(CommandResultStatus);
REGISTER_API(CommandResultCommandName);
REGISTER_API(CommandResultDuration);
REGISTER_API(CommandResultDirty);
REGISTER_API(CommandResultClientId);
REGISTER_API(Fork);
REGISTER_API(SendChildHeartbeat);
REGISTER_API(ExitFromChild);

View File

@ -105,6 +105,7 @@ typedef struct ValkeyModule {
list *usedby; /* List of modules using APIs from this one. */
list *using; /* List of modules we use some APIs of. */
list *filters; /* List of filters the module has registered. */
list *result_callbacks; /* List of command result callbacks the module has registered. */
list *module_configs; /* List of configurations the module has registered */
int configs_initialized; /* Have the module configurations been initialized? */
int in_call; /* RM_Call() nesting level */
@ -205,6 +206,11 @@ void moduleNotifyKeyspaceEvent(int type, const char *event, robj *key, int dbid)
unsigned long moduleNotifyKeyspaceSubscribersCnt(void);
void firePostExecutionUnitJobs(void);
void moduleCallCommandFilters(client *c);
void moduleCallCommandResultCallbacks(client *c,
struct serverCommand *cmd,
int command_failed,
long long duration,
long long dirty);
void modulePostExecutionUnitOperations(void);
void ModuleForkDoneHandler(int exitcode, int bysignal);
int TerminateModuleForkChild(int child_pid, int wait);

View File

@ -3820,14 +3820,19 @@ void call(client *c, int flags) {
/* Update failed command calls if required. */
if (!incrCommandStatsOnError(real_cmd, ERROR_COMMAND_FAILED) && c->deferred_reply_errors) {
int command_failed = incrCommandStatsOnError(real_cmd, ERROR_COMMAND_FAILED);
if (!command_failed && c->deferred_reply_errors) {
/* When call is used from a module client, error stats, and total_error_replies
* isn't updated since these errors, if handled by the module, are internal,
* and not reflected to users. however, the commandstats does show these calls
* (made by RM_Call), so it should log if they failed or succeeded. */
real_cmd->failed_calls++;
command_failed = 1;
}
/* Call module command result callbacks if any are registered. */
moduleCallCommandResultCallbacks(c, real_cmd, command_failed, duration, dirty);
/* After executing command, we will close the client after writing entire
* reply if it is set 'CLIENT_CLOSE_AFTER_COMMAND' flag. */
if (c->flag.close_after_command) {

View File

@ -316,6 +316,22 @@ typedef uint64_t ValkeyModuleTimerID;
/* Do filter ValkeyModule_Call() commands initiated by module itself. */
#define VALKEYMODULE_CMDFILTER_NOSELF (1 << 0)
/* Command Result Callback Flags */
/* Only invoke callback for failed commands */
#define VALKEYMODULE_CMDRESULT_FAILURES_ONLY (1 << 0)
/* Don't invoke callback for ValkeyModule_Call() commands initiated by module itself. */
#define VALKEYMODULE_CMDRESULT_NOSELF (1 << 1)
/* Command Result Status Values */
/* Command executed successfully */
#define VALKEYMODULE_CMDRESULT_SUCCESS 0
/* Command execution failed */
#define VALKEYMODULE_CMDRESULT_FAILURE 1
/* Declare that the module can handle errors with ValkeyModule_SetModuleOptions. */
#define VALKEYMODULE_OPTIONS_HANDLE_IO_ERRORS (1 << 0)
@ -682,6 +698,13 @@ static const ValkeyModuleEvent ValkeyModuleEvent_ReplicationRoleChanged = {VALKE
#define VALKEYMODULE_CLIENTINFO_FLAG_UNIXSOCKET (1 << 4)
#define VALKEYMODULE_CLIENTINFO_FLAG_MULTI (1 << 5)
#define VALKEYMODULE_CLIENTINFO_FLAG_READONLY (1 << 6)
#define VALKEYMODULE_CLIENTINFO_FLAG_PRIMARY (1 << 7)
#define VALKEYMODULE_CLIENTINFO_FLAG_REPLICA (1 << 8)
#define VALKEYMODULE_CLIENTINFO_FLAG_MONITOR (1 << 9)
#define VALKEYMODULE_CLIENTINFO_FLAG_MODULE (1 << 10)
#define VALKEYMODULE_CLIENTINFO_FLAG_AUTHENTICATED (1 << 11)
#define VALKEYMODULE_CLIENTINFO_FLAG_EVER_AUTHENTICATED (1 << 12)
#define VALKEYMODULE_CLIENTINFO_FLAG_FAKE (1 << 13)
/* Here we take all the structures that the module pass to the core
* and the other way around. Notably the list here contains the structures
@ -1332,6 +1355,8 @@ typedef struct ValkeyModuleDict ValkeyModuleDict;
typedef struct ValkeyModuleDictIter ValkeyModuleDictIter;
typedef struct ValkeyModuleCommandFilterCtx ValkeyModuleCommandFilterCtx;
typedef struct ValkeyModuleCommandFilter ValkeyModuleCommandFilter;
typedef struct ValkeyModuleCommandResultCtx ValkeyModuleCommandResultCtx;
typedef struct ValkeyModuleCommandResult ValkeyModuleCommandResult;
typedef struct ValkeyModuleServerInfoData ValkeyModuleServerInfoData;
typedef struct ValkeyModuleScanCursor ValkeyModuleScanCursor;
typedef struct ValkeyModuleUser ValkeyModuleUser;
@ -1365,6 +1390,7 @@ typedef void (*ValkeyModuleClusterMessageReceiver)(ValkeyModuleCtx *ctx,
uint32_t len);
typedef void (*ValkeyModuleTimerProc)(ValkeyModuleCtx *ctx, void *data);
typedef void (*ValkeyModuleCommandFilterFunc)(ValkeyModuleCommandFilterCtx *filter);
typedef void (*ValkeyModuleCommandResultFunc)(ValkeyModuleCommandResultCtx *result);
typedef void (*ValkeyModuleForkDoneHandler)(int exitcode, int bysignal, void *user_data);
typedef void (*ValkeyModuleScanCB)(ValkeyModuleCtx *ctx,
ValkeyModuleString *keyname,
@ -2019,6 +2045,18 @@ VALKEYMODULE_API int (*ValkeyModule_CommandFilterArgDelete)(ValkeyModuleCommandF
int pos) VALKEYMODULE_ATTR;
VALKEYMODULE_API unsigned long long (*ValkeyModule_CommandFilterGetClientId)(ValkeyModuleCommandFilterCtx *fctx)
VALKEYMODULE_ATTR;
VALKEYMODULE_API ValkeyModuleCommandResult *(*ValkeyModule_RegisterCommandResult)(ValkeyModuleCtx *ctx,
ValkeyModuleCommandResultFunc cb,
int flags) VALKEYMODULE_ATTR;
VALKEYMODULE_API int (*ValkeyModule_UnregisterCommandResult)(ValkeyModuleCtx *ctx,
ValkeyModuleCommandResult *result) VALKEYMODULE_ATTR;
VALKEYMODULE_API int (*ValkeyModule_CommandResultStatus)(ValkeyModuleCommandResultCtx *rctx) VALKEYMODULE_ATTR;
VALKEYMODULE_API const char *(*ValkeyModule_CommandResultCommandName)(ValkeyModuleCommandResultCtx *rctx)
VALKEYMODULE_ATTR;
VALKEYMODULE_API long long (*ValkeyModule_CommandResultDuration)(ValkeyModuleCommandResultCtx *rctx) VALKEYMODULE_ATTR;
VALKEYMODULE_API long long (*ValkeyModule_CommandResultDirty)(ValkeyModuleCommandResultCtx *rctx) VALKEYMODULE_ATTR;
VALKEYMODULE_API unsigned long long (*ValkeyModule_CommandResultClientId)(ValkeyModuleCommandResultCtx *rctx)
VALKEYMODULE_ATTR;
VALKEYMODULE_API int (*ValkeyModule_Fork)(ValkeyModuleForkDoneHandler cb, void *user_data) VALKEYMODULE_ATTR;
VALKEYMODULE_API void (*ValkeyModule_SendChildHeartbeat)(double progress) VALKEYMODULE_ATTR;
VALKEYMODULE_API int (*ValkeyModule_ExitFromChild)(int retcode) VALKEYMODULE_ATTR;
@ -2493,6 +2531,13 @@ static int ValkeyModule_Init(ValkeyModuleCtx *ctx, const char *name, int ver, in
VALKEYMODULE_GET_API(CommandFilterArgReplace);
VALKEYMODULE_GET_API(CommandFilterArgDelete);
VALKEYMODULE_GET_API(CommandFilterGetClientId);
VALKEYMODULE_GET_API(RegisterCommandResult);
VALKEYMODULE_GET_API(UnregisterCommandResult);
VALKEYMODULE_GET_API(CommandResultStatus);
VALKEYMODULE_GET_API(CommandResultCommandName);
VALKEYMODULE_GET_API(CommandResultDuration);
VALKEYMODULE_GET_API(CommandResultDirty);
VALKEYMODULE_GET_API(CommandResultClientId);
VALKEYMODULE_GET_API(Fork);
VALKEYMODULE_GET_API(SendChildHeartbeat);
VALKEYMODULE_GET_API(ExitFromChild);

View File

@ -1,5 +1,7 @@
# Build test modules
list(APPEND MODULES_LIST "commandfilter")
list(APPEND MODULES_LIST "commandresult")
list(APPEND MODULES_LIST "benchmark_commandresult")
list(APPEND MODULES_LIST "basics")
list(APPEND MODULES_LIST "testrdb")
list(APPEND MODULES_LIST "fork")

View File

@ -25,6 +25,8 @@ endif
TEST_MODULES = \
commandfilter.so \
commandresult.so \
benchmark_commandresult.so \
basics.so \
testrdb.so \
fork.so \

View File

@ -0,0 +1,250 @@
/*
* Benchmark module for testing command result callback performance impact
*
* This module provides commands to measure the overhead of different callback modes:
* - No callbacks (baseline)
* - FAILURES_ONLY mode (recommended for production)
* - ALL mode (track all commands)
* - Combined FAILURES_ONLY + NOSELF mode
*/
#include "../src/valkeymodule.h"
#include <string.h>
#include <stdio.h>
/* Benchmark statistics */
typedef struct BenchmarkStats {
long long total_callbacks;
long long total_commands;
long long start_time_us;
long long end_time_us;
} BenchmarkStats;
static BenchmarkStats bench_stats = {0};
static ValkeyModuleCommandResult *registered_callback = NULL;
/* Reset benchmark statistics */
void ResetBenchmarkStats(void) {
bench_stats.total_callbacks = 0;
bench_stats.total_commands = 0;
bench_stats.start_time_us = 0;
bench_stats.end_time_us = 0;
}
/* Minimal callback that just increments counter */
void MinimalCallback(ValkeyModuleCommandResultCtx *ctx) {
(void)ctx; /* Unused */
bench_stats.total_callbacks++;
}
/* BENCHMARK.REGISTER <mode>
* Modes: none, failures, all, failures+noself
*/
int BenchmarkRegister_RedisCommand(ValkeyModuleCtx *ctx, ValkeyModuleString **argv, int argc) {
if (argc != 2) {
return ValkeyModule_WrongArity(ctx);
}
if (registered_callback) {
return ValkeyModule_ReplyWithError(ctx, "ERR callback already registered");
}
size_t len;
const char *mode = ValkeyModule_StringPtrLen(argv[1], &len);
int flags = 0;
if (strcmp(mode, "none") == 0) {
return ValkeyModule_ReplyWithSimpleString(ctx, "OK"); // No callback registered
} else if (strcmp(mode, "failures") == 0) {
flags = VALKEYMODULE_CMDRESULT_FAILURES_ONLY;
} else if (strcmp(mode, "all") == 0) {
flags = 0; // No flags - track all
} else if (strcmp(mode, "failures+noself") == 0) {
flags = VALKEYMODULE_CMDRESULT_FAILURES_ONLY | VALKEYMODULE_CMDRESULT_NOSELF;
} else {
return ValkeyModule_ReplyWithError(ctx, "ERR invalid mode. Use: none, failures, all, failures+noself");
}
registered_callback = ValkeyModule_RegisterCommandResult(ctx, MinimalCallback, flags);
if (!registered_callback) {
return ValkeyModule_ReplyWithError(ctx, "ERR failed to register callback");
}
return ValkeyModule_ReplyWithSimpleString(ctx, "OK");
}
/* BENCHMARK.UNREGISTER */
int BenchmarkUnregister_RedisCommand(ValkeyModuleCtx *ctx, ValkeyModuleString **argv, int argc) {
VALKEYMODULE_NOT_USED(argv);
if (argc != 1) {
return ValkeyModule_WrongArity(ctx);
}
if (!registered_callback) {
return ValkeyModule_ReplyWithError(ctx, "ERR no callback registered");
}
if (ValkeyModule_UnregisterCommandResult(ctx, registered_callback) == VALKEYMODULE_ERR) {
return ValkeyModule_ReplyWithError(ctx, "ERR failed to unregister callback");
}
registered_callback = NULL;
return ValkeyModule_ReplyWithSimpleString(ctx, "OK");
}
/* BENCHMARK.START */
int BenchmarkStart_RedisCommand(ValkeyModuleCtx *ctx, ValkeyModuleString **argv, int argc) {
VALKEYMODULE_NOT_USED(argv);
if (argc != 1) {
return ValkeyModule_WrongArity(ctx);
}
ResetBenchmarkStats();
bench_stats.start_time_us = ValkeyModule_Microseconds();
return ValkeyModule_ReplyWithSimpleString(ctx, "OK");
}
/* BENCHMARK.STOP
* Returns: callbacks_fired, commands_executed, duration_us, throughput
*/
int BenchmarkStop_RedisCommand(ValkeyModuleCtx *ctx, ValkeyModuleString **argv, int argc) {
VALKEYMODULE_NOT_USED(argv);
if (argc != 1) {
return ValkeyModule_WrongArity(ctx);
}
bench_stats.end_time_us = ValkeyModule_Microseconds();
long long duration_us = bench_stats.end_time_us - bench_stats.start_time_us;
double throughput = 0.0;
if (duration_us > 0) {
throughput = (double)bench_stats.total_commands / ((double)duration_us / 1000000.0);
}
ValkeyModule_ReplyWithArray(ctx, 8);
ValkeyModule_ReplyWithSimpleString(ctx, "callbacks_fired");
ValkeyModule_ReplyWithLongLong(ctx, bench_stats.total_callbacks);
ValkeyModule_ReplyWithSimpleString(ctx, "commands_executed");
ValkeyModule_ReplyWithLongLong(ctx, bench_stats.total_commands);
ValkeyModule_ReplyWithSimpleString(ctx, "duration_us");
ValkeyModule_ReplyWithLongLong(ctx, duration_us);
ValkeyModule_ReplyWithSimpleString(ctx, "throughput_per_sec");
ValkeyModule_ReplyWithDouble(ctx, throughput);
return VALKEYMODULE_OK;
}
/* BENCHMARK.SUCCESS - Always succeeds, increments command counter */
int BenchmarkSuccess_RedisCommand(ValkeyModuleCtx *ctx, ValkeyModuleString **argv, int argc) {
VALKEYMODULE_NOT_USED(argv);
if (argc != 1) {
return ValkeyModule_WrongArity(ctx);
}
bench_stats.total_commands++;
return ValkeyModule_ReplyWithSimpleString(ctx, "OK");
}
/* BENCHMARK.FAIL - Always fails, increments command counter */
int BenchmarkFail_RedisCommand(ValkeyModuleCtx *ctx, ValkeyModuleString **argv, int argc) {
VALKEYMODULE_NOT_USED(argv);
if (argc != 1) {
return ValkeyModule_WrongArity(ctx);
}
bench_stats.total_commands++;
return ValkeyModule_ReplyWithError(ctx, "ERR benchmark failure");
}
/* BENCHMARK.RMCALL <command> [args...]
* Call command via RM_Call to test NOSELF behavior
*/
int BenchmarkRMCall_RedisCommand(ValkeyModuleCtx *ctx, ValkeyModuleString **argv, int argc) {
if (argc < 2) {
return ValkeyModule_WrongArity(ctx);
}
bench_stats.total_commands++;
ValkeyModuleCallReply *reply = ValkeyModule_Call(ctx,
ValkeyModule_StringPtrLen(argv[1], NULL), "v", argv + 2, argc - 2);
if (!reply) {
return ValkeyModule_ReplyWithError(ctx, "ERR call failed");
}
ValkeyModule_ReplyWithCallReply(ctx, reply);
ValkeyModule_FreeCallReply(reply);
return VALKEYMODULE_OK;
}
/* BENCHMARK.PING - Minimal command for overhead testing */
int BenchmarkPing_RedisCommand(ValkeyModuleCtx *ctx, ValkeyModuleString **argv, int argc) {
VALKEYMODULE_NOT_USED(argv);
if (argc != 1) {
return ValkeyModule_WrongArity(ctx);
}
bench_stats.total_commands++;
return ValkeyModule_ReplyWithSimpleString(ctx, "PONG");
}
int ValkeyModule_OnLoad(ValkeyModuleCtx *ctx, ValkeyModuleString **argv, int argc) {
VALKEYMODULE_NOT_USED(argv);
VALKEYMODULE_NOT_USED(argc);
if (ValkeyModule_Init(ctx, "benchmark_commandresult", 1, VALKEYMODULE_APIVER_1) == VALKEYMODULE_ERR) {
return VALKEYMODULE_ERR;
}
/* Register benchmark commands */
if (ValkeyModule_CreateCommand(ctx, "benchmark.register", BenchmarkRegister_RedisCommand,
"write", 0, 0, 0) == VALKEYMODULE_ERR) {
return VALKEYMODULE_ERR;
}
if (ValkeyModule_CreateCommand(ctx, "benchmark.unregister", BenchmarkUnregister_RedisCommand,
"write", 0, 0, 0) == VALKEYMODULE_ERR) {
return VALKEYMODULE_ERR;
}
if (ValkeyModule_CreateCommand(ctx, "benchmark.start", BenchmarkStart_RedisCommand,
"write", 0, 0, 0) == VALKEYMODULE_ERR) {
return VALKEYMODULE_ERR;
}
if (ValkeyModule_CreateCommand(ctx, "benchmark.stop", BenchmarkStop_RedisCommand,
"readonly", 0, 0, 0) == VALKEYMODULE_ERR) {
return VALKEYMODULE_ERR;
}
if (ValkeyModule_CreateCommand(ctx, "benchmark.success", BenchmarkSuccess_RedisCommand,
"readonly", 0, 0, 0) == VALKEYMODULE_ERR) {
return VALKEYMODULE_ERR;
}
if (ValkeyModule_CreateCommand(ctx, "benchmark.fail", BenchmarkFail_RedisCommand,
"readonly", 0, 0, 0) == VALKEYMODULE_ERR) {
return VALKEYMODULE_ERR;
}
if (ValkeyModule_CreateCommand(ctx, "benchmark.rmcall", BenchmarkRMCall_RedisCommand,
"write", 0, 0, 0) == VALKEYMODULE_ERR) {
return VALKEYMODULE_ERR;
}
if (ValkeyModule_CreateCommand(ctx, "benchmark.ping", BenchmarkPing_RedisCommand,
"readonly fast", 0, 0, 0) == VALKEYMODULE_ERR) {
return VALKEYMODULE_ERR;
}
return VALKEYMODULE_OK;
}

View File

@ -0,0 +1,347 @@
/* Test module for command result callbacks API
*
* This module tests the ValkeyModule_RegisterCommandResult() API and related
* functionality for tracking command execution results.
*
* Commands provided:
* - CMDRESULT.REGISTER <flags> - Register a result callback with specified flags
* - CMDRESULT.UNREGISTER - Unregister the callback
* - CMDRESULT.STATS - Get statistics about callback invocations
* - CMDRESULT.RESET - Reset statistics
* - CMDRESULT.GETLOG [count] - Get the last N logged command results
* - CMDRESULT.SUCCESS - A command that always succeeds
* - CMDRESULT.FAIL - A command that always fails
*/
#include "valkeymodule.h"
#include <string.h>
#include <stdlib.h>
/* Statistics tracking */
static struct {
long long total_callbacks;
long long success_count;
long long failure_count;
long long total_duration_us;
long long total_dirty;
} stats = {0};
/* Command result log entry */
#define MAX_LOG_ENTRIES 100
typedef struct {
char command_name[64];
int status;
long long duration;
long long dirty;
unsigned long long client_id;
} ResultLogEntry;
static ResultLogEntry result_log[MAX_LOG_ENTRIES];
static int log_head = 0;
static int log_count = 0;
/* Registered callback handle */
static ValkeyModuleCommandResult *registered_callback = NULL;
/* Add entry to circular log */
void LogResult(const char *cmd_name, int status, long long duration,
long long dirty, unsigned long long client_id) {
ResultLogEntry *entry = &result_log[log_head];
strncpy(entry->command_name, cmd_name, sizeof(entry->command_name) - 1);
entry->command_name[sizeof(entry->command_name) - 1] = '\0';
entry->status = status;
entry->duration = duration;
entry->dirty = dirty;
entry->client_id = client_id;
log_head = (log_head + 1) % MAX_LOG_ENTRIES;
if (log_count < MAX_LOG_ENTRIES) log_count++;
}
/* Command result callback function */
void CommandResultCallback(ValkeyModuleCommandResultCtx *ctx) {
stats.total_callbacks++;
int status = ValkeyModule_CommandResultStatus(ctx);
const char *cmd_name = ValkeyModule_CommandResultCommandName(ctx);
long long duration = ValkeyModule_CommandResultDuration(ctx);
long long dirty = ValkeyModule_CommandResultDirty(ctx);
unsigned long long client_id = ValkeyModule_CommandResultClientId(ctx);
if (status == VALKEYMODULE_CMDRESULT_SUCCESS) {
stats.success_count++;
} else {
stats.failure_count++;
}
stats.total_duration_us += duration;
stats.total_dirty += dirty;
/* Log the result */
LogResult(cmd_name ? cmd_name : "unknown", status, duration, dirty, client_id);
}
/* CMDRESULT.REGISTER <flags>
* Flags can be: "all", "failures", "noself", "failures+noself"
*/
int CmdResultRegister_RedisCommand(ValkeyModuleCtx *ctx, ValkeyModuleString **argv, int argc) {
if (argc != 2) {
return ValkeyModule_WrongArity(ctx);
}
if (registered_callback) {
return ValkeyModule_ReplyWithError(ctx, "ERR callback already registered");
}
size_t len;
const char *flags_str = ValkeyModule_StringPtrLen(argv[1], &len);
int flags = 0;
if (strcmp(flags_str, "failures") == 0) {
flags = VALKEYMODULE_CMDRESULT_FAILURES_ONLY;
} else if (strcmp(flags_str, "noself") == 0) {
flags = VALKEYMODULE_CMDRESULT_NOSELF;
} else if (strcmp(flags_str, "failures+noself") == 0) {
flags = VALKEYMODULE_CMDRESULT_FAILURES_ONLY | VALKEYMODULE_CMDRESULT_NOSELF;
} else if (strcmp(flags_str, "all") != 0) {
return ValkeyModule_ReplyWithError(ctx, "ERR invalid flags. Use: all, failures, noself, or failures+noself");
}
registered_callback = ValkeyModule_RegisterCommandResult(ctx, CommandResultCallback, flags);
if (!registered_callback) {
return ValkeyModule_ReplyWithError(ctx, "ERR failed to register callback");
}
return ValkeyModule_ReplyWithSimpleString(ctx, "OK");
}
/* CMDRESULT.UNREGISTER */
int CmdResultUnregister_RedisCommand(ValkeyModuleCtx *ctx, ValkeyModuleString **argv, int argc) {
VALKEYMODULE_NOT_USED(argv);
if (argc != 1) {
return ValkeyModule_WrongArity(ctx);
}
if (!registered_callback) {
return ValkeyModule_ReplyWithError(ctx, "ERR no callback registered");
}
int result = ValkeyModule_UnregisterCommandResult(ctx, registered_callback);
registered_callback = NULL;
if (result != VALKEYMODULE_OK) {
return ValkeyModule_ReplyWithError(ctx, "ERR failed to unregister callback");
}
return ValkeyModule_ReplyWithSimpleString(ctx, "OK");
}
/* CMDRESULT.STATS
* Returns: total_callbacks, success_count, failure_count, total_duration_us, total_dirty
*/
int CmdResultStats_RedisCommand(ValkeyModuleCtx *ctx, ValkeyModuleString **argv, int argc) {
VALKEYMODULE_NOT_USED(argv);
if (argc != 1) {
return ValkeyModule_WrongArity(ctx);
}
ValkeyModule_ReplyWithArray(ctx, 10);
ValkeyModule_ReplyWithSimpleString(ctx, "total_callbacks");
ValkeyModule_ReplyWithLongLong(ctx, stats.total_callbacks);
ValkeyModule_ReplyWithSimpleString(ctx, "success_count");
ValkeyModule_ReplyWithLongLong(ctx, stats.success_count);
ValkeyModule_ReplyWithSimpleString(ctx, "failure_count");
ValkeyModule_ReplyWithLongLong(ctx, stats.failure_count);
ValkeyModule_ReplyWithSimpleString(ctx, "total_duration_us");
ValkeyModule_ReplyWithLongLong(ctx, stats.total_duration_us);
ValkeyModule_ReplyWithSimpleString(ctx, "total_dirty");
ValkeyModule_ReplyWithLongLong(ctx, stats.total_dirty);
return VALKEYMODULE_OK;
}
/* CMDRESULT.RESET */
int CmdResultReset_RedisCommand(ValkeyModuleCtx *ctx, ValkeyModuleString **argv, int argc) {
VALKEYMODULE_NOT_USED(argv);
if (argc != 1) {
return ValkeyModule_WrongArity(ctx);
}
stats.total_callbacks = 0;
stats.success_count = 0;
stats.failure_count = 0;
stats.total_duration_us = 0;
stats.total_dirty = 0;
log_head = 0;
log_count = 0;
return ValkeyModule_ReplyWithSimpleString(ctx, "OK");
}
/* CMDRESULT.GETLOG [count]
* Returns the last N command results from the log
*/
int CmdResultGetLog_RedisCommand(ValkeyModuleCtx *ctx, ValkeyModuleString **argv, int argc) {
if (argc > 2) {
return ValkeyModule_WrongArity(ctx);
}
long long count = log_count;
if (argc == 2) {
if (ValkeyModule_StringToLongLong(argv[1], &count) != VALKEYMODULE_OK) {
return ValkeyModule_ReplyWithError(ctx, "ERR invalid count");
}
if (count < 0) count = 0;
if (count > log_count) count = log_count;
}
ValkeyModule_ReplyWithArray(ctx, count);
/* Get entries from newest to oldest */
for (int i = 0; i < count; i++) {
int idx = (log_head - 1 - i + MAX_LOG_ENTRIES) % MAX_LOG_ENTRIES;
ResultLogEntry *entry = &result_log[idx];
ValkeyModule_ReplyWithArray(ctx, 10);
ValkeyModule_ReplyWithSimpleString(ctx, "command");
ValkeyModule_ReplyWithCString(ctx, entry->command_name);
ValkeyModule_ReplyWithSimpleString(ctx, "status");
ValkeyModule_ReplyWithCString(ctx,
entry->status == VALKEYMODULE_CMDRESULT_SUCCESS ? "success" : "failure");
ValkeyModule_ReplyWithSimpleString(ctx, "duration_us");
ValkeyModule_ReplyWithLongLong(ctx, entry->duration);
ValkeyModule_ReplyWithSimpleString(ctx, "dirty");
ValkeyModule_ReplyWithLongLong(ctx, entry->dirty);
ValkeyModule_ReplyWithSimpleString(ctx, "client_id");
ValkeyModule_ReplyWithLongLong(ctx, entry->client_id);
}
return VALKEYMODULE_OK;
}
/* CMDRESULT.SUCCESS
* A command that always succeeds
*/
int CmdResultSuccess_RedisCommand(ValkeyModuleCtx *ctx, ValkeyModuleString **argv, int argc) {
VALKEYMODULE_NOT_USED(argv);
VALKEYMODULE_NOT_USED(argc);
return ValkeyModule_ReplyWithSimpleString(ctx, "OK");
}
/* CMDRESULT.FAIL
* A command that always fails
*/
int CmdResultFail_RedisCommand(ValkeyModuleCtx *ctx, ValkeyModuleString **argv, int argc) {
VALKEYMODULE_NOT_USED(argv);
VALKEYMODULE_NOT_USED(argc);
return ValkeyModule_ReplyWithError(ctx, "ERR intentional failure");
}
/* CMDRESULT.DIRTY <key>
* A command that modifies a key (increments dirty count)
*/
int CmdResultDirty_RedisCommand(ValkeyModuleCtx *ctx, ValkeyModuleString **argv, int argc) {
if (argc != 2) {
return ValkeyModule_WrongArity(ctx);
}
/* Use RM_Call to execute SET, which will properly increment dirty count */
ValkeyModuleString *value = ValkeyModule_CreateString(ctx, "modified", 8);
ValkeyModuleCallReply *reply = ValkeyModule_Call(ctx, "SET", "ss", argv[1], value);
if (!reply) {
ValkeyModule_FreeString(ctx, value);
return ValkeyModule_ReplyWithError(ctx, "ERR failed to set key");
}
ValkeyModule_FreeCallReply(reply);
ValkeyModule_FreeString(ctx, value);
return ValkeyModule_ReplyWithSimpleString(ctx, "OK");
}
/* CMDRESULT.RMCALL <command> [args...]
* Test that NOSELF flag works - this calls a command via RM_Call
*/
int CmdResultRMCall_RedisCommand(ValkeyModuleCtx *ctx, ValkeyModuleString **argv, int argc) {
if (argc < 2) {
return ValkeyModule_WrongArity(ctx);
}
/* Call the command via RM_Call */
ValkeyModuleCallReply *reply = ValkeyModule_Call(ctx,
ValkeyModule_StringPtrLen(argv[1], NULL), "v", argv + 2, argc - 2);
if (!reply) {
return ValkeyModule_ReplyWithError(ctx, "ERR call failed");
}
/* Forward the reply */
ValkeyModule_ReplyWithCallReply(ctx, reply);
ValkeyModule_FreeCallReply(reply);
return VALKEYMODULE_OK;
}
int ValkeyModule_OnLoad(ValkeyModuleCtx *ctx, ValkeyModuleString **argv, int argc) {
VALKEYMODULE_NOT_USED(argv);
VALKEYMODULE_NOT_USED(argc);
if (ValkeyModule_Init(ctx, "commandresult", 1, VALKEYMODULE_APIVER_1) == VALKEYMODULE_ERR) {
return VALKEYMODULE_ERR;
}
if (ValkeyModule_CreateCommand(ctx, "cmdresult.register", CmdResultRegister_RedisCommand,
"admin", 0, 0, 0) == VALKEYMODULE_ERR) {
return VALKEYMODULE_ERR;
}
if (ValkeyModule_CreateCommand(ctx, "cmdresult.unregister", CmdResultUnregister_RedisCommand,
"admin", 0, 0, 0) == VALKEYMODULE_ERR) {
return VALKEYMODULE_ERR;
}
if (ValkeyModule_CreateCommand(ctx, "cmdresult.stats", CmdResultStats_RedisCommand,
"readonly", 0, 0, 0) == VALKEYMODULE_ERR) {
return VALKEYMODULE_ERR;
}
if (ValkeyModule_CreateCommand(ctx, "cmdresult.reset", CmdResultReset_RedisCommand,
"admin", 0, 0, 0) == VALKEYMODULE_ERR) {
return VALKEYMODULE_ERR;
}
if (ValkeyModule_CreateCommand(ctx, "cmdresult.getlog", CmdResultGetLog_RedisCommand,
"readonly", 0, 0, 0) == VALKEYMODULE_ERR) {
return VALKEYMODULE_ERR;
}
if (ValkeyModule_CreateCommand(ctx, "cmdresult.success", CmdResultSuccess_RedisCommand,
"readonly", 0, 0, 0) == VALKEYMODULE_ERR) {
return VALKEYMODULE_ERR;
}
if (ValkeyModule_CreateCommand(ctx, "cmdresult.fail", CmdResultFail_RedisCommand,
"readonly", 0, 0, 0) == VALKEYMODULE_ERR) {
return VALKEYMODULE_ERR;
}
if (ValkeyModule_CreateCommand(ctx, "cmdresult.dirty", CmdResultDirty_RedisCommand,
"write", 1, 1, 1) == VALKEYMODULE_ERR) {
return VALKEYMODULE_ERR;
}
if (ValkeyModule_CreateCommand(ctx, "cmdresult.rmcall", CmdResultRMCall_RedisCommand,
"readonly", 0, 0, 0) == VALKEYMODULE_ERR) {
return VALKEYMODULE_ERR;
}
return VALKEYMODULE_OK;
}

View File

@ -306,14 +306,21 @@ int test_clientinfo(ValkeyModuleCtx *ctx, ValkeyModuleString **argv, int argc)
ValkeyModule_ReplyWithArray(ctx, 10);
char flags[512];
snprintf(flags, sizeof(flags) - 1, "%s:%s:%s:%s:%s:%s:%s",
snprintf(flags, sizeof(flags) - 1, "%s:%s:%s:%s:%s:%s:%s:%s:%s:%s:%s:%s:%s:%s",
ci.flags & VALKEYMODULE_CLIENTINFO_FLAG_SSL ? "ssl" : "",
ci.flags & VALKEYMODULE_CLIENTINFO_FLAG_PUBSUB ? "pubsub" : "",
ci.flags & VALKEYMODULE_CLIENTINFO_FLAG_BLOCKED ? "blocked" : "",
ci.flags & VALKEYMODULE_CLIENTINFO_FLAG_TRACKING ? "tracking" : "",
ci.flags & VALKEYMODULE_CLIENTINFO_FLAG_UNIXSOCKET ? "unixsocket" : "",
ci.flags & VALKEYMODULE_CLIENTINFO_FLAG_MULTI ? "multi" : "",
ci.flags & VALKEYMODULE_CLIENTINFO_FLAG_READONLY ? "readonly" : "");
ci.flags & VALKEYMODULE_CLIENTINFO_FLAG_READONLY ? "readonly" : "",
ci.flags & VALKEYMODULE_CLIENTINFO_FLAG_PRIMARY ? "primary" : "",
ci.flags & VALKEYMODULE_CLIENTINFO_FLAG_REPLICA ? "replica" : "",
ci.flags & VALKEYMODULE_CLIENTINFO_FLAG_MONITOR ? "monitor" : "",
ci.flags & VALKEYMODULE_CLIENTINFO_FLAG_MODULE ? "module" : "",
ci.flags & VALKEYMODULE_CLIENTINFO_FLAG_AUTHENTICATED ? "authenticated" : "",
ci.flags & VALKEYMODULE_CLIENTINFO_FLAG_EVER_AUTHENTICATED ? "ever_authenticated" : "",
ci.flags & VALKEYMODULE_CLIENTINFO_FLAG_FAKE ? "fake" : "");
ValkeyModule_ReplyWithCString(ctx, "flags");
ValkeyModule_ReplyWithCString(ctx, flags);

View File

@ -0,0 +1,255 @@
# Benchmark tests for command result callback performance
start_server {tags {"modules"}} {
set testmodule [file normalize tests/modules/benchmark_commandresult.so]
r module load $testmodule
proc run_benchmark {mode iterations failure_rate description {with_warmup 0}} {
puts "\n========================================="
puts "Benchmark: $description"
puts "Mode: $mode"
puts "Iterations: $iterations"
puts "Failure rate: ${failure_rate}%"
puts "========================================="
# Warmup phase - run a smaller workload first to warm caches
if {$with_warmup} {
puts "Running warmup phase..."
r benchmark.register $mode
r benchmark.start
set warmup_iterations [expr {min(1000, $iterations / 10)}]
for {set i 0} {$i < $warmup_iterations} {incr i} {
r benchmark.success
}
catch {r benchmark.stop}
if {$mode ne "none"} {
catch {r benchmark.unregister}
}
puts "Warmup complete"
}
# Setup
r benchmark.register $mode
r benchmark.start
# Run benchmark
set num_failures [expr {int($iterations * $failure_rate / 100.0)}]
set num_successes [expr {$iterations - $num_failures}]
for {set i 0} {$i < $num_successes} {incr i} {
r benchmark.success
}
for {set i 0} {$i < $num_failures} {incr i} {
catch {r benchmark.fail} e
}
# Get results
set results [r benchmark.stop]
array set stats $results
# Calculate metrics
set callbacks_fired $stats(callbacks_fired)
set commands_executed $stats(commands_executed)
set duration_us $stats(duration_us)
set throughput $stats(throughput_per_sec)
set duration_ms [expr {$duration_us / 1000.0}]
set avg_latency_us [expr {$duration_us / double($commands_executed)}]
puts "Results:"
puts " Commands executed: $commands_executed"
puts " Callbacks fired: $callbacks_fired"
puts " Duration: ${duration_ms} ms"
puts " Throughput: [format %.2f $throughput] commands/sec"
puts " Avg latency: [format %.3f $avg_latency_us] μs/command"
if {$mode eq "none"} {
set overhead_pct 0.0
puts " Overhead: 0% (baseline)"
} else {
# Calculate overhead vs baseline (stored in global)
global baseline_throughput
if {[info exists baseline_throughput]} {
set overhead_pct [expr {(($baseline_throughput - $throughput) / $baseline_throughput) * 100.0}]
puts " Overhead: [format %.2f $overhead_pct]% vs baseline"
}
}
# Verify callback behavior
if {$mode eq "none"} {
assert_equal $callbacks_fired 0
} elseif {$mode eq "failures"} {
assert_equal $callbacks_fired $num_failures
} elseif {$mode eq "all"} {
# ALL mode fires for benchmark commands too (success, fail, stop)
# We executed $iterations commands + 1 stop command = $iterations + 1 total
assert_equal $callbacks_fired [expr {$iterations + 1}]
} elseif {$mode eq "failures+noself"} {
# FAILURES_ONLY + NOSELF should only see direct failures
assert_equal $callbacks_fired $num_failures
}
# Cleanup - always unregister if not "none" mode
if {$mode ne "none"} {
catch {r benchmark.unregister}
}
# Store baseline for comparison
if {$mode eq "none"} {
set ::baseline_throughput $throughput
set ::baseline_latency $avg_latency_us
}
return [list $throughput $avg_latency_us $callbacks_fired $overhead_pct]
}
test {Benchmark - Small workload (1,000 commands, 99.9% success)} {
set results_none [run_benchmark none 1000 0.1 "Baseline (no callbacks)"]
set results_failures [run_benchmark failures 1000 0.1 "FAILURES_ONLY mode"]
set results_all [run_benchmark all 1000 0.1 "ALL mode (track everything)"]
# ALL mode should have measurable overhead
lassign $results_all throughput_all latency_all callbacks_all overhead_all
assert {$overhead_all > 0}
}
test {Benchmark - Medium workload (10,000 commands, 99% success)} {
set results_none [run_benchmark none 10000 1.0 "Baseline (no callbacks)"]
set results_failures [run_benchmark failures 10000 1.0 "FAILURES_ONLY mode"]
set results_all [run_benchmark all 10000 1.0 "ALL mode (track everything)"]
# FAILURES_ONLY should have minimal overhead
lassign $results_failures throughput_failures latency_failures callbacks_failures overhead_failures
assert {$overhead_failures < 10.0}
# ALL mode should have measurable overhead (or at least not be much faster due to timing variance)
lassign $results_all throughput_all latency_all callbacks_all overhead_all
assert {$overhead_all > -5.0}
}
test {Benchmark - Large workload (100,000 commands, 99.9% success)} {
# Run with warmup to eliminate cold-start effects
# Run baseline TWICE - first for warmup, second for measurement
puts "\n*** Running baseline warmup ***"
run_benchmark none 100000 0.1 "Baseline warmup" 1
puts "\n*** Running actual measurements ***"
set results_failures [run_benchmark failures 100000 0.1 "FAILURES_ONLY mode"]
set results_failures_noself [run_benchmark failures+noself 100000 0.1 "FAILURES_ONLY + NOSELF"]
set results_all [run_benchmark all 100000 0.1 "ALL mode (track everything)"]
# Run baseline LAST to get fair comparison (system is warmed up now)
set results_none [run_benchmark none 100000 0.1 "Baseline (no callbacks)"]
# Recalculate overhead using the baseline we just measured
lassign $results_none tp_none lat_none cb_none oh_none
lassign $results_failures tp_fail lat_fail cb_fail oh_fail_old
lassign $results_failures_noself tp_fn lat_fn cb_fn oh_fn_old
lassign $results_all tp_all lat_all cb_all oh_all_old
# Recalculate overhead with correct baseline
set oh_fail [expr {(($tp_none - $tp_fail) / $tp_none) * 100.0}]
set oh_fn [expr {(($tp_none - $tp_fn) / $tp_none) * 100.0}]
set oh_all [expr {(($tp_none - $tp_all) / $tp_none) * 100.0}]
# Print comparison table
puts "\n========================================="
puts "Performance Comparison Summary"
puts "========================================="
puts [format "%-25s %15s %15s %10s" "Mode" "Throughput" "Avg Latency" "Overhead"]
puts "---------------------------------------------------------------------"
puts [format "%-25s %15.2f %12.3f μs %9.2f%%" "Baseline (none)" $tp_none $lat_none 0.0]
puts [format "%-25s %15.2f %12.3f μs %9.2f%%" "FAILURES_ONLY" $tp_fail $lat_fail $oh_fail]
puts [format "%-25s %15.2f %12.3f μs %9.2f%%" "FAILURES_ONLY+NOSELF" $tp_fn $lat_fn $oh_fn]
puts [format "%-25s %15.2f %12.3f μs %9.2f%%" "ALL" $tp_all $lat_all $oh_all]
puts "====================================="
# Assertions - be lenient due to timing variance in microbenchmarks
# FAILURES_ONLY should have low overhead (negative values mean faster, which is timing noise)
assert {$oh_fail < 5.0}
# ALL mode should have measurable overhead (or at least not be much faster)
assert {$oh_all > -5.0}
}
test {Benchmark - High failure rate (10% failures)} {
set results_none [run_benchmark none 10000 10.0 "Baseline (no callbacks)"]
set results_failures [run_benchmark failures 10000 10.0 "FAILURES_ONLY with 10% failures"]
# Even with 10% failures, overhead should be reasonable
lassign $results_failures tp_fail lat_fail cb_fail oh_fail
assert {$oh_fail < 15.0}
}
test {Benchmark - NOSELF with RM_Call} {
r benchmark.register failures+noself
r benchmark.start
# Direct calls (should trigger callback if they fail)
for {set i 0} {$i < 100} {incr i} {
r benchmark.success
}
# RM_Call wrapper (inner command callback should be skipped)
for {set i 0} {$i < 100} {incr i} {
r benchmark.rmcall benchmark.success
}
set results [r benchmark.stop]
array set stats $results
# Should have 300 commands total:
# - 100 direct benchmark.success calls
# - 100 benchmark.rmcall calls (wrapper command)
# - 100 inner benchmark.success calls (invoked via RM_Call)
assert_equal $stats(commands_executed) 300
# No callbacks should fire (all successes with NOSELF flag)
assert_equal $stats(callbacks_fired) 0
r benchmark.unregister
}
test {Benchmark - Minimal command overhead} {
# Test ultra-lightweight PING command
puts "\n========================================="
puts "Testing minimal command overhead"
puts "========================================="
# Ensure no callbacks from previous tests
catch {r benchmark.unregister}
# Baseline - no callbacks
r benchmark.register none
r benchmark.start
for {set i 0} {$i < 10000} {incr i} {
r benchmark.ping
}
set results_none [r benchmark.stop]
array set stats_none $results_none
# With FAILURES_ONLY
r benchmark.register failures
r benchmark.start
for {set i 0} {$i < 10000} {incr i} {
r benchmark.ping
}
set results_failures [r benchmark.stop]
array set stats_failures $results_failures
set baseline_throughput $stats_none(throughput_per_sec)
set failures_throughput $stats_failures(throughput_per_sec)
set overhead_pct [expr {(($baseline_throughput - $failures_throughput) / $baseline_throughput) * 100.0}]
puts "Baseline throughput: [format %.2f $baseline_throughput] cmd/sec"
puts "FAILURES_ONLY throughput: [format %.2f $failures_throughput] cmd/sec"
puts "Overhead: [format %.2f $overhead_pct]%"
# For minimal commands, overhead should be reasonable
assert {$overhead_pct < 15.0}
r benchmark.unregister
}
}

View File

@ -0,0 +1,288 @@
set testmodule [file normalize tests/modules/commandresult.so]
start_server {tags {"modules"}} {
r module load $testmodule
# Helper to ensure cleanup between tests
proc cleanup_callback {} {
catch {r cmdresult.unregister}
r cmdresult.reset
}
test {Module commandresult - Register callback with 'all' flag} {
cleanup_callback
r cmdresult.register all
# Execute some commands
r cmdresult.success
r ping
catch {r cmdresult.fail} e
# Check stats
set stats [r cmdresult.stats]
assert {[dict get $stats total_callbacks] >= 3}
assert {[dict get $stats success_count] >= 2}
assert {[dict get $stats failure_count] >= 1}
r cmdresult.unregister
}
test {Module commandresult - Register callback with 'failures' flag} {
cleanup_callback
r cmdresult.register failures
# Execute successful and failing commands
r cmdresult.success
r ping
r cmdresult.success
catch {r cmdresult.fail} e
catch {r cmdresult.fail} e
# With failures-only, should only see 2 callbacks (the failures)
set stats [r cmdresult.stats]
assert_equal [dict get $stats failure_count] 2
# Success count should be 0 since we're only tracking failures
assert_equal [dict get $stats success_count] 0
r cmdresult.unregister
}
test {Module commandresult - Callback tracks duration} {
cleanup_callback
r cmdresult.register all
r cmdresult.success
r ping
set stats [r cmdresult.stats]
# Duration should be > 0 microseconds
assert {[dict get $stats total_duration_us] > 0}
r cmdresult.unregister
}
test {Module commandresult - Callback tracks dirty keys} {
cleanup_callback
r cmdresult.register all
# This command modifies a key
r cmdresult.dirty mykey
set stats [r cmdresult.stats]
# Should have at least 1 dirty key
assert {[dict get $stats total_dirty] >= 1}
r cmdresult.unregister
}
test {Module commandresult - Get command log} {
cleanup_callback
r cmdresult.register all
r cmdresult.success
catch {r cmdresult.fail} e
r ping
set log [r cmdresult.getlog 3]
assert_equal [llength $log] 3
# Check first entry (most recent - ping)
set entry [lindex $log 0]
assert {[dict get $entry command] eq "ping"}
assert {[dict get $entry status] eq "success"}
# Check second entry (cmdresult.fail)
set entry [lindex $log 1]
assert {[dict get $entry command] eq "cmdresult.fail"}
assert {[dict get $entry status] eq "failure"}
# Check third entry (cmdresult.success)
set entry [lindex $log 2]
assert {[dict get $entry command] eq "cmdresult.success"}
assert {[dict get $entry status] eq "success"}
r cmdresult.unregister
}
test {Module commandresult - Get partial log} {
cleanup_callback
r cmdresult.register all
r cmdresult.success
r cmdresult.success
r cmdresult.success
r cmdresult.success
r cmdresult.success
# Request only last 2 entries
set log [r cmdresult.getlog 2]
assert_equal [llength $log] 2
r cmdresult.unregister
}
test {Module commandresult - Client ID is captured} {
cleanup_callback
r cmdresult.register all
r cmdresult.success
set log [r cmdresult.getlog 1]
set entry [lindex $log 0]
# Client ID should be a positive integer
assert {[dict get $entry client_id] > 0}
r cmdresult.unregister
}
test {Module commandresult - NOSELF flag with RM_Call} {
cleanup_callback
r cmdresult.register noself
# This command calls PING via RM_Call
# With NOSELF, the PING callback should be skipped
r cmdresult.rmcall ping
set stats [r cmdresult.stats]
# Should see callback for cmdresult.rmcall itself, but not for ping
# Note: After stats is read, we have 2 callbacks: rmcall (from before) + stats (just now)
# But when we READ stats, it returns the count BEFORE stats callback fires
assert_equal [dict get $stats total_callbacks] 1
# Get the last 2 log entries - they will be: [0]=stats, [1]=rmcall (newest first)
set log [r cmdresult.getlog 2]
set rmcall_entry [lindex $log 1]
# Should be cmdresult.rmcall, not ping
assert {[dict get $rmcall_entry command] eq "cmdresult.rmcall"}
r cmdresult.unregister
}
test {Module commandresult - Without NOSELF flag, RM_Call is tracked} {
cleanup_callback
r cmdresult.register all
# This command calls PING via RM_Call
# Without NOSELF, both cmdresult.rmcall and ping should be tracked
r cmdresult.rmcall ping
set stats [r cmdresult.stats]
# Should see callbacks for both cmdresult.rmcall and ping
assert {[dict get $stats total_callbacks] >= 2}
r cmdresult.unregister
}
test {Module commandresult - Unregister callback} {
cleanup_callback
r cmdresult.register all
r cmdresult.success
r cmdresult.unregister
# After unregister, new commands shouldn't trigger callbacks
r cmdresult.success
r ping
set stats [r cmdresult.stats]
# Should only have 1 callback (from before unregister)
assert_equal [dict get $stats total_callbacks] 1
# Trying to unregister again should fail
catch {r cmdresult.unregister} err
assert_match {*no callback registered*} $err
}
test {Module commandresult - Cannot register twice} {
cleanup_callback
r cmdresult.register all
# Trying to register again should fail
catch {r cmdresult.register all} err
assert_match {*already registered*} $err
r cmdresult.unregister
}
test {Module commandresult - Reset clears stats and log} {
cleanup_callback
r cmdresult.register all
r cmdresult.success
r ping
catch {r cmdresult.fail} e
# Verify we have stats
set stats [r cmdresult.stats]
assert {[dict get $stats total_callbacks] > 0}
# Reset should clear everything
cleanup_callback
set stats [r cmdresult.stats]
assert_equal [dict get $stats total_callbacks] 0
assert_equal [dict get $stats success_count] 0
assert_equal [dict get $stats failure_count] 0
set log [r cmdresult.getlog]
assert_equal [llength $log] 0
}
test {Module commandresult - Invalid flag returns error} {
cleanup_callback
catch {r cmdresult.register invalid_flag} err
assert_match {*invalid flags*} $err
}
test {Module commandresult - Failures+noself combination} {
cleanup_callback
r cmdresult.register failures+noself
# Successful commands shouldn't trigger callback
r cmdresult.success
r ping
# Failing command should trigger callback
catch {r cmdresult.fail} e
# RM_Call to a failing command - the inner cmdresult.fail is skipped (NOSELF),
# but cmdresult.rmcall itself forwards the error so it counts as a failure too
catch {r cmdresult.rmcall cmdresult.fail} e
set stats [r cmdresult.stats]
# Should see 2 callbacks: cmdresult.fail (direct) + cmdresult.rmcall (wrapper that forwards error)
# The inner cmdresult.fail called via RM_Call is skipped due to NOSELF
assert_equal [dict get $stats failure_count] 2
assert_equal [dict get $stats success_count] 0
r cmdresult.unregister
}
test {Module commandresult - Command name is captured correctly} {
cleanup_callback
r cmdresult.register all
r cmdresult.success
r set mykey myvalue
r get mykey
set log [r cmdresult.getlog 3]
# Check that command names are correct
set commands [list]
foreach entry $log {
lappend commands [dict get $entry command]
}
assert {[lsearch $commands "get"] >= 0}
assert {[lsearch $commands "set"] >= 0}
assert {[lsearch $commands "cmdresult.success"] >= 0}
r cmdresult.unregister
}
test {Unload the module - commandresult} {
assert_equal {OK} [r module unload commandresult]
}
}

View File

@ -97,31 +97,72 @@ start_server {overrides {save {900 1}} tags {"modules"}} {
assert { $was_set == 0 }
}
proc parse_client_flags {flags} {
set flag_list [split $flags ":"]
set parsed_flags {}
# Just collect all non-empty flags
foreach flag $flag_list {
if {$flag ne ""} {
lappend parsed_flags $flag
}
}
return $parsed_flags
}
test {test module clientinfo api} {
# Test basic sanity and SSL flag
set info [r test.clientinfo]
set ssl_flag [expr $::tls ? {"ssl:"} : {":"}]
assert { [dict get $info db] == 9 }
assert { [dict get $info flags] == "${ssl_flag}:::::" }
set flags [parse_client_flags [dict get $info flags]]
# Check initial state - should have auth flags, maybe SSL
if {$::tls} {
assert { "ssl" in $flags }
}
assert { "authenticated" in $flags }
assert { "ever_authenticated" in $flags }
assert { "multi" ni $flags }
assert { "tracking" ni $flags }
assert { "readonly" ni $flags }
# Test MULTI flag
r multi
r test.clientinfo
set info [lindex [r exec] 0]
assert { [dict get $info flags] == "${ssl_flag}::::multi:" }
set flags [parse_client_flags [dict get $info flags]]
assert { "multi" in $flags }
assert { "authenticated" in $flags }
assert { "ever_authenticated" in $flags }
# Test TRACKING flag
r client tracking on
set info [r test.clientinfo]
assert { [dict get $info flags] == "${ssl_flag}::tracking:::" }
set flags [parse_client_flags [dict get $info flags]]
assert { "tracking" in $flags }
assert { "multi" ni $flags }
assert { "authenticated" in $flags }
assert { "ever_authenticated" in $flags }
r CLIENT TRACKING off
# Test READONLY flag
r readonly
set info [r test.clientinfo]
assert { [dict get $info flags] == "${ssl_flag}:::::readonly" }
set flags [parse_client_flags [dict get $info flags]]
assert { "readonly" in $flags }
assert { "tracking" ni $flags }
assert { "multi" ni $flags }
assert { "authenticated" in $flags }
assert { "ever_authenticated" in $flags }
r readwrite
set info [r test.clientinfo]
assert { [dict get $info flags] == "${ssl_flag}:::::" }
set flags [parse_client_flags [dict get $info flags]]
assert { "readonly" ni $flags }
assert { "authenticated" in $flags }
assert { "ever_authenticated" in $flags }
}
test {tracking with rm_call sanity} {