valkey-benchmark(for search):cluster support

This commit is contained in:
Zvi Schneider 2025-09-21 22:23:39 +03:00
parent ecd27a77ac
commit fe07820171
1 changed files with 497 additions and 256 deletions

View File

@ -62,6 +62,7 @@
#include "cli_common.h"
#include "mt19937-64.h"
extern uint16_t crc16(const char *buf, int len);
#define UNUSED(V) ((void)V)
#define RANDPTR_INITIAL_SIZE 8
#define DEFAULT_LATENCY_PRECISION 3
@ -74,15 +75,19 @@
#define SHOW_THROUGHPUT_INTERVAL 250 /* 250ms */
#define CLIENT_GET_EVENTLOOP(c) (c->thread_id >= 0 ? config.threads[c->thread_id]->el : config.el)
#define VECTOR_PLACEHOLDER "__v_rd__" // Exactly 8 characters for 2 floats
#define VECTOR_PLACEHOLDER_LEN 8 // length of VECTOR_PLACEHOLDER strings
#define VECTOR_PLACEHOLDER_INDEX 10
#define PLACEHOLDER_LEN 12 // length of BENCHMARK_PLACEHOLDERS strings
// TODO: Use existing vectors\fields in the index as base for vector\tag\numeric generation
#define PLACEHOLDER_COUNT 14
#define VECTOR_PLACEHOLDER_LEN 16 // length of "__rand_vec_f32__" string
static const size_t PLACEHOLDER_LEN = 12; // length of BENCHMARK_PLACEHOLDERS strings
static const char *PLACEHOLDERS[PLACEHOLDER_COUNT] = {
static const char *PLACEHOLDERS[] = {
"__rand_int__", "__rand_1st__", "__rand_2nd__", "__rand_3rd__", "__rand_4th__",
"__rand_5th__", "__rand_6th__", "__rand_7th__", "__rand_8th__", "__rand_9th__",
"__rand_vec_f32__", "__rand_vec_1st__", "__rand_vec_2nd__", "__rand_vec_3rd__"};
#define VECTOR_PLACEHOLDER_INDEX 10 // index of vector placeholder in PLACEHOLDERS array
VECTOR_PLACEHOLDER // Vector placeholder
};
#define VECTOR_NUM_RAND_DIM (VECTOR_PLACEHOLDER_LEN/sizeof(float32_t)) // Number of random dimensions for vector generation
#define PLACEHOLDER_COUNT 11
struct benchmarkThread;
struct clusterNode;
@ -197,7 +202,6 @@ typedef struct tagDistribution {
typedef struct searchRuntimeConfig {
int n_prefill; /* Number of vectors to prefill before benchmarking */
int n_rnd_dim; /* Number of random dimensions to use for vector generation */
/* Tag distribution fields */
tagDistribution *tag_dists; /* Array of tag distributions */
int n_dists; /* Number of distributions */
@ -208,6 +212,7 @@ typedef struct searchIndex {
sds name; /* Index name */
sds algorithm; /* Index algorithm type (e.g., HNSW, FLAT) */
sds prefix; /* Index key prefix */
int nocontent; /* Use NOCONTENT option for FT.SEARCH */
sds vector_field; /* Vector field name */
int vector_dim; /* Vector dimension */
sds tag_field; /* Tag field name if exists*/
@ -273,6 +278,8 @@ static struct config {
readFromReplica read_from_replica;
int cluster_node_count;
struct clusterNode **cluster_nodes;
int cluster_primary_node_count;
struct clusterNode **cluster_primary_nodes;
struct serverConfig *server_config;
struct hdr_histogram *latency_histogram;
struct hdr_histogram *current_sec_latency_histogram;
@ -390,6 +397,47 @@ static long long showThroughput(struct aeEventLoop *eventLoop, long long id, voi
static uint64_t dictSdsHash(const void *key);
static int dictSdsKeyCompare(const void *key1, const void *key2);
/* Fast unique vector generation using key-based deterministic randomization */
static void generateVectorUnique(float *vector, int dim, uint64_t key_idx) {
/* Use multiple hash passes for better distribution */
uint64_t hash1 = key_idx * 0x9E3779B97F4A7C15ULL;
uint64_t hash2 = key_idx * 0xBF58476D1CE4E5B9ULL;
/* Generate full vector with mixed entropy sources */
for (int i = 0; i < dim; i++) {
/* Mix key_idx, dimension index, and hash values */
uint64_t mixed = hash1 ^ (hash2 + i);
mixed *= 0x94D049BB133111EBULL;
mixed ^= mixed >> 31;
mixed *= 0xBF58476D1CE4E5B9ULL;
mixed ^= mixed >> 31;
/* Convert to float in [-1, 1] with good distribution */
uint32_t bits = (uint32_t)(mixed >> 32);
vector[i] = (float)((int32_t)bits) / 2147483648.0f;
}
/* Optional: Normalize vector for cosine similarity */
if (strcmp(config.search.metric, "COSINE") == 0) {
float norm = 0.0f;
for (int i = 0; i < dim; i++) {
norm += vector[i] * vector[i];
}
norm = sqrtf(norm);
if (norm > 0.0f) {
for (int i = 0; i < dim; i++) {
vector[i] /= norm;
}
}
}
}
/* Optimized binary conversion without sds overhead */
static inline void vectorToBinaryDirect(float *vector, int dim, char *output) {
memcpy(output, vector, dim * sizeof(float));
}
/* Print FT.SEARCH results in a user-friendly format */
static void printSearchResults(valkeyReply *reply) {
if (!reply || reply->type != VALKEY_REPLY_ARRAY) {
@ -822,192 +870,231 @@ static void initBaseVector(int dim) {
}
}
/* Efficiently generate vector by manipulating base vector */
static void generateVectorEfficient(float *vector, int dim, uint64_t key_idx) {
/* Copy base vector */
memcpy(vector, base_vector, sizeof(float) * dim);
/* Use key index to seed global RNG */
init_genrand64(key_idx);
/* Modify only a subset of coordinates (10% of dimensions) */
int modifications = dim / 10;
if (modifications < 1) modifications = 1;
for (int i = 0; i < modifications; i++) {
/* Select random coordinate to modify */
int coord = genrand64_int64() % dim;
/* Generate new value for this coordinate */
uint64_t r = genrand64_int64();
vector[coord] = ((float)(r & 0x7FFFFFFF) / 0x40000000) - 1.0f;
}
}
/* Convert float array to binary format for vector search queries */
static sds vectorToBinary(float *vector, int dim) {
sds result = sdsnewlen(NULL, dim * sizeof(float));
memcpy(result, vector, dim * sizeof(float));
return result;
}
/* Simplified vector benchmark that uses the existing benchmark infrastructure */
static void benchmarkVectorOp(const char *title, int is_insert) {
static void *searchPrefillWorkerThreadCluster(void *arg) ;
/* Benchmark function for vector operations with cluster awareness */
static void benchmarkVectorOpClusterAware(const char *title, int is_insert) {
char *cmd;
int len;
int vec_dim = config.search.vector_dim;
if (!config.use_search) return;
if (vec_dim <= 0) {
fprintf(stderr, "Error: Vector dimension must be greater than 0.\n");
return;
}
static __thread float *sample_vector = NULL;
if (!sample_vector) {
sample_vector = zmalloc(sizeof(float) * vec_dim);
generate_vector_fast(sample_vector, 0); // Generate a sample vector
/* Validation checks */
assert(config.search.vector_dim > 0 && config.use_search);
assert(strlen(VECTOR_PLACEHOLDER) == VECTOR_PLACEHOLDER_LEN); // Self-check
/* Ensure we have at least 2 dimensions for the random part */
if (config.search.vector_dim < VECTOR_NUM_RAND_DIM) {
fprintf(stderr, "Error: Vector dimension must be at least %d (current: %d)\n",
VECTOR_NUM_RAND_DIM, config.search.vector_dim);
exit(1);
}
int dim_with_placeholder = vec_dim - config.search.curr_conf.n_rnd_dim; // Adjust for placeholder length
/* Generate a sample vector for the command template */
generateVectorEfficient(sample_vector, dim_with_placeholder, 0);
/* Convert to binary format for HSET (same as search expects) */
sds vector_binary = vectorToBinary(sample_vector, dim_with_placeholder);
int fixed_dims = config.search.vector_dim - VECTOR_NUM_RAND_DIM; // Reserve 2 floats for randomness
if (is_insert) {
/* Generate HSET command template */
sds key = sdscatprintf(sdsempty(), "%s__rand_int__", config.search.prefix);
/* Check if we need to include tag field */
if (config.search.tag_field && config.search.curr_conf.tag_dists) {
/* Include tag field in HSET */
sds selected_tag = selectTagByDistribution();
if (selected_tag) {
len = valkeyFormatCommand(&cmd, "HSET %b %s __rand_vec_f32____rand_vec_1st____rand_vec_2nd____rand_vec_3nd__%b %s %s",
key, sdslen(key),
config.search.vector_field, vector_binary, sdslen(vector_binary),
config.search.tag_field, selected_tag);
sdsfree(selected_tag);
} else {
/* No tags selected, just insert vector */
len = valkeyFormatCommand(&cmd, "HSET %b %s __rand_vec_f32____rand_vec_1st____rand_vec_2nd____rand_vec_3nd__%b",
key, sdslen(key),
config.search.vector_field, vector_binary, sdslen(vector_binary));
}
/* Generate key with appropriate cluster tag */
sds key;
if (config.cluster_mode) {
key = sdscatprintf(sdsempty(), "%s{tag}:__rand_int__", config.search.prefix);
} else {
/* Vector only HSET */
len = valkeyFormatCommand(&cmd, "HSET %b %s __rand_vec_f32____rand_vec_1st____rand_vec_2nd____rand_vec_3nd__%b",
key, sdslen(key),
config.search.vector_field, vector_binary, sdslen(vector_binary));
key = sdscatprintf(sdsempty(), "%s__rand_int__", config.search.prefix);
}
/* Build vector data: fixed part + placeholder */
sds vector_data = sdsempty();
/* Generate fixed portion if needed */
if (fixed_dims > 0) {
float *fixed_vector = zmalloc(fixed_dims * sizeof(float));
static __thread uint64_t local_counter = 0;
generateVectorUnique(fixed_vector, fixed_dims,
local_counter++ ^ ((uint64_t)pthread_self() << 32));
/* Append binary data for fixed portion */
vector_data = sdscatlen(vector_data, (char*)fixed_vector,
fixed_dims * sizeof(float));
zfree(fixed_vector);
}
/* Append the 8-byte placeholder (will be replaced in-place later) */
vector_data = sdscatlen(vector_data, VECTOR_PLACEHOLDER, VECTOR_PLACEHOLDER_LEN);
/* Verify total size matches expected vector dimension */
assert(sdslen(vector_data) == config.search.vector_dim * sizeof(float));
/* Build HSET command */
if (config.search.tag_field && config.search.curr_conf.tag_dists) {
sds selected_tag = selectTagByDistribution();
len = valkeyFormatCommand(&cmd,
"HSET %b %s %b %s %s",
key, sdslen(key),
config.search.vector_field,
vector_data, sdslen(vector_data),
config.search.tag_field,
selected_tag ? selected_tag : "");
if (selected_tag) sdsfree(selected_tag);
} else {
len = valkeyFormatCommand(&cmd,
"HSET %b %s %b",
key, sdslen(key),
config.search.vector_field,
vector_data, sdslen(vector_data));
}
sdsfree(key);
sdsfree(vector_data);
} else {
/* Generate FT.SEARCH command template */
/* Create query string with optional tag filter */
/* For QUERY: generate complete vector without placeholders */
int vec_dim = config.search.vector_dim;
float *vector = zmalloc(vec_dim * sizeof(float));
static __thread uint64_t local_counter = 0;
generateVectorUnique(vector, vec_dim,
local_counter++ ^ ((uint64_t)pthread_self() << 32));
sds vector_binary = vectorToBinary(vector, vec_dim);
/* Build KNN query */
sds query;
if (config.search.curr_conf.tag_filter && config.search.tag_field) {
/* Include tag filter in query */
query = sdscatprintf(sdsempty(), "@%s:{%s}=>[KNN %d @%s $query_vector EF_RUNTIME %d]",
config.search.tag_field, config.search.curr_conf.tag_filter,
config.search.k, config.search.vector_field, config.search.ef_search);
query = sdscatprintf(sdsempty(),
"@%s:{%s}=>[KNN %d @%s $query_vector EF_RUNTIME %d]",
config.search.tag_field,
config.search.curr_conf.tag_filter,
config.search.k,
config.search.vector_field,
config.search.ef_search);
} else {
/* KNN query only */
query = sdscatprintf(sdsempty(), "*=>[KNN %d @%s $query_vector EF_RUNTIME %d]",
config.search.k, config.search.vector_field, config.search.ef_search);
query = sdscatprintf(sdsempty(),
"*=>[KNN %d @%s $query_vector EF_RUNTIME %d]",
config.search.k,
config.search.vector_field,
config.search.ef_search);
}
len = valkeyFormatCommand(&cmd, "FT.SEARCH %b %b PARAMS 2 query_vector __rand_vec_f32____rand_vec_1st____rand_vec_2nd____rand_vec_3nd__%b DIALECT 2",
config.search.name, sdslen(config.search.name),
query, sdslen(query),
vector_binary, sdslen(vector_binary));
sdsfree(query);
/* Build FT.SEARCH command */
if (config.search.nocontent) {
len = valkeyFormatCommand(&cmd,
"FT.SEARCH %b %b NOCONTENT PARAMS 2 query_vector %b DIALECT 2",
config.search.name, sdslen(config.search.name),
query, sdslen(query),
vector_binary, sdslen(vector_binary));
} else {
len = valkeyFormatCommand(&cmd,
"FT.SEARCH %b %b PARAMS 2 query_vector %b DIALECT 2",
config.search.name, sdslen(config.search.name),
query, sdslen(query),
vector_binary, sdslen(vector_binary));
}
sdsfree(query);
sdsfree(vector_binary);
zfree(vector);
}
/* Use the existing benchmark infrastructure */
benchmark(title, cmd, len);
/* Cleanup */
sdsfree(vector_binary);
// zfree(sample_vector);
free(cmd);
}
/* Thread worker function for prefill */
static void *prefillWorkerThread(void *arg) {
if (!config.use_search) return NULL;
/* Non-cluster prefill worker thread */
static void *searchPrefillWorkerThread(void *arg) {
assert(config.use_search);
prefillThreadData *data = (prefillThreadData *)arg;
valkeyContext *ctx = getValkeyContext(config.ct, config.conn_info.hostip, config.conn_info.hostport);
if (ctx == NULL) {
fprintf(stderr, "Thread %d: Failed to connect to Valkey server.\n", data->thread_id);
/* Connect to single node */
const char *ip = config.conn_info.hostip;
int port = config.conn_info.hostport;
valkeyContext *ctx = getValkeyContext(config.ct, ip, port);
if (!ctx) {
fprintf(stderr, "Thread %d: Failed to connect to %s:%d\n",
data->thread_id, ip, port);
return NULL;
}
int vec_dim = config.search.vector_dim;
sds prefix = config.search.prefix;
sds vector_field = config.search.vector_field;
float *vector = zmalloc(vec_dim * sizeof(float));
/* Use pipelining to batch commands */
int pipeline_size = 1;
int pipeline_count = 0;
/* Process assigned range */
for (int i = data->start_index; i <= data->end_index; i++) {
/* Generate key using same format as vec-insert benchmark */
sds key = sdscatprintf(sdsempty(), "%s%012d", prefix, i);
/* Generate vector efficiently */
float *vector = zmalloc(vec_dim * sizeof(float));
generateVectorEfficient(vector, vec_dim, i);
/* Convert to binary format for HSET (same as search expects) */
/* Simple key without cluster tags */
sds key = sdscatprintf(sdsempty(), "%s%012d",
config.search.prefix, i);
/* Generate unique vector deterministically based on key index */
generateVectorUnique(vector, vec_dim, i);
sds vector_binary = vectorToBinary(vector, vec_dim);
/* Insert vector using HSET with binary data and optional tag */
valkeyReply *reply;
/* Use pipelining for better throughput */
if (config.search.tag_field && config.search.curr_conf.tag_dists) {
sds selected_tag = selectTagByDistribution();
if (selected_tag) {
reply = valkeyCommand(ctx, "HSET %b %s %b %s %s",
key, sdslen(key),
vector_field,
vector_binary, sdslen(vector_binary),
config.search.tag_field, selected_tag);
sdsfree(selected_tag);
} else {
/* No tags selected, just insert vector */
reply = valkeyCommand(ctx, "HSET %b %s %b",
key, sdslen(key),
vector_field,
vector_binary, sdslen(vector_binary));
}
valkeyAppendCommand(ctx, "HSET %b %s %b %s %s",
key, sdslen(key),
config.search.vector_field,
vector_binary, sdslen(vector_binary),
config.search.tag_field,
selected_tag ? selected_tag : "");
if (selected_tag) sdsfree(selected_tag);
} else {
reply = valkeyCommand(ctx, "HSET %b %s %b",
key, sdslen(key),
vector_field,
vector_binary, sdslen(vector_binary));
}
if (!reply || reply->type == VALKEY_REPLY_ERROR) {
fprintf(stderr, "Thread %d: Failed to insert vector %d: %s\n",
data->thread_id, i, reply ? reply->str : "Connection error");
if (reply) freeReplyObject(reply);
sdsfree(key);
sdsfree(vector_binary);
zfree(vector);
break;
valkeyAppendCommand(ctx, "HSET %b %s %b",
key, sdslen(key),
config.search.vector_field,
vector_binary, sdslen(vector_binary));
}
pipeline_count++;
/* Process pipeline when full or at end */
if (pipeline_count >= pipeline_size || i == data->end_index) {
for (int j = 0; j < pipeline_count; j++) {
void *reply = NULL;
if (valkeyGetReply(ctx, &reply) == VALKEY_OK) {
valkeyReply *r = (valkeyReply *)reply;
if (r && r->type == VALKEY_REPLY_ERROR) {
fprintf(stderr, "Thread %d: Error: %s\n",
data->thread_id, r->str);
}
if (reply) freeReplyObject(reply);
} else {
fprintf(stderr, "Thread %d: Failed to get reply\n",
data->thread_id);
/* Reconnect on connection errors */
valkeyFree(ctx);
ctx = getValkeyContext(config.ct, ip, port);
if (!ctx) break;
}
}
/* Update progress */
pthread_mutex_lock(data->progress_mutex);
(*data->global_progress) += pipeline_count;
pthread_mutex_unlock(data->progress_mutex);
pipeline_count = 0;
}
freeReplyObject(reply);
sdsfree(key);
sdsfree(vector_binary);
zfree(vector);
/* Update global progress thread-safely */
pthread_mutex_lock(data->progress_mutex);
(*data->global_progress)++;
pthread_mutex_unlock(data->progress_mutex);
}
valkeyFree(ctx);
zfree(vector);
if (ctx) valkeyFree(ctx);
return NULL;
}
/* Prefill vector index with specified number of vectors */
/* Updated prefillVectorIndex to choose the right worker function */
static void prefillVectorIndex(int count) {
if (!config.use_search) return;
assert(config.search.vector_dim > 0 && config.use_search);
if (count <= 0) return;
int vec_dim = config.search.vector_dim;
/* Ensure base vector is initialized */
if (!base_vector) {
initBaseVector(vec_dim);
@ -1016,11 +1103,11 @@ static void prefillVectorIndex(int count) {
/* Determine number of threads to use */
int num_threads = (config.num_threads > 0) ? config.num_threads : 1;
if (count < num_threads) {
num_threads = count; /* Don't use more threads than vectors */
num_threads = count;
}
printf("Prefilling index with %d vectors using %d thread(s)...\n", count, num_threads);
/* Use multithreaded approach */
pthread_t *threads = zmalloc(num_threads * sizeof(pthread_t));
prefillThreadData *thread_data = zmalloc(num_threads * sizeof(prefillThreadData));
@ -1052,12 +1139,17 @@ static void prefillVectorIndex(int count) {
current_start = thread_data[i].end_index + 1;
if (pthread_create(&threads[i], NULL, prefillWorkerThread, &thread_data[i])) {
/* Choose the right worker function based on cluster mode */
void *(*worker_func)(void *) = config.cluster_mode ?
searchPrefillWorkerThreadCluster : searchPrefillWorkerThread;
if (pthread_create(&threads[i], NULL, worker_func, &thread_data[i])) {
fprintf(stderr, "Failed to create prefill thread %d\n", i);
exit(1);
}
}
/* Rest of the function remains the same... */
/* Monitor progress while threads are working */
int last_progress = 0;
while (global_progress < count) {
@ -1069,14 +1161,7 @@ static void prefillVectorIndex(int count) {
if (current_progress >= last_progress + progress_interval || current_progress == count) {
float progress = (float)current_progress / count * 100.0f;
long long search_memory = 0;
long long search_reclaimable = 0;
long long search_total_docs = 0;
long long search_ingest_field_vector = 0;
long long search_background_indexing_status = 0;
getSearchInfo(&search_memory, &search_reclaimable, &search_total_docs,
&search_ingest_field_vector, &search_background_indexing_status);
printf("Prefilled %d vectors (%.1f%%)...[s_mem=%lldMB][s_recl=%lld][s_docs=%lld][s_ingest_vecs=%lld][s_bg_index_stat=%lld]\n", current_progress, progress, search_memory/(1024*1024), search_reclaimable/(1024*1024), search_total_docs, search_ingest_field_vector, search_background_indexing_status);
printf("Prefilled %d vectors (%.1f%%)...\n", current_progress, progress);
last_progress = current_progress;
}
}
@ -1095,7 +1180,170 @@ static void prefillVectorIndex(int count) {
pthread_mutex_destroy(&progress_mutex);
zfree(threads);
zfree(thread_data);
}
#if 1
/* Enhanced prefill for cluster mode with proper slot distribution */
static void *searchPrefillWorkerThreadCluster(void *arg) {
assert(config.use_search);
prefillThreadData *data = (prefillThreadData *)arg;
/* In cluster mode, connect to a specific node based on thread ID */
const char *ip = config.conn_info.hostip;
int port = config.conn_info.hostport;
if (config.cluster_mode && config.cluster_primary_node_count) {
/* Distribute threads across cluster nodes */
int node_idx = data->thread_id % config.cluster_primary_node_count;
clusterNode *node = config.cluster_primary_nodes[node_idx];
if (node) {
ip = node->ip;
port = node->port;
}
}
valkeyContext *ctx = getValkeyContext(config.ct, ip, port);
if (!ctx) {
fprintf(stderr, "Thread %d: Failed to connect to %s:%d\n",
data->thread_id, ip, port);
return NULL;
}
int vec_dim = config.search.vector_dim;
float *vector = zmalloc(vec_dim * sizeof(float));
/* Use pipelining to batch commands */
int pipeline_size = 1;
int pipeline_count = 0;
for (int i = data->start_index; i <= data->end_index; i++) {
sds key;
if (config.cluster_mode) {
/* For cluster mode, ensure deterministic slot assignment */
/* Use a specific slot tag to route to the connected node */
int node_idx = data->thread_id % config.cluster_primary_node_count;
clusterNode *node = config.cluster_primary_nodes[node_idx];
if (node && node->slots_count > 0) {
/* Pick a slot that this node owns */
int slot_idx = i % node->slots_count;
int slot = node->slots[slot_idx];
const char *tag = crc16_slot_table[slot];
key = sdscatprintf(sdsempty(), "%s%012d{%s}",
config.search.prefix, i, tag);
} else {
/* Fallback: let cluster redirect as needed */
key = sdscatprintf(sdsempty(), "%s%012d",
config.search.prefix, i);
}
} else {
key = sdscatprintf(sdsempty(), "%s%012d",
config.search.prefix, i);
}
/* Generate unique vector deterministically based on key index */
generateVectorUnique(vector, vec_dim, i);
sds vector_binary = vectorToBinary(vector, vec_dim);
/* Use pipelining for better throughput */
if (config.search.tag_field && config.search.curr_conf.tag_dists) {
sds selected_tag = selectTagByDistribution();
valkeyAppendCommand(ctx, "HSET %b %s %b %s %s",
key, sdslen(key),
config.search.vector_field,
vector_binary, sdslen(vector_binary),
config.search.tag_field,
selected_tag ? selected_tag : "");
if (selected_tag) sdsfree(selected_tag);
} else {
valkeyAppendCommand(ctx, "HSET %b %s %b",
key, sdslen(key),
config.search.vector_field,
vector_binary, sdslen(vector_binary));
}
pipeline_count++;
/* Process pipeline when full or at end */
if (pipeline_count >= pipeline_size || i == data->end_index) {
for (int j = 0; j < pipeline_count; j++) {
void *reply = NULL;
if (valkeyGetReply(ctx, &reply) == VALKEY_OK) {
valkeyReply *r = (valkeyReply *)reply;
/* Handle MOVED errors silently - they're expected in cluster */
if (r && r->type == VALKEY_REPLY_ERROR) {
if (strncmp(r->str, "MOVED", 5) != 0 &&
strncmp(r->str, "ASK", 3) != 0) {
/* Only log non-redirect errors */
fprintf(stderr, "Thread %d: Error: %s\n",
data->thread_id, r->str);
}
}
if (reply) freeReplyObject(reply);
} else {
fprintf(stderr, "Thread %d: Failed to get reply\n",
data->thread_id);
/* Reconnect on connection errors */
valkeyFree(ctx);
ctx = getValkeyContext(config.ct, ip, port);
if (!ctx) break;
}
}
/* Update progress */
pthread_mutex_lock(data->progress_mutex);
(*data->global_progress) += pipeline_count;
pthread_mutex_unlock(data->progress_mutex);
pipeline_count = 0;
}
sdsfree(key);
sdsfree(vector_binary);
}
zfree(vector);
if (ctx) valkeyFree(ctx);
return NULL;
}
#endif
/* Fix the setClusterKeyHashTag to work with vector placeholders */
static void setClusterKeyHashTagForVectors(client c) {
assert(c->thread_id >= 0);
clusterNode *node = c->cluster_node;
assert(node);
int is_updating_slots = atomic_load_explicit(&config.is_updating_slots,
memory_order_relaxed);
if (is_updating_slots) updateClusterSlotsConfiguration();
/* Select a random slot from this node */
int slot = node->slots[rand() % node->slots_count];
const char *tag = crc16_slot_table[slot];
int taglen = strlen(tag);
/* Update all {tag} placeholders in the command buffer */
char *p = c->obuf + c->prefixlen;
char *end = c->obuf + sdslen(c->obuf);
while ((p = strstr(p, "{tag}")) != NULL && p < end) {
/* Replace {tag} with actual slot tag */
memmove(p + 1 + taglen + 1, p + 5, end - (p + 5));
p[0] = '{';
memcpy(p + 1, tag, taglen);
p[1 + taglen] = '}';
/* Adjust buffer length if tag is shorter than "tag" */
if (taglen < 3) {
int diff = 3 - taglen;
sdsrange(c->obuf, 0, sdslen(c->obuf) - diff - 1);
end -= diff;
}
p += taglen + 2; /* Move past the replaced tag */
}
}
// TODO: If index already exists, we shuld check if it matches the current configuration.
@ -1271,59 +1519,6 @@ void initPlaceholders(const char *cmd, size_t cmd_len) {
return;
}
static void replacePlaceholderFloat32(const size_t *indices, const size_t count, char *cmd, _Atomic uint64_t *key_counter, unsigned placeholder_len) {
if (!config.use_search) return;
// Replace __rand_vec_f32__ placeholder with a float vector in range of -1.0 to 1.0
if (count == 0) return;
uint64_t key = 0;
if (config.keyspacelen != 0) {
if (config.sequential_replacement) {
key = atomic_fetch_add_explicit(key_counter, 1, memory_order_relaxed);
} else {
key = random();
}
key %= config.keyspacelen;
}
// Generate float vector with improved randomness
unsigned num_floats = placeholder_len / 4;
float vector[num_floats];
// Use xorshift64* for better quality random numbers
uint64_t state = key ? key : 0x123456789ABCDEF0ULL;
for (unsigned i = 0; i < num_floats; i++) {
// xorshift64* algorithm - much better than linear congruential
state ^= state >> 12;
state ^= state << 25;
state ^= state >> 27;
state *= 0x2545F4914F6CDD1DULL;
// Mix with index for additional entropy
uint64_t mixed = state ^ ((uint64_t)i * 0x9E3779B97F4A7C15ULL);
// Convert to float in range [-1, 1] with better distribution
// Use upper 32 bits for better quality
uint32_t bits = (uint32_t)(mixed >> 32);
vector[i] = ((float)bits / (float)0xFFFFFFFF) * 2.0f - 1.0f;
if ((key + i) % 3 == 0) {
vector[i] += vector[i]*(float)key;
} else if (key) {
vector[i] /= (float)key;
}
}
char *placeholder = cmd + indices[0];
memcpy(placeholder, vector, placeholder_len);
// Copy the first instance to the other locations
for (size_t j = 1; j < count; j++) {
char *copy_placeholder = cmd + indices[j];
memcpy(copy_placeholder, placeholder, placeholder_len);
}
}
static void replacePlaceholder(const size_t *indices, const size_t count, char *cmd, _Atomic uint64_t *key_counter, unsigned placeholder_len) {
if (count == 0) return;
@ -1352,34 +1547,87 @@ static void replacePlaceholder(const size_t *indices, const size_t count, char *
}
}
static void replacePlaceholderVector(const size_t *indices, const size_t count,
char *cmd, _Atomic uint64_t *key_counter) {
if (!config.use_search || count == 0) return;
/* Self-check: ensure placeholder is exactly 8 bytes */
assert(VECTOR_PLACEHOLDER_LEN == 8);
/* Get key for randomization */
uint64_t key = 0;
if (config.keyspacelen != 0) {
if (config.sequential_replacement) {
key = atomic_fetch_add_explicit(key_counter, 1, memory_order_relaxed);
} else {
key = random();
}
key %= config.keyspacelen;
}
/* Generate exactly 2 floats (8 bytes) */
float vector[2];
uint64_t state = key ? key : 0x123456789ABCDEF0ULL;
for (int i = 0; i < 2; i++) {
state ^= state >> 12;
state ^= state << 25;
state ^= state >> 27;
state *= 0x2545F4914F6CDD1DULL;
uint32_t bits = (uint32_t)(state >> 32);
vector[i] = ((float)(int32_t)bits) / 2147483648.0f;
}
/* Normalize if using COSINE metric */
if (config.search.metric && strcmp(config.search.metric, "COSINE") == 0) {
float norm = sqrtf(vector[0] * vector[0] + vector[1] * vector[1]);
if (norm > 0.0f) {
vector[0] /= norm;
vector[1] /= norm;
}
}
/* Replace all occurrences in-place (exactly 8 bytes) */
for (size_t j = 0; j < count; j++) {
char *placeholder = cmd + indices[j];
/* Self-check: verify we're replacing "__v_rd__" */
assert(memcmp(placeholder, VECTOR_PLACEHOLDER, VECTOR_PLACEHOLDER_LEN) == 0);
memcpy(placeholder, vector, 8); // Exactly 8 bytes replacement
}
}
static void replacePlaceholders(char *cmd_data, int cmd_count) {
static _Atomic uint64_t seq_key[PLACEHOLDER_COUNT] = {0};
for (int cmd_index = 0; cmd_index < cmd_count; cmd_index++) {
char *cmd = cmd_data + cmd_index * placeholders.cmd_len;
// printf("Replacing placeholders in command %d: %.*s\n", cmd_index, (int)placeholders.cmd_len, cmd);
/* for __rand_int__, multiple instances will have different values */
/* Handle __rand_int__ separately (multiple different values) */
size_t *indices = placeholders.indices[0];
_Atomic uint64_t *key_counter = &seq_key[0];
for (size_t i = 0; i < placeholders.count[0]; i++) {
replacePlaceholder(indices + i, 1, cmd, key_counter, PLACEHOLDER_LEN);
}
/* For other placeholders, multiple occurrences within the command will
* have the same value */
for (size_t placeholder = 1; placeholder < PLACEHOLDER_COUNT; placeholder++) {
size_t *indices = placeholders.indices[placeholder];
/* Handle other regular placeholders */
for (size_t placeholder = 1; placeholder < VECTOR_PLACEHOLDER_INDEX; placeholder++) {
indices = placeholders.indices[placeholder];
size_t count = placeholders.count[placeholder];
if (placeholder >= VECTOR_PLACEHOLDER_INDEX) {
replacePlaceholderFloat32(indices, count, cmd, &seq_key[placeholder], VECTOR_PLACEHOLDER_LEN);
} else {
/* Handle regular placeholders */
_Atomic uint64_t *key_counter = &seq_key[placeholder];
replacePlaceholder(indices, count, cmd, key_counter, PLACEHOLDER_LEN);
}
key_counter = &seq_key[placeholder];
replacePlaceholder(indices, count, cmd, key_counter, PLACEHOLDER_LEN);
}
/* Handle vector placeholder */
if (config.use_search && placeholders.count[VECTOR_PLACEHOLDER_INDEX] > 0) {
indices = placeholders.indices[VECTOR_PLACEHOLDER_INDEX];
size_t count = placeholders.count[VECTOR_PLACEHOLDER_INDEX];
replacePlaceholderVector(indices, count, cmd,
&seq_key[VECTOR_PLACEHOLDER_INDEX]);
}
}
}
@ -1445,30 +1693,6 @@ static void resetClient(client c) {
c->pending = config.pipeline * c->seqlen;
}
static void setClusterKeyHashTag(client c) {
assert(c->thread_id >= 0);
clusterNode *node = c->cluster_node;
assert(node);
int is_updating_slots = atomic_load_explicit(&config.is_updating_slots, memory_order_relaxed);
/* If updateClusterSlotsConfiguration is updating the slots array,
* call updateClusterSlotsConfiguration is order to block the thread
* since the mutex is locked. When the slots will be updated by the
* thread that's actually performing the update, the execution of
* updateClusterSlotsConfiguration won't actually do anything, since
* the updated_slots_count array will be already NULL. */
if (is_updating_slots) updateClusterSlotsConfiguration();
int slot = node->slots[rand() % node->slots_count];
const char *tag = crc16_slot_table[slot];
int taglen = strlen(tag);
size_t i;
for (i = 0; i < c->staglen; i++) {
char *p = c->stagptr[i] + 1;
p[0] = tag[0];
p[1] = (taglen >= 2 ? tag[1] : '}');
p[2] = (taglen == 3 ? tag[2] : '}');
}
}
/* Acquires the specified number of tokens from the token bucket or calculates the wait time if tokens are not available.
* This function implements a token bucket rate limiting algorithm to control access to a resource.
*
@ -1746,7 +1970,7 @@ static void writeHandler(aeEventLoop *el, int fd, void *privdata, int mask) {
/* Really initialize: replace keys and set start time. */
if (config.replace_placeholders) replacePlaceholders(c->obuf + c->prefixlen, config.pipeline);
if (config.cluster_mode && c->staglen > 0) setClusterKeyHashTag(c);
if (config.cluster_mode && c->staglen > 0) setClusterKeyHashTagForVectors(c);
c->slots_last_update = atomic_load_explicit(&config.slots_last_update, memory_order_relaxed);
c->start = ustime();
c->latency = -1;
@ -2236,14 +2460,20 @@ static void freeClusterNodes(void) {
if (n) freeClusterNode(n);
}
zfree(config.cluster_nodes);
zfree(config.cluster_primary_nodes);
config.cluster_nodes = NULL;
config.cluster_primary_nodes = NULL;
}
static clusterNode **addClusterNode(clusterNode *node) {
int count = config.cluster_node_count + 1;
config.cluster_nodes = zrealloc(config.cluster_nodes, count * sizeof(*node));
config.cluster_nodes = zrealloc(config.cluster_nodes, count * sizeof(clusterNode *));
if (!config.cluster_nodes) return NULL;
config.cluster_nodes[config.cluster_node_count++] = node;
if (node->replicate == NULL) {
config.cluster_primary_nodes = zrealloc(config.cluster_primary_nodes, (config.cluster_primary_node_count + 1) * sizeof(clusterNode *));
config.cluster_primary_nodes[config.cluster_primary_node_count++] = node;
}
return config.cluster_nodes;
}
@ -2579,12 +2809,12 @@ void setDefaultSearchConfig(void) {
config.search.tag_field = NULL; // No tag field by default
config.search.numeric_field = NULL; // No numeric field by default
config.search.k = 10; // Default K for KNN queries
config.search.curr_conf.n_rnd_dim = 16; // Default number of random dimensions
config.search.curr_conf.tag_dists = NULL;
config.search.curr_conf.n_dists = 0;
config.search.curr_conf.tag_filter = NULL;
config.search.metric = sdsnew("L2");
config.search.algorithm = sdsnew("hnsw"); // Default algorithm
config.search.nocontent = 0; // exclude content by default
}
/* Returns number of consumed options. */
int parseOptions(int argc, char **argv) {
@ -2762,7 +2992,7 @@ int parseOptions(int argc, char **argv) {
} else if (!strcmp(argv[i], "--vector-dim")) {
if (lastarg) goto invalid;
config.search.vector_dim = atoi(argv[++i]);
if (config.search.vector_dim <= config.search.curr_conf.n_rnd_dim) {
if (config.search.vector_dim <= VECTOR_NUM_RAND_DIM) {
fprintf(stderr, "Invalid vector dimension: %d\n", config.search.vector_dim);
goto invalid;
}
@ -3150,7 +3380,7 @@ int main(int argc, char **argv) {
int i;
char *data, *cmd, *tag;
int len;
memset(&config, 0, sizeof(config));
client c;
srandom(time(NULL) ^ getpid());
@ -3158,7 +3388,6 @@ int main(int argc, char **argv) {
signal(SIGHUP, SIG_IGN);
signal(SIGPIPE, SIG_IGN);
memset(&config.sslconfig, 0, sizeof(config.sslconfig));
config.ct = VALKEY_CONN_TCP;
config.numclients = 50;
config.requests = 100000;
@ -3191,7 +3420,7 @@ int main(int argc, char **argv) {
config.threads = NULL;
config.cluster_mode = 0;
config.rps = 0;
config.read_from_replica = FROM_PRIMARY_ONLY;
config.read_from_replica = FROM_ALL;
config.cluster_node_count = 0;
config.cluster_nodes = NULL;
config.server_config = NULL;
@ -3479,7 +3708,7 @@ int main(int argc, char **argv) {
initBaseVector(vec_dim);
/* Use custom vector benchmark function */
benchmarkVectorOp("VEC-INSERT", 1);
benchmarkVectorOpClusterAware("VEC-INSERT", 1);
}
if (test_is_selected("vec-query")) {
@ -3489,9 +3718,20 @@ int main(int argc, char **argv) {
initBaseVector(vec_dim);
/* Use custom vector benchmark function */
benchmarkVectorOp("VEC-QUERY", 0);
benchmarkVectorOpClusterAware("VEC-QUERY", 0);
}
if (test_is_selected("vec-del")) {
sds prefix = config.search.prefix;
if (config.cluster_mode) {
len = valkeyFormatCommand(&cmd, "DEL %s{tag}:__rand_int__", prefix);
} else {
len = valkeyFormatCommand(&cmd, "DEL %s__rand_int__", prefix);
}
benchmark("VEC-DEL", cmd, len);
free(cmd);
}
#if 0
if (test_is_selected("vec-del")) {
/* Use DEL command to delete keys from vector index */
sds prefix = config.search.prefix;
@ -3499,6 +3739,7 @@ int main(int argc, char **argv) {
benchmark("VEC-DEL", cmd, len);
free(cmd);
}
#endif
}
if (test_is_selected("spop")) {
len = valkeyFormatCommand(&cmd, "SPOP myset%s", tag);