Cluster: Optimize slot bitmap iteration from per-bit to 64-bit word scan (#2781)

In functions `clusterSendFailoverAuthIfNeeded` and
`clusterProcessPacket`, we iterate through **every slot bit**
sequentially in the form of `for (int j = 0; j < CLUSTER_SLOTS; j++)`,
performing 16384 checks even when only a few bits were set, and thus
causing unnecessary loop overhead.

This is particularly wasteful in function
`clusterSendFailoverAuthIfNeeded` where we need to ensure the sender's
claimed slots all have up-to-date config epoch. Usually healthy senders
would meet such condition, and thus we normally need to exhaust the for
loop of 16384 checks.

The proposed new implementation loads 64 bits (8 byte word) at a time
and skips empty words completely, therefore only performing 256 checks.

---------

Signed-off-by: Zhijun <dszhijun@gmail.com>
This commit is contained in:
Zhijun Liao 2025-11-26 21:52:17 +08:00 committed by GitHub
parent 56ab3c4a81
commit faac14ab9c
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
1 changed files with 56 additions and 30 deletions

View File

@ -181,6 +181,9 @@ static inline char *clusterLinkGetHumanNodeName(clusterLink *link) {
#define isSlotUnclaimed(slot) \
(server.cluster->slots[slot] == NULL || bitmapTestBit(server.cluster->owner_not_claiming_slot, slot))
/* Treating slot bitmaps as 8-byte words to speedup iteration */
#define CLUSTER_SLOT_WORDS (CLUSTER_SLOTS / 64)
#define SLOT_WORD_OFFSET(w) ((w) << 3)
#define RCVBUF_INIT_LEN 1024
#define RCVBUF_MIN_READ_LEN 14
@ -3619,6 +3622,16 @@ int clusterIsValidPacket(clusterLink *link) {
return 1;
}
/* When iterating through the slot bitmap, group every 64 bits as
* a word to speedup. */
static inline int clusterExtractSlotFromWord(uint64_t *slot_word, size_t slot_word_index) {
/* Get the index of the least-significant set bit, in this 64-bit word */
const unsigned bit = (unsigned)__builtin_ctzll(*slot_word);
const int slot = (int)((slot_word_index << 6) | bit);
*slot_word &= *slot_word - 1; /* clear that bit */
return slot;
}
/* When this function is called, there is a packet to process starting
* at link->rcvbuf. Releasing the buffer is up to the caller, so this
* function should just handle the higher level stuff of processing the
@ -4139,20 +4152,27 @@ int clusterProcessPacket(clusterLink *link) {
* new configuration, so other nodes that have an updated table must
* do it. In this way A will stop to act as a primary (or can try to
* failover if there are the conditions to win the election). */
for (int j = 0; j < CLUSTER_SLOTS; j++) {
if (bitmapTestBit(hdr->myslots, j)) {
if (server.cluster->slots[j] == sender || isSlotUnclaimed(j)) continue;
if (server.cluster->slots[j]->configEpoch > sender_claimed_config_epoch) {
bool found_new_owner = false;
for (size_t w = 0; w < CLUSTER_SLOT_WORDS && !found_new_owner; w++) {
uint64_t word;
memcpy(&word, hdr->myslots + SLOT_WORD_OFFSET(w), sizeof(word));
while (word) {
const int slot = clusterExtractSlotFromWord(&word, w);
clusterNode *slot_owner = server.cluster->slots[slot];
if (slot_owner == sender || isSlotUnclaimed(slot)) continue;
if (slot_owner->configEpoch > sender_claimed_config_epoch) {
serverLog(LL_VERBOSE,
"Node %.40s (%s) has old slots configuration, sending "
"an UPDATE message about %.40s (%s)",
sender->name, sender->human_nodename,
server.cluster->slots[j]->name, server.cluster->slots[j]->human_nodename);
clusterSendUpdate(sender->link, server.cluster->slots[j]);
slot_owner->name, slot_owner->human_nodename);
clusterSendUpdate(sender->link, slot_owner);
/* TODO: instead of exiting the loop send every other
* UPDATE packet for other nodes that are the new owner
* of sender's slots. */
found_new_owner = true;
break;
}
}
@ -5050,7 +5070,8 @@ void clusterSendFailoverAuthIfNeeded(clusterNode *node, clusterMsg *request) {
uint64_t requestConfigEpoch = ntohu64(request->configEpoch);
unsigned char *claimed_slots = request->myslots;
int force_ack = request->mflags[0] & CLUSTERMSG_FLAG0_FORCEACK;
int j;
int slot;
clusterNode *slot_owner;
/* IF we are not a primary serving at least 1 slot, we don't have the
* right to vote, as the cluster size is the number
@ -5096,18 +5117,23 @@ void clusterSendFailoverAuthIfNeeded(clusterNode *node, clusterMsg *request) {
/* The replica requesting the vote must have a configEpoch for the claimed
* slots that is >= the one of the primaries currently serving the same
* slots in the current configuration. */
for (j = 0; j < CLUSTER_SLOTS; j++) {
if (bitmapTestBit(claimed_slots, j) == 0) continue;
if (isSlotUnclaimed(j) || server.cluster->slots[j]->configEpoch <= requestConfigEpoch) {
for (size_t w = 0; w < CLUSTER_SLOT_WORDS; w++) {
uint64_t word;
memcpy(&word, claimed_slots + SLOT_WORD_OFFSET(w), sizeof(word));
while (word) {
slot = clusterExtractSlotFromWord(&word, w);
if (isSlotUnclaimed(slot) || server.cluster->slots[slot]->configEpoch <= requestConfigEpoch) {
continue;
}
slot_owner = server.cluster->slots[slot];
/* If we reached this point we found a slot that in our current slots
* is served by a primary with a greater configEpoch than the one claimed
* by the replica requesting our vote. Refuse to vote for this replica. */
serverLog(LL_WARNING,
"Failover auth denied to %.40s (%s): "
"slot %d epoch (%llu) > reqConfigEpoch (%llu)",
node->name, node->human_nodename, j, (unsigned long long)server.cluster->slots[j]->configEpoch,
node->name, node->human_nodename, slot, (unsigned long long)slot_owner->configEpoch,
(unsigned long long)requestConfigEpoch);
/* Send an UPDATE message to the replica. After receiving the UPDATE message,
@ -5116,11 +5142,11 @@ void clusterSendFailoverAuthIfNeeded(clusterNode *node, clusterMsg *request) {
serverLog(LL_VERBOSE,
"Node %.40s (%s) has old slots configuration, sending "
"an UPDATE message about %.40s (%s)",
node->name, node->human_nodename,
server.cluster->slots[j]->name, server.cluster->slots[j]->human_nodename);
clusterSendUpdate(node->link, server.cluster->slots[j]);
node->name, node->human_nodename, slot_owner->name, slot_owner->human_nodename);
clusterSendUpdate(node->link, slot_owner);
return;
}
}
/* We can vote for this replica. */
server.cluster->lastVoteEpoch = server.cluster->currentEpoch;