mirror of https://github.com/valkey-io/valkey
Merge dee4e7b5b6 into 6b60e6bfc7
This commit is contained in:
commit
57e0d16235
|
|
@ -401,7 +401,7 @@ void migrateCloseSocket(robj *host, robj *port) {
|
|||
sdsfree(name);
|
||||
}
|
||||
|
||||
void migrateCloseTimedoutSockets(void) {
|
||||
void migrateCloseTimedOutSockets(void) {
|
||||
dictIterator *di = dictGetSafeIterator(server.migrate_cached_sockets);
|
||||
dictEntry *de;
|
||||
|
||||
|
|
|
|||
|
|
@ -136,7 +136,7 @@ int clusterSlotByCommand(struct serverCommand *cmd, robj **argv, int argc, int *
|
|||
clusterNode *getNodeByQuery(client *c, int *error_code);
|
||||
int clusterRedirectBlockedClientIfNeeded(client *c);
|
||||
void clusterRedirectClient(client *c, clusterNode *n, int hashslot, int error_code);
|
||||
void migrateCloseTimedoutSockets(void);
|
||||
void migrateCloseTimedOutSockets(void);
|
||||
unsigned int keyHashSlot(char *key, int keylen);
|
||||
int patternHashSlot(char *pattern, int length);
|
||||
int isValidAuxString(char *s, unsigned int length);
|
||||
|
|
|
|||
|
|
@ -3087,7 +3087,7 @@ void clusterUpdateSlotsConfigWith(clusterNode *sender, uint64_t senderConfigEpoc
|
|||
/* A primary reason why we are here is likely due to my primary crashing during the
|
||||
* slot finalization process, leading me to become the new primary without
|
||||
* inheriting the slot ownership, while the source shard continued and relinquished
|
||||
* theslot to its old primary. Under such circumstances, the node would undergo
|
||||
* the slot to its old primary. Under such circumstances, the node would undergo
|
||||
* an election and have its config epoch increased with consensus. That said, we
|
||||
* will still explicitly bump the config epoch here to be consistent with the
|
||||
* existing practice.
|
||||
|
|
|
|||
|
|
@ -165,7 +165,7 @@ int geohashBoundingBox(GeoShape *shape, double *bounds) {
|
|||
|
||||
/* Calculate a set of areas (center + 8) that are able to cover a range query
|
||||
* for the specified position and shape (see geohash.h GeoShape).
|
||||
* the bounding box saved in shaple.bounds */
|
||||
* the bounding box saved in shape.bounds */
|
||||
GeoHashRadius geohashCalculateAreasByShapeWGS84(GeoShape *shape) {
|
||||
GeoHashRange long_range, lat_range;
|
||||
GeoHashRadius radius;
|
||||
|
|
|
|||
|
|
@ -629,7 +629,7 @@ void latencyCommandReplyWithLatestEvents(client *c) {
|
|||
}
|
||||
|
||||
#define LATENCY_GRAPH_COLS 80
|
||||
sds latencyCommandGenSparkeline(char *event, struct latencyTimeSeries *ts) {
|
||||
sds latencyCommandGenSparkline(char *event, struct latencyTimeSeries *ts) {
|
||||
int j;
|
||||
struct sequence *seq = createSparklineSequence();
|
||||
sds graph = sdsempty();
|
||||
|
|
@ -703,7 +703,7 @@ void latencyCommand(client *c) {
|
|||
ts = dictGetVal(de);
|
||||
event = dictGetKey(de);
|
||||
|
||||
graph = latencyCommandGenSparkeline(event, ts);
|
||||
graph = latencyCommandGenSparkline(event, ts);
|
||||
addReplyVerbatim(c, graph, sdslen(graph), "txt");
|
||||
sdsfree(graph);
|
||||
} else if (!strcasecmp(c->argv[1]->ptr, "latest") && c->argc == 2) {
|
||||
|
|
|
|||
|
|
@ -89,7 +89,7 @@ static void freeCompiledFunc(lua_State *lua,
|
|||
|
||||
/*
|
||||
* Compile a given script code by generating a set of compiled functions. These
|
||||
* functions are also saved into the the registry of the Lua environment.
|
||||
* functions are also saved into the registry of the Lua environment.
|
||||
*
|
||||
* Returns an array of compiled functions. The `compileFunction` struct stores a
|
||||
* Lua ref that allows to later retrieve the function from the registry.
|
||||
|
|
|
|||
|
|
@ -112,7 +112,7 @@ int HelloACL_Reply(ValkeyModuleCtx *ctx, ValkeyModuleString **argv, int argc) {
|
|||
int HelloACL_Timeout(ValkeyModuleCtx *ctx, ValkeyModuleString **argv, int argc) {
|
||||
VALKEYMODULE_NOT_USED(argv);
|
||||
VALKEYMODULE_NOT_USED(argc);
|
||||
return ValkeyModule_ReplyWithSimpleString(ctx, "Request timedout");
|
||||
return ValkeyModule_ReplyWithSimpleString(ctx, "Request timed out");
|
||||
}
|
||||
|
||||
/* Private data frees data for HELLOACL.AUTHASYNC command. */
|
||||
|
|
|
|||
|
|
@ -49,7 +49,7 @@ int HelloBlock_Reply(ValkeyModuleCtx *ctx, ValkeyModuleString **argv, int argc)
|
|||
int HelloBlock_Timeout(ValkeyModuleCtx *ctx, ValkeyModuleString **argv, int argc) {
|
||||
VALKEYMODULE_NOT_USED(argv);
|
||||
VALKEYMODULE_NOT_USED(argc);
|
||||
return ValkeyModule_ReplyWithSimpleString(ctx, "Request timedout");
|
||||
return ValkeyModule_ReplyWithSimpleString(ctx, "Request timed out");
|
||||
}
|
||||
|
||||
/* Private data freeing callback for HELLO.BLOCK command. */
|
||||
|
|
|
|||
|
|
@ -5292,7 +5292,7 @@ void clientUnblockCommand(client *c) {
|
|||
/* Note that we never try to unblock a client blocked on a module command,
|
||||
* or a client blocked by CLIENT PAUSE or some other blocking type which
|
||||
* doesn't have a timeout callback (even in the case of UNBLOCK ERROR).
|
||||
* The reason is that we assume that if a command doesn't expect to be timedout,
|
||||
* The reason is that we assume that if a command doesn't expect to be timed out,
|
||||
* it also doesn't expect to be unblocked by CLIENT UNBLOCK */
|
||||
if (target && target->flag.blocked && blockedClientMayTimeout(target)) {
|
||||
if (unblock_error)
|
||||
|
|
|
|||
|
|
@ -838,7 +838,7 @@ int raxGenericInsert(rax *rax, unsigned char *s, size_t len, void *data, void **
|
|||
size_t oldalloc = rax_ptr_alloc_size(h);
|
||||
|
||||
/* If this node is going to have a single child, and there
|
||||
* are other characters, so that that would result in a chain
|
||||
* are other characters, so that would result in a chain
|
||||
* of single-childed nodes, turn it into a compressed node. */
|
||||
if (h->size == 0 && len - i > 1) {
|
||||
debugf("Inserting compressed node\n");
|
||||
|
|
@ -1045,7 +1045,7 @@ int raxRemove(rax *rax, unsigned char *s, size_t len, void **old) {
|
|||
rax_free(child);
|
||||
rax->numnodes--;
|
||||
h = raxStackPop(&ts);
|
||||
/* If this node has more then one child, or actually holds
|
||||
/* If this node has more than one child, or actually holds
|
||||
* a key, stop here. */
|
||||
if (h->iskey || (!h->iscompr && h->size != 1)) break;
|
||||
}
|
||||
|
|
|
|||
|
|
@ -5231,7 +5231,7 @@ void replicationCron(void) {
|
|||
}
|
||||
}
|
||||
|
||||
/* Disconnect timedout replicas. */
|
||||
/* Disconnect timed out replicas. */
|
||||
if (listLength(server.replicas)) {
|
||||
listIter li;
|
||||
listNode *ln;
|
||||
|
|
|
|||
20
src/script.c
20
src/script.c
|
|
@ -45,24 +45,24 @@ scriptFlag scripts_flags_def[] = {
|
|||
/* On script invocation, holding the current run context */
|
||||
static scriptRunCtx *curr_run_ctx = NULL;
|
||||
|
||||
static void exitScriptTimedoutMode(scriptRunCtx *run_ctx) {
|
||||
static void exitScriptTimedOutMode(scriptRunCtx *run_ctx) {
|
||||
serverAssert(run_ctx == curr_run_ctx);
|
||||
serverAssert(scriptIsTimedout());
|
||||
serverAssert(scriptIsTimedOut());
|
||||
run_ctx->flags &= ~SCRIPT_TIMEDOUT;
|
||||
blockingOperationEnds();
|
||||
/* if we are a replica and we have an active primary, set it for continue processing */
|
||||
if (server.primary_host && server.primary) queueClientForReprocessing(server.primary);
|
||||
}
|
||||
|
||||
static void enterScriptTimedoutMode(scriptRunCtx *run_ctx) {
|
||||
static void enterScriptTimedOutMode(scriptRunCtx *run_ctx) {
|
||||
serverAssert(run_ctx == curr_run_ctx);
|
||||
serverAssert(!scriptIsTimedout());
|
||||
/* Mark script as timedout */
|
||||
serverAssert(!scriptIsTimedOut());
|
||||
/* Mark script as timed out */
|
||||
run_ctx->flags |= SCRIPT_TIMEDOUT;
|
||||
blockingOperationStarts();
|
||||
}
|
||||
|
||||
int scriptIsTimedout(void) {
|
||||
int scriptIsTimedOut(void) {
|
||||
return scriptIsRunning() && (curr_run_ctx->flags & SCRIPT_TIMEDOUT);
|
||||
}
|
||||
|
||||
|
|
@ -81,7 +81,7 @@ client *scriptGetCaller(void) {
|
|||
* and also check if the run should be terminated. */
|
||||
int scriptInterrupt(scriptRunCtx *run_ctx) {
|
||||
if (run_ctx->flags & SCRIPT_TIMEDOUT) {
|
||||
/* script already timedout
|
||||
/* script already timed out
|
||||
we just need to precess some events and return */
|
||||
processEventsWhileBlocked();
|
||||
return (run_ctx->flags & SCRIPT_KILLED) ? SCRIPT_KILL : SCRIPT_CONTINUE;
|
||||
|
|
@ -97,7 +97,7 @@ int scriptInterrupt(scriptRunCtx *run_ctx) {
|
|||
"You can try killing the script using the %s command. Script name is: %s.",
|
||||
elapsed, (run_ctx->flags & SCRIPT_EVAL_MODE) ? "SCRIPT KILL" : "FUNCTION KILL", run_ctx->funcname);
|
||||
|
||||
enterScriptTimedoutMode(run_ctx);
|
||||
enterScriptTimedOutMode(run_ctx);
|
||||
/* Once the script timeouts we reenter the event loop to permit others
|
||||
* some commands execution. For this reason
|
||||
* we need to mask the client executing the script from the event loop.
|
||||
|
|
@ -267,8 +267,8 @@ void scriptResetRun(scriptRunCtx *run_ctx) {
|
|||
/* After the script done, remove the MULTI state. */
|
||||
run_ctx->c->flag.multi = 0;
|
||||
|
||||
if (scriptIsTimedout()) {
|
||||
exitScriptTimedoutMode(run_ctx);
|
||||
if (scriptIsTimedOut()) {
|
||||
exitScriptTimedOutMode(run_ctx);
|
||||
/* Restore the client that was protected when the script timeout
|
||||
* was detected. */
|
||||
unprotectClient(run_ctx->original_client);
|
||||
|
|
|
|||
|
|
@ -61,7 +61,7 @@
|
|||
|
||||
/* runCtx flags */
|
||||
#define SCRIPT_WRITE_DIRTY (1ULL << 0) /* indicate that the current script already performed a write command */
|
||||
#define SCRIPT_TIMEDOUT (1ULL << 3) /* indicate that the current script timedout */
|
||||
#define SCRIPT_TIMEDOUT (1ULL << 3) /* indicate that the current script timed out */
|
||||
#define SCRIPT_KILLED (1ULL << 4) /* indicate that the current script was marked to be killed */
|
||||
#define SCRIPT_READ_ONLY (1ULL << 5) /* indicate that the current script should only perform read commands */
|
||||
#define SCRIPT_ALLOW_OOM (1ULL << 6) /* indicate to allow any command even if OOM reached */
|
||||
|
|
@ -114,7 +114,7 @@ void scriptKill(client *c, int is_eval);
|
|||
int scriptIsRunning(void);
|
||||
const char *scriptCurrFunction(void);
|
||||
int scriptIsEval(void);
|
||||
int scriptIsTimedout(void);
|
||||
int scriptIsTimedOut(void);
|
||||
client *scriptGetClient(void);
|
||||
client *scriptGetCaller(void);
|
||||
long long scriptRunDuration(void);
|
||||
|
|
|
|||
|
|
@ -273,7 +273,7 @@ struct sentinelState {
|
|||
int tilt; /* Are we in TILT mode? */
|
||||
int total_tilt; /* Number of tilt. */
|
||||
int running_scripts; /* Number of scripts in execution right now. */
|
||||
mstime_t tilt_start_time; /* When TITL started. */
|
||||
mstime_t tilt_start_time; /* When TILT started. */
|
||||
mstime_t previous_time; /* Last time we ran the time handler. */
|
||||
list *scripts_queue; /* Queue of user scripts to execute. */
|
||||
char *announce_ip; /* IP addr that is gossiped to other sentinels if
|
||||
|
|
@ -920,7 +920,7 @@ void sentinelCollectTerminatedScripts(void) {
|
|||
|
||||
/* Kill scripts in timeout, they'll be collected by the
|
||||
* sentinelCollectTerminatedScripts() function. */
|
||||
void sentinelKillTimedoutScripts(void) {
|
||||
void sentinelKillTimedOutScripts(void) {
|
||||
listNode *ln;
|
||||
listIter li;
|
||||
mstime_t now = mstime();
|
||||
|
|
@ -5429,7 +5429,7 @@ void sentinelTimer(void) {
|
|||
sentinelHandleDictOfValkeyInstances(sentinel.primaries);
|
||||
sentinelRunPendingScripts();
|
||||
sentinelCollectTerminatedScripts();
|
||||
sentinelKillTimedoutScripts();
|
||||
sentinelKillTimedOutScripts();
|
||||
|
||||
/* We continuously change the frequency of the server "timer interrupt"
|
||||
* in order to desynchronize every Sentinel from every other.
|
||||
|
|
|
|||
|
|
@ -855,7 +855,7 @@ int isMutuallyExclusiveChildType(int type) {
|
|||
|
||||
/* Returns true when we're inside a long command that yielded to the event loop. */
|
||||
int isInsideYieldingLongCommand(void) {
|
||||
return scriptIsTimedout() || server.busy_module_yield_flags;
|
||||
return scriptIsTimedOut() || server.busy_module_yield_flags;
|
||||
}
|
||||
|
||||
/* Return true if this instance has persistence completely turned off:
|
||||
|
|
@ -1653,7 +1653,7 @@ long long serverCron(struct aeEventLoop *eventLoop, long long id, void *clientDa
|
|||
|
||||
/* Cleanup expired MIGRATE cached sockets. */
|
||||
run_with_period(1000) {
|
||||
migrateCloseTimedoutSockets();
|
||||
migrateCloseTimedOutSockets();
|
||||
}
|
||||
|
||||
/* Resize tracking keys table if needed. This is also done at every
|
||||
|
|
@ -4138,10 +4138,10 @@ void unprepareCommand(client *c) {
|
|||
* other operations can be performed by the caller. Otherwise
|
||||
* if C_ERR is returned the client was destroyed (i.e. after QUIT). */
|
||||
int processCommand(client *c) {
|
||||
if (!scriptIsTimedout()) {
|
||||
if (!scriptIsTimedOut()) {
|
||||
/* Both EXEC and scripts call call() directly so there should be
|
||||
* no way in_exec or scriptIsRunning() is 1.
|
||||
* That is unless lua_timedout, in which case client may run
|
||||
* That is unless lua_timed out, in which case client may run
|
||||
* some commands. */
|
||||
serverAssert(!server.in_exec);
|
||||
serverAssert(!scriptIsRunning());
|
||||
|
|
|
|||
|
|
@ -2540,7 +2540,7 @@ typedef int serverGetKeysProc(struct serverCommand *cmd, robj **argv, int argc,
|
|||
* populateCommandLegacyRangeSpec.
|
||||
*
|
||||
* CMD_ALLOW_BUSY: The command can run while another command is running for
|
||||
* a long time (timedout script, module command that yields)
|
||||
* a long time (timed out script, module command that yields)
|
||||
*
|
||||
* CMD_MODULE_GETCHANNELS: Use the modules getchannels interface.
|
||||
*
|
||||
|
|
|
|||
|
|
@ -389,7 +389,7 @@ int streamCompareID(streamID *a, streamID *b) {
|
|||
|
||||
/* Retrieves the ID of the stream edge entry. An edge is either the first or
|
||||
* the last ID in the stream, and may be a tombstone. To filter out tombstones,
|
||||
* set the'skip_tombstones' argument to 1. */
|
||||
* set the 'skip_tombstones' argument to 1. */
|
||||
void streamGetEdgeID(stream *s, int first, int skip_tombstones, streamID *edge_id) {
|
||||
streamIterator si;
|
||||
int64_t numfields;
|
||||
|
|
|
|||
|
|
@ -33,7 +33,7 @@
|
|||
|
||||
/* ========================== Clients timeouts ============================= */
|
||||
|
||||
/* Check if this blocked client timedout (does nothing if the client is
|
||||
/* Check if this blocked client timed out (does nothing if the client is
|
||||
* not blocked right now). If so send a reply, unblock it, and return 1.
|
||||
* Otherwise 0 is returned and no operation is performed. */
|
||||
int checkBlockedClientTimeout(client *c, mstime_t now) {
|
||||
|
|
|
|||
|
|
@ -222,7 +222,7 @@ start_server {tags {"dual-channel-replication external:skip"}} {
|
|||
set primary_port [srv 0 port]
|
||||
|
||||
$primary config set repl-diskless-sync yes
|
||||
# Set primary shared replication buffer size to a bit more then the size of
|
||||
# Set primary shared replication buffer size to a bit more than the size of
|
||||
# a replication buffer block.
|
||||
$primary config set client-output-buffer-limit "replica 1100k 0 0"
|
||||
$primary config set dual-channel-replication-enabled $enable
|
||||
|
|
|
|||
|
|
@ -428,7 +428,7 @@ start_cluster 3 3 {tags {external:skip cluster} overrides {cluster-allow-replica
|
|||
# Pause the replica to simulate a failure
|
||||
pause_process [srv -3 pid]
|
||||
|
||||
# Setslot with an explicit 1ms timeoout
|
||||
# Setslot with an explicit 1ms timeout
|
||||
set start_time [clock milliseconds]
|
||||
catch {R 0 CLUSTER SETSLOT 609 MIGRATING $R1_id TIMEOUT 3000} e
|
||||
set end_time [clock milliseconds]
|
||||
|
|
|
|||
|
|
@ -474,8 +474,8 @@ start_cluster 1 1 {tags {external:skip cluster} overrides {cluster-slot-stats-en
|
|||
set slot [R 0 cluster keyslot $channel]
|
||||
set primary [Rn 0]
|
||||
set replica [Rn 1]
|
||||
set replica_subcriber [valkey_deferring_client -1]
|
||||
$replica_subcriber SSUBSCRIBE $channel
|
||||
set replica_subscriber [valkey_deferring_client -1]
|
||||
$replica_subscriber SSUBSCRIBE $channel
|
||||
# *2\r\n$10\r\nssubscribe\r\n$7\r\nchannel\r\n --> 34 bytes.
|
||||
$primary SPUBLISH $channel hello
|
||||
# *3\r\n$8\r\nspublish\r\n$7\r\nchannel\r\n$5\r\nhello\r\n --> 42 bytes.
|
||||
|
|
|
|||
|
|
@ -1275,7 +1275,7 @@ start_server {tags {"scripting external:skip large-memory"}} {
|
|||
# Start a new server since the last test in this stanza will kill the
|
||||
# instance at all.
|
||||
start_server {tags {"scripting"}} {
|
||||
test {Timedout read-only scripts can be killed by SCRIPT KILL} {
|
||||
test {Timed out read-only scripts can be killed by SCRIPT KILL} {
|
||||
set rd [valkey_deferring_client]
|
||||
r config set lua-time-limit 10
|
||||
run_script_on_connection $rd {while true do end} 0
|
||||
|
|
@ -1288,7 +1288,7 @@ start_server {tags {"scripting"}} {
|
|||
$rd close
|
||||
}
|
||||
|
||||
test {Timedout read-only scripts can be killed by SCRIPT KILL even when use pcall} {
|
||||
test {Timed out read-only scripts can be killed by SCRIPT KILL even when use pcall} {
|
||||
set rd [valkey_deferring_client]
|
||||
r config set lua-time-limit 10
|
||||
run_script_on_connection $rd {local f = function() while 1 do redis.call('ping') end end while 1 do pcall(f) end} 0
|
||||
|
|
@ -1316,7 +1316,7 @@ start_server {tags {"scripting"}} {
|
|||
assert_match {*killed by user*} $res
|
||||
}
|
||||
|
||||
test {Timedout script does not cause a false dead client} {
|
||||
test {Timed out script does not cause a false dead client} {
|
||||
set rd [valkey_deferring_client]
|
||||
r config set lua-time-limit 10
|
||||
|
||||
|
|
@ -1364,13 +1364,13 @@ start_server {tags {"scripting"}} {
|
|||
$rd close
|
||||
}
|
||||
|
||||
test {Timedout script link is still usable after Lua returns} {
|
||||
test {Timed out script link is still usable after Lua returns} {
|
||||
r config set lua-time-limit 10
|
||||
run_script {for i=1,100000 do redis.call('ping') end return 'ok'} 0
|
||||
r ping
|
||||
} {PONG}
|
||||
|
||||
test {Timedout scripts and unblocked command} {
|
||||
test {Timed out scripts and unblocked command} {
|
||||
# make sure a command that's allowed during BUSY doesn't trigger an unblocked command
|
||||
|
||||
# enable AOF to also expose an assertion if the bug would happen
|
||||
|
|
@ -1418,7 +1418,7 @@ start_server {tags {"scripting"}} {
|
|||
r DEBUG set-disable-deny-scripts 0
|
||||
} {OK} {external:skip needs:debug}
|
||||
|
||||
test {Timedout scripts that modified data can't be killed by SCRIPT KILL} {
|
||||
test {Timed out scripts that modified data can't be killed by SCRIPT KILL} {
|
||||
set rd [valkey_deferring_client]
|
||||
r config set lua-time-limit 10
|
||||
run_script_on_connection $rd {redis.call('set',KEYS[1],'y'); while true do end} 1 x
|
||||
|
|
@ -1433,7 +1433,7 @@ start_server {tags {"scripting"}} {
|
|||
|
||||
# Note: keep this test at the end of this server stanza because it
|
||||
# kills the server.
|
||||
test {SHUTDOWN NOSAVE can kill a timedout script anyway} {
|
||||
test {SHUTDOWN NOSAVE can kill a timed out script anyway} {
|
||||
# The server should be still unresponding to normal commands.
|
||||
catch {r ping} e
|
||||
assert_match {BUSY*} $e
|
||||
|
|
|
|||
Loading…
Reference in New Issue