From 20d33dec96d9b811648eec31a88c0b4de155eb0f Mon Sep 17 00:00:00 2001 From: jjuleslasarte <140852041+jjuleslasarte@users.noreply.github.com> Date: Mon, 24 Nov 2025 12:04:40 -0800 Subject: [PATCH] Initial commit for key blocking Signed-off-by: jjuleslasarte --- cmake/Modules/SourceFiles.cmake | 1 + src/Makefile | 2 +- src/durable_write.c | 993 ++++++++++++++++++++++++++++++++ src/durable_write.h | 116 ++++ src/networking.c | 23 + src/object.c | 10 + src/replication.c | 3 + src/server.c | 15 +- src/server.h | 18 +- 9 files changed, 1178 insertions(+), 3 deletions(-) create mode 100644 src/durable_write.c create mode 100644 src/durable_write.h diff --git a/cmake/Modules/SourceFiles.cmake b/cmake/Modules/SourceFiles.cmake index edc8d6668..e684b12cb 100644 --- a/cmake/Modules/SourceFiles.cmake +++ b/cmake/Modules/SourceFiles.cmake @@ -11,6 +11,7 @@ set(VALKEY_SERVER_SRCS ${CMAKE_SOURCE_DIR}/src/ae.c ${CMAKE_SOURCE_DIR}/src/anet.c ${CMAKE_SOURCE_DIR}/src/dict.c + ${CMAKE_SOURCE_DIR}/src/durable_write.c ${CMAKE_SOURCE_DIR}/src/hashtable.c ${CMAKE_SOURCE_DIR}/src/kvstore.c ${CMAKE_SOURCE_DIR}/src/sds.c diff --git a/src/Makefile b/src/Makefile index 1ce928167..74537c367 100644 --- a/src/Makefile +++ b/src/Makefile @@ -423,7 +423,7 @@ ENGINE_NAME=valkey SERVER_NAME=$(ENGINE_NAME)-server$(PROG_SUFFIX) ENGINE_SENTINEL_NAME=$(ENGINE_NAME)-sentinel$(PROG_SUFFIX) ENGINE_TRACE_OBJ=trace/trace.o trace/trace_commands.o trace/trace_db.o trace/trace_cluster.o trace/trace_server.o trace/trace_rdb.o trace/trace_aof.o -ENGINE_SERVER_OBJ=threads_mngr.o adlist.o vector.o quicklist.o ae.o anet.o dict.o hashtable.o kvstore.o server.o sds.o zmalloc.o lzf_c.o lzf_d.o pqsort.o zipmap.o sha1.o ziplist.o release.o memory_prefetch.o io_threads.o networking.o util.o object.o db.o replication.o rdb.o t_string.o t_list.o t_set.o t_zset.o t_hash.o config.o aof.o pubsub.o multi.o debug.o sort.o intset.o syncio.o cluster.o cluster_legacy.o cluster_slot_stats.o crc16.o cluster_migrateslots.o endianconv.o commandlog.o eval.o bio.o rio.o rand.o memtest.o syscheck.o crcspeed.o crccombine.o crc64.o bitops.o sentinel.o notify.o setproctitle.o blocked.o hyperloglog.o latency.o sparkline.o valkey-check-rdb.o valkey-check-aof.o geo.o lazyfree.o module.o evict.o expire.o geohash.o geohash_helper.o childinfo.o allocator_defrag.o defrag.o siphash.o rax.o t_stream.o listpack.o localtime.o lolwut.o lolwut5.o lolwut6.o lolwut9.o acl.o tracking.o socket.o tls.o sha256.o timeout.o setcpuaffinity.o monotonic.o mt19937-64.o resp_parser.o call_reply.o script.o functions.o commands.o strl.o connection.o unix.o logreqres.o rdma.o scripting_engine.o entry.o vset.o lua/script_lua.o lua/function_lua.o lua/engine_lua.o lua/debug_lua.o +ENGINE_SERVER_OBJ=threads_mngr.o adlist.o vector.o quicklist.o ae.o anet.o dict.o hashtable.o kvstore.o server.o sds.o zmalloc.o lzf_c.o lzf_d.o pqsort.o zipmap.o sha1.o ziplist.o release.o memory_prefetch.o io_threads.o networking.o util.o object.o db.o replication.o rdb.o t_string.o t_list.o t_set.o t_zset.o t_hash.o config.o aof.o pubsub.o multi.o debug.o sort.o intset.o syncio.o cluster.o cluster_legacy.o cluster_slot_stats.o crc16.o cluster_migrateslots.o endianconv.o commandlog.o eval.o bio.o rio.o rand.o memtest.o syscheck.o crcspeed.o crccombine.o crc64.o bitops.o sentinel.o notify.o setproctitle.o blocked.o hyperloglog.o latency.o sparkline.o valkey-check-rdb.o valkey-check-aof.o geo.o lazyfree.o module.o evict.o expire.o geohash.o geohash_helper.o childinfo.o allocator_defrag.o defrag.o siphash.o rax.o t_stream.o listpack.o localtime.o lolwut.o lolwut5.o lolwut6.o lolwut9.o acl.o tracking.o socket.o tls.o sha256.o timeout.o setcpuaffinity.o monotonic.o mt19937-64.o resp_parser.o call_reply.o script.o functions.o commands.o durable_write.o strl.o connection.o unix.o logreqres.o rdma.o scripting_engine.o entry.o vset.o lua/script_lua.o lua/function_lua.o lua/engine_lua.o lua/debug_lua.o ENGINE_SERVER_OBJ+=$(ENGINE_TRACE_OBJ) ENGINE_CLI_NAME=$(ENGINE_NAME)-cli$(PROG_SUFFIX) ENGINE_CLI_OBJ=anet.o adlist.o dict.o valkey-cli.o zmalloc.o release.o ae.o serverassert.o crcspeed.o crccombine.o crc64.o siphash.o crc16.o monotonic.o cli_common.o mt19937-64.o strl.o cli_commands.o sds.o util.o sha256.o diff --git a/src/durable_write.c b/src/durable_write.c new file mode 100644 index 000000000..5cfe30285 --- /dev/null +++ b/src/durable_write.c @@ -0,0 +1,993 @@ +#include "durable_write.h" +#include "expire.h" +#include "server.h" +#include +#include + +// TODO: handle PSYNC +// TODO: handle durability on/off? +// TODO: handle failovers (clear durability state) +// TODO: remove debug logging +// TODO: handle lua & multi +// TODO: handle blocking commands +// TODO: handle DB level commands (swap flushall etc) +// TODO: handle monitors +// TODO: temetry +/*================================= Global state ============================ */ + +// Track the replication offset prior to executing a command block +// including single command and multi-command transactions +static long long pre_command_replication_offset; + +// Track the replication offset prior to executing a single command in call() +static long long pre_call_replication_offset; + +// Track the number of commands awaiting propagation prior to executing a single command in call() +static int pre_call_num_ops_pending_propagation; + +/*============================ Internal prototypes ========================= */ + +static void resetPreExecutionOffset(struct client *c); +static inline void trackCommandPreExecutionPosition(struct client *c); +static int unblockClientWaitingReplicaAck(struct client *c); +static bool clientEligibleForResponseTracking(client *c); +static inline void unblockFirstResponse(struct client *c); +static inline int isBlockingNeededForOffset(struct client *c, long long offset); +static void blockClientAndMonitorsOnReplOffset(struct client *c, long long blockingReplOffset); +static void populateReplicaOffsets(long long *offsets, const size_t numReplicas); +static inline int offsetSorterDesc(const void* v1, const void* v2); +static unsigned long long getNumberOfUncommittedKeys(void); +static inline int hasUncommittedKeys(void); +static inline void addUncommittedKey(const sds key, const long long offset, rax *uncommittedKeys); +static int isSingleCommandAccessingUncommittedKeys(serverDb *db, struct serverCommand *cmd, robj **argv, int argc); +static int isAccessingUncommittedData(client *c); +static bool shouldRejectCommandWithUncommittedData(client *c); +static long long getSingleCommandBlockingOffsetForReplicatingCommand(client *c); +static long long getSingleCommandBlockingOffsetForNonReplicatingCommand(client *c); +static long long getSingleCommandBlockingOffsetForConsistentWrites(struct client *c); + +/*================================= Utility functions ======================== */ + +/** + * Utility function to determine whether the durability flag has been enabled. + * return 1 if durability is enabled, 0 otherwise. + */ +int isDurabilityEnabled(void) { + // TODO: this should be configurable + // should this have its own flag? + // or general 'durability flag' + return true; +} + +int isPrimaryDurabilityEnabled(void) { + return isDurabilityEnabled() && iAmPrimary(); +} + +/** + * TODO: this needs to be replaced by an interface w/ durable replication + * to tell us when we've achieved consensus via raft. + * Using 2 as default for POC. + */ +static inline unsigned replicaAcksForConsensus(void) { + return 2; +} + +/* + * Utility function to sort offsets in descending order + * @v1 Pointer to long long representing the first offset + * @v2 Pointer to long long representing the second offset + * returns -ve if first offset > second offset + * zero if both offsets are equal + * +ve if first offset < second offset + */ +static inline int offsetSorterDesc(const void* v1, const void* v2) { + const long long *a = v1; + const long long *b = v2; + + return (*b - *a); +} + +static unsigned long long getNumberOfUncommittedKeys(void) { + unsigned long long num_uncommitted_keys = 0; + for(int i=0; iuncommitted_keys); + } + } + return num_uncommitted_keys; +} + +unsigned long long getUncommittedKeysCleanupTimeLimit(unsigned long long num_uncommitted_keys) { + // If we have uncommitted keys, then the time limit for the clean-up is proportional + // to it up to the configured cleanup_time_limit_ms. The upper threshold is 1 million + // dirty keys as that will occupy 30MB+ of memory for a typical key of 10-20 Bytes. + unsigned long long time_limit_ms = 1; + if (num_uncommitted_keys > 0) { + time_limit_ms = ceil(server.durability.keys_cleanup_time_limit_ms * MIN(1, (double)(num_uncommitted_keys / 1000000.0))); + } + return time_limit_ms; +} + +/** + * Determine if there are uncommitted keys in the valkey server or not + * Returns 1 if there are uncommitted keys, 0 otherwise. + */ +static inline int hasUncommittedKeys(void) { + for (int i = 0; i < server.dbnum; i++) { + if (server.db[i] != NULL) { + if(raxSize(server.db[i]->uncommitted_keys) > 0) + return 1; + } + } + return 0; +} + +/*================================= Replica offset management =============== */ + +/* + * Populates the offset of each replica. If the replica is offline, + * then the function places a ZERO for its entry. + * @offsets The array that needs to be filled in. Function assumes that proper memory has been allocated for it. + * @numReplicas The size of the offsets array that needs to be filled in. + */ +static void populateReplicaOffsets(long long *offsets, const size_t numReplicas) { + memset(offsets, 0, sizeof(long long) * numReplicas); + + // iterate the replicas to get the offset they have reached. + listIter li; + listRewind(server.replicas, &li); + + listNode *ln = listNext(&li); + for (unsigned i=0; i < numReplicas && ln != NULL; ln = listNext(&li), i++) { + const client *replica = listNodeValue(ln); + serverAssert(replica->repl_data); + if (replica->repl_data->repl_state == REPLICA_STATE_ONLINE) { + offsets[i] = replica->repl_data->repl_ack_off; + } + } +} + +/** + * This function loops through all the replica's offset, finding the max offset that the required replicas have acknowledged. + * In case the required replicas exceeds the number of replicas connected, the function will return 0 indicating the offset + * is not reached by sufficient amount of replicas. + * + * @param numAcksNeeded The number of replicas that need to acknowledge the offset. + * @returns The offset that requested replicas have reached. + * In absence of required replicas, the primary offset is returned. + * If there is not enough number of replicas connected, return -1. + */ +long long getConsensusOffset(const unsigned long numAcksNeeded) { + const unsigned long numReplicas = listLength(server.replicas); + if (numAcksNeeded == 0) { + // If no ack is needed, then the consensus offset is the one primary is at. + return server.primary_repl_offset; + } + + // If the number of connected replicas is less than the number of required replicas, + // return -1 because we don't have enough number of replicas for the ACK. + if (numReplicas < numAcksNeeded) { + return -1; + } + + // fetch and sort the replica offsets for all replicas. This way we get the top offsets that have been acknowledged. + // Kth element in the sorted array will give us the offset that has been acknowledged by K replicas. + durable_t *durability = &server.durability; + + // make sure we have enough space for the replicas. Resize only if the required + // replica count is larger. No need to downsize. + if(durability->replica_offsets_size < numReplicas) { + durability->replica_offsets = zrealloc(durability->replica_offsets, numReplicas * sizeof(long long)); + durability->replica_offsets_size = numReplicas; + } + populateReplicaOffsets(durability->replica_offsets, numReplicas); + + // don't bother sorting if there is only one replica. + if (numReplicas > 1) { + qsort(durability->replica_offsets, numReplicas, sizeof(long long), offsetSorterDesc); + } + + // get the Kth element + return durability->replica_offsets[numAcksNeeded - 1]; +} + +/*================================= Client management ======================== */ + +/** + * Reset the pre-execution offset fields. + */ +static void resetPreExecutionOffset(struct client *c) { + c->clientDurabilityInfo.offset.recorded = false; + c->clientDurabilityInfo.offset.reply_block = NULL; + c->clientDurabilityInfo.offset.byte_offset = 0; +} + +/** + * Utility function to track the pre-execution position in the client reply COB. The given client can be either + * a normal client (or TODO: a monitor client) + * For a normal client, this position is the byte position in the COB prior to command execution. The response + * generated from executing the next valkey command comes after this position. + * (For a monitor client, this position is the byte position in the COB prior to command replication. The command + * will be replicated after this position.) + */ +static inline void trackCommandPreExecutionPosition(struct client *c) { + // There can be cases when the client gets blocked by other mechanisms such as slot migration + // after we tracked the command's pre-execution position when the reply buffer is non-empty. + // Later on when the client unblocks, the reply buffer can get flushed to the client so the + // previously tracked pre-execution reply position is no longer valid. In order to address that, + // here we reset the pre-execution position of the command unconditionally. + resetPreExecutionOffset(c); + list *reply = c->reply; + int bufpos = c->bufpos; + + if (reply != NULL && listLength(reply) > 0) { + listNode *last_reply_block = listLast(reply); + c->clientDurabilityInfo.offset.reply_block = last_reply_block; + c->clientDurabilityInfo.offset.byte_offset = ((clientReplyBlock*)listNodeValue(last_reply_block))->used; + } else if (bufpos > 0) { + // We are only tracking the client reply block and we don't need to + // take ownership of the pointer, so there is no need to free it + c->clientDurabilityInfo.offset.reply_block = NULL; + c->clientDurabilityInfo.offset.byte_offset = bufpos; + } + c->clientDurabilityInfo.offset.recorded = true; +} + +/* If the client is currently waiting for replica acknowledgement, + * mark it unblocked and reset the client flags. + * This involves us removing the client from the clients_waiting_replica_ack list, + * and mark the client as unblocked for durability. + * + * @param c The client + * @return 1 if the client is successfully marked unblocked, 0 otherwise + */ +static int unblockClientWaitingReplicaAck(struct client *c) { + if (c->flag.durable_blocked_client) { + listNode *ln = listSearchKey(server.durability.clients_waiting_replica_ack, c); + if(ln != NULL) { + listDelNode(server.durability.clients_waiting_replica_ack, ln); + c->flag.durable_blocked_client = 0; + return 1; + } + } + return 0; +} + +/** + * Initialize the durable write client attributes when client is created + */ +void durableClientInit(struct client *c) { + if (!isDurabilityEnabled()) { + return; + } + if (c->clientDurabilityInfo.blocked_responses == NULL) { + c->clientDurabilityInfo.blocked_responses = listCreate(); + listSetFreeMethod(c->clientDurabilityInfo.blocked_responses, zfree); + resetPreExecutionOffset(c); + c->clientDurabilityInfo.current_command_repl_offset = -1; + } +} + +/** + * Reset the client durable write attributes during a client clean-up. + * This method is invoked when a client is freed. + */ +void durableClientReset(struct client *c) { + // Free this client from the clients_waiting_replica_ack list and emit a metric on + // how many clients are disconnected before the response gets flushed/unblocked. + unblockClientWaitingReplicaAck(c); + + if(c->clientDurabilityInfo.blocked_responses != NULL) { + listRelease(c->clientDurabilityInfo.blocked_responses); + c->clientDurabilityInfo.blocked_responses = NULL; + } + + resetPreExecutionOffset(c); + c->clientDurabilityInfo.current_command_repl_offset = -1; +} + +/* + * Returns true if the client is eligible for keyspace tracking + * on a primary node. + */ +static bool clientEligibleForResponseTracking(client *c) { + serverAssert(iAmPrimary()); + + if(c->cmd == NULL) return false; + + // should we do info? + // i.e: keyspace, does it include dirty keys? + // Administrative commands that are not keyspace informational nor + // write commands are not eligible for response tracking/blocking. + if ((c->cmd->flags & CMD_ADMIN) && !(c->cmd->flags & CMD_WRITE)) { + return false; + } + + return ((c->cmd->flags & (CMD_WRITE | CMD_READONLY))); // Read or write command + // TODO: transactions, lua, scripts, etc + // TODO: functions +} + +/* Check if we only allow client to receive up to a certain + * position in the client reply buffer + */ +inline bool isClientReplyBufferLimited(struct client *c) { + return c->clientDurabilityInfo.blocked_responses != NULL && + listLength(c->clientDurabilityInfo.blocked_responses) > 0; +} + +/*================================= Response blocking ======================= */ + +/** + * Block the last response if it exists in the client output buffer + * @param c The client to block the last response in the COB + * @param blocked_offset The replication offset to block on + */ +void blockLastResponseIfExist(struct client *c, long long blocked_offset) { + // We must have called the pre-hook to track COB position + serverAssert(c->clientDurabilityInfo.offset.recorded); + + // Flag to indicate whether there is new response added in client output + // buffer + bool has_new_response = false; + struct listNode *disallowed_reply_block = + c->clientDurabilityInfo.offset.reply_block; + size_t disallowed_byte_offset = + c->clientDurabilityInfo.offset.byte_offset; + + // Track the starting position of the blocked response in the client COB + if (disallowed_reply_block == NULL) { + // The end of last response was in the initial 16KB buffer + if((size_t)c->bufpos > disallowed_byte_offset) { + // We are not at the end of the 16KB initial buffer + has_new_response = true; + } else if (listLength(c->reply) > 0) { + // We were at the end of the 16KB initial buffer and need to spill + // over to start our response from the first byte at the reply block + has_new_response = true; + disallowed_byte_offset = 0; + disallowed_reply_block = listFirst(c->reply); + } + } else { + // The end of the previous response is in the client reply list + clientReplyBlock *last_reply_block = (clientReplyBlock*)listNodeValue(disallowed_reply_block); + if (last_reply_block->used > disallowed_byte_offset) { + // More data comes after the last reply in the same reply block + has_new_response = true; + } else if(disallowed_reply_block->next != NULL) { + // No more data comes after the last reply and we start from the next reply block + has_new_response = true; + disallowed_byte_offset = 0; + disallowed_reply_block = disallowed_reply_block->next; + } + } + + // If the command outputs new response, create blockedResponse object and + // add it into linkedlist to block it. + if (has_new_response) { + blockedResponse *new_block = zcalloc(sizeof(blockedResponse)); + new_block->primary_repl_offset = blocked_offset; + new_block->disallowed_byte_offset = disallowed_byte_offset; + new_block->disallowed_reply_block = disallowed_reply_block; + listAddNodeTail(c->clientDurabilityInfo.blocked_responses, new_block); + } +} + +/* Unblock the first response for the client */ +static inline void unblockFirstResponse(struct client *c) { + serverAssert(c->clientDurabilityInfo.blocked_responses != NULL); + if (listLength(c->clientDurabilityInfo.blocked_responses) > 0) { + listNode *first = listFirst(c->clientDurabilityInfo.blocked_responses); + listDelNode(c->clientDurabilityInfo.blocked_responses, first); + } +} + +/* Determines if we need to block on a given replication offset for a given client + * @param c Client + * @param offset The replication offset we are checking + * @return 0 if we don't need to block at the specified offset, and 1 if we do. + */ +static inline int isBlockingNeededForOffset(struct client *c, long long offset) { + // If the blocking offset is -1 or no replica is needed to ACK, don't block. + if (offset == -1 || replicaAcksForConsensus() == 0) { + return 0; + } + + // If there are no blocked responses previously, we always want to block + if (listLength(c->clientDurabilityInfo.blocked_responses) == 0) + return 1; + + listNode *last_response = listLast(c->clientDurabilityInfo.blocked_responses); + long long previous_offset = ((blockedResponse*)listNodeValue(last_response))->primary_repl_offset; + return previous_offset < offset; +} + +/** + * Block a given client on the specified replication offset if applicable. + * And clears the client's pre-execution byte offset fields so it won't carry + * forward to the next command. + * + * @param c Client + * @param blockingReplOffset The replication offset to block the client on + */ +void blockClientOnReplOffset(struct client *c, long long blockingReplOffset) { + serverAssert(isPrimaryDurabilityEnabled()); + + /* If needed, we block the client and put it into our list of clients + * waiting for ack from slaves. */ + if (isBlockingNeededForOffset(c, blockingReplOffset)) { + serverLog(LOG_DEBUG, "client should be blocked at offset %lld,", blockingReplOffset); + blockLastResponseIfExist(c, blockingReplOffset); + if (!c->flag.durable_blocked_client) { + listAddNodeTail(server.durability.clients_waiting_replica_ack,c); + c->flag.durable_blocked_client = 1; + } + } + + // Now we have processed the client blocking information and tracked it, + // we can reset the client durability attributes we are tracking for + // the current command. + resetPreExecutionOffset(c); +} + +/** + * Process the pending dirty keys/databases if needed, and block a give client as well as all + * connected MONITOR clients on the specified replication offset. + * Regarding the command issued by the given client, its response to the given client will be blocked, + * and replication of such command to the monitors will be also blocked. + * + * @param c Client + * @param blockingReplOffset The replication offset to block the client and the monitors on + */ +static void blockClientAndMonitorsOnReplOffset(struct client *c, long long blockingReplOffset) { + // Block the client that issues the command on the replication offset + blockClientOnReplOffset(c, blockingReplOffset); + + //TODO: handle monitors +} + +/** + * Unblock responses and tasks of all blocked clients with a given consensus acked offset + * This function traverses through all the clients that wait for replica ack, and unblock + * all responses and tasks that has the required offset that is acknowledged by replicas. + * If the max repl offset is acked, all blocked responses will be flushed. + * + * @param durability Durability object of the current primary + * @param consensus_ack_offset Repl offset that have been acked by the required number of replicas + */ +void unblockResponsesWithAckOffset(struct durable_t *durability, long long consensus_ack_offset) { + + serverLog(LOG_DEBUG, "unblocking clients for consensus offset %lld,", consensus_ack_offset); + // Traverses through all the clients that wait for replica ack + listIter li, li_response; + listNode *ln, *ln_response; + listRewind(durability->clients_waiting_replica_ack, &li); + blockedResponse *br = NULL; + while((ln = listNext(&li))) { + client *c = ln->value; + + // For each client blocked, we go through all its blocked responses, + // and unblock all the responses whose replication offset are + // ACK'ed by the required number of replicas + // If the max repl offset is acked, all blocked responses will be unblocked + serverAssert(c->clientDurabilityInfo.blocked_responses != NULL); + listRewind(c->clientDurabilityInfo.blocked_responses, &li_response); + bool unblocked_responses = false; + + while((ln_response = listNext(&li_response))) { + br = listNodeValue(ln_response); + if(br->primary_repl_offset <= consensus_ack_offset) { + unblockFirstResponse(c); + if (unblocked_responses == false) { + unblocked_responses = true; + } + } else { + // As soon as we encounter a client response that has the + // required reply offset greater than the replicas ACK'ed offset, + // we can break out of this loop because all replies that follows + // has replication offset that is greater and can't be unblocked + break; + } + } + + // If there are no more blocked responses for the client, we can safely + // mark it unblocked entirely + if (listLength(c->clientDurabilityInfo.blocked_responses) == 0) { + if (unblockClientWaitingReplicaAck(c)) { + } + } + // Put client in pending write queue so responses can be flushed + // to client if we have unblocked at least 1 response objects. + if (unblocked_responses) { + putClientInPendingWriteQueue(c); + } + } +} + +/* Check if there are clients blocked that can be unblocked since + * we received enough ACKs from replicas */ +void postReplicaAck(void) { + serverLog(LOG_DEBUG, "postReplicaAck hook entered"); + if (!isPrimaryDurabilityEnabled()) { + return; + } + + struct durable_t *durability = &server.durability; + long long consensus_ack_offset = getConsensusOffset(replicaAcksForConsensus()); + if (consensus_ack_offset <= durability->previous_acked_offset) { + return; + } + + // Update the previous acknowledged offset for durability + durability->previous_acked_offset = consensus_ack_offset; + + // Unblock responses and keyspace notifications with consensus acked offset + unblockResponsesWithAckOffset(durability, consensus_ack_offset); +} + +/*================================= Key management ============================ */ + +/** + * Mark a key as uncommitted at a particular replication offset for acknowledgement. + * + * @param key The name of the uncommitted key to mark + * @param offset The replication offset to wait on the uncommitted key + * @param uncommittedKeys The set of uncommitted keys + */ +static inline void addUncommittedKey(const sds key, const long long offset, rax *uncommittedKeys) { + // The value in the uncomittedKeys is the replication offset in long long format + int retval = raxInsert(uncommittedKeys, (unsigned char*)key, sdslen(key), (void *)offset, NULL); + serverAssert(retval == 1 || errno == 0); +} + +/** + * Retrieve the uncommitted replication offset for a given key, purge the given + * key from uncommitted keys set if the replication offset has been committed. + * Pre-condition: valkey is currently a primary + * @param key The key to retrieve the uncommitted replication offset + * @param db The serverDB object + * @return the ACK offset of the key if key is uncommitted, returns -1 otherwise. + */ +long long durablePurgeAndGetUncommittedKeyOffset(const sds key, serverDb *db) { + serverAssert(iAmPrimary()); + void *result; + if (!raxFind(db->uncommitted_keys, (unsigned char*)key, sdslen(key), &result)) { + return -1; + } + + long long key_offset = (long long) result; + + /** + * If the replication offset of key has been properly acked by replicas, + * then purge the key from the uncommitted keys set, and return -1 + * indicating the key has been committed. + */ + if (key_offset <= server.durability.previous_acked_offset) { + raxRemove(db->uncommitted_keys, (unsigned char*)key, sdslen(key), NULL); + return -1; + } + + return key_offset; +} + +/** + * Utility method to handle a dirty key for a given client. + * @param c The calling client. NULL if the key becomes dirty outside of a client command (i.e. expiry/eviction) + * @param key + * @param db + */ +void handleUncommittedKeyForClient(client *c, robj *key, serverDb *db) { + // If we are in the context of a MULTI/EXEC transaction (or TODO: a script) + // we mark the dirty key + // pending so it can be properly recorded later on with the final replication offset. + if ((c != NULL) && ((c->flag.multi))) { + // TODO: handle multi + } else { + // Otherwise, the key is updated outside of a transaction or a script, simply mark the key + // dirty at the current primary_repl_offset + addUncommittedKey(key->ptr, server.primary_repl_offset, db->uncommitted_keys); + } +} + +/** + * Clear all uncommitted DBs and keys that are properly acknowledged by + * sufficient number of replicas and mark them no longer dirty. + * + * This method iterates through all the valkey databases and checks the + * DB and all items tracked by the uncommitted_keys set for each, and + * removes keys that are acknowledged by sufficient number of replicas. + * It is applicable only to primary. + */ +void clearUncommittedKeysAcknowledged(void) { + if (!isPrimaryDurabilityEnabled()) { + return; + } + + durable_t *durability = &server.durability; + const int TIME_CHECK_INTERVAL = 100; + unsigned long long scan_count = 0; + + // Determine the number of uncommitted keys. Return if there is none + unsigned long long num_uncommitted_keys = getNumberOfUncommittedKeys(); + if (num_uncommitted_keys == 0) return; + + unsigned long long time_limit_ms = getUncommittedKeysCleanupTimeLimit(num_uncommitted_keys); + unsigned long long start_time_ms = mstime(); + while(durability->curr_db_scan_idx < server.dbnum) { + serverDb *db = server.db[durability->curr_db_scan_idx]; + if (db != NULL) { + raxIterator *iter = &db->next_scan_iter; + + // Clear the database's dirty replication offset if it is acknowledged by replicas + if (db->dirty_repl_offset <= server.durability.previous_acked_offset) { + db->dirty_repl_offset = -1; + } + + // In a time-bound fashion, clear the uncommitted keys if the required replication + // offset has been acknowledged by replicas. + if(raxSize(db->uncommitted_keys) > 0) { + if (!db->scan_in_progress) { + raxStart(iter, db->uncommitted_keys); + raxSeek(iter, "^", NULL, 0); + db->scan_in_progress = 1; + } else { + raxSeek(iter, ">=", iter->key, iter->key_len); + } + + while (raxNext(iter)) { + // Use scan_count % TIME_CHECK_INTERVAL to reduce the number of calling mstime + // method, it can make sure to scan some keys if time_limit_ms + // is very small. + if ((time_limit_ms > 0) && (scan_count > 0) + && (scan_count % TIME_CHECK_INTERVAL == 0)) { + unsigned long long cur_time_ms = mstime(); + if (cur_time_ms - start_time_ms > time_limit_ms) { + // Stop the current scan, continue to do in the next run + return; + } + } + + long long dirty_key_offset = (long long)iter->data; + if (dirty_key_offset <= server.durability.previous_acked_offset) { + raxRemove(db->uncommitted_keys, iter->key, iter->key_len, NULL); + raxSeek(iter, ">", iter->key, iter->key_len); + } + scan_count++; + } + } + + // Finish to DB scan. + if(db->scan_in_progress) { + db->scan_in_progress = 0; + raxStop(iter); + } + } + durability->curr_db_scan_idx++; + } + + // If all databases have been scanned, reset curr_db_scan_idx to 0, and + // exit the keys cleanup procedure. + if(durability->curr_db_scan_idx == server.dbnum) { + durability->curr_db_scan_idx = 0; + } +} + +/*========================== Command access validation ====================== */ + +/** + * Determines if a single valkey command is trying to access an uncommitted key. + * Returns 1 if so, 0 otherwise. + */ +static int isSingleCommandAccessingUncommittedKeys(serverDb *db, struct serverCommand *cmd, robj **argv, int argc) { + // If the database has no uncommitted keys, return 0 + if (raxSize(db->uncommitted_keys) == 0) return 0; + + getKeysResult keysResult; + initGetKeysResult(&keysResult); + int numkeys = getKeysFromCommand(cmd, argv, argc, &keysResult); + keyReference *keys = keysResult.keys; + + for (int i = 0; i < numkeys; i++) { + sds keystr = argv[keys[i].pos]->ptr; + // Check if we are trying to access an uncommitted key + void *result; + if (raxFind(db->uncommitted_keys, (unsigned char*)keystr, sdslen(keystr), &result)) { + getKeysFreeResult(&keysResult); + return 1; + } + } + + // Free the keys after done processing them + getKeysFreeResult(&keysResult); + return 0; +} + +/** + * Determine if a client is trying to access uncommitted keys or function store. + * Returns 1 if so, 0 otherwise. + */ +static int isAccessingUncommittedData(client *c) { + if (hasUncommittedKeys()) { + return 1; + } + + // Single command handling + if (isSingleCommandAccessingUncommittedKeys(c->db, c->cmd, c->argv, c->argc)) { + return 1; + } + + return 0; +} + +static bool shouldRejectCommandWithUncommittedData(client *c) { + if(c->cmd == NULL // command is null + || ((c->cmd->flags & CMD_ADMIN)) + || c->flag.primary) { + return false; + } + + // If we are operating as a replica (after a failover) + // trying to access dirty items. + if ((!iAmPrimary()) && isAccessingUncommittedData(c)) { + return true; + } + + return false; +} + +/*========================== Command offset calculation ===================== */ + +/** + * Process a single replicating command for consistent write blocking. + * + * @param c Client + * @return The blocking replication offset or -1 if we're in a nested call and the replication + * offset has not been updated yet. + */ +static long long getSingleCommandBlockingOffsetForReplicatingCommand(client *c) { + + if (!(c->cmd->flags & CMD_WRITE)) { + return -1; + } + + // If the command executed generated replication data, then this means the server data changed. + // We need to mark the modified data as dirty and block the response to the client until the + // replica's replication offset is caught up to the current global offset. + // todo handle functions + getKeysResult result; + initGetKeysResult(&result); + int numkeys = getKeysFromCommand(c->cmd, c->argv, c->argc, &result); + keyReference *keys = result.keys; + if (numkeys > 0) { + if (c->cmd->proc == moveCommand) { + // TODO: support MOVE command: we need to mark the key as dirty in the destination DB + // dont block for now + return -1; + } else if (c->cmd->proc == copyCommand) { + // TODO: handle copy command + // handle the dirty keys in the destination db + // dont block for now + return -1; + } + + // Mark all the keys updated by the current command as dirty in the current DB + for (int i = 0; i < numkeys; i++) { + handleUncommittedKeyForClient(c, c->argv[keys[i].pos], c->db); + } + } + getKeysFreeResult(&result); + + + return server.primary_repl_offset; +} + +/** + * Process a single non-replicating command for consistent write blocking. + * + * @param c Client + * @return The blocking replication offset or -1 if no blocking need. + */ +static long long getSingleCommandBlockingOffsetForNonReplicatingCommand(client *c) { + long long blocking_repl_offset = -1; + // TODO: handle function, module, etc + if (c->cmd->flags & (CMD_READONLY | CMD_WRITE)) { + // For read/write commands that didn't generate replication data, we would block + // on the highest offset of all accessed uncommitted keys and the valkey DBs itself. + // Note some commands categorized as writes can perform read only operations + // therefore they should undergo the same checks as read-only commands. + blocking_repl_offset = c->db->dirty_repl_offset; + getKeysResult result; + initGetKeysResult(&result); + int numkeys = getKeysFromCommand(c->cmd, c->argv, c->argc, &result); + keyReference *keys = result.keys; + + for (int i = 0; i < numkeys; i++) { + sds keystr = c->argv[keys[i].pos]->ptr; + // If we try to access an uncommitted key, then block the client + // until all prior updates on this key have been acknowledged. + // So here we essentially need to track the biggest offset amongst + // all the uncommitted keys accessed by the command. + long long offset = durablePurgeAndGetUncommittedKeyOffset(keystr, c->db); + if(offset > blocking_repl_offset) { + blocking_repl_offset = offset; + } + } + getKeysFreeResult(&result); + } + + return blocking_repl_offset; +} + +/** + * Process a single command for consistent write blocking. + * + * @param c Client + * @return The replication offset we need to use for blocking this client for replica ack. + * Returns -1 if blocking is not required. Replication offset of 0 can lead to blocking + * behavior because if the primary has no replicas, and it is configured to require replica + * to ACK write, then it needs to block writes. + */ +static long long getSingleCommandBlockingOffsetForConsistentWrites(struct client *c) { + serverAssert(isPrimaryDurabilityEnabled()); + + // If no replicas are required for ACK, then return -1 (no need to block) + if (replicaAcksForConsensus() == 0) + return -1; + + long long blocking_repl_offset = -1; + + if ((server.primary_repl_offset > pre_call_replication_offset) || (server.also_propagate.numops > pre_call_num_ops_pending_propagation)) { + blocking_repl_offset = getSingleCommandBlockingOffsetForReplicatingCommand(c); + } else { + blocking_repl_offset = getSingleCommandBlockingOffsetForNonReplicatingCommand(c); + } + + // If the blocking offset is already acknowledged by replicas, + // then we don't need to block the response at all. + if (blocking_repl_offset <= server.durability.previous_acked_offset) { + blocking_repl_offset = -1; + } + + return blocking_repl_offset; +} + +/*=========================== Command hook functions ======================= */ + +/** + * For synchronous replication, we need to record the starting replication offset of the command + * about to be executed. + */ +void preCall(void) { + serverLog(LOG_DEBUG, "preCall hook entered"); + if (!isPrimaryDurabilityEnabled()) return; + + pre_call_replication_offset = server.primary_repl_offset; + pre_call_num_ops_pending_propagation = server.also_propagate.numops; + serverLog(LOG_DEBUG, "preCall hook: pre_call_replication_offset=%lld, pre_call_num_ops_pending_propagation=%d", + pre_call_replication_offset, pre_call_num_ops_pending_propagation); +} + +/** + * For synchronous replication, after we finish processing a valkey command which can either be a stand-alone + * command, or in a multi-command block such as MULTI/EXEC transaction or a Lua script, we need to + * track the replication offset for the command and update the replication offset post-execution + * for the entire command block. Later on, after the command block execution completes, we can determine + * whether to block the client response for replica acknowledgement or not. + * + * Note: we need to track the final replication offset on the primary for all the keys and databases + * that become dirty due to the command or transaction/script. + * + * @param c The client executing the valkey command + */ +void postCall(struct client *c) { + // log debug tracing + serverLog(LOG_DEBUG, "Call hook entered for command '%s'", c->cmd->declared_name); + if (!isPrimaryDurabilityEnabled() || (c->flag.blocked)) + return; + + // Determine the blocking replication offset for the current command + long long current_cmd_blocking_offset = getSingleCommandBlockingOffsetForConsistentWrites(c); + + // Here we need to track the replication offset of the command executed + // by the calling client somewhere. This is usually tracked in the calling + // client itself. But for the case of scripts, the script caller client is + // different from the fake client created to execute each script command + struct client *tracking_client = server.current_client? server.current_client : c; + + if (current_cmd_blocking_offset > tracking_client->clientDurabilityInfo.current_command_repl_offset) { + tracking_client->clientDurabilityInfo.current_command_repl_offset = current_cmd_blocking_offset; + } + + // TODO: handle db level modifications like FLUSHALL, SWAPDB +} + +/** + * Perform pre-processing before command execution for a given client. + * + * For non-administrative commands that is either read or write, we will track the pre-execution positions + * in the reply COB of the client and all the connected monitors. + */ +int preCommandExec(struct client *c) { + serverLog(LOG_DEBUG, "preCommandExec hook entered for command '%s'", + c->cmd ? c->cmd->declared_name : "NULL"); + if (!isDurabilityEnabled()) { + serverLog(LOG_DEBUG, "preCommandExec hook: durability not enabled, allowing"); + return CMD_FILTER_ALLOW; + } + + // Reset the client current command replication offset + c->clientDurabilityInfo.current_command_repl_offset = -1; + + if (shouldRejectCommandWithUncommittedData(c)) { + serverAssert(!(c->cmd->flags & CMD_WRITE)); + flagTransaction(c); + addReplyError(c, DURABLE_ACCESSED_DATA_UNAVAILABLE); + return CMD_FILTER_REJECT; + } + + // If we are operating as a primary, then apply the regular synchronous replication + // logic of blocking command response post execution if needed. + if (iAmPrimary() && clientEligibleForResponseTracking(c)) { + + // Track the pre-execution position in the client reply COB + trackCommandPreExecutionPosition(c); + + // todo: handle monitors + } + + pre_command_replication_offset = server.primary_repl_offset; + return CMD_FILTER_ALLOW; +} + +/** + * Perform post-processing after command execution for a given client. + * + * For write operation, we insert the updated keys into the uncommitted keys + * map and wait for replica acknowledgement at that particular replication + * byte offset. + * + * For read operation that need to access uncommitted keys, we need to block the client + * response until all the dependent keys are properly acknowledged by replicas. + * + * @param c client + */ +void postCommandExec(struct client *c) { + if (!isPrimaryDurabilityEnabled()) { + return; + } + serverLog(LOG_DEBUG, "postCommandExec hook entered for command '%s'", + c->cmd ? c->cmd->declared_name : "NULL"); + // If the command is NULL or is in a MULTI/EXEC block, then we skip + // TODO: handle multi + if(c->cmd == NULL || c->flag.multi) { + return; + } + + // Block the client (TODO:monitors) based on the required replication offset + // for the current command. + long long blocking_repl_offset = c->clientDurabilityInfo.current_command_repl_offset; + + // If the client needs to block, we need to enforce that it is eligible for response tracking. + // Otherwise we will try to block the response without tracking the command's pre-execution + // position in the client reply buffer, which wouldn't work. If this assert fails, then we + // need to fix clientEligibleForResponseTracking() to handle the problematic command. + if (blocking_repl_offset > pre_command_replication_offset) { + serverAssert(clientEligibleForResponseTracking(c)); + } + + blockClientAndMonitorsOnReplOffset(c, blocking_repl_offset); +} + +/** + * Function used to initialize the durability datastructures. + * TODO: exit clean up? + */ +void durableInit(void) { + // Initialize synchronous replication + server.durability.replica_offsets_size = 0; + server.durability.replica_offsets = NULL; + server.durability.previous_acked_offset = -1; + server.durability.curr_db_scan_idx = 0; + server.durability.clients_waiting_replica_ack = listCreate(); +} + diff --git a/src/durable_write.h b/src/durable_write.h new file mode 100644 index 000000000..353484673 --- /dev/null +++ b/src/durable_write.h @@ -0,0 +1,116 @@ +#ifndef DURABLE_WRITE_H +#define DURABLE_WRITE_H + +#include +#include +#include +#include "expire.h" +#include "sds.h" + +#define DURABLE_ACCESSED_DATA_UNAVAILABLE "Accessed data unavailable to be served" +/* Command filter codes that are used in pre execution stage of a command. */ +#define CMD_FILTER_ALLOW 0 +#define CMD_FILTER_REJECT 1 + +struct client; +struct serverObject; +struct serverDb; +struct list; +struct listNode; + +typedef long long mstime_t; + +/** + * Durability container to house all the durability related fields. + */ +typedef struct durable_t { + /* Uncommitted keys cleanup configuration time limit in milliseconds */ + unsigned int keys_cleanup_time_limit_ms; + /* The current scanning database index, starting from 0 */ + int curr_db_scan_idx; + + /* Number of replicas to ack for an update to be considered committed */ + long long num_replicas_to_ack; + + /* clients waiting for offset ack/quorum*/ + struct list *clients_waiting_replica_ack; + + // cached allocation of replica offsets to prevent allocation per cmd. + unsigned long replica_offsets_size; + long long *replica_offsets; + // Previously acknowledged replication offset by replicas + long long previous_acked_offset; +} durable_t; + +// Blocked response structure used by client to mark +// the blocking information associated with each response +typedef struct blockedResponse { + // Pointer to the client's reply node where the blocked response starts. + // NULL if the blocked response starts from the 16KB initial buffer + // Here we don't take ownership of this pointer so we never + // release the memory pointed to by this block. + struct listNode *disallowed_reply_block; + // The boundary in the reply buffer where the blocked response starts. + // We don't write data from this point onwards to the client socket + size_t disallowed_byte_offset; + // The replication offset to wait for ACK from replicas + long long primary_repl_offset; +} blockedResponse; + +// Describes a pre-execution COB offset for a client +typedef struct preExecutionOffsetPosition { + // True if the pre execution offset/reply block are initialized + bool recorded; + // Track initial client COB position for client blocking + // Pointer to the pre-execution reply node, NULL for initial buffer + struct listNode *reply_block; + // Byte position boundary within the pre-execution reply block + size_t byte_offset; +} preExecutionOffsetPosition; + +typedef struct clientDurabilityInfo { + // Blocked client responses list for consistency + struct list *blocked_responses; + + /* Pre-execution data recorded before a command is executed + * to record the boundaries of the COB. */ + preExecutionOffsetPosition offset; + + // Replication offset to block this current command response + long long current_command_repl_offset; +} clientDurableInfo; + +/** + * Init + */ +void durableInit(void); +void durableClientInit(struct client *c); +void durableClientReset(struct client *c); +/* + Command processing hooks for offset and cob tracking +*/ +void preCall(void); +void postCall(struct client *c); +int preCommandExec(struct client *c); +void postCommandExec(struct client *c); +void postReplicaAck(void); + +/* + Utils +*/ +int isPrimaryDurabilityEnabled(void); +bool isClientReplyBufferLimited(struct client *c); +long long durablePurgeAndGetUncommittedKeyOffset(const sds key, struct serverDb *db); +// TODO: naming of these flags. +int isDurabilityEnabled(void); +void clearUncommittedKeysAcknowledged(void); +// TODO: +// preReplyToBlockedClient +// for streams and timeounts, when a blocked client is being unblocked +// before a reply is added, the command will not be reprocessed via processCommand() +// we should hook this to get pre-execution offsets + + + + +#endif /* DURABLE_WRITE_H */ diff --git a/src/networking.c b/src/networking.c index 4bcf0370e..d8c2b5278 100644 --- a/src/networking.c +++ b/src/networking.c @@ -27,6 +27,7 @@ * POSSIBILITY OF SUCH DAMAGE. */ +#include "durable_write.h" #include "server.h" #include "cluster.h" #include "cluster_slot_stats.h" @@ -367,6 +368,14 @@ client *createClient(connection *conn) { c->io_last_written.buf = NULL; c->io_last_written.bufpos = 0; c->io_last_written.data_len = 0; + + // init durability info like + // key blocking on primary + // TODO: this probably doesn't need to be a separate function + // just makes it a bit easier to review the POC with all related functionality + // together + durableClientInit(c); + return c; } @@ -1668,6 +1677,18 @@ void copyReplicaOutputBuffer(client *dst, client *src) { /* Return true if the specified client has pending reply buffers to write to * the socket. */ int clientHasPendingReplies(client *c) { + if (isClientReplyBufferLimited(c)) { + // Check if our first allowed reply boundary is in a position that comes + // after the current position that valkey has written up to in the COB. + const blockedResponse *n = listNodeValue(listFirst(c->clientDurabilityInfo.blocked_responses)); + if ((c->bufpos && n->disallowed_reply_block == NULL) || + (c->bufpos == 0 && n->disallowed_reply_block != NULL && listFirst(c->reply) == n->disallowed_reply_block)) { + // Both positions are pointing both at the initial 16KB buffer or the + // first reply block, compare the sentlen with the last allowed byte offset + return c->io_last_written.data_len < n->disallowed_byte_offset; + } + } + if (getClientType(c) == CLIENT_TYPE_REPLICA) { /* Replicas use global shared replication buffer instead of * private output buffer. */ @@ -1866,6 +1887,8 @@ void unlinkClient(client *c) { /* Wait for IO operations to be done before unlinking the client. */ waitForClientIO(c); + durableClientReset(c); + /* If this is marked as current client unset it. */ if (c->conn && server.current_client == c) server.current_client = NULL; diff --git a/src/object.c b/src/object.c index 13efeb502..94bc65c7b 100644 --- a/src/object.c +++ b/src/object.c @@ -1102,6 +1102,16 @@ int getPositiveLongFromObjectOrReply(client *c, robj *o, long *target, const cha } } +int getIntFromObject(robj *o, int *target) { + long long value; + + if (getLongLongFromObject(o, &value) != C_OK) return C_ERR; + if (value < INT_MIN || value > INT_MAX) return C_ERR; + + *target = value; + return C_OK; +} + int getIntFromObjectOrReply(client *c, robj *o, int *target, const char *msg) { long value; diff --git a/src/replication.c b/src/replication.c index 631c0d234..41745f60e 100644 --- a/src/replication.c +++ b/src/replication.c @@ -1421,6 +1421,9 @@ void replconfCommand(client *c) { if (c->repl_data->repl_state == REPLICA_STATE_BG_RDB_LOAD) { replicaPutOnline(c); } + + // Process all clients waiting ACK from a quorum + postReplicaAck(); /* Note: this command does not reply anything! */ return; } else if (!strcasecmp(c->argv[j]->ptr, "getack")) { diff --git a/src/server.c b/src/server.c index ab0719398..e1dbb8f73 100644 --- a/src/server.c +++ b/src/server.c @@ -33,6 +33,7 @@ */ #include "server.h" #include "connection.h" +#include "durable_write.h" #include "monotonic.h" #include "cluster.h" #include "cluster_slot_stats.h" @@ -1694,6 +1695,7 @@ long long serverCron(struct aeEventLoop *eventLoop, long long id, void *clientDa run_with_period(100) modulesCron(); } + run_with_period(1000) clearUncommittedKeysAcknowledged(); /* Fire the cron loop modules event. */ ValkeyModuleCronLoopV1 ei = {VALKEYMODULE_CRON_LOOP_VERSION, server.hz}; moduleFireServerEvent(VALKEYMODULE_EVENT_CRON_LOOP, 0, &ei); @@ -2815,6 +2817,10 @@ serverDb *createDatabase(int id) { db->ready_keys = dictCreate(&objectKeyPointerValueDictType); db->watched_keys = dictCreate(&keylistDictType); db->id = id; + + db->uncommitted_keys = raxNew(); + db->dirty_repl_offset = -1; + db->scan_in_progress = 0; resetDbExpiryState(db); return db; } @@ -3035,7 +3041,7 @@ void initServer(void) { commandlogInit(); latencyMonitorInit(); initSharedQueryBuf(); - + durableInit(); /* Initialize ACL default password if it exists */ ACLUpdateDefaultUserPassword(server.requirepass); @@ -3785,6 +3791,7 @@ void call(client *c, int flags) { struct ClientFlags client_old_flags = c->flag; struct serverCommand *real_cmd = c->realcmd; + preCall(); client *prev_client = server.executing_client; server.executing_client = c; @@ -3982,7 +3989,9 @@ void call(client *c, int flags) { if (zmalloc_used > server.stat_peak_memory) server.stat_peak_memory = zmalloc_used; /* Do some maintenance job and cleanup */ + // TODO: should blocking postCall could be moved into afterCommand? afterCommand(c); + postCall(c); /* Remember the replication offset of the client, right after its last * command that resulted in propagation. */ @@ -4516,8 +4525,12 @@ int processCommand(client *c) { queueMultiCommand(c, cmd_flags); addReply(c, shared.queued); } else { + if (preCommandExec(c) == CMD_FILTER_REJECT) { + return C_OK; + } int flags = CMD_CALL_FULL; call(c, flags); + postCommandExec(c); if (listLength(server.ready_keys) && !isInsideYieldingLongCommand()) handleClientsBlockedOnKeys(); } return C_OK; diff --git a/src/server.h b/src/server.h index 1413712b0..388f05f32 100644 --- a/src/server.h +++ b/src/server.h @@ -37,6 +37,7 @@ #include "commands.h" #include "allocator_defrag.h" +#include #include #include #include @@ -52,7 +53,7 @@ #include #include #include - +#include "durable_write.h" #ifdef HAVE_LIBSYSTEMD #include #endif @@ -868,6 +869,7 @@ typedef struct replBufBlock { char buf[]; } replBufBlock; + /* Database representation. There are multiple databases identified * by integers from 0 (the default database) up to the max configured * database. The database number is the 'id' field in the structure. */ @@ -886,6 +888,13 @@ typedef struct serverDb { long long avg_ttl; /* Average TTL, just for stats */ unsigned long cursor; /* Cursor of the active expire cycle. */ } expiry[ACTIVE_EXPIRY_TYPE_COUNT]; + + /* fields related to dirty key tracking + * for consistent writes with durability */ + rax *uncommitted_keys; /* Map of dirty keys to the offset required by replica acknowledgement */ + long long dirty_repl_offset; /* Replication offset for a dirty DB */ + raxIterator next_scan_iter; /* The next iterator for db scan */ + int scan_in_progress; /* Flag of showing whether db is in scan or not */ } serverDb; /* forward declaration for functions ctx */ @@ -1168,6 +1177,8 @@ typedef struct ClientFlags { or client::buf. */ uint64_t keyspace_notified : 1; /* Indicates that a keyspace notification was triggered during the execution of the current command. */ + uint64_t durable_blocked_client: 1; /* This is a durable blocked client that is waiting for the server to + * acknowledge the write of the command that caused it to be blocked. */ } ClientFlags; typedef struct ClientPubSubData { @@ -1366,6 +1377,7 @@ typedef struct client { #ifdef LOG_REQ_RES clientReqResInfo reqres; #endif + struct clientDurabilityInfo clientDurabilityInfo; } client; /* When a command generates a lot of discrete elements to the client output buffer, it is much faster to @@ -1665,6 +1677,7 @@ typedef enum childInfoType { } childInfoType; struct valkeyServer { + durable_t durability; /* General */ pid_t pid; /* Main process pid. */ pthread_t main_thread_id; /* Main thread id */ @@ -2927,6 +2940,9 @@ int processIOThreadsWriteDone(void); void releaseReplyReferences(client *c); void resetLastWrittenBuf(client *c); +//TODO:jules move this elsewhere +int getIntFromObject(robj *o, int *target); + int parseExtendedCommandArgumentsOrReply(client *c, int *flags, int *unit, robj **expire, robj **compare_val, int command_type, int max_args); /* logreqres.c - logging of requests and responses */