This commit is contained in:
uriyage 2025-12-17 10:14:32 +08:00 committed by GitHub
commit 46efa137f8
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
7 changed files with 2889 additions and 3 deletions

View File

@ -170,7 +170,9 @@ set(VALKEY_BENCHMARK_SRCS
${CMAKE_SOURCE_DIR}/src/monotonic.c
${CMAKE_SOURCE_DIR}/src/cli_common.c
${CMAKE_SOURCE_DIR}/src/mt19937-64.c
${CMAKE_SOURCE_DIR}/src/strl.c)
${CMAKE_SOURCE_DIR}/src/strl.c
${CMAKE_SOURCE_DIR}/src/fuzzer_client.c
${CMAKE_SOURCE_DIR}/src/fuzzer_command_generator.c)
# valkey-rdma module
set(VALKEY_RDMA_MODULE_SRCS ${CMAKE_SOURCE_DIR}/src/rdma.c)

View File

@ -428,7 +428,7 @@ ENGINE_SERVER_OBJ+=$(ENGINE_TRACE_OBJ)
ENGINE_CLI_NAME=$(ENGINE_NAME)-cli$(PROG_SUFFIX)
ENGINE_CLI_OBJ=anet.o adlist.o dict.o valkey-cli.o zmalloc.o release.o ae.o serverassert.o crcspeed.o crccombine.o crc64.o siphash.o crc16.o monotonic.o cli_common.o mt19937-64.o strl.o cli_commands.o sds.o util.o sha256.o
ENGINE_BENCHMARK_NAME=$(ENGINE_NAME)-benchmark$(PROG_SUFFIX)
ENGINE_BENCHMARK_OBJ=ae.o anet.o valkey-benchmark.o adlist.o dict.o zmalloc.o serverassert.o release.o crcspeed.o crccombine.o crc64.o siphash.o crc16.o monotonic.o cli_common.o mt19937-64.o strl.o sds.o util.o sha256.o
ENGINE_BENCHMARK_OBJ=ae.o anet.o valkey-benchmark.o adlist.o dict.o zmalloc.o serverassert.o release.o crcspeed.o crccombine.o crc64.o siphash.o crc16.o monotonic.o cli_common.o mt19937-64.o strl.o sds.o util.o sha256.o fuzzer_client.o fuzzer_command_generator.o
ENGINE_CHECK_RDB_NAME=$(ENGINE_NAME)-check-rdb$(PROG_SUFFIX)
ENGINE_CHECK_AOF_NAME=$(ENGINE_NAME)-check-aof$(PROG_SUFFIX)
ENGINE_LIB_NAME=lib$(ENGINE_NAME).a

779
src/fuzzer_client.c Normal file
View File

