mirror of https://github.com/valkey-io/valkey
Introduce volatile set
-------------
Overview:
---------
This PR introduces a complete redesign of the 'vset' (stands for volatile set) data structure,
creating an adaptive container for expiring entries. The new design is
memory-efficient, scalable, and dynamically promotes/demotes its internal
representation depending on runtime behavior and volume.
The core concept uses a single tagged pointer (`expiry_buckets`) that encodes
one of several internal structures:
- NONE (-1): Empty set
- SINGLE (0x1): One entry
- VECTOR (0x2): Sorted vector of entry pointers
- HT (0x4): Hash table for larger buckets with many entries
- RAX (0x6): Radix tree (keyed by aligned expiry timestamps)
This allows the set to grow and shrink seamlessly while optimizing for both
space and performance.
Motivation:
-----------
The previous design lacked flexibility in high-churn environments or
workloads with skewed expiry distributions. This redesign enables dynamic
layout adjustment based on the time distribution and volume of the inserted
entries, while maintaining fast expiry checks and minimal memory overhead.
Key Concepts:
-------------
- All pointers stored in the structure must be odd-aligned to preserve
3 bits for tagging. This is safe with SDS strings (which set the LSB).
- Buckets evolve automatically:
- Start as NONE.
- On first insert → become SINGLE.
- If another entry with similar expiry → promote to VECTOR.
- If VECTOR exceeds 127 entries → convert to RAX.
- If a RAX bucket's vector fills and cannot split → promote to HT.
- Each vector bucket is kept sorted by `entry->getExpiry()`.
- Binary search is used for efficient insertion and splitting.
# Coarse Buckets Expiration System for Hash Fields
This PR introduces **coarse-grained expiration buckets** to support per-field
expirations in hash types — a feature known as *volatile fields*.
It enables scalable expiration tracking by grouping fields into time-aligned
buckets instead of individually tracking exact timestamps.
## Motivation
Valkey traditionally supports key-level expiration. However, in many applications,
there's a strong need to expire individual fields within a hash (e.g., session keys,
token caches, etc.).
Tracking these at fine granularity is expensive and potentially unscalable, so
this implementation introduces *bucketed expirations* to batch expirations together.
## Bucket Granularity and Timestamp Handling
- Each expiration bucket represents a time slice of fixed width (e.g., 8192 ms).
- Expiring fields are mapped to the **end** of a time slice (not the floor).
- This design facilitates:
- Efficient *splitting* of large buckets when needed
- *Downgrading* buckets when fields permit tighter packing
- Coalescing during lazy cleanup or memory pressure
### Example Calculation
Suppose a field has an expiration time of `1690000123456` ms and the max bucket
interval is 8192 ms:
```
BUCKET_INTERVAL_MAX = 8192;
expiry = 1690000123456;
bucket_ts = (expiry & ~(BUCKET_INTERVAL_MAX - 1LL)) + BUCKET_INTERVAL_MAX;
= (1690000123456 & ~8191) + 8192
= 1690000122880 + 8192
= 1690000131072
```
The field is stored in a bucket that **ends at** `1690000131072` ms.
### Bucket Alignment Diagram
```
Time (ms) →
|----------------|----------------|----------------|
128ms buckets → 1690000122880 1690000131072
^ ^
| |
expiry floor assigned bucket end
```
## Bucket Placement Logic
- If a suitable bucket **already exists** (i.e., its `end_ts > expiry`), the field is added.
- If no bucket covers the `expiry`, a **new bucket** is created at the computed `end_ts`.
## Bucket Downgrade Conditions
Buckets are downgraded to smaller intervals when overpopulated (>127 fields).
This happens when **all fields fit into a tighter bucket**.
Downgrade rule:
```
(max_expiry & ~(BUCKET_INTERVAL_MIN - 1LL)) + BUCKET_INTERVAL_MIN < current_bucket_ts
```
If the above holds, all fields can be moved to a tighter bucket interval.
### Downgrade Bucket — Diagram
```
Before downgrade:
Current Bucket (8192 ms)
|----------------------------------------|
| Field A | Field B | Field C | Field D |
| exp=+30 | +200 | +500 | +1500 |
|----------------------------------------|
↑
All expiries fall before tighter boundary
After downgrade to 1024 ms:
New Bucket (1024 ms)
|------------------|
| A | B | C | D |
|------------------|
```
### Bucket Split Strategy
If downgrade is not possible, the bucket is **split**:
- Fields are sorted by expiration time.
- A subset that fits in an earlier bucket is moved out.
- Remaining fields stay in the original bucket.
### Split Bucket — Diagram
```
Before split:
Large Bucket (8192 ms)
|--------------------------------------------------|
| A | B | C | D | E | F | G | H | I | J | ... | Z |
|---------------- Sorted by expiry ---------------|
↑
Fields A–L can be moved to an earlier bucket
After split:
Bucket 1 (end=1690000129024) Bucket 2 (end=1690000131072)
|------------------------| |------------------------|
| A | B | C | ... | L | | M | N | O | ... | Z |
|------------------------| |------------------------|
```
## Summary of Bucket Behavior
| Scenario | Action Taken |
|--------------------------------|------------------------------|
| No bucket covers expiry | New bucket is created |
| Existing bucket fits | Field is added |
| Bucket overflows (>127 fields) | Downgrade or split attempted |
API Changes:
------------
Create/Free:
void vsetInit(vset *set);
void vsetClear(vset *set);
Mutation:
bool vsetAddEntry(vset *set, vsetGetExpiryFunc getExpiry, void *entry);
bool vsetRemoveEntry(vset *set, vsetGetExpiryFunc getExpiry, void *entry);
bool vsetUpdateEntry(vset *set, vsetGetExpiryFunc getExpiry, void *old_entry,
void *new_entry, long long old_expiry,
long long new_expiry);
Expiry Retrieval:
long long vsetEstimatedEarliestExpiry(vset *set, vsetGetExpiryFunc getExpiry);
size_t vsetPopExpired(vset *set, vsetGetExpiryFunc getExpiry, vsetExpiryFunc expiryFunc, mstime_t now, size_t max_count, void *ctx);
Utilities:
bool vsetIsEmpty(vset *set);
size_t vsetMemUsage(vset *set);
Iteration:
void vsetStart(vset *set, vsetIterator *it);
bool vsetNext(vsetIterator *it, void **entryptr);
void vsetStop(vsetIterator *it);
Entry Requirements:
-------------------
All entries must conform to the following interface via `volatileEntryType`:
sds entryGetKey(const void entry); // for deduplication
long long getExpiry(const void entry); // used for bucketing
int expire(void db, void o, void entry); // used for expiration callbacks
Diagrams:
---------
1. Tagged Pointer Representation
-----------------------------
Lower 3 bits of `expiry_buckets` encode bucket type:
+------------------------------+
| pointer | TAG (3b) |
+------------------------------+
↑
masked via VSET_PTR_MASK
TAG values:
0x1 → SINGLE
0x2 → VECTOR
0x4 → HT
0x6 → RAX
2. Evolution of the Bucket
------------------------
*Volatile set top-level structure:*
```
+--------+ +--------+ +--------+ +--------+
| NONE | --> | SINGLE | --> | VECTOR | --> | RAX |
+--------+ +--------+ +--------+ +--------+
```
*If the top-level element is a RAX, it has child buckets of type:*
```
+--------+ +--------+ +-----------+
| SINGLE | --> | VECTOR | --> | HASHTABLE |
+--------+ +--------+ +-----------+
```
*Vectors can split into multiple vectors and shrink into SINGLE buckets. A RAX with only one element is collapsed by replacing the RAX with its single element on the top level (except for HASHTABLE buckets which are not allowed on the top level).*
3. RAX Structure with Expiry-Aligned Keys
--------------------------------------
Buckets in RAX are indexed by aligned expiry timestamps:
+------------------------------+
| RAX key (bucket_ts) → Bucket|
+------------------------------+
| 0x00000020 → VECTOR |
| 0x00000040 → VECTOR |
| 0x00000060 → HT |
+------------------------------+
4. Bucket Splitting (Inside RAX)
-----------------------------
If a vector bucket in a RAX fills:
- Binary search for best split point.
- Use `getExpiry(entry)` + `get_bucket_ts()` to find transition.
- Create 2 new buckets and update RAX.
Original:
[entry1, entry2, ..., entryN] ← bucket_ts = 64ms
After split:
[entry1, ..., entryK] → bucket_ts = 32ms
[entryK+1, ..., entryN] → bucket_ts = 64ms
If all entries share same bucket_ts → promote to HT.
5. Shrinking Behavior
------------------
On deletion:
- HT may shrink to VECTOR.
- VECTOR with 1 item → becomes SINGLE.
- If RAX has only one key left, it’s promoted up.
Summary:
--------
This redesign provides:
✓ Fine-grained memory control
✓ High scalability for bursty TTL data
✓ Fast expiry checks via windowed organization
✓ Minimal overhead for sparse sets
✓ Flexible binary-search-based sorting and bucketing
It also lays the groundwork for future enhancements, including metrics,
prioritized expiry policies, or segmented cleaning.
Signed-off-by: Ran Shidlansik <ranshid@amazon.com>
This commit is contained in:
parent
65215e5378
commit
c3b1b0317d
|
|
@ -119,7 +119,7 @@ set(VALKEY_SERVER_SRCS
|
||||||
${CMAKE_SOURCE_DIR}/src/server.c
|
${CMAKE_SOURCE_DIR}/src/server.c
|
||||||
${CMAKE_SOURCE_DIR}/src/logreqres.c
|
${CMAKE_SOURCE_DIR}/src/logreqres.c
|
||||||
${CMAKE_SOURCE_DIR}/src/entry.c
|
${CMAKE_SOURCE_DIR}/src/entry.c
|
||||||
${CMAKE_SOURCE_DIR}/src/volatile_set.c)
|
${CMAKE_SOURCE_DIR}/src/vset.c)
|
||||||
|
|
||||||
|
|
||||||
# valkey-cli
|
# valkey-cli
|
||||||
|
|
|
||||||
|
|
@ -423,7 +423,7 @@ ENGINE_NAME=valkey
|
||||||
SERVER_NAME=$(ENGINE_NAME)-server$(PROG_SUFFIX)
|
SERVER_NAME=$(ENGINE_NAME)-server$(PROG_SUFFIX)
|
||||||
ENGINE_SENTINEL_NAME=$(ENGINE_NAME)-sentinel$(PROG_SUFFIX)
|
ENGINE_SENTINEL_NAME=$(ENGINE_NAME)-sentinel$(PROG_SUFFIX)
|
||||||
ENGINE_TRACE_OBJ=trace/trace.o trace/trace_commands.o trace/trace_db.o trace/trace_cluster.o trace/trace_server.o trace/trace_rdb.o trace/trace_aof.o
|
ENGINE_TRACE_OBJ=trace/trace.o trace/trace_commands.o trace/trace_db.o trace/trace_cluster.o trace/trace_server.o trace/trace_rdb.o trace/trace_aof.o
|
||||||
ENGINE_SERVER_OBJ=threads_mngr.o adlist.o vector.o quicklist.o ae.o anet.o dict.o hashtable.o kvstore.o server.o sds.o zmalloc.o lzf_c.o lzf_d.o pqsort.o zipmap.o sha1.o ziplist.o release.o memory_prefetch.o io_threads.o networking.o util.o object.o db.o replication.o rdb.o t_string.o t_list.o t_set.o t_zset.o t_hash.o config.o aof.o pubsub.o multi.o debug.o sort.o intset.o syncio.o cluster.o cluster_legacy.o cluster_slot_stats.o crc16.o endianconv.o commandlog.o eval.o bio.o rio.o rand.o memtest.o syscheck.o crcspeed.o crccombine.o crc64.o bitops.o sentinel.o notify.o setproctitle.o blocked.o hyperloglog.o latency.o sparkline.o valkey-check-rdb.o valkey-check-aof.o geo.o lazyfree.o module.o evict.o expire.o geohash.o geohash_helper.o childinfo.o allocator_defrag.o defrag.o siphash.o rax.o t_stream.o listpack.o localtime.o lolwut.o lolwut5.o lolwut6.o acl.o tracking.o socket.o tls.o sha256.o timeout.o setcpuaffinity.o monotonic.o mt19937-64.o resp_parser.o call_reply.o script.o functions.o commands.o strl.o connection.o unix.o logreqres.o rdma.o scripting_engine.o entry.o volatile_set.o lua/script_lua.o lua/function_lua.o lua/engine_lua.o lua/debug_lua.o
|
ENGINE_SERVER_OBJ=threads_mngr.o adlist.o vector.o quicklist.o ae.o anet.o dict.o hashtable.o kvstore.o server.o sds.o zmalloc.o lzf_c.o lzf_d.o pqsort.o zipmap.o sha1.o ziplist.o release.o memory_prefetch.o io_threads.o networking.o util.o object.o db.o replication.o rdb.o t_string.o t_list.o t_set.o t_zset.o t_hash.o config.o aof.o pubsub.o multi.o debug.o sort.o intset.o syncio.o cluster.o cluster_legacy.o cluster_slot_stats.o crc16.o endianconv.o commandlog.o eval.o bio.o rio.o rand.o memtest.o syscheck.o crcspeed.o crccombine.o crc64.o bitops.o sentinel.o notify.o setproctitle.o blocked.o hyperloglog.o latency.o sparkline.o valkey-check-rdb.o valkey-check-aof.o geo.o lazyfree.o module.o evict.o expire.o geohash.o geohash_helper.o childinfo.o allocator_defrag.o defrag.o siphash.o rax.o t_stream.o listpack.o localtime.o lolwut.o lolwut5.o lolwut6.o acl.o tracking.o socket.o tls.o sha256.o timeout.o setcpuaffinity.o monotonic.o mt19937-64.o resp_parser.o call_reply.o script.o functions.o commands.o strl.o connection.o unix.o logreqres.o rdma.o scripting_engine.o entry.o vset.o lua/script_lua.o lua/function_lua.o lua/engine_lua.o lua/debug_lua.o
|
||||||
ENGINE_SERVER_OBJ+=$(ENGINE_TRACE_OBJ)
|
ENGINE_SERVER_OBJ+=$(ENGINE_TRACE_OBJ)
|
||||||
ENGINE_CLI_NAME=$(ENGINE_NAME)-cli$(PROG_SUFFIX)
|
ENGINE_CLI_NAME=$(ENGINE_NAME)-cli$(PROG_SUFFIX)
|
||||||
ENGINE_CLI_OBJ=anet.o adlist.o dict.o valkey-cli.o zmalloc.o release.o ae.o serverassert.o crcspeed.o crccombine.o crc64.o siphash.o crc16.o monotonic.o cli_common.o mt19937-64.o strl.o cli_commands.o sds.o util.o sha256.o
|
ENGINE_CLI_OBJ=anet.o adlist.o dict.o valkey-cli.o zmalloc.o release.o ae.o serverassert.o crcspeed.o crccombine.o crc64.o siphash.o crc16.o monotonic.o cli_common.o mt19937-64.o strl.o cli_commands.o sds.o util.o sha256.o
|
||||||
|
|
|
||||||
|
|
@ -1802,7 +1802,7 @@ size_t hashtableScanDefrag(hashtable *ht, size_t cursor, hashtableScanFunction f
|
||||||
size_t used_before = ht->used[0];
|
size_t used_before = ht->used[0];
|
||||||
bucket *b = &ht->tables[0][idx];
|
bucket *b = &ht->tables[0][idx];
|
||||||
do {
|
do {
|
||||||
if (b->presence != 0) {
|
if (fn && b->presence != 0) {
|
||||||
int pos;
|
int pos;
|
||||||
for (pos = 0; pos < ENTRIES_PER_BUCKET; pos++) {
|
for (pos = 0; pos < ENTRIES_PER_BUCKET; pos++) {
|
||||||
if (isPositionFilled(b, pos) && validateElementIfNeeded(ht, b->entries[pos])) {
|
if (isPositionFilled(b, pos) && validateElementIfNeeded(ht, b->entries[pos])) {
|
||||||
|
|
@ -1845,7 +1845,7 @@ size_t hashtableScanDefrag(hashtable *ht, size_t cursor, hashtableScanFunction f
|
||||||
size_t used_before = ht->used[table_small];
|
size_t used_before = ht->used[table_small];
|
||||||
bucket *b = &ht->tables[table_small][idx];
|
bucket *b = &ht->tables[table_small][idx];
|
||||||
do {
|
do {
|
||||||
if (b->presence) {
|
if (fn && b->presence) {
|
||||||
for (int pos = 0; pos < ENTRIES_PER_BUCKET; pos++) {
|
for (int pos = 0; pos < ENTRIES_PER_BUCKET; pos++) {
|
||||||
if (isPositionFilled(b, pos) && validateElementIfNeeded(ht, b->entries[pos])) {
|
if (isPositionFilled(b, pos) && validateElementIfNeeded(ht, b->entries[pos])) {
|
||||||
void *emit = emit_ref ? &b->entries[pos] : b->entries[pos];
|
void *emit = emit_ref ? &b->entries[pos] : b->entries[pos];
|
||||||
|
|
@ -1875,7 +1875,7 @@ size_t hashtableScanDefrag(hashtable *ht, size_t cursor, hashtableScanFunction f
|
||||||
size_t used_before = ht->used[table_large];
|
size_t used_before = ht->used[table_large];
|
||||||
bucket *b = &ht->tables[table_large][idx];
|
bucket *b = &ht->tables[table_large][idx];
|
||||||
do {
|
do {
|
||||||
if (b->presence) {
|
if (fn && b->presence) {
|
||||||
for (int pos = 0; pos < ENTRIES_PER_BUCKET; pos++) {
|
for (int pos = 0; pos < ENTRIES_PER_BUCKET; pos++) {
|
||||||
if (isPositionFilled(b, pos) && validateElementIfNeeded(ht, b->entries[pos])) {
|
if (isPositionFilled(b, pos) && validateElementIfNeeded(ht, b->entries[pos])) {
|
||||||
void *emit = emit_ref ? &b->entries[pos] : b->entries[pos];
|
void *emit = emit_ref ? &b->entries[pos] : b->entries[pos];
|
||||||
|
|
|
||||||
|
|
@ -28,10 +28,12 @@
|
||||||
* POSSIBILITY OF SUCH DAMAGE.
|
* POSSIBILITY OF SUCH DAMAGE.
|
||||||
*/
|
*/
|
||||||
|
|
||||||
|
#include "hashtable.h"
|
||||||
#include "server.h"
|
#include "server.h"
|
||||||
#include "serverassert.h"
|
#include "serverassert.h"
|
||||||
#include "functions.h"
|
#include "functions.h"
|
||||||
#include "intset.h" /* Compact integer set structure */
|
#include "intset.h" /* Compact integer set structure */
|
||||||
|
#include "vset.h"
|
||||||
#include "zmalloc.h"
|
#include "zmalloc.h"
|
||||||
#include "sds.h"
|
#include "sds.h"
|
||||||
#include "module.h"
|
#include "module.h"
|
||||||
|
|
@ -1201,6 +1203,7 @@ size_t objectComputeSize(robj *key, robj *o, size_t sample_size, int dbid) {
|
||||||
} else if (o->encoding == OBJ_ENCODING_HASHTABLE) {
|
} else if (o->encoding == OBJ_ENCODING_HASHTABLE) {
|
||||||
hashtable *ht = o->ptr;
|
hashtable *ht = o->ptr;
|
||||||
hashtableIterator iter;
|
hashtableIterator iter;
|
||||||
|
vset *volatile_fields = hashtableMetadata(ht);
|
||||||
hashtableInitIterator(&iter, ht, 0);
|
hashtableInitIterator(&iter, ht, 0);
|
||||||
void *next;
|
void *next;
|
||||||
|
|
||||||
|
|
@ -1211,6 +1214,7 @@ size_t objectComputeSize(robj *key, robj *o, size_t sample_size, int dbid) {
|
||||||
}
|
}
|
||||||
hashtableResetIterator(&iter);
|
hashtableResetIterator(&iter);
|
||||||
if (samples) asize += (double)elesize / samples * hashtableSize(ht);
|
if (samples) asize += (double)elesize / samples * hashtableSize(ht);
|
||||||
|
if (vsetIsValid(volatile_fields)) asize += vsetMemUsage(volatile_fields);
|
||||||
} else {
|
} else {
|
||||||
serverPanic("Unknown hash encoding");
|
serverPanic("Unknown hash encoding");
|
||||||
}
|
}
|
||||||
|
|
|
||||||
|
|
@ -80,7 +80,7 @@
|
||||||
#include "rax.h" /* Radix tree */
|
#include "rax.h" /* Radix tree */
|
||||||
#include "connection.h" /* Connection abstraction */
|
#include "connection.h" /* Connection abstraction */
|
||||||
#include "memory_prefetch.h"
|
#include "memory_prefetch.h"
|
||||||
#include "volatile_set.h"
|
#include "vset.h"
|
||||||
#include "trace/trace.h"
|
#include "trace/trace.h"
|
||||||
#include "entry.h"
|
#include "entry.h"
|
||||||
|
|
||||||
|
|
@ -2627,7 +2627,7 @@ typedef struct {
|
||||||
unsigned char *fptr, *vptr;
|
unsigned char *fptr, *vptr;
|
||||||
|
|
||||||
hashtableIterator iter;
|
hashtableIterator iter;
|
||||||
volatileSetIterator viter;
|
vsetIterator viter;
|
||||||
void *next;
|
void *next;
|
||||||
|
|
||||||
} hashTypeIterator;
|
} hashTypeIterator;
|
||||||
|
|
|
||||||
80
src/t_hash.c
80
src/t_hash.c
|
|
@ -35,7 +35,7 @@
|
||||||
#include "hashtable.h"
|
#include "hashtable.h"
|
||||||
#include "rax.h"
|
#include "rax.h"
|
||||||
#include "sds.h"
|
#include "sds.h"
|
||||||
#include "volatile_set.h"
|
#include "vset.h"
|
||||||
#include "server.h"
|
#include "server.h"
|
||||||
#include "zmalloc.h"
|
#include "zmalloc.h"
|
||||||
#include <math.h>
|
#include <math.h>
|
||||||
|
|
@ -52,27 +52,23 @@ typedef enum {
|
||||||
EXPIRATION_MODIFICATION_EXPIRE_ASAP = 2, /* if apply of the expiration modification was set to a time in the past (i.e field is immediately expired) */
|
EXPIRATION_MODIFICATION_EXPIRE_ASAP = 2, /* if apply of the expiration modification was set to a time in the past (i.e field is immediately expired) */
|
||||||
} expiryModificationResult;
|
} expiryModificationResult;
|
||||||
|
|
||||||
volatileEntryType hashVolatileEntryType = {
|
|
||||||
.entryGetKey = (sds(*)(const void *entry))entryGetField,
|
|
||||||
.getExpiry = (long long (*)(const void *entry))entryGetExpiry,
|
|
||||||
};
|
|
||||||
|
|
||||||
/*-----------------------------------------------------------------------------
|
/*-----------------------------------------------------------------------------
|
||||||
* Hash type Expiry API
|
* Hash type Expiry API
|
||||||
*----------------------------------------------------------------------------*/
|
*----------------------------------------------------------------------------*/
|
||||||
|
|
||||||
static volatile_set *hashTypeGetVolatileSet(robj *o) {
|
static vset *hashTypeGetVolatileSet(robj *o) {
|
||||||
serverAssert(o->encoding == OBJ_ENCODING_HASHTABLE);
|
serverAssert(o->encoding == OBJ_ENCODING_HASHTABLE);
|
||||||
return *(volatile_set **)hashtableMetadata(o->ptr);
|
vset *set = (vset *)hashtableMetadata(o->ptr);
|
||||||
}
|
return vsetIsValid(set) ? set : NULL;
|
||||||
|
|
||||||
void hashTypeFreeVolatileSet(robj *o) {
|
|
||||||
volatile_set *set = hashTypeGetVolatileSet(o);
|
|
||||||
if (set) freeVolatileSet(set);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
bool hashTypeHasVolatileElements(robj *o) {
|
bool hashTypeHasVolatileElements(robj *o) {
|
||||||
return ((o->encoding == OBJ_ENCODING_HASHTABLE) && (hashTypeGetVolatileSet(o) != NULL));
|
if (o->encoding == OBJ_ENCODING_HASHTABLE) {
|
||||||
|
vset *set = hashTypeGetVolatileSet(o);
|
||||||
|
if (set && !vsetIsEmpty(set))
|
||||||
|
return true;
|
||||||
|
}
|
||||||
|
return false;
|
||||||
}
|
}
|
||||||
|
|
||||||
/* make any access to the hash object elements ignore the specific elements expiration.
|
/* make any access to the hash object elements ignore the specific elements expiration.
|
||||||
|
|
@ -80,44 +76,43 @@ bool hashTypeHasVolatileElements(robj *o) {
|
||||||
static inline void hashTypeIgnoreTTL(robj *o, bool ignore) {
|
static inline void hashTypeIgnoreTTL(robj *o, bool ignore) {
|
||||||
if (o->encoding == OBJ_ENCODING_HASHTABLE) {
|
if (o->encoding == OBJ_ENCODING_HASHTABLE) {
|
||||||
/* prevent placing access function if not needed */
|
/* prevent placing access function if not needed */
|
||||||
if (!ignore && !hashTypeHasVolatileElements(o)) {
|
if (!ignore && hashTypeGetVolatileSet(o) == NULL) {
|
||||||
ignore = true;
|
ignore = true;
|
||||||
}
|
}
|
||||||
hashtableSetType(o->ptr, ignore ? &hashHashtableType : &hashWithVolatileItemsHashtableType);
|
hashtableSetType(o->ptr, ignore ? &hashHashtableType : &hashWithVolatileItemsHashtableType);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
static volatile_set *hashTypeGetOrcreateVolatileSet(robj *o) {
|
static vset *hashTypeGetOrcreateVolatileSet(robj *o) {
|
||||||
serverAssert(o->encoding == OBJ_ENCODING_HASHTABLE);
|
serverAssert(o->encoding == OBJ_ENCODING_HASHTABLE);
|
||||||
volatile_set **volatile_set_ref = hashtableMetadata(o->ptr);
|
vset *set = (vset *)hashtableMetadata(o->ptr);
|
||||||
if (*volatile_set_ref == NULL) {
|
if (!vsetIsValid(set)) {
|
||||||
*volatile_set_ref = createVolatileSet(&hashVolatileEntryType);
|
vsetInit(set);
|
||||||
/* serves mainly for optimization. Use type which supports access function only when needed. */
|
/* serves mainly for optimization. Use type which supports access function only when needed. */
|
||||||
hashTypeIgnoreTTL(o, false);
|
hashTypeIgnoreTTL(o, false);
|
||||||
}
|
}
|
||||||
return *volatile_set_ref;
|
return set;
|
||||||
}
|
}
|
||||||
|
|
||||||
static void hashTypeDeleteVolatileSet(robj *o) {
|
void hashTypeFreeVolatileSet(robj *o) {
|
||||||
volatile_set **volatile_set_ref = hashtableMetadata(o->ptr);
|
vset *set = (vset *)hashtableMetadata(o->ptr);
|
||||||
freeVolatileSet(*volatile_set_ref);
|
if (vsetIsValid(set)) vsetRelease(set);
|
||||||
*volatile_set_ref = NULL;
|
|
||||||
/* serves mainly for optimization. by changing the hashtable type we can avoid extra function call in hashtable access */
|
/* serves mainly for optimization. by changing the hashtable type we can avoid extra function call in hashtable access */
|
||||||
hashTypeIgnoreTTL(o, true);
|
hashTypeIgnoreTTL(o, true);
|
||||||
}
|
}
|
||||||
|
|
||||||
void hashTypeTrackEntry(robj *o, void *entry) {
|
void hashTypeTrackEntry(robj *o, void *entry) {
|
||||||
volatile_set *set = hashTypeGetOrcreateVolatileSet(o);
|
vset *set = hashTypeGetOrcreateVolatileSet(o);
|
||||||
serverAssert(volatileSetAddEntry(set, entry, entryGetExpiry(entry)));
|
serverAssert(vsetAddEntry(set, entryGetExpiry, entry));
|
||||||
}
|
}
|
||||||
|
|
||||||
void hashTypeUntrackEntry(robj *o, void *entry) {
|
void hashTypeUntrackEntry(robj *o, void *entry) {
|
||||||
if (!entryHasExpiry(entry)) return;
|
if (!entryHasExpiry(entry)) return;
|
||||||
volatile_set *set = hashTypeGetVolatileSet(o);
|
vset *set = hashTypeGetVolatileSet(o);
|
||||||
debugServerAssert(set);
|
debugServerAssert(set);
|
||||||
serverAssert(volatileSetRemoveEntry(set, entry, entryGetExpiry(entry)));
|
serverAssert(vsetRemoveEntry(set, entryGetExpiry, entry));
|
||||||
if (volatileSetNumEntries(set) == 0) {
|
if (vsetIsEmpty(set)) {
|
||||||
hashTypeDeleteVolatileSet(o);
|
hashTypeFreeVolatileSet(o);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
@ -128,20 +123,13 @@ void hashTypeTrackUpdateEntry(robj *o, void *old_entry, void *new_entry, long lo
|
||||||
if (!old_tracked && !new_tracked)
|
if (!old_tracked && !new_tracked)
|
||||||
return;
|
return;
|
||||||
|
|
||||||
volatile_set *set = hashTypeGetOrcreateVolatileSet(o);
|
vset *set = hashTypeGetOrcreateVolatileSet(o);
|
||||||
debugServerAssert(set);
|
debugServerAssert(!old_tracked || !vsetIsEmpty(set));
|
||||||
|
|
||||||
if (old_tracked && !new_tracked) {
|
serverAssert(vsetUpdateEntry(set, entryGetExpiry, old_entry, new_entry, old_expiry, new_expiry) == 1);
|
||||||
serverAssert(volatileSetRemoveEntry(set, old_entry, old_expiry));
|
|
||||||
} else if (new_tracked && !old_tracked) {
|
if (vsetIsEmpty(set)) {
|
||||||
serverAssert(volatileSetAddEntry(set, new_entry, new_expiry));
|
hashTypeFreeVolatileSet(o);
|
||||||
} else {
|
|
||||||
volatile_set *set = hashTypeGetVolatileSet(o);
|
|
||||||
debugServerAssert(set);
|
|
||||||
serverAssert(volatileSetUpdateEntry(set, old_entry, new_entry, old_expiry, new_expiry) == 1);
|
|
||||||
}
|
|
||||||
if (volatileSetNumEntries(set) == 0) {
|
|
||||||
hashTypeDeleteVolatileSet(o);
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
@ -599,7 +587,7 @@ void hashTypeInitVolatileIterator(robj *subject, hashTypeIterator *hi) {
|
||||||
if (hi->encoding == OBJ_ENCODING_LISTPACK) {
|
if (hi->encoding == OBJ_ENCODING_LISTPACK) {
|
||||||
return;
|
return;
|
||||||
} else if (hi->encoding == OBJ_ENCODING_HASHTABLE) {
|
} else if (hi->encoding == OBJ_ENCODING_HASHTABLE) {
|
||||||
volatileSetStart(hashTypeGetVolatileSet(subject), &hi->viter);
|
vsetInitIterator(hashTypeGetVolatileSet(subject), &hi->viter);
|
||||||
} else {
|
} else {
|
||||||
serverPanic("Unknown hash encoding");
|
serverPanic("Unknown hash encoding");
|
||||||
}
|
}
|
||||||
|
|
@ -610,7 +598,7 @@ void hashTypeResetIterator(hashTypeIterator *hi) {
|
||||||
if (!hi->volatile_items_iter)
|
if (!hi->volatile_items_iter)
|
||||||
hashtableResetIterator(&hi->iter);
|
hashtableResetIterator(&hi->iter);
|
||||||
else
|
else
|
||||||
volatileSetReset(&hi->viter);
|
vsetResetIterator(&hi->viter);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
@ -650,7 +638,7 @@ int hashTypeNext(hashTypeIterator *hi) {
|
||||||
if (!hi->volatile_items_iter) {
|
if (!hi->volatile_items_iter) {
|
||||||
if (!hashtableNext(&hi->iter, &hi->next)) return C_ERR;
|
if (!hashtableNext(&hi->iter, &hi->next)) return C_ERR;
|
||||||
} else {
|
} else {
|
||||||
if (!volatileSetNext(&hi->viter, &hi->next)) return C_ERR;
|
if (!vsetNext(&hi->viter, &hi->next)) return C_ERR;
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
serverPanic("Unknown hash encoding");
|
serverPanic("Unknown hash encoding");
|
||||||
|
|
|
||||||
|
|
@ -201,6 +201,14 @@ int test_reclaimFilePageCache(int argc, char **argv, int flags);
|
||||||
int test_writePointerWithPadding(int argc, char **argv, int flags);
|
int test_writePointerWithPadding(int argc, char **argv, int flags);
|
||||||
int test_valkey_strtod(int argc, char **argv, int flags);
|
int test_valkey_strtod(int argc, char **argv, int flags);
|
||||||
int test_vector(int argc, char **argv, int flags);
|
int test_vector(int argc, char **argv, int flags);
|
||||||
|
int test_vset_add_and_iterate(int argc, char **argv, int flags);
|
||||||
|
int test_vset_large_batch_same_expiry(int argc, char **argv, int flags);
|
||||||
|
int test_vset_large_batch_update_entry_same_expiry(int argc, char **argv, int flags);
|
||||||
|
int test_vset_large_batch_update_entry_multiple_expiries(int argc, char **argv, int flags);
|
||||||
|
int test_vset_iterate_multiple_expiries(int argc, char **argv, int flags);
|
||||||
|
int test_vset_add_and_remove_all(int argc, char **argv, int flags);
|
||||||
|
int test_vset_defrag(int argc, char **argv, int flags);
|
||||||
|
int test_vset_fuzzer(int argc, char **argv, int flags);
|
||||||
int test_ziplistCreateIntList(int argc, char **argv, int flags);
|
int test_ziplistCreateIntList(int argc, char **argv, int flags);
|
||||||
int test_ziplistPop(int argc, char **argv, int flags);
|
int test_ziplistPop(int argc, char **argv, int flags);
|
||||||
int test_ziplistGetElementAtIndex3(int argc, char **argv, int flags);
|
int test_ziplistGetElementAtIndex3(int argc, char **argv, int flags);
|
||||||
|
|
@ -261,6 +269,7 @@ unitTest __test_sha1_c[] = {{"test_sha1", test_sha1}, {NULL, NULL}};
|
||||||
unitTest __test_util_c[] = {{"test_string2ll", test_string2ll}, {"test_string2l", test_string2l}, {"test_ll2string", test_ll2string}, {"test_ld2string", test_ld2string}, {"test_fixedpoint_d2string", test_fixedpoint_d2string}, {"test_version2num", test_version2num}, {"test_reclaimFilePageCache", test_reclaimFilePageCache}, {"test_writePointerWithPadding", test_writePointerWithPadding}, {NULL, NULL}};
|
unitTest __test_util_c[] = {{"test_string2ll", test_string2ll}, {"test_string2l", test_string2l}, {"test_ll2string", test_ll2string}, {"test_ld2string", test_ld2string}, {"test_fixedpoint_d2string", test_fixedpoint_d2string}, {"test_version2num", test_version2num}, {"test_reclaimFilePageCache", test_reclaimFilePageCache}, {"test_writePointerWithPadding", test_writePointerWithPadding}, {NULL, NULL}};
|
||||||
unitTest __test_valkey_strtod_c[] = {{"test_valkey_strtod", test_valkey_strtod}, {NULL, NULL}};
|
unitTest __test_valkey_strtod_c[] = {{"test_valkey_strtod", test_valkey_strtod}, {NULL, NULL}};
|
||||||
unitTest __test_vector_c[] = {{"test_vector", test_vector}, {NULL, NULL}};
|
unitTest __test_vector_c[] = {{"test_vector", test_vector}, {NULL, NULL}};
|
||||||
|
unitTest __test_vset_c[] = {{"test_vset_add_and_iterate", test_vset_add_and_iterate}, {"test_vset_large_batch_same_expiry", test_vset_large_batch_same_expiry}, {"test_vset_large_batch_update_entry_same_expiry", test_vset_large_batch_update_entry_same_expiry}, {"test_vset_large_batch_update_entry_multiple_expiries", test_vset_large_batch_update_entry_multiple_expiries}, {"test_vset_iterate_multiple_expiries", test_vset_iterate_multiple_expiries}, {"test_vset_add_and_remove_all", test_vset_add_and_remove_all}, {"test_vset_defrag", test_vset_defrag}, {"test_vset_fuzzer", test_vset_fuzzer}, {NULL, NULL}};
|
||||||
unitTest __test_ziplist_c[] = {{"test_ziplistCreateIntList", test_ziplistCreateIntList}, {"test_ziplistPop", test_ziplistPop}, {"test_ziplistGetElementAtIndex3", test_ziplistGetElementAtIndex3}, {"test_ziplistGetElementOutOfRange", test_ziplistGetElementOutOfRange}, {"test_ziplistGetLastElement", test_ziplistGetLastElement}, {"test_ziplistGetFirstElement", test_ziplistGetFirstElement}, {"test_ziplistGetElementOutOfRangeReverse", test_ziplistGetElementOutOfRangeReverse}, {"test_ziplistIterateThroughFullList", test_ziplistIterateThroughFullList}, {"test_ziplistIterateThroughListFrom1ToEnd", test_ziplistIterateThroughListFrom1ToEnd}, {"test_ziplistIterateThroughListFrom2ToEnd", test_ziplistIterateThroughListFrom2ToEnd}, {"test_ziplistIterateThroughStartOutOfRange", test_ziplistIterateThroughStartOutOfRange}, {"test_ziplistIterateBackToFront", test_ziplistIterateBackToFront}, {"test_ziplistIterateBackToFrontDeletingAllItems", test_ziplistIterateBackToFrontDeletingAllItems}, {"test_ziplistDeleteInclusiveRange0To0", test_ziplistDeleteInclusiveRange0To0}, {"test_ziplistDeleteInclusiveRange0To1", test_ziplistDeleteInclusiveRange0To1}, {"test_ziplistDeleteInclusiveRange1To2", test_ziplistDeleteInclusiveRange1To2}, {"test_ziplistDeleteWithStartIndexOutOfRange", test_ziplistDeleteWithStartIndexOutOfRange}, {"test_ziplistDeleteWithNumOverflow", test_ziplistDeleteWithNumOverflow}, {"test_ziplistDeleteFooWhileIterating", test_ziplistDeleteFooWhileIterating}, {"test_ziplistReplaceWithSameSize", test_ziplistReplaceWithSameSize}, {"test_ziplistReplaceWithDifferentSize", test_ziplistReplaceWithDifferentSize}, {"test_ziplistRegressionTestForOver255ByteStrings", test_ziplistRegressionTestForOver255ByteStrings}, {"test_ziplistRegressionTestDeleteNextToLastEntries", test_ziplistRegressionTestDeleteNextToLastEntries}, {"test_ziplistCreateLongListAndCheckIndices", test_ziplistCreateLongListAndCheckIndices}, {"test_ziplistCompareStringWithZiplistEntries", test_ziplistCompareStringWithZiplistEntries}, {"test_ziplistMergeTest", test_ziplistMergeTest}, {"test_ziplistStressWithRandomPayloadsOfDifferentEncoding", test_ziplistStressWithRandomPayloadsOfDifferentEncoding}, {"test_ziplistCascadeUpdateEdgeCases", test_ziplistCascadeUpdateEdgeCases}, {"test_ziplistInsertEdgeCase", test_ziplistInsertEdgeCase}, {"test_ziplistStressWithVariableSize", test_ziplistStressWithVariableSize}, {"test_BenchmarkziplistFind", test_BenchmarkziplistFind}, {"test_BenchmarkziplistIndex", test_BenchmarkziplistIndex}, {"test_BenchmarkziplistValidateIntegrity", test_BenchmarkziplistValidateIntegrity}, {"test_BenchmarkziplistCompareWithString", test_BenchmarkziplistCompareWithString}, {"test_BenchmarkziplistCompareWithNumber", test_BenchmarkziplistCompareWithNumber}, {"test_ziplistStress__ziplistCascadeUpdate", test_ziplistStress__ziplistCascadeUpdate}, {NULL, NULL}};
|
unitTest __test_ziplist_c[] = {{"test_ziplistCreateIntList", test_ziplistCreateIntList}, {"test_ziplistPop", test_ziplistPop}, {"test_ziplistGetElementAtIndex3", test_ziplistGetElementAtIndex3}, {"test_ziplistGetElementOutOfRange", test_ziplistGetElementOutOfRange}, {"test_ziplistGetLastElement", test_ziplistGetLastElement}, {"test_ziplistGetFirstElement", test_ziplistGetFirstElement}, {"test_ziplistGetElementOutOfRangeReverse", test_ziplistGetElementOutOfRangeReverse}, {"test_ziplistIterateThroughFullList", test_ziplistIterateThroughFullList}, {"test_ziplistIterateThroughListFrom1ToEnd", test_ziplistIterateThroughListFrom1ToEnd}, {"test_ziplistIterateThroughListFrom2ToEnd", test_ziplistIterateThroughListFrom2ToEnd}, {"test_ziplistIterateThroughStartOutOfRange", test_ziplistIterateThroughStartOutOfRange}, {"test_ziplistIterateBackToFront", test_ziplistIterateBackToFront}, {"test_ziplistIterateBackToFrontDeletingAllItems", test_ziplistIterateBackToFrontDeletingAllItems}, {"test_ziplistDeleteInclusiveRange0To0", test_ziplistDeleteInclusiveRange0To0}, {"test_ziplistDeleteInclusiveRange0To1", test_ziplistDeleteInclusiveRange0To1}, {"test_ziplistDeleteInclusiveRange1To2", test_ziplistDeleteInclusiveRange1To2}, {"test_ziplistDeleteWithStartIndexOutOfRange", test_ziplistDeleteWithStartIndexOutOfRange}, {"test_ziplistDeleteWithNumOverflow", test_ziplistDeleteWithNumOverflow}, {"test_ziplistDeleteFooWhileIterating", test_ziplistDeleteFooWhileIterating}, {"test_ziplistReplaceWithSameSize", test_ziplistReplaceWithSameSize}, {"test_ziplistReplaceWithDifferentSize", test_ziplistReplaceWithDifferentSize}, {"test_ziplistRegressionTestForOver255ByteStrings", test_ziplistRegressionTestForOver255ByteStrings}, {"test_ziplistRegressionTestDeleteNextToLastEntries", test_ziplistRegressionTestDeleteNextToLastEntries}, {"test_ziplistCreateLongListAndCheckIndices", test_ziplistCreateLongListAndCheckIndices}, {"test_ziplistCompareStringWithZiplistEntries", test_ziplistCompareStringWithZiplistEntries}, {"test_ziplistMergeTest", test_ziplistMergeTest}, {"test_ziplistStressWithRandomPayloadsOfDifferentEncoding", test_ziplistStressWithRandomPayloadsOfDifferentEncoding}, {"test_ziplistCascadeUpdateEdgeCases", test_ziplistCascadeUpdateEdgeCases}, {"test_ziplistInsertEdgeCase", test_ziplistInsertEdgeCase}, {"test_ziplistStressWithVariableSize", test_ziplistStressWithVariableSize}, {"test_BenchmarkziplistFind", test_BenchmarkziplistFind}, {"test_BenchmarkziplistIndex", test_BenchmarkziplistIndex}, {"test_BenchmarkziplistValidateIntegrity", test_BenchmarkziplistValidateIntegrity}, {"test_BenchmarkziplistCompareWithString", test_BenchmarkziplistCompareWithString}, {"test_BenchmarkziplistCompareWithNumber", test_BenchmarkziplistCompareWithNumber}, {"test_ziplistStress__ziplistCascadeUpdate", test_ziplistStress__ziplistCascadeUpdate}, {NULL, NULL}};
|
||||||
unitTest __test_zipmap_c[] = {{"test_zipmapIterateWithLargeKey", test_zipmapIterateWithLargeKey}, {"test_zipmapIterateThroughElements", test_zipmapIterateThroughElements}, {NULL, NULL}};
|
unitTest __test_zipmap_c[] = {{"test_zipmapIterateWithLargeKey", test_zipmapIterateWithLargeKey}, {"test_zipmapIterateThroughElements", test_zipmapIterateThroughElements}, {NULL, NULL}};
|
||||||
unitTest __test_zmalloc_c[] = {{"test_zmallocAllocReallocCallocAndFree", test_zmallocAllocReallocCallocAndFree}, {"test_zmallocAllocZeroByteAndFree", test_zmallocAllocZeroByteAndFree}, {NULL, NULL}};
|
unitTest __test_zmalloc_c[] = {{"test_zmallocAllocReallocCallocAndFree", test_zmallocAllocReallocCallocAndFree}, {"test_zmallocAllocZeroByteAndFree", test_zmallocAllocZeroByteAndFree}, {NULL, NULL}};
|
||||||
|
|
@ -288,6 +297,7 @@ struct unitTestSuite {
|
||||||
{"test_util.c", __test_util_c},
|
{"test_util.c", __test_util_c},
|
||||||
{"test_valkey_strtod.c", __test_valkey_strtod_c},
|
{"test_valkey_strtod.c", __test_valkey_strtod_c},
|
||||||
{"test_vector.c", __test_vector_c},
|
{"test_vector.c", __test_vector_c},
|
||||||
|
{"test_vset.c", __test_vset_c},
|
||||||
{"test_ziplist.c", __test_ziplist_c},
|
{"test_ziplist.c", __test_ziplist_c},
|
||||||
{"test_zipmap.c", __test_zipmap_c},
|
{"test_zipmap.c", __test_zipmap_c},
|
||||||
{"test_zmalloc.c", __test_zmalloc_c},
|
{"test_zmalloc.c", __test_zmalloc_c},
|
||||||
|
|
|
||||||
|
|
@ -0,0 +1,518 @@
|
||||||
|
#include "../vset.h"
|
||||||
|
#include "../entry.h"
|
||||||
|
#include "test_help.h"
|
||||||
|
#include "../zmalloc.h"
|
||||||
|
|
||||||
|
#include <stdio.h>
|
||||||
|
#include <limits.h>
|
||||||
|
#include <string.h>
|
||||||
|
#include <sys/types.h>
|
||||||
|
#include <sys/wait.h>
|
||||||
|
#include <unistd.h>
|
||||||
|
#include <time.h>
|
||||||
|
|
||||||
|
typedef entry mock_entry;
|
||||||
|
|
||||||
|
static mock_entry *mockCreateEntry(const char *keystr, long long expiry) {
|
||||||
|
sds field = sdsnew(keystr);
|
||||||
|
mock_entry *e = entryCreate(field, sdsnew("value"), expiry);
|
||||||
|
sdsfree(field);
|
||||||
|
return e;
|
||||||
|
}
|
||||||
|
|
||||||
|
static void mockFreeEntry(void *entry) {
|
||||||
|
// printf("mockFreeEntry: %p\n", entry);
|
||||||
|
entryFree(entry);
|
||||||
|
}
|
||||||
|
|
||||||
|
static mock_entry *mockEntryUpdate(mock_entry *entry, long long expiry) {
|
||||||
|
mock_entry *new_entry = entryCreate(entryGetField(entry), sdsdup(entryGetValue(entry)), expiry);
|
||||||
|
entryFree(entry);
|
||||||
|
return new_entry;
|
||||||
|
}
|
||||||
|
|
||||||
|
static long long mockGetExpiry(const void *entry) {
|
||||||
|
return entryGetExpiry(entry);
|
||||||
|
}
|
||||||
|
|
||||||
|
int test_vset_add_and_iterate(int argc, char **argv, int flags) {
|
||||||
|
(void)argc;
|
||||||
|
(void)argv;
|
||||||
|
(void)flags;
|
||||||
|
|
||||||
|
vset set;
|
||||||
|
vsetInit(&set);
|
||||||
|
|
||||||
|
mock_entry *e1 = mockCreateEntry("item1", 123);
|
||||||
|
mock_entry *e2 = mockCreateEntry("item2", 456);
|
||||||
|
|
||||||
|
TEST_ASSERT(vsetAddEntry(&set, mockGetExpiry, e1));
|
||||||
|
TEST_ASSERT(vsetAddEntry(&set, mockGetExpiry, e2));
|
||||||
|
|
||||||
|
TEST_ASSERT(!vsetIsEmpty(&set));
|
||||||
|
|
||||||
|
vsetIterator it;
|
||||||
|
vsetInitIterator(&set, &it);
|
||||||
|
|
||||||
|
void *entry;
|
||||||
|
int count = 0;
|
||||||
|
while (vsetNext(&it, &entry)) {
|
||||||
|
TEST_EXPECT(entry != NULL);
|
||||||
|
count++;
|
||||||
|
}
|
||||||
|
|
||||||
|
TEST_ASSERT(count == 2);
|
||||||
|
|
||||||
|
vsetResetIterator(&it);
|
||||||
|
vsetRelease(&set);
|
||||||
|
mockFreeEntry(e1);
|
||||||
|
mockFreeEntry(e2);
|
||||||
|
|
||||||
|
TEST_PRINT_INFO("Test passed with %d expects", failed_expects);
|
||||||
|
return 0;
|
||||||
|
}
|
||||||
|
|
||||||
|
int test_vset_large_batch_same_expiry(int argc, char **argv, int flags) {
|
||||||
|
(void)argc;
|
||||||
|
(void)argv;
|
||||||
|
(void)flags;
|
||||||
|
|
||||||
|
vset set;
|
||||||
|
vsetInit(&set);
|
||||||
|
|
||||||
|
const long long expiry_time = 1000LL;
|
||||||
|
const int total_entries = 200;
|
||||||
|
|
||||||
|
// Allocate and add 200 entries with same expiry
|
||||||
|
mock_entry **entries = zmalloc(sizeof(mock_entry *) * total_entries);
|
||||||
|
TEST_ASSERT(entries != NULL);
|
||||||
|
|
||||||
|
for (int i = 0; i < total_entries; i++) {
|
||||||
|
char key_buf[32];
|
||||||
|
snprintf(key_buf, sizeof(key_buf), "entry_%d", i);
|
||||||
|
entries[i] = mockCreateEntry(key_buf, expiry_time);
|
||||||
|
TEST_ASSERT(vsetAddEntry(&set, mockGetExpiry, entries[i]));
|
||||||
|
}
|
||||||
|
|
||||||
|
// Verify set is not empty
|
||||||
|
TEST_ASSERT(!vsetIsEmpty(&set));
|
||||||
|
|
||||||
|
// Iterate all entries and count them
|
||||||
|
vsetIterator it;
|
||||||
|
vsetInitIterator(&set, &it);
|
||||||
|
|
||||||
|
void *entry;
|
||||||
|
int count = 0;
|
||||||
|
while (vsetNext(&it, &entry)) {
|
||||||
|
TEST_EXPECT(entry != NULL);
|
||||||
|
count++;
|
||||||
|
}
|
||||||
|
TEST_ASSERT(count == total_entries);
|
||||||
|
|
||||||
|
// Cleanup
|
||||||
|
vsetResetIterator(&it);
|
||||||
|
vsetRelease(&set);
|
||||||
|
|
||||||
|
for (int i = 0; i < total_entries; i++) {
|
||||||
|
mockFreeEntry(entries[i]);
|
||||||
|
}
|
||||||
|
zfree(entries);
|
||||||
|
|
||||||
|
TEST_PRINT_INFO("Inserted and iterated %d entries with same expiry", total_entries);
|
||||||
|
return 0;
|
||||||
|
}
|
||||||
|
|
||||||
|
int test_vset_large_batch_update_entry_same_expiry(int argc, char **argv, int flags) {
|
||||||
|
(void)argc;
|
||||||
|
(void)argv;
|
||||||
|
(void)flags;
|
||||||
|
|
||||||
|
vset set;
|
||||||
|
vsetInit(&set);
|
||||||
|
|
||||||
|
const long long expiry_time = 1000LL;
|
||||||
|
const unsigned int total_entries = 1000;
|
||||||
|
|
||||||
|
mock_entry *entries[total_entries];
|
||||||
|
|
||||||
|
for (unsigned int i = 0; i < total_entries; i++) {
|
||||||
|
char key_buf[32];
|
||||||
|
snprintf(key_buf, sizeof(key_buf), "entry_%d", i);
|
||||||
|
entries[i] = mockCreateEntry(key_buf, expiry_time);
|
||||||
|
TEST_ASSERT(vsetAddEntry(&set, mockGetExpiry, entries[i]));
|
||||||
|
}
|
||||||
|
// Verify set is not empty
|
||||||
|
TEST_ASSERT(!vsetIsEmpty(&set));
|
||||||
|
|
||||||
|
// Now iterate and replace all entries
|
||||||
|
for (unsigned int i = 0; i < total_entries; i++) {
|
||||||
|
mock_entry *old_entry = entries[i];
|
||||||
|
entries[i] = mockEntryUpdate(entries[i], expiry_time);
|
||||||
|
TEST_ASSERT(vsetUpdateEntry(&set, mockGetExpiry, old_entry, entries[i], expiry_time, expiry_time));
|
||||||
|
}
|
||||||
|
|
||||||
|
for (unsigned int i = 0; i < total_entries; i++) {
|
||||||
|
TEST_ASSERT(vsetRemoveEntry(&set, mockGetExpiry, entries[i]));
|
||||||
|
}
|
||||||
|
|
||||||
|
// Verify set is empty
|
||||||
|
TEST_ASSERT(vsetIsEmpty(&set));
|
||||||
|
|
||||||
|
// Cleanup
|
||||||
|
for (unsigned int i = 0; i < total_entries; i++) {
|
||||||
|
mockFreeEntry(entries[i]);
|
||||||
|
}
|
||||||
|
|
||||||
|
TEST_PRINT_INFO("Inserted, updated and deleted %d entries with same expiry", total_entries);
|
||||||
|
return 0;
|
||||||
|
}
|
||||||
|
|
||||||
|
int test_vset_large_batch_update_entry_multiple_expiries(int argc, char **argv, int flags) {
|
||||||
|
(void)argc;
|
||||||
|
(void)argv;
|
||||||
|
(void)flags;
|
||||||
|
const unsigned int total_entries = 1000;
|
||||||
|
|
||||||
|
vset set;
|
||||||
|
vsetInit(&set);
|
||||||
|
|
||||||
|
// Prepare entries with mixed expiry times, some duplicates
|
||||||
|
mock_entry *entries[total_entries];
|
||||||
|
|
||||||
|
// Initialize keys
|
||||||
|
for (unsigned int i = 0; i < total_entries; i++) {
|
||||||
|
char key_buf[32];
|
||||||
|
snprintf(key_buf, sizeof(key_buf), "entry_%d", i);
|
||||||
|
long long expiry_time = rand() % 10000;
|
||||||
|
entries[i] = mockCreateEntry(key_buf, expiry_time);
|
||||||
|
TEST_ASSERT(vsetAddEntry(&set, mockGetExpiry, entries[i]));
|
||||||
|
}
|
||||||
|
// Verify set is not empty
|
||||||
|
TEST_ASSERT(!vsetIsEmpty(&set));
|
||||||
|
|
||||||
|
// Now iterate and replace all entries
|
||||||
|
for (unsigned int i = 0; i < total_entries; i++) {
|
||||||
|
mock_entry *old_entry = entries[i];
|
||||||
|
long long old_expiry = entryGetExpiry(entries[i]);
|
||||||
|
long long new_expiry = old_expiry + rand() % 100000;
|
||||||
|
entries[i] = mockEntryUpdate(entries[i], new_expiry);
|
||||||
|
TEST_ASSERT(vsetUpdateEntry(&set, mockGetExpiry, old_entry, entries[i], old_expiry, new_expiry));
|
||||||
|
}
|
||||||
|
|
||||||
|
for (unsigned int i = 0; i < total_entries; i++) {
|
||||||
|
TEST_ASSERT(vsetRemoveEntry(&set, mockGetExpiry, entries[i]));
|
||||||
|
}
|
||||||
|
|
||||||
|
// Verify set is empty
|
||||||
|
TEST_ASSERT(vsetIsEmpty(&set));
|
||||||
|
|
||||||
|
// Cleanup
|
||||||
|
for (unsigned int i = 0; i < total_entries; i++) {
|
||||||
|
mockFreeEntry(entries[i]);
|
||||||
|
}
|
||||||
|
|
||||||
|
TEST_PRINT_INFO("Inserted, updated and deleted %d entries with different expiry", total_entries);
|
||||||
|
return 0;
|
||||||
|
}
|
||||||
|
|
||||||
|
int test_vset_iterate_multiple_expiries(int argc, char **argv, int flags) {
|
||||||
|
(void)argc;
|
||||||
|
(void)argv;
|
||||||
|
(void)flags;
|
||||||
|
const unsigned int total_entries = 5;
|
||||||
|
|
||||||
|
vset set;
|
||||||
|
vsetInit(&set);
|
||||||
|
|
||||||
|
// Prepare entries with mixed expiry times, some duplicates
|
||||||
|
mock_entry *entries[total_entries];
|
||||||
|
|
||||||
|
// Initialize keys
|
||||||
|
for (unsigned int i = 0; i < total_entries; i++) {
|
||||||
|
char key_buf[32];
|
||||||
|
snprintf(key_buf, sizeof(key_buf), "entry_%d", i);
|
||||||
|
long long expiry_time = rand() % 10000;
|
||||||
|
entries[i] = mockCreateEntry(key_buf, expiry_time);
|
||||||
|
TEST_ASSERT(vsetAddEntry(&set, mockGetExpiry, entries[i]));
|
||||||
|
}
|
||||||
|
|
||||||
|
vsetIterator it;
|
||||||
|
vsetInitIterator(&set, &it);
|
||||||
|
|
||||||
|
int found[5] = {0};
|
||||||
|
int total = 0;
|
||||||
|
|
||||||
|
void *entry;
|
||||||
|
while (vsetNext(&it, &entry)) {
|
||||||
|
TEST_EXPECT(entry != NULL);
|
||||||
|
mock_entry *e = (mock_entry *)entry;
|
||||||
|
|
||||||
|
// Match the entries we inserted
|
||||||
|
for (int i = 0; i < 5; i++) {
|
||||||
|
if (strcmp(entryGetField(e), entryGetField(entries[i])) == 0) {
|
||||||
|
found[i] = 1;
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
total++;
|
||||||
|
}
|
||||||
|
|
||||||
|
TEST_ASSERT(total == 5);
|
||||||
|
|
||||||
|
for (int i = 0; i < 5; i++) {
|
||||||
|
TEST_EXPECT(found[i]);
|
||||||
|
}
|
||||||
|
|
||||||
|
vsetResetIterator(&it);
|
||||||
|
vsetRelease(&set);
|
||||||
|
for (int i = 0; i < 5; i++) mockFreeEntry(entries[i]);
|
||||||
|
|
||||||
|
TEST_PRINT_INFO("Iterated all %d mixed expiry entries successfully", total);
|
||||||
|
return 0;
|
||||||
|
}
|
||||||
|
|
||||||
|
int test_vset_add_and_remove_all(int argc, char **argv, int flags) {
|
||||||
|
UNUSED(argc);
|
||||||
|
UNUSED(argv);
|
||||||
|
UNUSED(flags);
|
||||||
|
|
||||||
|
vset set;
|
||||||
|
vsetInit(&set);
|
||||||
|
|
||||||
|
const int total_entries = 130;
|
||||||
|
mock_entry *entries[total_entries];
|
||||||
|
long long expiry = 5000;
|
||||||
|
|
||||||
|
for (int i = 0; i < total_entries; i++) {
|
||||||
|
char key[32];
|
||||||
|
snprintf(key, sizeof(key), "key_%d", i);
|
||||||
|
entries[i] = mockCreateEntry(key, expiry);
|
||||||
|
TEST_ASSERT(vsetAddEntry(&set, mockGetExpiry, entries[i]));
|
||||||
|
}
|
||||||
|
|
||||||
|
for (int i = 0; i < total_entries; i++) {
|
||||||
|
TEST_ASSERT(vsetRemoveEntry(&set, mockGetExpiry, entries[i]));
|
||||||
|
mockFreeEntry(entries[i]);
|
||||||
|
}
|
||||||
|
|
||||||
|
TEST_ASSERT(vsetIsEmpty(&set));
|
||||||
|
vsetRelease(&set);
|
||||||
|
|
||||||
|
TEST_PRINT_INFO("Add/remove %d entries, set size now 0", total_entries);
|
||||||
|
return 0;
|
||||||
|
}
|
||||||
|
|
||||||
|
/********************* Fuzzer tests ********************************/
|
||||||
|
|
||||||
|
#define NUM_ITERATIONS 100000
|
||||||
|
#define MAX_ENTRIES 10000
|
||||||
|
#define NUM_DEFRAG_STEPS 100
|
||||||
|
|
||||||
|
/* Global array to simulate a test database */
|
||||||
|
mock_entry *mock_entries[MAX_ENTRIES];
|
||||||
|
int mock_entry_count = 0;
|
||||||
|
|
||||||
|
/* --------- volatileEntryType Callbacks --------- */
|
||||||
|
sds mock_entry_get_key(const void *entry) {
|
||||||
|
return (sds)entry;
|
||||||
|
}
|
||||||
|
|
||||||
|
long long mock_entry_get_expiry(const void *entry) {
|
||||||
|
return mockGetExpiry(entry);
|
||||||
|
}
|
||||||
|
|
||||||
|
int mock_entry_expire(void *entry, void *ctx) {
|
||||||
|
mock_entry *e = (mock_entry *)entry;
|
||||||
|
long long now = *(long long *)ctx;
|
||||||
|
TEST_ASSERT(mock_entry_get_expiry(entry) <= now);
|
||||||
|
for (int i = 0; i < mock_entry_count; i++) {
|
||||||
|
if (mock_entries[i] == e) {
|
||||||
|
// printf("expire entry %p with expiry %llu\n", e, mockGetExpiry(e));
|
||||||
|
mockFreeEntry(e);
|
||||||
|
mock_entries[i] = mock_entries[--mock_entry_count];
|
||||||
|
return 1;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return 0;
|
||||||
|
}
|
||||||
|
|
||||||
|
/* --------- Helper Functions --------- */
|
||||||
|
mock_entry *mock_entry_create(const char *keystr, long long expiry) {
|
||||||
|
return mockCreateEntry(keystr, expiry);
|
||||||
|
}
|
||||||
|
|
||||||
|
int insert_mock_entry(vset *set) {
|
||||||
|
if (mock_entry_count >= MAX_ENTRIES) return 0;
|
||||||
|
char keybuf[32];
|
||||||
|
snprintf(keybuf, sizeof(keybuf), "key_%d", mock_entry_count);
|
||||||
|
|
||||||
|
long long expiry = rand() % 10000 + 100;
|
||||||
|
mock_entry *e = mock_entry_create(keybuf, expiry);
|
||||||
|
// printf("adding entry %p with expiry %llu\n", e, expiry);
|
||||||
|
TEST_ASSERT(vsetAddEntry(set, mockGetExpiry, e));
|
||||||
|
mock_entries[mock_entry_count++] = e;
|
||||||
|
return 0;
|
||||||
|
}
|
||||||
|
|
||||||
|
int insert_mock_entry_with_expiry(vset *set, long long expiry) {
|
||||||
|
if (mock_entry_count >= MAX_ENTRIES) return 0;
|
||||||
|
char keybuf[32];
|
||||||
|
snprintf(keybuf, sizeof(keybuf), "key_%d", mock_entry_count);
|
||||||
|
|
||||||
|
mock_entry *e = mock_entry_create(keybuf, expiry);
|
||||||
|
// printf("adding entry %p with expiry %llu\n", e, expiry);
|
||||||
|
TEST_ASSERT(vsetAddEntry(set, mockGetExpiry, e));
|
||||||
|
mock_entries[mock_entry_count++] = e;
|
||||||
|
return 0;
|
||||||
|
}
|
||||||
|
|
||||||
|
int update_mock_entry(vset *set) {
|
||||||
|
if (mock_entry_count == 0) return 0;
|
||||||
|
int idx = rand() % mock_entry_count;
|
||||||
|
mock_entry *old = mock_entries[idx];
|
||||||
|
long long old_expiry = mockGetExpiry(old);
|
||||||
|
long long new_expiry = old_expiry + (rand() % 500);
|
||||||
|
mock_entry *updated = mockEntryUpdate(old, new_expiry);
|
||||||
|
mock_entries[idx] = updated;
|
||||||
|
// printf("Update entry %p with entry %p with old expiry %llu new expiry %llu\n", old, updated, old_expiry, new_expiry);
|
||||||
|
TEST_ASSERT(vsetUpdateEntry(set, mockGetExpiry, old, updated, old_expiry, new_expiry));
|
||||||
|
return 0;
|
||||||
|
}
|
||||||
|
|
||||||
|
int remove_mock_entry(vset *set) {
|
||||||
|
if (mock_entry_count == 0) return 0;
|
||||||
|
int idx = rand() % mock_entry_count;
|
||||||
|
mock_entry *e = mock_entries[idx];
|
||||||
|
// printf("removing entry %p with expiry %llu\n", e, mockGetExpiry(e));
|
||||||
|
TEST_ASSERT(vsetRemoveEntry(set, mockGetExpiry, e));
|
||||||
|
mockFreeEntry(e);
|
||||||
|
mock_entries[idx] = mock_entries[--mock_entry_count];
|
||||||
|
|
||||||
|
return 0;
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
int expire_mock_entries(vset *set, mstime_t now) {
|
||||||
|
// printf("Before expired entries entries: %d\n", mock_entry_count);
|
||||||
|
vsetRemoveExpired(set, mockGetExpiry, mock_entry_expire, now, mock_entry_count, &now);
|
||||||
|
// printf("After expired %zu entries left entries: %d and set is empty: %s\n", count, mock_entry_count, vsetIsEmpty(set) ? "true" : "false");
|
||||||
|
return 0;
|
||||||
|
}
|
||||||
|
|
||||||
|
void *mock_defragfn(void *ptr) {
|
||||||
|
size_t size = zmalloc_size(ptr);
|
||||||
|
void *newptr = zmalloc(size);
|
||||||
|
memcpy(newptr, ptr, size);
|
||||||
|
zfree(ptr);
|
||||||
|
return newptr;
|
||||||
|
}
|
||||||
|
|
||||||
|
int mock_defrag_rax_node(raxNode **noderef) {
|
||||||
|
raxNode *newnode = mock_defragfn(*noderef);
|
||||||
|
if (newnode) {
|
||||||
|
*noderef = newnode;
|
||||||
|
return 1;
|
||||||
|
}
|
||||||
|
return 0;
|
||||||
|
}
|
||||||
|
|
||||||
|
size_t defrag_vset(vset *set, size_t cursor, size_t steps) {
|
||||||
|
if (steps == 0) steps = ULONG_MAX;
|
||||||
|
do {
|
||||||
|
cursor = vsetScanDefrag(set, cursor, mock_defragfn, mock_defrag_rax_node);
|
||||||
|
steps--;
|
||||||
|
} while (cursor != 0 && steps > 0);
|
||||||
|
return cursor;
|
||||||
|
}
|
||||||
|
|
||||||
|
int free_mock_entries(void) {
|
||||||
|
for (int i = 0; i < mock_entry_count; i++) {
|
||||||
|
mock_entry *e = mock_entries[i];
|
||||||
|
mockFreeEntry(e);
|
||||||
|
}
|
||||||
|
mock_entry_count = 0;
|
||||||
|
return 0;
|
||||||
|
}
|
||||||
|
|
||||||
|
/* --------- Defrag Test --------- */
|
||||||
|
int test_vset_defrag(int argc, char **argv, int flags) {
|
||||||
|
UNUSED(argc);
|
||||||
|
UNUSED(argv);
|
||||||
|
UNUSED(flags);
|
||||||
|
srand(time(NULL));
|
||||||
|
|
||||||
|
vset set;
|
||||||
|
vsetInit(&set);
|
||||||
|
|
||||||
|
/* defrag empty set */
|
||||||
|
TEST_ASSERT(defrag_vset(&set, 0, 0) == 0);
|
||||||
|
|
||||||
|
/* defrag when single entry */
|
||||||
|
insert_mock_entry(&set);
|
||||||
|
TEST_ASSERT(defrag_vset(&set, 0, 0) == 0);
|
||||||
|
|
||||||
|
/* defrag when vector */
|
||||||
|
for (int i = 0; i < 127 - 1; i++)
|
||||||
|
insert_mock_entry(&set);
|
||||||
|
TEST_ASSERT(defrag_vset(&set, 0, 0) == 0);
|
||||||
|
|
||||||
|
long long expiry = rand() % 10000 + 100;
|
||||||
|
for (int i = 0; i < 127 * 2; i++) {
|
||||||
|
insert_mock_entry_with_expiry(&set, expiry);
|
||||||
|
}
|
||||||
|
TEST_ASSERT(defrag_vset(&set, 0, 0) == 0);
|
||||||
|
|
||||||
|
size_t cursor = 0;
|
||||||
|
for (int i = 0; i < NUM_ITERATIONS; i++) {
|
||||||
|
if (i % NUM_DEFRAG_STEPS == 0)
|
||||||
|
cursor = defrag_vset(&set, cursor, NUM_DEFRAG_STEPS);
|
||||||
|
insert_mock_entry_with_expiry(&set, expiry);
|
||||||
|
}
|
||||||
|
TEST_ASSERT(defrag_vset(&set, 0, 0) == 0);
|
||||||
|
|
||||||
|
vsetRelease(&set);
|
||||||
|
free_mock_entries();
|
||||||
|
|
||||||
|
return 0;
|
||||||
|
}
|
||||||
|
|
||||||
|
/* --------- Fuzzer Test --------- */
|
||||||
|
int test_vset_fuzzer(int argc, char **argv, int flags) {
|
||||||
|
UNUSED(argc);
|
||||||
|
UNUSED(argv);
|
||||||
|
UNUSED(flags);
|
||||||
|
srand(time(NULL));
|
||||||
|
|
||||||
|
vset set;
|
||||||
|
vsetInit(&set);
|
||||||
|
|
||||||
|
for (int i = 0; i < NUM_ITERATIONS; i++) {
|
||||||
|
int op = rand() % 5;
|
||||||
|
switch (op) {
|
||||||
|
case 0:
|
||||||
|
case 1:
|
||||||
|
insert_mock_entry(&set);
|
||||||
|
break;
|
||||||
|
case 2:
|
||||||
|
update_mock_entry(&set);
|
||||||
|
break;
|
||||||
|
case 3:
|
||||||
|
remove_mock_entry(&set);
|
||||||
|
break;
|
||||||
|
case 4:
|
||||||
|
TEST_ASSERT(defrag_vset(&set, 0, 0) == 0);
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
|
||||||
|
if (i % 100 == 0) {
|
||||||
|
mstime_t now = rand() % 10000;
|
||||||
|
expire_mock_entries(&set, now);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
/* now expire all the entries and check that we have no entries left */
|
||||||
|
expire_mock_entries(&set, LONG_LONG_MAX);
|
||||||
|
TEST_ASSERT(vsetIsEmpty(&set) && mock_entry_count == 0);
|
||||||
|
vsetRelease(&set);
|
||||||
|
free_mock_entries(); /* Just in case */
|
||||||
|
return 0;
|
||||||
|
}
|
||||||
|
|
@ -1,79 +0,0 @@
|
||||||
#include <string.h>
|
|
||||||
#include "volatile_set.h"
|
|
||||||
#include "zmalloc.h"
|
|
||||||
#include "config.h"
|
|
||||||
#include "endianconv.h"
|
|
||||||
#include "serverassert.h"
|
|
||||||
|
|
||||||
#define EXPIRY_HASH_SIZE 16
|
|
||||||
volatile_set *createVolatileSet(volatileEntryType *type) {
|
|
||||||
volatile_set *set = zmalloc(sizeof(volatile_set));
|
|
||||||
set->etypr = type;
|
|
||||||
set->expiry_buckets = raxNew();
|
|
||||||
return set;
|
|
||||||
}
|
|
||||||
|
|
||||||
void freeVolatileSet(volatile_set *b) {
|
|
||||||
raxFree(b->expiry_buckets);
|
|
||||||
zfree(b);
|
|
||||||
}
|
|
||||||
|
|
||||||
int volatileSetAddEntry(volatile_set *set, void *entry, long long expiry) {
|
|
||||||
unsigned char buf[EXPIRY_HASH_SIZE];
|
|
||||||
expiry = htonu64(expiry);
|
|
||||||
memcpy(buf, &expiry, sizeof(expiry));
|
|
||||||
memcpy(buf + 8, &entry, sizeof(entry));
|
|
||||||
if (sizeof(entry) == 4) memset(buf + 12, 0, 4); /* Zero padding for 32bit target. */
|
|
||||||
return raxTryInsert(set->expiry_buckets, buf, sizeof(buf), NULL, NULL);
|
|
||||||
}
|
|
||||||
|
|
||||||
int volatileSetRemoveEntry(volatile_set *set, void *entry, long long expiry) {
|
|
||||||
unsigned char buf[EXPIRY_HASH_SIZE];
|
|
||||||
expiry = htonu64(expiry);
|
|
||||||
memcpy(buf, &expiry, sizeof(expiry));
|
|
||||||
memcpy(buf + 8, &entry, sizeof(entry));
|
|
||||||
if (sizeof(entry) == 4) memset(buf + 12, 0, 4); /* Zero padding for 32bit target. */
|
|
||||||
return raxRemove(set->expiry_buckets, buf, sizeof(buf), NULL);
|
|
||||||
}
|
|
||||||
|
|
||||||
int volatileSetUpdateEntry(volatile_set *set, void *old_entry, void *new_entry, long long old_expiry, long long new_expiry) {
|
|
||||||
if (old_entry == new_entry && old_expiry == new_expiry) return 1;
|
|
||||||
|
|
||||||
if (old_entry && old_expiry != -1) {
|
|
||||||
assert(volatileSetRemoveEntry(set, old_entry, old_expiry));
|
|
||||||
}
|
|
||||||
if (new_entry && new_expiry != -1) {
|
|
||||||
assert(volatileSetAddEntry(set, new_entry, new_expiry));
|
|
||||||
}
|
|
||||||
return 1;
|
|
||||||
}
|
|
||||||
|
|
||||||
int volatileSetExpireEntry(volatile_set *set, void *entry) {
|
|
||||||
volatileSetRemoveEntry(set, entry, set->etypr->getExpiry(entry));
|
|
||||||
if (set->etypr->expire) {
|
|
||||||
set->etypr->expire(entry);
|
|
||||||
return 1;
|
|
||||||
}
|
|
||||||
return 0;
|
|
||||||
}
|
|
||||||
|
|
||||||
size_t volatileSetNumEntries(volatile_set *set) {
|
|
||||||
assert(set && set->expiry_buckets);
|
|
||||||
return set->expiry_buckets->numele;
|
|
||||||
}
|
|
||||||
|
|
||||||
void volatileSetStart(volatile_set *set, volatileSetIterator *it) {
|
|
||||||
raxStart(&it->bucket, set->expiry_buckets);
|
|
||||||
}
|
|
||||||
|
|
||||||
int volatileSetNext(volatileSetIterator *it, void **entryptr) {
|
|
||||||
if (raxNext(&it->bucket)) {
|
|
||||||
assert(it->bucket.key_len == EXPIRY_HASH_SIZE);
|
|
||||||
memcpy(entryptr, it->bucket.key + sizeof(long long), sizeof(*entryptr));
|
|
||||||
return 1;
|
|
||||||
}
|
|
||||||
return 0;
|
|
||||||
}
|
|
||||||
void volatileSetReset(volatileSetIterator *it) {
|
|
||||||
raxStop(&it->bucket);
|
|
||||||
}
|
|
||||||
|
|
@ -1,40 +0,0 @@
|
||||||
#ifndef VOLATILESET_H
|
|
||||||
#define VOLATILESET_H
|
|
||||||
|
|
||||||
#include <stddef.h>
|
|
||||||
#include "rax.h"
|
|
||||||
#include "sds.h"
|
|
||||||
|
|
||||||
typedef struct {
|
|
||||||
sds (*entryGetKey)(const void *entry);
|
|
||||||
|
|
||||||
long long (*getExpiry)(const void *entry);
|
|
||||||
|
|
||||||
int (*expire)(void *entry);
|
|
||||||
|
|
||||||
} volatileEntryType;
|
|
||||||
|
|
||||||
|
|
||||||
typedef struct {
|
|
||||||
volatileEntryType *etypr;
|
|
||||||
rax *expiry_buckets;
|
|
||||||
} volatile_set;
|
|
||||||
|
|
||||||
typedef struct volatileSetIterator {
|
|
||||||
raxIterator bucket;
|
|
||||||
} volatileSetIterator;
|
|
||||||
|
|
||||||
|
|
||||||
int volatileSetRemoveEntry(volatile_set *set, void *entry, long long expiry);
|
|
||||||
int volatileSetAddEntry(volatile_set *set, void *entry, long long expiry);
|
|
||||||
int volatileSetExpireEntry(volatile_set *set, void *entry);
|
|
||||||
int volatileSetUpdateEntry(volatile_set *set, void *old_entry, void *new_entry, long long old_expiry, long long new_expiry);
|
|
||||||
size_t volatileSetNumEntries(volatile_set *set);
|
|
||||||
void volatileSetStart(volatile_set *set, volatileSetIterator *it);
|
|
||||||
int volatileSetNext(volatileSetIterator *it, void **entryptr);
|
|
||||||
void volatileSetReset(volatileSetIterator *it);
|
|
||||||
|
|
||||||
void freeVolatileSet(volatile_set *b);
|
|
||||||
volatile_set *createVolatileSet(volatileEntryType *type);
|
|
||||||
|
|
||||||
#endif
|
|
||||||
File diff suppressed because it is too large
Load Diff
|
|
@ -0,0 +1,97 @@
|
||||||
|
#ifndef VOLATILESET_H
|
||||||
|
#define VOLATILESET_H
|
||||||
|
|
||||||
|
#include <stddef.h>
|
||||||
|
#include <stdbool.h>
|
||||||
|
|
||||||
|
#include "hashtable.h"
|
||||||
|
#include "rax.h"
|
||||||
|
#include "sds.h"
|
||||||
|
#include "monotonic.h" /* for mstime_t*/
|
||||||
|
|
||||||
|
/*
|
||||||
|
*-----------------------------------------------------------------------------
|
||||||
|
* Volatile Set - Adaptive, Expiry-aware Set Structure
|
||||||
|
*-----------------------------------------------------------------------------
|
||||||
|
*
|
||||||
|
* The `vset` is a dynamic, memory-efficient container for managing
|
||||||
|
* entries with expiry semantics. It is designed to efficiently track entries
|
||||||
|
* that expire at varying times and scales to large sets by adapting its internal
|
||||||
|
* representation as it grows or shrinks.
|
||||||
|
*
|
||||||
|
*-----------------------------------------------------------------------------
|
||||||
|
* Public API
|
||||||
|
*-----------------------------------------------------------------------------
|
||||||
|
*
|
||||||
|
* Create/Free:
|
||||||
|
* vsetInit(vset *set) - used in order to initialize a new vset.
|
||||||
|
* void vsetClear(vset *set) - used in order to empty all the data in a vset.
|
||||||
|
* void vsetRelease(vset *set) - just like vsetClear, but also release the set itself so it will become unusable.
|
||||||
|
* and will require a new call to vsetInit in order to continue using the set.
|
||||||
|
* Example:
|
||||||
|
* vset set;
|
||||||
|
* vsetInit(&set);
|
||||||
|
* // add some elements to the vset
|
||||||
|
* vsetClear(&set);
|
||||||
|
* // verify the set is empty:
|
||||||
|
* assert(vsetIsEmpty(&set));
|
||||||
|
*
|
||||||
|
* Mutation:
|
||||||
|
* bool vsetAddEntry(vset *set, vsetGetExpiryFunc getExpiry, void *entry) - used in order to insert a new entry into the set.
|
||||||
|
* The API also make use of the provided getExpiry function in order to compare the 'entry' expiration time of the other existing
|
||||||
|
* entries in the set.
|
||||||
|
*
|
||||||
|
* bool vsetRemoveEntry(vset *set, vsetGetExpiryFunc getExpiry, void *entry) - used in order to remove and entry from the set.
|
||||||
|
*
|
||||||
|
* bool vsetUpdateEntry(vset *set, vsetGetExpiryFunc getExpiry, void *old_entry,
|
||||||
|
* void *new_entry, long long old_expiry,
|
||||||
|
* long long new_expiry) - is used in order to update an existing entry in the set.
|
||||||
|
* Note that the implementation assumes the 'old_entry' might not point to a valid memory location, thus it require that the 'old_expiry'
|
||||||
|
* is provided and matches the old entry expiration time.
|
||||||
|
*
|
||||||
|
* Expiry Retrieval/Removal:
|
||||||
|
* long long vsetEstimatedEarliestExpiry(vset *set, vsetGetExpiryFunc getExpiry) - will return an estimation to the lowest expiry time of
|
||||||
|
* the entries which currently exists in the set. Because of the semi-sorted ordering this implementation is using, the returned value MIGHT not be the 'real' minimum
|
||||||
|
* but rather some value which is the maximum among a group of entries which are all close or equal to the 'real' minimum.
|
||||||
|
*
|
||||||
|
* size_t vsetRemoveExpired(vset *set, vsetGetExpiryFunc getExpiry, vsetExpiryFunc expiryFunc, mstime_t now, size_t max_count, void *ctx) - can be used
|
||||||
|
* in order to remove up to max_count entries from the vset. The removed entries will all satisfy the condition that their expiration time is smaller than the provided now.
|
||||||
|
* Note that there are no guarantees about the order to the entries.
|
||||||
|
*
|
||||||
|
* Utilities:
|
||||||
|
* bool vsetIsEmpty(vset *set) - used in order to check if a given set has any entries.
|
||||||
|
*
|
||||||
|
* Iteration:
|
||||||
|
* void vsetInitIterator(vset *set, vsetIterator *it) - used to initialize a new vset iterator.
|
||||||
|
* bool vsetNext(vsetIterator *it, void **entryptr) - used to iterate to the next element. Will return false if there are no more elements.
|
||||||
|
* void vsetResetIterator(vsetIterator *it) - used in order to reset the iterator at the end of the iteration.
|
||||||
|
*
|
||||||
|
* Note that the vset iterator is NOT safe, Meaning you should not change the set while iterating it. Adding entries and/or removing entries
|
||||||
|
* can result in unexpected behavior.! */
|
||||||
|
|
||||||
|
/* Return the absolute expiration time in milliseconds for the provided entry */
|
||||||
|
typedef long long (*vsetGetExpiryFunc)(const void *entry);
|
||||||
|
/* Callback to be optionally provided to vsetPopExpired. when item is removed from the vset this callback will also be applied. */
|
||||||
|
typedef int (*vsetExpiryFunc)(void *entry, void *ctx);
|
||||||
|
// vset is just a pointer to a bucket
|
||||||
|
typedef void *vset;
|
||||||
|
|
||||||
|
typedef uint8_t vsetIterator[560];
|
||||||
|
|
||||||
|
bool vsetAddEntry(vset *set, vsetGetExpiryFunc getExpiry, void *entry);
|
||||||
|
bool vsetRemoveEntry(vset *set, vsetGetExpiryFunc getExpiry, void *entry);
|
||||||
|
bool vsetUpdateEntry(vset *set, vsetGetExpiryFunc getExpiry, void *old_entry, void *new_entry, long long old_expiry, long long new_expiry);
|
||||||
|
bool vsetIsEmpty(vset *set);
|
||||||
|
void vsetInitIterator(vset *set, vsetIterator *it);
|
||||||
|
bool vsetNext(vsetIterator *it, void **entryptr);
|
||||||
|
void vsetResetIterator(vsetIterator *it);
|
||||||
|
void vsetInit(vset *set);
|
||||||
|
void vsetClear(vset *set);
|
||||||
|
void vsetRelease(vset *set);
|
||||||
|
bool vsetIsValid(vset *set);
|
||||||
|
long long vsetEstimatedEarliestExpiry(vset *set, vsetGetExpiryFunc getExpiry);
|
||||||
|
size_t vsetRemoveExpired(vset *set, vsetGetExpiryFunc getExpiry, vsetExpiryFunc expiryFunc, mstime_t now, size_t max_count, void *ctx);
|
||||||
|
size_t vsetMemUsage(vset *set);
|
||||||
|
size_t vsetScanDefrag(vset *set, size_t cursor, void *(*defragfn)(void *), int (*defragRaxNode)(raxNode **));
|
||||||
|
|
||||||
|
#endif
|
||||||
Loading…
Reference in New Issue