mirror of https://github.com/valkey-io/valkey
Merge 6e953bec5d into 8ab0152bef
This commit is contained in:
commit
c2668d3099
100
src/t_stream.c
100
src/t_stream.c
|
|
@ -64,7 +64,7 @@ size_t streamReplyWithRangeFromConsumerPEL(client *c,
|
|||
streamConsumer *consumer);
|
||||
int streamParseStrictIDOrReply(client *c, robj *o, streamID *id, uint64_t missing_seq, int *seq_given);
|
||||
int streamParseIDOrReply(client *c, robj *o, streamID *id, uint64_t missing_seq);
|
||||
|
||||
static long long streamEstimateDistance(stream *s, streamCG *cg, streamID *next_id);
|
||||
/* -----------------------------------------------------------------------
|
||||
* Low level stream encoding: a radix tree of listpacks.
|
||||
* ----------------------------------------------------------------------- */
|
||||
|
|
@ -1389,11 +1389,6 @@ int streamIDEqZero(streamID *id) {
|
|||
int streamRangeHasTombstones(stream *s, streamID *start, streamID *end) {
|
||||
streamID start_id, end_id;
|
||||
|
||||
if (!s->length || streamIDEqZero(&s->max_deleted_entry_id)) {
|
||||
/* The stream is empty or has no tombstones. */
|
||||
return 0;
|
||||
}
|
||||
|
||||
if (start) {
|
||||
start_id = *start;
|
||||
} else {
|
||||
|
|
@ -1418,39 +1413,56 @@ int streamRangeHasTombstones(stream *s, streamID *start, streamID *end) {
|
|||
return 0;
|
||||
}
|
||||
|
||||
/* Replies with a consumer group's current lag, that is the number of messages
|
||||
* in the stream that are yet to be delivered. In case that the lag isn't
|
||||
* available due to fragmentation, the reply to the client is a null. */
|
||||
void streamReplyWithCGLag(client *c, stream *s, streamCG *cg) {
|
||||
/* Replies with a consumer group's current lag, which is the number of messages in the stream
|
||||
* that are yet to be delivered. Additionally, it includes an entries-read field that indicates
|
||||
* the number of messages currently read. In case that the lag or entries-read isn't available
|
||||
* due to fragmentation, the reply to the client is null. */
|
||||
void streamReplyWithCGLagAndEntriesRead(client *c, stream *s, streamCG *cg) {
|
||||
int valid = 0;
|
||||
long long lag = 0;
|
||||
|
||||
if (!s->entries_added) {
|
||||
/* The lag of a newly-initialized stream is 0. */
|
||||
lag = 0;
|
||||
/* Attempt to retrieve the group's last ID logical read counter. */
|
||||
long long entries_read = streamEstimateDistance(s, cg, &cg->last_id);
|
||||
if (entries_read != SCG_INVALID_ENTRIES_READ) {
|
||||
/* A valid counter was obtained. */
|
||||
lag = (long long)s->entries_added - entries_read;
|
||||
valid = 1;
|
||||
} else if (cg->entries_read != SCG_INVALID_ENTRIES_READ && !streamRangeHasTombstones(s, &cg->last_id, NULL)) {
|
||||
/* No fragmentation ahead means that the group's logical reads counter
|
||||
* is valid for performing the lag calculation. */
|
||||
lag = (long long)s->entries_added - cg->entries_read;
|
||||
valid = 1;
|
||||
} else {
|
||||
/* Attempt to retrieve the group's last ID logical read counter. */
|
||||
long long entries_read = streamEstimateDistanceFromFirstEverEntry(s, &cg->last_id);
|
||||
if (entries_read != SCG_INVALID_ENTRIES_READ) {
|
||||
/* A valid counter was obtained. */
|
||||
lag = (long long)s->entries_added - entries_read;
|
||||
valid = 1;
|
||||
}
|
||||
}
|
||||
|
||||
if (valid) {
|
||||
/* Read counter of the last delivered ID */
|
||||
addReplyBulkCString(c, "entries-read");
|
||||
addReplyLongLong(c, entries_read);
|
||||
/* Group lag */
|
||||
addReplyBulkCString(c, "lag");
|
||||
addReplyLongLong(c, lag);
|
||||
} else {
|
||||
addReplyBulkCString(c, "entries-read");
|
||||
addReplyNull(c);
|
||||
addReplyBulkCString(c, "lag");
|
||||
addReplyNull(c);
|
||||
}
|
||||
}
|
||||
|
||||
/* The function returns the logical read counter corresponding to next_id
|
||||
* based on the information of the group.
|
||||
*/
|
||||
static long long streamEstimateDistance(stream *s, streamCG *cg, streamID *next_id) {
|
||||
/* If the values of next_id and last_id are the same,
|
||||
* it is considered that only the current value needs to be returned,
|
||||
* otherwise it is considered to be the calculated value.
|
||||
* This is used to align with the streamEstimateDistanceFromFirstEverEntry method.
|
||||
*/
|
||||
long long step = streamCompareID(&cg->last_id, next_id) == 0 ? 0 : 1;
|
||||
if (cg->entries_read != SCG_INVALID_ENTRIES_READ && !streamRangeHasTombstones(s, &cg->last_id, NULL)) {
|
||||
/* A valid counter and no future tombstones mean we can
|
||||
* increment the read counter to keep tracking the group's
|
||||
* progress. */
|
||||
return cg->entries_read + step;
|
||||
}
|
||||
return streamEstimateDistanceFromFirstEverEntry(s, next_id);
|
||||
}
|
||||
|
||||
/* This function returns a value that is the ID's logical read counter, or its
|
||||
* distance (the number of entries) from the first entry ever to have been added
|
||||
* to the stream.
|
||||
|
|
@ -1485,11 +1497,6 @@ long long streamEstimateDistanceFromFirstEverEntry(stream *s, streamID *id) {
|
|||
return s->entries_added;
|
||||
}
|
||||
|
||||
if (!streamIDEqZero(id) && streamCompareID(id, &s->max_deleted_entry_id) < 0) {
|
||||
/* The ID is before the last tombstone, so the counter is unknown. */
|
||||
return SCG_INVALID_ENTRIES_READ;
|
||||
}
|
||||
|
||||
int cmp_last = streamCompareID(id, &s->last_id);
|
||||
if (cmp_last == 0) {
|
||||
/* Return the exact counter of the last entry in the stream. */
|
||||
|
|
@ -1678,16 +1685,7 @@ size_t streamReplyWithRange(client *c,
|
|||
while (streamIteratorGetID(&si, &id, &numfields)) {
|
||||
/* Update the group last_id if needed. */
|
||||
if (group && streamCompareID(&id, &group->last_id) > 0) {
|
||||
if (group->entries_read != SCG_INVALID_ENTRIES_READ &&
|
||||
streamCompareID(&group->last_id, &s->first_id) >= 0 &&
|
||||
!streamRangeHasTombstones(s, &group->last_id, NULL)) {
|
||||
/* A valid counter and no tombstones in the group's last-delivered-id and the stream's last-generated-id,
|
||||
* we can increment the read counter to keep tracking the group's progress. */
|
||||
group->entries_read++;
|
||||
} else if (s->entries_added) {
|
||||
/* The group's counter may be invalid, so we try to obtain it. */
|
||||
group->entries_read = streamEstimateDistanceFromFirstEverEntry(s, &id);
|
||||
}
|
||||
group->entries_read = streamEstimateDistance(s, group, &id);
|
||||
group->last_id = id;
|
||||
/* In the past, we would only set it when NOACK was specified. And in
|
||||
* #9127, XCLAIM did not propagate entries_read in ACK, which would
|
||||
|
|
@ -3691,16 +3689,7 @@ void xinfoReplyWithStreamInfo(client *c, stream *s) {
|
|||
addReplyStreamID(c, &cg->last_id);
|
||||
|
||||
/* Read counter of the last delivered ID */
|
||||
addReplyBulkCString(c, "entries-read");
|
||||
if (cg->entries_read != SCG_INVALID_ENTRIES_READ) {
|
||||
addReplyLongLong(c, cg->entries_read);
|
||||
} else {
|
||||
addReplyNull(c);
|
||||
}
|
||||
|
||||
/* Group lag */
|
||||
addReplyBulkCString(c, "lag");
|
||||
streamReplyWithCGLag(c, s, cg);
|
||||
streamReplyWithCGLagAndEntriesRead(c, s, cg);
|
||||
|
||||
/* Group PEL count */
|
||||
addReplyBulkCString(c, "pel-count");
|
||||
|
|
@ -3887,14 +3876,7 @@ void xinfoCommand(client *c) {
|
|||
addReplyLongLong(c, raxSize(cg->pel));
|
||||
addReplyBulkCString(c, "last-delivered-id");
|
||||
addReplyStreamID(c, &cg->last_id);
|
||||
addReplyBulkCString(c, "entries-read");
|
||||
if (cg->entries_read != SCG_INVALID_ENTRIES_READ) {
|
||||
addReplyLongLong(c, cg->entries_read);
|
||||
} else {
|
||||
addReplyNull(c);
|
||||
}
|
||||
addReplyBulkCString(c, "lag");
|
||||
streamReplyWithCGLag(c, s, cg);
|
||||
streamReplyWithCGLagAndEntriesRead(c, s, cg);
|
||||
}
|
||||
raxStop(&ri);
|
||||
} else if (!strcasecmp(opt, "STREAM")) {
|
||||
|
|
|
|||
|
|
@ -1065,7 +1065,7 @@ start_server {
|
|||
set group [lindex [dict get $reply groups] 0]
|
||||
assert_equal [dict get $reply max-deleted-entry-id] "0-0"
|
||||
assert_equal [dict get $reply entries-added] 0
|
||||
assert_equal [dict get $group entries-read] {}
|
||||
assert_equal [dict get $group entries-read] 0
|
||||
assert_equal [dict get $group lag] 0
|
||||
|
||||
r XADD x 1-0 data a
|
||||
|
|
@ -1075,7 +1075,7 @@ start_server {
|
|||
set group [lindex [dict get $reply groups] 0]
|
||||
assert_equal [dict get $reply max-deleted-entry-id] "1-0"
|
||||
assert_equal [dict get $reply entries-added] 1
|
||||
assert_equal [dict get $group entries-read] {}
|
||||
assert_equal [dict get $group entries-read] 1
|
||||
assert_equal [dict get $group lag] 0
|
||||
}
|
||||
|
||||
|
|
@ -1090,7 +1090,7 @@ start_server {
|
|||
|
||||
set reply [r XINFO STREAM x FULL]
|
||||
set group [lindex [dict get $reply groups] 0]
|
||||
assert_equal [dict get $group entries-read] {}
|
||||
assert_equal [dict get $group entries-read] 0
|
||||
assert_equal [dict get $group lag] 5
|
||||
|
||||
r XREADGROUP GROUP g1 c11 COUNT 1 STREAMS x >
|
||||
|
|
@ -1121,7 +1121,7 @@ start_server {
|
|||
# so the lag can't be calculated
|
||||
set reply [r XINFO STREAM x FULL]
|
||||
set group [lindex [dict get $reply groups] 0]
|
||||
assert_equal [dict get $group entries-read] 8
|
||||
assert_equal [dict get $group entries-read] {}
|
||||
assert_equal [dict get $group lag] {}
|
||||
|
||||
r XREADGROUP GROUP g1 c12 COUNT 1 STREAMS x >
|
||||
|
|
@ -1183,7 +1183,7 @@ start_server {
|
|||
assert_equal [dict get $group entries-read] 5
|
||||
assert_equal [dict get $group lag] 1
|
||||
set group [lindex [dict get $reply groups] 1]
|
||||
assert_equal [dict get $group entries-read] {}
|
||||
assert_equal [dict get $group entries-read] 3
|
||||
assert_equal [dict get $group lag] 3
|
||||
|
||||
r XTRIM x MINID = 5-0
|
||||
|
|
@ -1192,7 +1192,7 @@ start_server {
|
|||
assert_equal [dict get $group entries-read] 5
|
||||
assert_equal [dict get $group lag] 1
|
||||
set group [lindex [dict get $reply groups] 1]
|
||||
assert_equal [dict get $group entries-read] {}
|
||||
assert_equal [dict get $group entries-read] 4
|
||||
assert_equal [dict get $group lag] 2
|
||||
}
|
||||
|
||||
|
|
@ -1249,17 +1249,17 @@ start_server {
|
|||
assert_equal [dict get $group entries-read] 5
|
||||
assert_equal [dict get $group lag] 2
|
||||
set group [lindex [dict get $reply groups] 1]
|
||||
assert_equal [dict get $group entries-read] {}
|
||||
assert_equal [dict get $group entries-read] 6
|
||||
assert_equal [dict get $group lag] 1
|
||||
|
||||
r XREADGROUP GROUP g1 c11 STREAMS x >
|
||||
set reply [r XINFO STREAM x FULL]
|
||||
set group [lindex [dict get $reply groups] 0]
|
||||
assert_equal [dict get $group entries-read] 7
|
||||
assert_equal [dict get $group lag] 0
|
||||
assert_equal [dict get $group entries-read] 6
|
||||
assert_equal [dict get $group lag] 1
|
||||
}
|
||||
|
||||
test {Consumer group lag with XADD trimming} {
|
||||
test {Consumer group lag with XADD trimming} {
|
||||
r DEL x
|
||||
r XADD x 1-0 data a
|
||||
r XADD x 2-0 data b
|
||||
|
|
@ -1305,6 +1305,9 @@ start_server {
|
|||
set group [lindex [dict get $reply groups] 0]
|
||||
assert_equal [dict get $group entries-read] 5
|
||||
assert_equal [dict get $group lag] 2
|
||||
set group [lindex [dict get $reply groups] 1]
|
||||
assert_equal [dict get $group entries-read] {}
|
||||
assert_equal [dict get $group lag] {}
|
||||
|
||||
r XADD x MINID = 7-0 8-0 data h
|
||||
set reply [r XINFO STREAM x FULL]
|
||||
|
|
@ -1312,16 +1315,96 @@ start_server {
|
|||
assert_equal [dict get $group entries-read] 5
|
||||
assert_equal [dict get $group lag] 3
|
||||
set group [lindex [dict get $reply groups] 1]
|
||||
assert_equal [dict get $group entries-read] {}
|
||||
assert_equal [dict get $group entries-read] 6
|
||||
assert_equal [dict get $group lag] 2
|
||||
|
||||
r XREADGROUP GROUP g1 c11 STREAMS x >
|
||||
set reply [r XINFO STREAM x FULL]
|
||||
set group [lindex [dict get $reply groups] 0]
|
||||
assert_equal [dict get $group entries-read] 8
|
||||
assert_equal [dict get $group entries-read] 7
|
||||
assert_equal [dict get $group lag] 1
|
||||
}
|
||||
|
||||
test {Consumer group lag with with tombstone} {
|
||||
r DEL x
|
||||
r XGROUP CREATE x processing $ MKSTREAM
|
||||
r XADD x 0-1 name Mercury
|
||||
r XREADGROUP GROUP processing alice STREAMS x >
|
||||
r XADD x 0-2 name Venus
|
||||
r XADD x 0-3 name Earth
|
||||
r XADD x 0-4 name Jupiter
|
||||
r XADD x 0-5 name Jupiter
|
||||
|
||||
set reply [r XINFO STREAM x FULL]
|
||||
set group [lindex [dict get $reply groups] 0]
|
||||
assert_equal [dict get $group entries-read] 1
|
||||
assert_equal [dict get $group lag] 4
|
||||
|
||||
r XDEL x 0-1
|
||||
set reply [r XINFO STREAM x FULL]
|
||||
set group [lindex [dict get $reply groups] 0]
|
||||
assert_equal [dict get $group entries-read] 1
|
||||
assert_equal [dict get $group lag] 4
|
||||
|
||||
r XDEL x 0-2
|
||||
set reply [r XINFO STREAM x FULL]
|
||||
set group [lindex [dict get $reply groups] 0]
|
||||
assert_equal [dict get $group entries-read] 2
|
||||
assert_equal [dict get $group lag] 3
|
||||
|
||||
r XDEL x 0-3
|
||||
set reply [r XINFO STREAM x FULL]
|
||||
set group [lindex [dict get $reply groups] 0]
|
||||
assert_equal [dict get $group entries-read] 3
|
||||
assert_equal [dict get $group lag] 2
|
||||
|
||||
r XDEL x 0-4
|
||||
set reply [r XINFO STREAM x FULL]
|
||||
set group [lindex [dict get $reply groups] 0]
|
||||
assert_equal [dict get $group entries-read] 4
|
||||
assert_equal [dict get $group lag] 1
|
||||
|
||||
r XDEL x 0-5
|
||||
set reply [r XINFO STREAM x FULL]
|
||||
set group [lindex [dict get $reply groups] 0]
|
||||
assert_equal [dict get $group entries-read] 5
|
||||
assert_equal [dict get $group lag] 0
|
||||
}
|
||||
|
||||
test {Consumer group check lag and entries-read consistency} {
|
||||
r DEL x
|
||||
r XGROUP CREATE x processing $ MKSTREAM
|
||||
r XGROUP CREATE x processing1 $ MKSTREAM
|
||||
r XADD x 0-1 name Mercury
|
||||
r XADD x 0-2 name Venus
|
||||
r XADD x 0-3 name Earth
|
||||
r XADD x 0-4 name Jupiter
|
||||
|
||||
r XREADGROUP GROUP processing alice COUNT 2 STREAMS x >
|
||||
r XDEL x 0-3
|
||||
|
||||
set reply [r XINFO STREAM x FULL]
|
||||
set group [lindex [dict get $reply groups] 0]
|
||||
assert_equal [dict get $group entries-read] {}
|
||||
assert_equal [dict get $group lag] {}
|
||||
|
||||
r DEL x
|
||||
r XGROUP CREATE x processing $ MKSTREAM
|
||||
r XGROUP CREATE x processing1 $ MKSTREAM
|
||||
r XADD x 0-1 name Mercury
|
||||
r XADD x 0-2 name Venus
|
||||
r XADD x 0-3 name Earth
|
||||
r XADD x 0-4 name Jupiter
|
||||
|
||||
r XDEL x 0-3
|
||||
r XREADGROUP GROUP processing alice COUNT 2 STREAMS x >
|
||||
|
||||
set reply [r XINFO STREAM x FULL]
|
||||
set group [lindex [dict get $reply groups] 0]
|
||||
assert_equal [dict get $group entries-read] {}
|
||||
assert_equal [dict get $group lag] {}
|
||||
}
|
||||
|
||||
test {Loading from legacy (Redis <= v6.2.x, rdb_ver < 10) persistence} {
|
||||
# The payload was DUMPed from a v5 instance after:
|
||||
# XADD x 1-0 data a
|
||||
|
|
|
|||
Loading…
Reference in New Issue