@ -0,0 +1,779 @@
/**
* fuzzer_client.c - Client implementation for Valkey fuzzer
*
* This file implements a client that connects to a Valkey server,
* sends commands generated by the fuzzer_command_generator module one by one,
* waiting for each reply before sending the next command.*/
#define _GNU_SOURCE
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <unistd.h>
#include <errno.h>
#include <assert.h>
#include <signal.h>
#include <stdarg.h>
#include <time.h>
#include <sys/wait.h>
#include <pthread.h>
#include <stdint.h>
#include <stdatomic.h>
#include <sys/time.h>
#include <valkey/valkey.h>
#include "fuzzer_command_generator.h"
#include "sds.h"
#include "adlist.h"
#include "cli_common.h"
/* Configuration constants */
/* Default number of commands to send in a fuzzing session */
#define RECONNECT_MAX_ATTEMPTS 5 /* Maximum number of reconnection attempts */
#define RECONNECT_BASE_DELAY_MS 100 /* Base delay for exponential backoff (milliseconds) */
#define COMMAND_TIMEOUT_SEC 15 /* Command timeout in seconds */
#define MAX_CLIENTS_NUM 5000 /* Maximum number of parallel clients */
#define MAX_ERRORS 100 /* Maximum number of errors to store */
/* Global atomic counters for progress reporting */
static atomic_int global_commands_sent = 0;
static atomic_int global_success_replies = 0;
static atomic_int global_err_replies = 0;
static atomic_int global_malformed_replies = 0;
static atomic_int global_total_errors = 0;
/* Global fuzzing flags */
static int global_fuzz_flags = 0;
/* Progress reporting control */
static volatile int progress_reporting_active = 0;
static pthread_t progress_thread;
static int total_commands_num = 0;
static time_t fuzzing_start_time = 0;
/* Global abort control for malformed replies */
static volatile int global_abort_all_threads = 0;
/* Logging levels */
typedef enum {
LOG_NONE = 0, /* No logging */
LOG_ERROR, /* Error messages only */
LOG_INFO, /* General information */
LOG_DEBUG, /* Detailed debug information */
} LogLevel;
/* Error entry structure */
typedef struct {
sds message;
sds command;
} ErrorEntry;
typedef struct {
list *errors;
int total_errors;
} ErrorList;
/* Global logging level */
static LogLevel current_log_level = LOG_DEBUG;
/* Thread data structure for parallel clients */
typedef struct {
const char *host;
int port;
int commands_num;
int thread_id;
ErrorList error_list; /* Thread-local error list */
cliSSLconfig *ssl_config; /* TLS configuration (NULL if TLS disabled) */
} ThreadData;
static __thread int thread_id = 0; /* Thread-local ID */
static __thread ErrorList *thread_error_list = NULL; /* Thread-local error list */
/* Function prototypes */
static void *threadConnectAndFuzz(void *arg);
static void cleanupErrorList(ErrorList *list);
static void *progressReporterThread(void *arg);
static void startProgressReporting(int commands_num);
static void stopProgressReporting(void);
static void resetCounters(void);
static void printFinalStatistics(void);
static void setLogLevel(LogLevel level) {
current_log_level = level;
}
static void logMessage(LogLevel level, const char *format, ...) {
if (level > current_log_level) return;
static const char *level_strs[] = {"", "ERROR", "INFO ", "DEBUG"};
FILE *output = (level == LOG_ERROR) ? stderr : stdout;
time_t now = time(NULL);
struct tm *tm = localtime(&now);
/* Print timestamp prefix directly */
fprintf(output, "[%02d:%02d:%02d] [%s] ",
tm->tm_hour, tm->tm_min, tm->tm_sec, level_strs[level]);
/* Print the main message directly */
va_list args;
va_start(args, format);
vfprintf(output, format, args);
va_end(args);
/* Add newline if not present */
fputc('\n', output);
fflush(output);
}
static void initErrorList(ErrorList *list) {
list->errors = listCreate();
list->total_errors = 0;
if (list->errors) {
listSetFreeMethod(list->errors, free);
}
}
static sds formatCommandString(const FuzzerCommand *cmd) {
sds cmd_str = sdsempty();
for (int i = 0; i < cmd->argc; i++) {
if (i > 0) {
cmd_str = sdscat(cmd_str, " ");
}
size_t arg_len = strlen(cmd->argv[i]);
if (arg_len > 50) {
cmd_str = sdscatlen(cmd_str, cmd->argv[i], 50);
cmd_str = sdscat(cmd_str, "...");
} else {
cmd_str = sdscat(cmd_str, cmd->argv[i]);
}
}
return cmd_str;
}
static void addError(ErrorList *list, const char *message, const char *command) {
list->total_errors++;
/* Remove oldest entry if at capacity */
if (listLength(list->errors) >= MAX_ERRORS) {
return; /* Report only MAX_ERRORS */
}
/* Add new error */
ErrorEntry *entry = malloc(sizeof(ErrorEntry));
if (entry) {
entry->message = sdsnew(message);
entry->command = sdsnew(command);
listAddNodeTail(list->errors, entry);
}
}
static void printErrors(int tid, ErrorList *list) {
if (list->total_errors == 0) return;
if (current_log_level < LOG_DEBUG) return;
logMessage(LOG_ERROR, "[Thread %d] === ERROR SUMMARY ===", tid);
logMessage(LOG_ERROR, "[Thread %d] Total errors: %d", tid, list->total_errors);
if (list->total_errors > MAX_ERRORS) {
logMessage(LOG_ERROR, "[Thread %d] Showing last %lu errors:",
tid, listLength(list->errors));
}
listIter *iter = listGetIterator(list->errors, AL_START_HEAD);
listNode *node;
int i = 1;
while ((node = listNext(iter)) != NULL) {
ErrorEntry *entry = listNodeValue(node);
logMessage(LOG_ERROR, "[Thread %d] Error %d: %s", tid, i++, entry->message);
logMessage(LOG_ERROR, "[Thread %d] Command: %s", tid, entry->command);
}
listReleaseIterator(iter);
logMessage(LOG_ERROR, "[Thread %d] === END ERROR SUMMARY ===", tid);
}
static void resetCounters(void) {
atomic_store(&global_commands_sent, 0);
atomic_store(&global_success_replies, 0);
atomic_store(&global_err_replies, 0);
atomic_store(&global_malformed_replies, 0);
atomic_store(&global_total_errors, 0);
}
/* Counter types for global statistics */
typedef enum {
COUNTER_COMMANDS_SENT = 0,
COUNTER_SUCCESS_REPLIES,
COUNTER_ERROR_REPLIES,
COUNTER_MALFORMED_REPLIES,
COUNTER_TOTAL_ERRORS
} CounterType;
static void incrCounter(CounterType type) {
switch (type) {
case COUNTER_COMMANDS_SENT:
atomic_fetch_add(&global_commands_sent, 1);
break;
case COUNTER_SUCCESS_REPLIES:
atomic_fetch_add(&global_success_replies, 1);
break;
case COUNTER_ERROR_REPLIES:
atomic_fetch_add(&global_err_replies, 1);
break;
case COUNTER_MALFORMED_REPLIES:
atomic_fetch_add(&global_malformed_replies, 1);
break;
case COUNTER_TOTAL_ERRORS:
atomic_fetch_add(&global_total_errors, 1);
break;
}
}
static void *progressReporterThread(void *arg) {
(void)arg; /* Unused parameter */
time_t last_report_time = time(NULL);
while (progress_reporting_active && !global_abort_all_threads) {
sleep(1); /* Report every second */
if (!progress_reporting_active || global_abort_all_threads) break;
time_t current_time = time(NULL);
int current_commands = atomic_load(&global_commands_sent);
/* Calculate rates */
double elapsed_time = difftime(current_time, fuzzing_start_time);
double commands_per_sec = elapsed_time > 0 ? current_commands / elapsed_time : 0;
/* Report progress */
if (current_time - last_report_time >= 1) {
printf("\rProgress: %d/%d commands (%.1f%%), %.1f rps",
current_commands, total_commands_num,
total_commands_num > 0 ? (current_commands * 100.0) / total_commands_num : 0,
commands_per_sec);
fflush(stdout);
last_report_time = current_time;
}
}
printf("\n");
if (global_abort_all_threads) {
printf("FUZZING ABORTED: Malformed reply detected\n");
}
fflush(stdout);
return NULL;
}
static void startProgressReporting(int commands_num) {
total_commands_num = commands_num;
fuzzing_start_time = time(NULL);
progress_reporting_active = 1;
int ret = pthread_create(&progress_thread, NULL, progressReporterThread, NULL);
if (ret != 0) {
logMessage(LOG_ERROR, "Failed to create progress reporter thread: %s", strerror(ret));
progress_reporting_active = 0;
} else {
logMessage(LOG_DEBUG, "Progress reporter thread started");
}
}
static void stopProgressReporting(void) {
if (progress_reporting_active) {
progress_reporting_active = 0;
int ret = pthread_join(progress_thread, NULL);
if (ret != 0) {
logMessage(LOG_ERROR, "Failed to join progress reporter thread: %s", strerror(ret));
} else {
logMessage(LOG_DEBUG, "Progress reporter thread stopped");
}
}
}
static void cleanupErrorList(ErrorList *l) {
if (!l || !l->errors) return;
listIter *iter = listGetIterator(l->errors, AL_START_HEAD);
listNode *node;
while ((node = listNext(iter)) != NULL) {
ErrorEntry *entry = listNodeValue(node);
sdsfree(entry->message);
sdsfree(entry->command);
}
listReleaseIterator(iter);
listRelease(l->errors);
l->errors = NULL;
l->total_errors = 0;
}
static int isServerDisconnected(valkeyContext *ctx) {
return (ctx->err == VALKEY_ERR_EOF ||
(ctx->err == VALKEY_ERR_IO &&
(errno == ECONNRESET || errno == EPIPE || errno == ENOTCONN)));
}
/* Connect to a Valkey server with optional TLS support
* Returns a valkeyContext on success, NULL on error
* ssl_config can be NULL to disable TLS */
static valkeyContext *connectToServer(const char *host, int port, cliSSLconfig *ssl_config) {
valkeyContext *ctx;
valkeyReply *reply;
struct timeval timeout = {1, 500000}; /* 1.5 seconds for connection */
struct timeval command_timeout = {COMMAND_TIMEOUT_SEC, 0}; /* Command timeout */
logMessage(LOG_DEBUG, "[thread %d] Connecting to %s:%d%s...", thread_id, host, port, ssl_config ? " (TLS)" : "");
valkeyOptions options = {0};
options.type = VALKEY_CONN_TCP;
options.endpoint.tcp.ip = host;
options.endpoint.tcp.port = port;
options.connect_timeout = &timeout; /* Use the timeout defined above */
ctx = valkeyConnectWithOptions(&options);
if (!ctx) {
logMessage(LOG_ERROR, "[Thread %d] Connection error: can't allocate valkey context", thread_id);
return NULL;
}
if (ctx->err) {
logMessage(LOG_ERROR, "[Thread %d] Connection error: %s", thread_id, ctx->errstr);
valkeyFree(ctx);
return NULL;
}
/* Set command timeout for all subsequent operations */
valkeySetTimeout(ctx, command_timeout);
/* Establish TLS connection if ssl_config is provided */
if (ssl_config) {
const char *tls_err = NULL;
if (cliSecureConnection(ctx, *ssl_config, &tls_err) == VALKEY_ERR && tls_err) {
logMessage(LOG_ERROR, "[Thread %d] Could not negotiate a TLS connection: %s", thread_id, tls_err);
valkeyFree(ctx);
return NULL;
}
logMessage(LOG_DEBUG, "[Thread %d] TLS connection established", thread_id);
}
/* Test connection with a PING */
reply = valkeyCommand(ctx, "PING");
if (reply == NULL || ctx->err) {
logMessage(LOG_INFO, "[Thread %d] PING failed: %s", thread_id, ctx->errstr);
if (reply) freeReplyObject(reply);
valkeyFree(ctx);
return NULL;
}
logMessage(LOG_DEBUG, "[Thread %d] Connected successfully", thread_id);
freeReplyObject(reply);
return ctx;
}
/* Reconnect to server with exponential backoff and optional TLS support
* Returns new context on success, NULL on failure
* ssl_config can be NULL to disable TLS */
static valkeyContext *reconnectWithBackoff(const char *host, int port, cliSSLconfig *ssl_config) {
int attempts = 0;
valkeyContext *ctx = NULL;
logMessage(LOG_DEBUG, "[Thread %d] Attempting to reconnect to %s:%d%s with backoff...",
thread_id, host, port, ssl_config ? " (TLS)" : "");
while (attempts < RECONNECT_MAX_ATTEMPTS) {
ctx = connectToServer(host, port, ssl_config);
if (ctx) {
logMessage(LOG_DEBUG, "[Thread %d] Successfully reconnected after %d attempts", thread_id, attempts + 1);
return ctx;
}
/* Exponential backoff */
int delay = RECONNECT_BASE_DELAY_MS * (1 << attempts);
logMessage(LOG_INFO, "[Thread %d] Reconnection attempt %d failed, retrying in %d ms...",
thread_id, attempts + 1, delay);
usleep(delay * 1000);
attempts++;
}
logMessage(LOG_ERROR, "[Thread %d] Failed to reconnect after %d attempts", thread_id, attempts);
return NULL;
}
/* Command execution result codes */
typedef enum {
CMD_SUCCESS = 0, /* Command executed successfully */
CMD_ERROR = 1, /* General error occurred */
CMD_DISCONNECTED = 2, /* Connection closed by server */
CMD_TIMEOUT = 3 /* Command timed out */
} CommandResult;
static void handleMalformedReply(valkeyContext *ctx, FuzzerCommand *cmd) {
/* Protocol error (malformed reply) - ABORT ALL THREADS */
sds cmd_str = formatCommandString(cmd);
char error_msg[512];
snprintf(error_msg, sizeof(error_msg), "MALFORMED REPLY from server: %s", ctx->errstr);
/* Log the full error regardless of debug level */
logMessage(LOG_ERROR, "[Thread %d] CRITICAL ERROR - MALFORMED REPLY DETECTED", thread_id);
logMessage(LOG_ERROR, "[Thread %d] Error: %s", thread_id, ctx->errstr);
logMessage(LOG_ERROR, "[Thread %d] Command that caused malformed reply: %s", thread_id, cmd_str);
/* Print the exact malformed reply data if available */
if (ctx->reader && ctx->reader->buf && ctx->reader->len > 0) {
fprintf(stderr, "\n===== MALFORMED REPLY DATA BEGIN =====\n");
/* Print both hex and ASCII representation of the reply */
for (size_t i = 0; i < ctx->reader->len && i < 1024; i++) {
if (i % 16 == 0) {
fprintf(stderr, "\n%04zx: ", i);
}
fprintf(stderr, "%02x ", (unsigned char)ctx->reader->buf[i]);
if (i % 16 == 15) {
fprintf(stderr, " ");
for (size_t j = i - 15; j <= i; j++) {
char c = ctx->reader->buf[j];
fprintf(stderr, "%c", (c >= 32 && c <= 126) ? c : '.');
}
}
}
fprintf(stderr, "\n===== MALFORMED REPLY DATA END =====\n");
} else {
fprintf(stderr, "\nNo malformed reply data available in context\n");
}
logMessage(LOG_ERROR, "[Thread %d] ABORTING ALL THREADS due to malformed reply (pthread_id: %lu)",
thread_id, (unsigned long)pthread_self());
/* Set global abort flag to stop all threads */
global_abort_all_threads = 1;
incrCounter(COUNTER_MALFORMED_REPLIES);
sdsfree(cmd_str);
}
static void handleCommandTimeout(FuzzerCommand *cmd, time_t start_time) {
/* Timeout error */
time_t current_time = time(NULL);
double elapsed_time = difftime(current_time, start_time);
logMessage(LOG_ERROR, "[Thread %d] TIMEOUT: Command timed out after %.1f seconds (max %d seconds) - Command: ",
thread_id, elapsed_time, COMMAND_TIMEOUT_SEC);
logMessage(LOG_ERROR, "Command: %s", printCommand(cmd));
logMessage(LOG_ERROR, "[Thread %d] ABORTING THREAD due to timeout (pthread_id: %lu)",
thread_id, (unsigned long)pthread_self());
}
/* Helper function to handle command errors and update counters */
static CommandResult handleCommandError(valkeyContext *ctx, FuzzerCommand *cmd, time_t start_time) {
CommandResult result = CMD_ERROR;
if (ctx->err == VALKEY_ERR_IO && (errno == EAGAIN || errno == EWOULDBLOCK || errno == ETIMEDOUT)) {
handleCommandTimeout(cmd, start_time);
result = CMD_TIMEOUT;
} else if (isServerDisconnected(ctx)) {
/* Connection closed */
logMessage(LOG_DEBUG, "Connection closed by server: %s", ctx->errstr);
result = CMD_DISCONNECTED;
} else if (ctx->err == VALKEY_ERR_PROTOCOL) {
handleMalformedReply(ctx, cmd);
} else {
sds cmd_str = formatCommandString(cmd);
char error_msg[512];
snprintf(error_msg, sizeof(error_msg), "Error getting reply: %s (errno: %d)",
ctx->errstr, errno);
addError(thread_error_list, error_msg, cmd_str);
incrCounter(COUNTER_TOTAL_ERRORS);
sdsfree(cmd_str);
}
return result;
}
/* Helper function to log reply directly without SDS allocation */
static void logReplyDebug(const char *command, valkeyReply *reply) {
if (current_log_level < LOG_DEBUG) return;
switch (reply->type) {
case VALKEY_REPLY_STRING:
case VALKEY_REPLY_STATUS:
case VALKEY_REPLY_ERROR:
if (reply->str) {
logMessage(LOG_DEBUG, "%s: -> %.100s", command, reply->str);
} else {
logMessage(LOG_DEBUG, "%s: -> (empty)", command);
}
break;
case VALKEY_REPLY_ARRAY:
logMessage(LOG_DEBUG, "%s: -> (array of %zu elements)", command, reply->elements);
break;
case VALKEY_REPLY_INTEGER:
logMessage(LOG_DEBUG, "%s: -> %lld", command, reply->integer);
break;
case VALKEY_REPLY_NIL:
logMessage(LOG_DEBUG, "%s: -> (nil)", command);
break;
default:
logMessage(LOG_DEBUG, "%s: -> (unknown reply type)", command);
}
}
/* Helper function to validate reply and update counters */
static void validateReplyAndUpdateCounters(valkeyReply *reply, FuzzerCommand *cmd) {
if (reply->type == VALKEY_REPLY_ERROR) {
const char *error_str = reply->str ? reply->str : "";
/* We consider error only if there is a problem with the command generated */
if (strstr(error_str, "syntax error") || strstr(error_str, "wrong number of arguments")) {
sds cmd_str = formatCommandString(cmd);
char error_msg[512];
snprintf(error_msg, sizeof(error_msg), "Command error: %s", error_str);
addError(thread_error_list, error_msg, cmd_str);
sdsfree(cmd_str);
incrCounter(COUNTER_ERROR_REPLIES);
incrCounter(COUNTER_TOTAL_ERRORS);
} else {
incrCounter(COUNTER_SUCCESS_REPLIES);
}
} else {
incrCounter(COUNTER_SUCCESS_REPLIES);
}
}
/* Send a single command and wait for its reply */
static CommandResult sendCommandAndGetReply(valkeyContext *ctx) {
int ret = CMD_SUCCESS;
FuzzerCommand *cmd = generateCmd();
if (current_log_level >= LOG_DEBUG) {
logMessage(LOG_DEBUG, "send Command: %s", printCommand(cmd));
}
/* Send command */
time_t start_time = time(NULL);
valkeyReply *reply = valkeyCommandArgv(ctx, cmd->argc, (const char **)cmd->argv, NULL);
/* Handle null reply (connection/protocol errors) */
if (reply == NULL) {
ret = handleCommandError(ctx, cmd, start_time);
} else {
/* Log reply */
logReplyDebug(cmd->argv[0], reply);
/* Validate reply and update counters */
validateReplyAndUpdateCounters(reply, cmd);
freeReplyObject(reply);
}
freeCommand(cmd);
return ret;
}
/* Connect and fuzz a Valkey server with optional TLS support
* Returns 0 on success, non-zero on error
* ssl_config can be NULL to disable TLS */
int connectAndFuzz(const char *host, int port, int commands_num, cliSSLconfig *ssl_config) {
valkeyContext *ctx;
int commands_sent = 0;
/* Connect to server */
ctx = connectToServer(host, port, ssl_config);
if (ctx == NULL) {
return -1;
}
initThreadClientCtx(global_fuzz_flags);
logMessage(LOG_DEBUG, "[Thread %d] Starting fuzzing session (max %d commands)...", thread_id, commands_num);
/* Main fuzzing loop - send commands one by one */
while (commands_sent < commands_num && !global_abort_all_threads) {
/* Send a command and get its reply */
CommandResult result = sendCommandAndGetReply(ctx);
if (result != CMD_SUCCESS) {
if (result == CMD_DISCONNECTED) {
/* Connection closed by server, try to reconnect */
valkeyFree(ctx);
ctx = reconnectWithBackoff(host, port, ssl_config);
if (ctx == NULL) {
return -1;
}
/* Reset client fuzzer context on successful reconnection */
resetClientFuzzCtx();
continue;
} else if (result == CMD_TIMEOUT) {
/* Timeout error - abort thread immediately */
logMessage(LOG_ERROR, "[Thread %d] Thread aborted due to timeout error", thread_id);
/* Clean up and exit */
if (ctx) valkeyFree(ctx);
return -1;
} else {
/* Other error, but continue with next command */
logMessage(LOG_INFO, "[Thread %d] Error processing command, continuing with next one", thread_id);
}
}
commands_sent++;
/* Update global command counter */
incrCounter(COUNTER_COMMANDS_SENT);
}
/* Clean up */
if (ctx) valkeyFree(ctx);
freeClientCtx();
return 0;
}
/* Thread function for parallel fuzzing */
static void *threadConnectAndFuzz(void *arg) {
ThreadData *data = (ThreadData *)arg;
thread_id = data->thread_id; /* Set thread-local sequence ID */
/* Set the thread-local error list to the one in ThreadData */
thread_error_list = &data->error_list;
int result = connectAndFuzz(data->host, data->port, data->commands_num, data->ssl_config);
return (void *)(intptr_t)result;
}
static void printFinalStatistics(void) {
int final_commands = atomic_load(&global_commands_sent);
int final_success = atomic_load(&global_success_replies);
int final_errors = atomic_load(&global_err_replies);
int final_malformed = atomic_load(&global_malformed_replies);
int final_total_errors = atomic_load(&global_total_errors);
logMessage(LOG_INFO, "=== FINAL STATISTICS ===");
logMessage(LOG_INFO, "Total commands sent: %d", final_commands);
logMessage(LOG_INFO, "Success replies: %d", final_success);
logMessage(LOG_INFO, "Error replies: %d", final_errors);
logMessage(LOG_INFO, "Malformed replies: %d", final_malformed);
logMessage(LOG_INFO, "Total errors: %d", final_total_errors);
logMessage(LOG_INFO, "Success rate: %.2f%%",
final_commands > 0 ? (final_success * 100.0) / final_commands : 0);
if (final_total_errors > 0 && current_log_level < LOG_DEBUG) {
logMessage(LOG_INFO, "To see full error details, run with --fuzz-loglevel debug");
}
}
/* Run client fuzzing sessions using threads with optional TLS support
* ssl_config can be NULL to disable TLS */
static int runClients(const char *host, int port, int commands_num, int clients_num, cliSSLconfig *ssl_config) {
global_abort_all_threads = 0;
int commands_per_client = commands_num / clients_num;
if (commands_per_client < 1) {
commands_per_client = 1;
}
printf("Running fuzzer with %d clients, %d commands per client (%d total commands)%s\n",
clients_num, commands_per_client, commands_num, ssl_config ? " (TLS)" : "");
/* Start progress reporting for all threads combined */
startProgressReporting(commands_num);
pthread_t *threads = malloc(sizeof(pthread_t) * clients_num);
ThreadData *thread_data = malloc(sizeof(ThreadData) * clients_num);
int failed = 0;
/* Create threads */
for (int i = 0; i < clients_num; i++) {
thread_data[i].host = host;
thread_data[i].port = port;
thread_data[i].commands_num = commands_per_client;
thread_data[i].thread_id = i + 1;
thread_data[i].ssl_config = ssl_config;
initErrorList(&thread_data[i].error_list);
int ret = pthread_create(&threads[i], NULL, threadConnectAndFuzz, &thread_data[i]);
if (ret != 0) {
logMessage(LOG_ERROR, "Failed to create thread %d: %s", i + 1, strerror(ret));
failed++;
} else {
logMessage(LOG_DEBUG, "Thread %d started...", i + 1);
}
}
/* Wait for all threads to complete and print their error lists if exist */
for (int i = 0; i < clients_num; i++) {
void *thread_result;
int ret = pthread_join(threads[i], &thread_result);
if (ret != 0) {
logMessage(LOG_ERROR, "Failed to join thread %d: %s", i + 1, strerror(ret));
failed++;
} else {
int exit_code = (int)(intptr_t)thread_result;
logMessage(LOG_DEBUG, "Thread %d completed with exit code %d", i + 1, exit_code);
if (exit_code != 0) {
failed++;
}
}
printErrors(thread_data[i].thread_id, &thread_data[i].error_list);
cleanupErrorList(&thread_data[i].error_list);
}
stopProgressReporting();
logMessage(LOG_INFO, "All client threads completed. %d succeeded, %d failed.",
clients_num - failed, failed);
free(threads);
free(thread_data);
return failed;
}
/* Run fuzzer clients with specified parameters
* This function is called by valkey-benchmark when fuzz mode is enabled */
int runFuzzerClients(const char *host, int port, int commands_num, int clients_num, int cluster_mode, int num_keys, cliSSLconfig *ssl_config, const char *log_level, int fuzz_flags) {
/* Set log level from parameter */
if (log_level) {
if (strcmp(log_level, "none") == 0) {
setLogLevel(LOG_NONE);
} else if (strcmp(log_level, "error") == 0) {
setLogLevel(LOG_ERROR);
} else if (strcmp(log_level, "info") == 0) {
setLogLevel(LOG_INFO);
} else if (strcmp(log_level, "debug") == 0) {
setLogLevel(LOG_DEBUG);
} else {
logMessage(LOG_INFO, "Invalid log level '%s', using default 'info'", log_level);
setLogLevel(LOG_INFO);
}
}
if (clients_num > MAX_CLIENTS_NUM) {
logMessage(LOG_ERROR, "Too many parallel clients requested (%d). Maximum is %d.",
clients_num, MAX_CLIENTS_NUM);
return 1;
}
global_fuzz_flags = fuzz_flags;
resetCounters();
/* Connect and init fuzzer */
valkeyContext *ctx;
ctx = connectToServer(host, port, ssl_config);
if (ctx == NULL) {
logMessage(LOG_ERROR, "Failed to connect to server.");
return -1;
}
if (initFuzzer(ctx, num_keys, cluster_mode, global_fuzz_flags) == -1) {
valkeyFree(ctx);
logMessage(LOG_ERROR, "Failed to init fuzzer.");
return -1;
}
valkeyFree(ctx);
/* Run fuzzing clients */
int result = runClients(host, port, commands_num, clients_num, ssl_config);
printFinalStatistics();
cleanupFuzzer();
return result;
}

File diff suppressed because it is too large Load Diff

View File

@ -0,0 +1,28 @@
#ifndef FUZZER_COMMAND_GENERATOR_H
#define FUZZER_COMMAND_GENERATOR_H
#include <valkey/valkey.h>
#include "sds.h"
typedef struct FuzzerCommand {
sds *argv;
int argc;
int size;
} FuzzerCommand;
/* Fuzzing mode types */
typedef enum {
FUZZ_MODE_MALFORMED_COMMANDS = (1 << 0),
FUZZ_MODE_CONFIG_COMMANDS = (1 << 1)
} FuzzMode;
int initFuzzer(valkeyContext *ctx, int num_keys, int cluster_mode, int fuzz_flags);
void cleanupFuzzer(void);
void initThreadClientCtx(int fuzz_flags);
void resetClientFuzzCtx(void);
void freeClientCtx(void);
FuzzerCommand *generateCmd(void);
void freeCommand(FuzzerCommand *args);
char *printCommand(FuzzerCommand *cmd);
#endif /* FUZZER_COMMAND_GENERATOR_H */

View File

@ -92,6 +92,10 @@ typedef enum readFromReplica {
FROM_ALL
} readFromReplica;
/* Fuzz mode flags */
#define FUZZ_MODE_MALFORMED_COMMANDS (1 << 0)
#define FUZZ_MODE_CONFIG_COMMANDS (1 << 1)
static struct config {
aeEventLoop *el;
enum valkeyConnectionType ct;
@ -133,6 +137,9 @@ static struct config {
int num_threads;
struct benchmarkThread **threads;
int cluster_mode;
int fuzz_mode; /* Boolean flag to enable fuzzing */
const char *fuzz_log_level;
int fuzz_flags; /* Bit flags for fuzzing modes */
readFromReplica read_from_replica;
int cluster_node_count;
struct clusterNode **cluster_nodes;
@ -229,6 +236,7 @@ static void freeServerConfig(serverConfig *cfg);
static int fetchClusterSlotsConfiguration(client c);
static void updateClusterSlotsConfiguration(void);
static long long showThroughput(struct aeEventLoop *eventLoop, long long id, void *clientData);
int runFuzzerClients(const char *host, int port, int max_commands, int parallel_clients, int cluster_mode, int num_keys, cliSSLconfig *ssl_config, const char *log_level, int fuzz_flags);
/* Dict callbacks */
static uint64_t dictSdsHash(const void *key);
@ -1819,6 +1827,28 @@ int parseOptions(int argc, char **argv) {
}
config.ct = VALKEY_CONN_RDMA;
#endif
} else if (!strcmp(argv[i], "--fuzz")) {
config.fuzz_mode = 1;
} else if (!strcmp(argv[i], "--fuzz-loglevel")) {
if (lastarg) goto invalid;
config.fuzz_log_level = argv[++i];
} else if (!strcmp(argv[i], "--fuzz-mode")) {
if (lastarg) goto invalid;
int count = 0;
const char *modes_arg = argv[++i];
sds *modes = sdssplitlen(modes_arg, strlen(modes_arg), ",", 1, &count);
for (int j = 0; j < count; j++) {
if (!strcmp(modes[j], "malformed-commands"))
config.fuzz_flags |= FUZZ_MODE_MALFORMED_COMMANDS;
else if (!strcmp(modes[j], "config-commands"))
config.fuzz_flags |= FUZZ_MODE_CONFIG_COMMANDS;
else {
fprintf(stderr, "Invalid fuzz mode: %s\n", modes[j]);
sdsfreesplitres(modes, count);
exit(1);
}
}
sdsfreesplitres(modes, count);
} else if (!strcmp(argv[i], "--mptcp")) {
config.mptcp = 1;
} else if (!strcmp(argv[i], "--")) {
@ -1976,6 +2006,14 @@ usage:
tls_usage,
rdma_usage,
" --mptcp Enable an MPTCP connection.\n"
" --fuzz Enable fuzzy mode to generate random commands. WARNING: Recommended for testing only, not for use with production data.\n"
" --fuzz-mode <modes> Set fuzzing modes (comma-separated): malformed-commands, config-commands.\n"
" malformed-commands: Generates also malformed commands.\n"
" config-commands: Allows CONFIG SET commands.\n"
" Default: valid commands only.\n"
" --fuzz-loglevel <level>\n"
" Set log level for fuzzer (none, error, info, debug).\n"
" Default is 'info'.\n"
" --help Output this help and exit.\n"
" --version Output version and exit.\n\n"
"Examples:\n\n"
@ -2162,6 +2200,9 @@ int main(int argc, char **argv) {
config.num_threads = 0;
config.threads = NULL;
config.cluster_mode = 0;
config.fuzz_mode = 0;
config.fuzz_log_level = "info";
config.fuzz_flags = 0;
config.rps = 0;
config.read_from_replica = FROM_PRIMARY_ONLY;
config.cluster_node_count = 0;
@ -2196,7 +2237,7 @@ int main(int argc, char **argv) {
exit(1);
}
if (config.cluster_mode) {
if (config.cluster_mode && !config.fuzz_mode) {
// We only include the slot placeholder {tag} if cluster mode is enabled
tag = ":{tag}";
@ -2288,6 +2329,20 @@ int main(int argc, char **argv) {
printf("\"test\",\"rps\",\"avg_latency_ms\",\"min_latency_ms\",\"p50_latency_ms\",\"p95_latency_ms\",\"p99_"
"latency_ms\",\"max_latency_ms\"\n");
}
if (config.fuzz_mode) {
return runFuzzerClients(
config.conn_info.hostip,
config.conn_info.hostport,
config.requests,
config.numclients,
config.cluster_mode,
config.keyspacelen,
config.tls ? &config.sslconfig : NULL,
config.fuzz_log_level,
config.fuzz_flags);
}
/* Run benchmark with command in the remainder of the arguments. */
if (argc) {
sds title = sdsnew(argv[0]);

18
tests/unit/fuzzer.tcl Normal file
View File

@ -0,0 +1,18 @@
tags {"slow"} {
run_solo {fuzzer} {
start_server {} {
test {FUZZ stresser with valkey-benchmark} {
assert_equal [r ping] {PONG}
set err [catch {exec src/valkey-benchmark -p [srv 0 port] -c 20 -n 100000 --fuzz --fuzz-loglevel info} output]
if {$err && $::verbose} {
# For now, if the server is still responsive, we don't consider the test a failure even if the fuzzer failed.
puts $output
}
# Verify server is still responsive after the fuzzer run
# Create a new client connection in case the previous one was closed by the fuzzer.
set rr [valkey_client]
assert_equal [$rr ping] {PONG}
}
}
}
}