mirror of https://github.com/valkey-io/valkey
FUNCTION FLUSH re-create lua VM, fix flush not gc, fix flush async + load crash (#1826)
There will be two issues in this test:
```
test {FUNCTION - test function flush} {
for {set i 0} {$i < 10000} {incr i} {
r function load [get_function_code LUA test_$i test_$i {return 'hello'}]
}
set before_flush_memory [s used_memory_vm_functions]
r function flush sync
set after_flush_memory [s used_memory_vm_functions]
puts "flush sync, before_flush_memory: $before_flush_memory, after_flush_memory: $after_flush_memory"
for {set i 0} {$i < 10000} {incr i} {
r function load [get_function_code LUA test_$i test_$i {return 'hello'}]
}
set before_flush_memory [s used_memory_vm_functions]
r function flush async
set after_flush_memory [s used_memory_vm_functions]
puts "flush async, before_flush_memory: $before_flush_memory, after_flush_memory: $after_flush_memory"
for {set i 0} {$i < 10000} {incr i} {
r function load [get_function_code LUA test_$i test_$i {return 'hello'}]
}
puts "Test done"
}
```
The first one is the test output, we can see that after executing
FUNCTION FLUSH,
used_memory_vm_functions has not changed at all:
```
flush sync, before_flush_memory: 2962432, after_flush_memory: 2962432
flush async, before_flush_memory: 4504576, after_flush_memory: 4504576
```
The second one is there is a crash when loading the functions during the
async
flush:
```
=== VALKEY BUG REPORT START: Cut & paste starting from here ===
# valkey 255.255.255 crashed by signal: 11, si_code: 2
# Accessing address: 0xe0429b7100000a3c
# Crashed running the instruction at: 0x102e0b09c
------ STACK TRACE ------
EIP:
0 valkey-server 0x0000000102e0b09c luaH_getstr + 52
Backtrace:
0 libsystem_platform.dylib 0x000000018b066584 _sigtramp + 56
1 valkey-server 0x0000000102e01054 luaD_precall + 96
2 valkey-server 0x0000000102e01b10 luaD_call + 104
3 valkey-server 0x0000000102e00d1c luaD_rawrunprotected + 76
4 valkey-server 0x0000000102e01e3c luaD_pcall + 60
5 valkey-server 0x0000000102dfc630 lua_pcall + 300
6 valkey-server 0x0000000102f77770 luaEngineCompileCode + 708
7 valkey-server 0x0000000102f71f50 scriptingEngineCallCompileCode + 104
8 valkey-server 0x0000000102f700b0 functionsCreateWithLibraryCtx + 2088
9 valkey-server 0x0000000102f70898 functionLoadCommand + 312
10 valkey-server 0x0000000102e3978c call + 416
11 valkey-server 0x0000000102e3b5b8 processCommand + 3340
12 valkey-server 0x0000000102e563cc processInputBuffer + 520
13 valkey-server 0x0000000102e55808 readQueryFromClient + 92
14 valkey-server 0x0000000102f696e0 connSocketEventHandler + 180
15 valkey-server 0x0000000102e20480 aeProcessEvents + 372
16 valkey-server 0x0000000102e4aad0 main + 26412
17 dyld 0x000000018acab154 start + 2476
------ STACK TRACE DONE ------
```
The reason is that, in the old implementation (introduced in 7.0),
FUNCTION FLUSH
use lua_unref to remove the script from lua VM. lua_unref does not
trigger the gc,
it causes us to not be able to effectively reclaim memory after the
FUNCTION FLUSH.
The other issue is that, since we don't re-create the lua VM in FUNCTION
FLUSH,
loading the functions during a FUNCTION FLUSH ASYNC will result a crash
because
lua engine state is not thread-safe.
The correct solution is to re-create a new Lua VM to use, just like
SCRIPT FLUSH.
---------
Signed-off-by: Binbin <binloveplay1314@qq.com>
Signed-off-by: Ricardo Dias <ricardo.dias@percona.com>
Co-authored-by: Ricardo Dias <ricardo.dias@percona.com>
This commit is contained in:
parent
95154feaa1
commit
b4c93cc9c2
2
src/db.c
2
src/db.c
|
|
@ -687,7 +687,7 @@ long long emptyData(int dbnum, int flags, void(callback)(hashtable *)) {
|
|||
if (with_functions) {
|
||||
serverAssert(dbnum == -1);
|
||||
/* TODO: fix this callback incompatibility. The arg is not used. */
|
||||
functionsLibCtxClearCurrent(async, (void (*)(dict *))callback);
|
||||
functionReset(async, (void (*)(dict *))callback);
|
||||
}
|
||||
|
||||
/* Also fire the end event. Note that this event will fire almost
|
||||
|
|
|
|||
|
|
@ -146,9 +146,9 @@ void freeEvalScripts(dict *scripts, list *scripts_lru_list, list *engine_callbac
|
|||
listIter *iter = listGetIterator(engine_callbacks, 0);
|
||||
listNode *node = NULL;
|
||||
while ((node = listNext(iter)) != NULL) {
|
||||
callableLazyEvalReset *callback = listNodeValue(node);
|
||||
callableLazyEnvReset *callback = listNodeValue(node);
|
||||
if (callback != NULL) {
|
||||
callback->engineLazyEvalResetCallback(callback->context);
|
||||
callback->engineLazyEnvResetCallback(callback->context);
|
||||
zfree(callback);
|
||||
}
|
||||
}
|
||||
|
|
@ -159,7 +159,7 @@ void freeEvalScripts(dict *scripts, list *scripts_lru_list, list *engine_callbac
|
|||
|
||||
static void resetEngineEvalEnvCallback(scriptingEngine *engine, void *context) {
|
||||
int async = context != NULL;
|
||||
callableLazyEvalReset *callback = scriptingEngineCallResetEvalEnvFunc(engine, async);
|
||||
callableLazyEnvReset *callback = scriptingEngineCallResetEnvFunc(engine, VMSE_EVAL, async);
|
||||
|
||||
if (async) {
|
||||
list *callbacks = context;
|
||||
|
|
@ -174,7 +174,6 @@ void evalRelease(int async) {
|
|||
list *engine_callbacks = listCreate();
|
||||
scriptingEngineManagerForEachEngine(resetEngineEvalEnvCallback, engine_callbacks);
|
||||
freeEvalScriptsAsync(evalCtx.scripts, evalCtx.scripts_lru_list, engine_callbacks);
|
||||
|
||||
} else {
|
||||
freeEvalScripts(evalCtx.scripts, evalCtx.scripts_lru_list, NULL);
|
||||
scriptingEngineManagerForEachEngine(resetEngineEvalEnvCallback, NULL);
|
||||
|
|
|
|||
|
|
@ -165,32 +165,62 @@ void functionsLibCtxClear(functionsLibCtx *lib_ctx, void(callback)(dict *)) {
|
|||
lib_ctx->cache_memory = 0;
|
||||
}
|
||||
|
||||
void functionsLibCtxClearCurrent(int async, void(callback)(dict *)) {
|
||||
static void resetEngineOrCollectResetCallbacks(scriptingEngine *engine, void *context) {
|
||||
int async = context != NULL;
|
||||
callableLazyEnvReset *callback = scriptingEngineCallResetEnvFunc(engine, VMSE_FUNCTION, async);
|
||||
|
||||
if (async) {
|
||||
functionsLibCtx *old_l_ctx = curr_functions_lib_ctx;
|
||||
curr_functions_lib_ctx = functionsLibCtxCreate();
|
||||
freeFunctionsAsync(old_l_ctx);
|
||||
list *callbacks = context;
|
||||
listAddNodeTail(callbacks, callback);
|
||||
}
|
||||
}
|
||||
|
||||
void functionsLibCtxReleaseCurrent(int async, void(callback)(dict *)) {
|
||||
if (async) {
|
||||
list *engine_callbacks = listCreate();
|
||||
scriptingEngineManagerForEachEngine(resetEngineOrCollectResetCallbacks, engine_callbacks);
|
||||
freeFunctionsAsync(curr_functions_lib_ctx, engine_callbacks);
|
||||
} else {
|
||||
functionsLibCtxClear(curr_functions_lib_ctx, callback);
|
||||
functionsLibCtxFree(curr_functions_lib_ctx, callback, NULL);
|
||||
scriptingEngineManagerForEachEngine(resetEngineOrCollectResetCallbacks, NULL);
|
||||
}
|
||||
}
|
||||
|
||||
/* Free the given functions ctx */
|
||||
static void functionsLibCtxFreeGeneric(functionsLibCtx *functions_lib_ctx, int async) {
|
||||
if (async) {
|
||||
freeFunctionsAsync(functions_lib_ctx);
|
||||
freeFunctionsAsync(functions_lib_ctx, NULL);
|
||||
} else {
|
||||
functionsLibCtxFree(functions_lib_ctx);
|
||||
functionsLibCtxFree(functions_lib_ctx, NULL, NULL);
|
||||
}
|
||||
}
|
||||
|
||||
void functionReset(int async, void(callback)(dict *)) {
|
||||
functionsLibCtxReleaseCurrent(async, callback);
|
||||
functionsInit();
|
||||
}
|
||||
|
||||
/* Free the given functions ctx */
|
||||
void functionsLibCtxFree(functionsLibCtx *functions_lib_ctx) {
|
||||
functionsLibCtxClear(functions_lib_ctx, NULL);
|
||||
void functionsLibCtxFree(functionsLibCtx *functions_lib_ctx, void(callback)(dict *), list *engine_callbacks) {
|
||||
functionsLibCtxClear(functions_lib_ctx, callback);
|
||||
dictRelease(functions_lib_ctx->functions);
|
||||
dictRelease(functions_lib_ctx->libraries);
|
||||
dictRelease(functions_lib_ctx->engines_stats);
|
||||
zfree(functions_lib_ctx);
|
||||
|
||||
if (engine_callbacks) {
|
||||
listIter *iter = listGetIterator(engine_callbacks, 0);
|
||||
listNode *node = NULL;
|
||||
while ((node = listNext(iter)) != NULL) {
|
||||
callableLazyEnvReset *engine_callback = listNodeValue(node);
|
||||
if (engine_callback != NULL) {
|
||||
engine_callback->engineLazyEnvResetCallback(engine_callback->context);
|
||||
zfree(engine_callback);
|
||||
}
|
||||
}
|
||||
listReleaseIterator(iter);
|
||||
listRelease(engine_callbacks);
|
||||
}
|
||||
}
|
||||
|
||||
/* Swap the current functions ctx with the given one.
|
||||
|
|
@ -824,7 +854,7 @@ void functionFlushCommand(client *c) {
|
|||
return;
|
||||
}
|
||||
|
||||
functionsLibCtxClearCurrent(async, NULL);
|
||||
functionReset(async, NULL);
|
||||
|
||||
/* Indicate that the command changed the data so it will be replicated and
|
||||
* counted as a data change (for persistence configuration) */
|
||||
|
|
|
|||
|
|
@ -90,10 +90,10 @@ dict *functionsLibGet(void);
|
|||
size_t functionsLibCtxFunctionsLen(functionsLibCtx *functions_ctx);
|
||||
functionsLibCtx *functionsLibCtxGetCurrent(void);
|
||||
functionsLibCtx *functionsLibCtxCreate(void);
|
||||
void functionsLibCtxClearCurrent(int async, void(callback)(dict *));
|
||||
void functionsLibCtxFree(functionsLibCtx *functions_lib_ctx);
|
||||
void functionsLibCtxFree(functionsLibCtx *functions_lib_ctx, void(callback)(dict *), list *engine_callbacks);
|
||||
void functionsLibCtxClear(functionsLibCtx *lib_ctx, void(callback)(dict *));
|
||||
void functionsLibCtxSwapWithCurrent(functionsLibCtx *new_lib_ctx, int async);
|
||||
void functionReset(int async, void(callback)(dict *));
|
||||
|
||||
void functionsRemoveLibFromEngine(scriptingEngine *engine);
|
||||
|
||||
|
|
|
|||
|
|
@ -66,8 +66,9 @@ void lazyFreeEvalScripts(void *args[]) {
|
|||
/* Release the functions ctx. */
|
||||
void lazyFreeFunctionsCtx(void *args[]) {
|
||||
functionsLibCtx *functions_lib_ctx = args[0];
|
||||
list *engine_callbacks = args[1];
|
||||
size_t len = functionsLibCtxFunctionsLen(functions_lib_ctx);
|
||||
functionsLibCtxFree(functions_lib_ctx);
|
||||
functionsLibCtxFree(functions_lib_ctx, NULL, engine_callbacks);
|
||||
atomic_fetch_sub_explicit(&lazyfree_objects, len, memory_order_relaxed);
|
||||
atomic_fetch_add_explicit(&lazyfreed_objects, len, memory_order_relaxed);
|
||||
}
|
||||
|
|
@ -239,13 +240,13 @@ void freeEvalScriptsAsync(dict *scripts, list *scripts_lru_list, list *engine_ca
|
|||
}
|
||||
|
||||
/* Free functions ctx, if the functions ctx contains enough functions, free it in async way. */
|
||||
void freeFunctionsAsync(functionsLibCtx *functions_lib_ctx) {
|
||||
void freeFunctionsAsync(functionsLibCtx *functions_lib_ctx, list *engine_callbacks) {
|
||||
if (functionsLibCtxFunctionsLen(functions_lib_ctx) > LAZYFREE_THRESHOLD) {
|
||||
atomic_fetch_add_explicit(&lazyfree_objects, functionsLibCtxFunctionsLen(functions_lib_ctx),
|
||||
memory_order_relaxed);
|
||||
bioCreateLazyFreeJob(lazyFreeFunctionsCtx, 1, functions_lib_ctx);
|
||||
bioCreateLazyFreeJob(lazyFreeFunctionsCtx, 2, functions_lib_ctx, engine_callbacks);
|
||||
} else {
|
||||
functionsLibCtxFree(functions_lib_ctx);
|
||||
functionsLibCtxFree(functions_lib_ctx, NULL, engine_callbacks);
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
|||
|
|
@ -14,11 +14,6 @@
|
|||
#define LUA_ENGINE_NAME "LUA"
|
||||
#define REGISTRY_ERROR_HANDLER_NAME "__ERROR_HANDLER__"
|
||||
|
||||
typedef struct luaFunction {
|
||||
lua_State *lua; /* Pointer to the lua context where this function was created. Only used in EVAL context. */
|
||||
int function_ref; /* Special ID that allows getting the Lua function object from the Lua registry */
|
||||
} luaFunction;
|
||||
|
||||
typedef struct luaEngineCtx {
|
||||
lua_State *eval_lua; /* The Lua interpreter for EVAL commands. We use just one for all EVAL calls */
|
||||
lua_State *function_lua; /* The Lua interpreter for FCALL commands. We use just one for all FCALL calls */
|
||||
|
|
@ -260,17 +255,9 @@ static void luaEngineFunctionCall(ValkeyModuleCtx *module_ctx,
|
|||
serverAssert(module_ctx == NULL);
|
||||
|
||||
luaEngineCtx *lua_engine_ctx = (luaEngineCtx *)engine_ctx;
|
||||
lua_State *lua = NULL;
|
||||
int lua_function_ref = -1;
|
||||
|
||||
if (type == VMSE_EVAL) {
|
||||
lua = lua_engine_ctx->eval_lua;
|
||||
luaFunction *script = compiled_function->function;
|
||||
lua_function_ref = script->function_ref;
|
||||
} else {
|
||||
lua = lua_engine_ctx->function_lua;
|
||||
lua_function_ref = luaFunctionGetLuaFunctionRef(compiled_function);
|
||||
}
|
||||
lua_State *lua = type == VMSE_EVAL ? lua_engine_ctx->eval_lua : lua_engine_ctx->function_lua;
|
||||
luaFunction *script = compiled_function->function;
|
||||
int lua_function_ref = script->function_ref;
|
||||
|
||||
/* Push the pcall error handler function on the stack. */
|
||||
lua_pushstring(lua, REGISTRY_ERROR_HANDLER_NAME);
|
||||
|
|
@ -290,10 +277,10 @@ static void luaEngineFunctionCall(ValkeyModuleCtx *module_ctx,
|
|||
lua_pop(lua, 1); /* Remove the error handler. */
|
||||
}
|
||||
|
||||
static void resetEvalContext(void *context) {
|
||||
lua_State *eval_lua = context;
|
||||
lua_gc(eval_lua, LUA_GCCOLLECT, 0);
|
||||
lua_close(eval_lua);
|
||||
static void resetLuaContext(void *context) {
|
||||
lua_State *lua = context;
|
||||
lua_gc(lua, LUA_GCCOLLECT, 0);
|
||||
lua_close(lua);
|
||||
|
||||
#if !defined(USE_LIBC)
|
||||
/* The lua interpreter may hold a lot of memory internally, and lua is
|
||||
|
|
@ -308,27 +295,30 @@ static void resetEvalContext(void *context) {
|
|||
#endif
|
||||
}
|
||||
|
||||
static callableLazyEvalReset *luaEngineResetEvalEnv(ValkeyModuleCtx *module_ctx,
|
||||
engineCtx *engine_ctx,
|
||||
int async) {
|
||||
static callableLazyEnvReset *luaEngineResetEvalEnv(ValkeyModuleCtx *module_ctx,
|
||||
engineCtx *engine_ctx,
|
||||
subsystemType type,
|
||||
int async) {
|
||||
/* The lua engine is implemented in the core, and not in a Valkey Module */
|
||||
serverAssert(module_ctx == NULL);
|
||||
|
||||
luaEngineCtx *lua_engine_ctx = (luaEngineCtx *)engine_ctx;
|
||||
serverAssert(lua_engine_ctx->eval_lua);
|
||||
callableLazyEvalReset *callback = NULL;
|
||||
serverAssert(type == VMSE_EVAL || type == VMSE_FUNCTION);
|
||||
lua_State *lua = type == VMSE_EVAL ? lua_engine_ctx->eval_lua : lua_engine_ctx->function_lua;
|
||||
serverAssert(lua);
|
||||
callableLazyEnvReset *callback = NULL;
|
||||
|
||||
if (async) {
|
||||
callback = zcalloc(sizeof(*callback));
|
||||
*callback = (callableLazyEvalReset){
|
||||
.context = lua_engine_ctx->eval_lua,
|
||||
.engineLazyEvalResetCallback = resetEvalContext,
|
||||
*callback = (callableLazyEnvReset){
|
||||
.context = lua,
|
||||
.engineLazyEnvResetCallback = resetLuaContext,
|
||||
};
|
||||
} else {
|
||||
resetEvalContext(lua_engine_ctx->eval_lua);
|
||||
resetLuaContext(lua);
|
||||
}
|
||||
|
||||
initializeLuaState(lua_engine_ctx, VMSE_EVAL);
|
||||
initializeLuaState(lua_engine_ctx, type);
|
||||
|
||||
return callback;
|
||||
}
|
||||
|
|
@ -350,21 +340,21 @@ static void luaEngineFreeFunction(ValkeyModuleCtx *module_ctx,
|
|||
compiledFunction *compiled_function) {
|
||||
/* The lua engine is implemented in the core, and not in a Valkey Module */
|
||||
serverAssert(module_ctx == NULL);
|
||||
serverAssert(type == VMSE_EVAL || type == VMSE_FUNCTION);
|
||||
|
||||
luaEngineCtx *lua_engine_ctx = engine_ctx;
|
||||
if (type == VMSE_EVAL) {
|
||||
luaFunction *script = (luaFunction *)compiled_function->function;
|
||||
if (lua_engine_ctx->eval_lua == script->lua) {
|
||||
/* The lua context is still the same, which means that we're not
|
||||
* resetting the whole eval context, and therefore, we need to
|
||||
* delete the function from the lua context.
|
||||
*/
|
||||
lua_unref(lua_engine_ctx->eval_lua, script->function_ref);
|
||||
}
|
||||
zfree(script);
|
||||
} else {
|
||||
luaFunctionFreeFunction(lua_engine_ctx->function_lua, compiled_function->function);
|
||||
lua_State *lua = type == VMSE_EVAL ? lua_engine_ctx->eval_lua : lua_engine_ctx->function_lua;
|
||||
serverAssert(lua);
|
||||
|
||||
luaFunction *script = (luaFunction *)compiled_function->function;
|
||||
if (lua == script->lua) {
|
||||
/* The lua context is still the same, which means that we're not
|
||||
* resetting the whole eval context, and therefore, we need to
|
||||
* delete the function from the lua context.
|
||||
*/
|
||||
lua_unref(lua, script->function_ref);
|
||||
}
|
||||
zfree(script);
|
||||
|
||||
if (compiled_function->name) {
|
||||
decrRefCount(compiled_function->name);
|
||||
|
|
@ -379,11 +369,12 @@ int luaEngineInitEngine(void) {
|
|||
ldbInit();
|
||||
|
||||
engineMethods methods = {
|
||||
.version = VALKEYMODULE_SCRIPTING_ENGINE_ABI_VERSION,
|
||||
.compile_code = luaEngineCompileCode,
|
||||
.free_function = luaEngineFreeFunction,
|
||||
.call_function = luaEngineFunctionCall,
|
||||
.get_function_memory_overhead = luaEngineFunctionMemoryOverhead,
|
||||
.reset_eval_env = luaEngineResetEvalEnv,
|
||||
.reset_env = luaEngineResetEvalEnv,
|
||||
.get_memory_info = luaEngineGetMemoryInfo,
|
||||
};
|
||||
|
||||
|
|
|
|||
|
|
@ -4,6 +4,11 @@
|
|||
#include "../scripting_engine.h"
|
||||
#include <lua.h>
|
||||
|
||||
typedef struct luaFunction {
|
||||
lua_State *lua; /* Pointer to the lua context where this function was created. Only used in EVAL context. */
|
||||
int function_ref; /* Special ID that allows getting the Lua function object from the Lua registry */
|
||||
} luaFunction;
|
||||
|
||||
int luaEngineInitEngine(void);
|
||||
|
||||
#endif /* _ENGINE_LUA_ */
|
||||
|
|
|
|||
|
|
@ -54,12 +54,6 @@
|
|||
#define LIBRARY_API_NAME "__LIBRARY_API__"
|
||||
#define GLOBALS_API_NAME "__GLOBALS_API__"
|
||||
|
||||
/* Lua function ctx */
|
||||
typedef struct luaFunctionCtx {
|
||||
/* Special ID that allows getting the Lua function object from the Lua registry */
|
||||
int lua_function_ref;
|
||||
} luaFunctionCtx;
|
||||
|
||||
typedef struct loadCtx {
|
||||
list *functions;
|
||||
monotime start_time;
|
||||
|
|
@ -183,20 +177,15 @@ done:
|
|||
return compiled_functions;
|
||||
}
|
||||
|
||||
int luaFunctionGetLuaFunctionRef(compiledFunction *compiled_function) {
|
||||
luaFunctionCtx *f = compiled_function->function;
|
||||
return f->lua_function_ref;
|
||||
}
|
||||
|
||||
static void luaRegisterFunctionArgsInitialize(compiledFunction *func,
|
||||
robj *name,
|
||||
robj *desc,
|
||||
luaFunctionCtx *lua_f_ctx,
|
||||
luaFunction *script,
|
||||
uint64_t flags) {
|
||||
*func = (compiledFunction){
|
||||
.name = name,
|
||||
.desc = desc,
|
||||
.function = lua_f_ctx,
|
||||
.function = script,
|
||||
.f_flags = flags,
|
||||
};
|
||||
}
|
||||
|
|
@ -250,7 +239,7 @@ static int luaRegisterFunctionReadNamedArgs(lua_State *lua,
|
|||
char *err = NULL;
|
||||
robj *name = NULL;
|
||||
robj *desc = NULL;
|
||||
luaFunctionCtx *lua_f_ctx = NULL;
|
||||
luaFunction *script = NULL;
|
||||
uint64_t flags = 0;
|
||||
if (!lua_istable(lua, 1)) {
|
||||
err = "calling server.register_function with a single argument is only applicable to Lua table (representing "
|
||||
|
|
@ -285,8 +274,8 @@ static int luaRegisterFunctionReadNamedArgs(lua_State *lua,
|
|||
}
|
||||
int lua_function_ref = luaL_ref(lua, LUA_REGISTRYINDEX);
|
||||
|
||||
lua_f_ctx = zmalloc(sizeof(*lua_f_ctx));
|
||||
lua_f_ctx->lua_function_ref = lua_function_ref;
|
||||
script = zmalloc(sizeof(*script));
|
||||
script->function_ref = lua_function_ref;
|
||||
continue; /* value was already popped, so no need to pop it out. */
|
||||
} else if (!strcasecmp(key, "flags")) {
|
||||
if (!lua_istable(lua, -1)) {
|
||||
|
|
@ -310,7 +299,7 @@ static int luaRegisterFunctionReadNamedArgs(lua_State *lua,
|
|||
goto error;
|
||||
}
|
||||
|
||||
if (!lua_f_ctx) {
|
||||
if (!script) {
|
||||
err = "server.register_function must get a callback argument";
|
||||
goto error;
|
||||
}
|
||||
|
|
@ -318,7 +307,7 @@ static int luaRegisterFunctionReadNamedArgs(lua_State *lua,
|
|||
luaRegisterFunctionArgsInitialize(func,
|
||||
name,
|
||||
desc,
|
||||
lua_f_ctx,
|
||||
script,
|
||||
flags);
|
||||
|
||||
return C_OK;
|
||||
|
|
@ -326,9 +315,9 @@ static int luaRegisterFunctionReadNamedArgs(lua_State *lua,
|
|||
error:
|
||||
if (name) decrRefCount(name);
|
||||
if (desc) decrRefCount(desc);
|
||||
if (lua_f_ctx) {
|
||||
lua_unref(lua, lua_f_ctx->lua_function_ref);
|
||||
zfree(lua_f_ctx);
|
||||
if (script) {
|
||||
lua_unref(lua, script->function_ref);
|
||||
zfree(script);
|
||||
}
|
||||
luaPushError(lua, err);
|
||||
return C_ERR;
|
||||
|
|
@ -338,7 +327,7 @@ static int luaRegisterFunctionReadPositionalArgs(lua_State *lua,
|
|||
compiledFunction *func) {
|
||||
char *err = NULL;
|
||||
robj *name = NULL;
|
||||
luaFunctionCtx *lua_f_ctx = NULL;
|
||||
luaFunction *script = NULL;
|
||||
if (!(name = luaGetStringObject(lua, 1))) {
|
||||
err = "first argument to server.register_function must be a string";
|
||||
goto error;
|
||||
|
|
@ -351,10 +340,10 @@ static int luaRegisterFunctionReadPositionalArgs(lua_State *lua,
|
|||
|
||||
int lua_function_ref = luaL_ref(lua, LUA_REGISTRYINDEX);
|
||||
|
||||
lua_f_ctx = zmalloc(sizeof(*lua_f_ctx));
|
||||
lua_f_ctx->lua_function_ref = lua_function_ref;
|
||||
script = zmalloc(sizeof(*script));
|
||||
script->function_ref = lua_function_ref;
|
||||
|
||||
luaRegisterFunctionArgsInitialize(func, name, NULL, lua_f_ctx, 0);
|
||||
luaRegisterFunctionArgsInitialize(func, name, NULL, script, 0);
|
||||
|
||||
return C_OK;
|
||||
|
||||
|
|
@ -440,7 +429,7 @@ void luaFunctionInitializeLuaState(lua_State *lua) {
|
|||
}
|
||||
|
||||
void luaFunctionFreeFunction(lua_State *lua, void *function) {
|
||||
luaFunctionCtx *funcCtx = function;
|
||||
lua_unref(lua, funcCtx->lua_function_ref);
|
||||
luaFunction *script = function;
|
||||
lua_unref(lua, script->function_ref);
|
||||
zfree(function);
|
||||
}
|
||||
|
|
|
|||
|
|
@ -11,8 +11,6 @@ compiledFunction **luaFunctionLibraryCreate(lua_State *lua,
|
|||
size_t *out_num_compiled_functions,
|
||||
robj **err);
|
||||
|
||||
int luaFunctionGetLuaFunctionRef(compiledFunction *compiled_function);
|
||||
|
||||
void luaFunctionFreeFunction(lua_State *lua, void *function);
|
||||
|
||||
#endif /* _FUNCTION_LUA_H_ */
|
||||
|
|
|
|||
|
|
@ -2185,7 +2185,7 @@ functionsLibCtx *disklessLoadFunctionsLibCtxCreate(void) {
|
|||
/* Helper function to discard our temp function lib context
|
||||
* when the loading succeeded or failed. */
|
||||
void disklessLoadDiscardFunctionsLibCtx(functionsLibCtx *temp_functions_lib_ctx) {
|
||||
freeFunctionsAsync(temp_functions_lib_ctx);
|
||||
freeFunctionsAsync(temp_functions_lib_ctx, NULL);
|
||||
}
|
||||
|
||||
/* If we know we got an entirely different data set from our primary
|
||||
|
|
|
|||
|
|
@ -330,13 +330,38 @@ size_t scriptingEngineCallGetFunctionMemoryOverhead(scriptingEngine *engine,
|
|||
return mem;
|
||||
}
|
||||
|
||||
callableLazyEvalReset *scriptingEngineCallResetEvalEnvFunc(scriptingEngine *engine,
|
||||
int async) {
|
||||
callableLazyEnvReset *scriptingEngineCallResetEnvFunc(scriptingEngine *engine,
|
||||
subsystemType type,
|
||||
int async) {
|
||||
ValkeyModuleCtx *module_ctx = engineSetupModuleCtx(COMMON_MODULE_CTX_INDEX, engine, NULL);
|
||||
callableLazyEvalReset *callback = engine->impl.methods.reset_eval_env(
|
||||
module_ctx,
|
||||
engine->impl.ctx,
|
||||
async);
|
||||
callableLazyEnvReset *callback = NULL;
|
||||
|
||||
if (engine->impl.methods.version <= 2) {
|
||||
/* For backward compatibility with scripting engine modules that
|
||||
* implement version 1 or 2 of the scripting engine ABI, we call the
|
||||
* reset_eval_env_v1 function, which is only implemented for resetting
|
||||
* the EVAL environment.
|
||||
*/
|
||||
if (type == VMSE_EVAL) {
|
||||
callback = engine->impl.methods.reset_eval_env_v2(
|
||||
module_ctx,
|
||||
engine->impl.ctx,
|
||||
async);
|
||||
} else {
|
||||
/* For FUNCTION scripts, the reset_env function is not implemented
|
||||
* in version 1 or 2 of the scripting engine ABI, so we just return
|
||||
* NULL.
|
||||
*/
|
||||
callback = NULL;
|
||||
}
|
||||
} else {
|
||||
callback = engine->impl.methods.reset_env(
|
||||
module_ctx,
|
||||
engine->impl.ctx,
|
||||
type,
|
||||
async);
|
||||
}
|
||||
|
||||
engineTeardownModuleCtx(COMMON_MODULE_CTX_INDEX, engine);
|
||||
return callback;
|
||||
}
|
||||
|
|
|
|||
|
|
@ -13,7 +13,7 @@ typedef ValkeyModuleScriptingEngineServerRuntimeCtx serverRuntimeCtx;
|
|||
typedef ValkeyModuleScriptingEngineCompiledFunction compiledFunction;
|
||||
typedef ValkeyModuleScriptingEngineSubsystemType subsystemType;
|
||||
typedef ValkeyModuleScriptingEngineMemoryInfo engineMemoryInfo;
|
||||
typedef ValkeyModuleScriptingEngineCallableLazyEvalReset callableLazyEvalReset;
|
||||
typedef ValkeyModuleScriptingEngineCallableLazyEnvReset callableLazyEnvReset;
|
||||
typedef ValkeyModuleScriptingEngineMethods engineMethods;
|
||||
|
||||
/*
|
||||
|
|
@ -77,8 +77,9 @@ void scriptingEngineCallFunction(scriptingEngine *engine,
|
|||
size_t scriptingEngineCallGetFunctionMemoryOverhead(scriptingEngine *engine,
|
||||
compiledFunction *compiled_function);
|
||||
|
||||
callableLazyEvalReset *scriptingEngineCallResetEvalEnvFunc(scriptingEngine *engine,
|
||||
int async);
|
||||
callableLazyEnvReset *scriptingEngineCallResetEnvFunc(scriptingEngine *engine,
|
||||
subsystemType type,
|
||||
int async);
|
||||
|
||||
engineMemoryInfo scriptingEngineCallGetMemoryInfo(scriptingEngine *engine,
|
||||
subsystemType type);
|
||||
|
|
|
|||
|
|
@ -3696,7 +3696,7 @@ int redis_check_aof_main(int argc, char **argv);
|
|||
/* Scripting */
|
||||
void freeEvalScripts(dict *scripts, list *scripts_lru_list, list *engine_callbacks);
|
||||
void freeEvalScriptsAsync(dict *scripts, list *scripts_lru_list, list *engine_callbacks);
|
||||
void freeFunctionsAsync(functionsLibCtx *lib_ctx);
|
||||
void freeFunctionsAsync(functionsLibCtx *lib_ctx, list *engine_callbacks);
|
||||
void sha1hex(char *digest, char *script, size_t len);
|
||||
unsigned long evalMemory(void);
|
||||
dict *evalScriptsDict(void);
|
||||
|
|
|
|||
|
|
@ -905,20 +905,20 @@ typedef enum ValkeyModuleScriptingEngineExecutionState {
|
|||
VMSE_STATE_KILLED,
|
||||
} ValkeyModuleScriptingEngineExecutionState;
|
||||
|
||||
typedef struct ValkeyModuleScriptingEngineCallableLazyEvalReset {
|
||||
typedef struct ValkeyModuleScriptingEngineCallableLazyEnvReset {
|
||||
void *context;
|
||||
|
||||
/*
|
||||
* Callback function used for resetting the EVAL context implemented by an
|
||||
* Callback function used for resetting the EVAL/FUNCTION context implemented by an
|
||||
* engine. This callback will be called by a background thread when it's
|
||||
* ready for resetting the context.
|
||||
*
|
||||
* - `context`: a generic pointer to a context object, stored in the
|
||||
* callableLazyEvalReset struct.
|
||||
* callableLazyEnvReset struct.
|
||||
*
|
||||
*/
|
||||
void (*engineLazyEvalResetCallback)(void *context);
|
||||
} ValkeyModuleScriptingEngineCallableLazyEvalReset;
|
||||
void (*engineLazyEnvResetCallback)(void *context);
|
||||
} ValkeyModuleScriptingEngineCallableLazyEnvReset;
|
||||
|
||||
/* The callback function called when either `EVAL`, `SCRIPT LOAD`, or
|
||||
* `FUNCTION LOAD` command is called to compile the code.
|
||||
|
|
@ -1031,6 +1031,8 @@ typedef size_t (*ValkeyModuleScriptingEngineGetFunctionMemoryOverheadFunc)(
|
|||
|
||||
/* The callback function called when `SCRIPT FLUSH` command is called. The
|
||||
* engine should reset the runtime environment used for EVAL scripts.
|
||||
* This callback has been replaced by `ValkeyModuleScriptingEngineResetEnvFunc`
|
||||
* callback in ABI version 3.
|
||||
*
|
||||
* - `module_ctx`: the module runtime context.
|
||||
*
|
||||
|
|
@ -1039,11 +1041,29 @@ typedef size_t (*ValkeyModuleScriptingEngineGetFunctionMemoryOverheadFunc)(
|
|||
* - `async`: if has value 1 then the reset is done asynchronously through
|
||||
* the callback structure returned by this function.
|
||||
*/
|
||||
typedef ValkeyModuleScriptingEngineCallableLazyEvalReset *(*ValkeyModuleScriptingEngineResetEvalEnvFunc)(
|
||||
typedef ValkeyModuleScriptingEngineCallableLazyEnvReset *(*ValkeyModuleScriptingEngineResetEvalFuncV2)(
|
||||
ValkeyModuleCtx *module_ctx,
|
||||
ValkeyModuleScriptingEngineCtx *engine_ctx,
|
||||
int async);
|
||||
|
||||
/* The callback function called when `SCRIPT FLUSH` or `FUNCTION FLUSH` command is called.
|
||||
* The engine should reset the runtime environment used for EVAL scripts or FUNCTION scripts.
|
||||
*
|
||||
* - `module_ctx`: the module runtime context.
|
||||
*
|
||||
* - `engine_ctx`: the scripting engine runtime context.
|
||||
*
|
||||
* - `type`: the subsystem type.
|
||||
*
|
||||
* - `async`: if has value 1 then the reset is done asynchronously through
|
||||
* the callback structure returned by this function.
|
||||
*/
|
||||
typedef ValkeyModuleScriptingEngineCallableLazyEnvReset *(*ValkeyModuleScriptingEngineResetEnvFunc)(
|
||||
ValkeyModuleCtx *module_ctx,
|
||||
ValkeyModuleScriptingEngineCtx *engine_ctx,
|
||||
ValkeyModuleScriptingEngineSubsystemType type,
|
||||
int async);
|
||||
|
||||
/* Return the current used memory by the engine.
|
||||
*
|
||||
* - `module_ctx`: the module runtime context.
|
||||
|
|
@ -1058,7 +1078,13 @@ typedef ValkeyModuleScriptingEngineMemoryInfo (*ValkeyModuleScriptingEngineGetMe
|
|||
ValkeyModuleScriptingEngineSubsystemType type);
|
||||
|
||||
/* Current ABI version for scripting engine modules. */
|
||||
#define VALKEYMODULE_SCRIPTING_ENGINE_ABI_VERSION 2UL
|
||||
/* Version Changelog:
|
||||
* 1. Initial version.
|
||||
* 2. Changed the `compile_code` callback to support binary data in the source code.
|
||||
* 3. Renamed reset_eval_env callback to reset_env and added a type parameter to be
|
||||
* able to reset both EVAL or FUNCTION scripts env.
|
||||
*/
|
||||
#define VALKEYMODULE_SCRIPTING_ENGINE_ABI_VERSION 3L
|
||||
|
||||
typedef struct ValkeyModuleScriptingEngineMethods {
|
||||
uint64_t version; /* Version of this structure for ABI compat. */
|
||||
|
|
@ -1081,8 +1107,11 @@ typedef struct ValkeyModuleScriptingEngineMethods {
|
|||
ValkeyModuleScriptingEngineGetFunctionMemoryOverheadFunc get_function_memory_overhead;
|
||||
|
||||
/* The callback function used to reset the runtime environment used
|
||||
* by the scripting engine for EVAL scripts. */
|
||||
ValkeyModuleScriptingEngineResetEvalEnvFunc reset_eval_env;
|
||||
* by the scripting engine for EVAL scripts or FUNCTION scripts. */
|
||||
union {
|
||||
ValkeyModuleScriptingEngineResetEvalFuncV2 reset_eval_env_v2;
|
||||
ValkeyModuleScriptingEngineResetEnvFunc reset_env;
|
||||
};
|
||||
|
||||
/* Function callback to get the used memory by the engine. */
|
||||
ValkeyModuleScriptingEngineGetMemoryInfoFunc get_memory_info;
|
||||
|
|
|
|||
|
|
@ -437,37 +437,75 @@ callHelloLangFunction(ValkeyModuleCtx *module_ctx,
|
|||
ValkeyModule_ReplyWithLongLong(module_ctx, result);
|
||||
}
|
||||
|
||||
static ValkeyModuleScriptingEngineCallableLazyEvalReset *helloResetEvalEnv(ValkeyModuleCtx *module_ctx,
|
||||
ValkeyModuleScriptingEngineCtx *engine_ctx,
|
||||
int async) {
|
||||
static ValkeyModuleScriptingEngineCallableLazyEnvReset *helloResetEvalEnv(ValkeyModuleCtx *module_ctx,
|
||||
ValkeyModuleScriptingEngineCtx *engine_ctx,
|
||||
int async) {
|
||||
VALKEYMODULE_NOT_USED(module_ctx);
|
||||
VALKEYMODULE_NOT_USED(engine_ctx);
|
||||
VALKEYMODULE_NOT_USED(async);
|
||||
return NULL;
|
||||
}
|
||||
|
||||
static ValkeyModuleScriptingEngineCallableLazyEnvReset *helloResetEnv(ValkeyModuleCtx *module_ctx,
|
||||
ValkeyModuleScriptingEngineCtx *engine_ctx,
|
||||
ValkeyModuleScriptingEngineSubsystemType type,
|
||||
int async) {
|
||||
VALKEYMODULE_NOT_USED(module_ctx);
|
||||
VALKEYMODULE_NOT_USED(engine_ctx);
|
||||
VALKEYMODULE_NOT_USED(type);
|
||||
VALKEYMODULE_NOT_USED(async);
|
||||
return NULL;
|
||||
}
|
||||
|
||||
int ValkeyModule_OnLoad(ValkeyModuleCtx *ctx,
|
||||
ValkeyModuleString **argv,
|
||||
int argc) {
|
||||
VALKEYMODULE_NOT_USED(argv);
|
||||
VALKEYMODULE_NOT_USED(argc);
|
||||
|
||||
if (ValkeyModule_Init(ctx, "helloengine", 1, VALKEYMODULE_APIVER_1) ==
|
||||
VALKEYMODULE_ERR)
|
||||
if (ValkeyModule_Init(ctx, "helloengine", 1, VALKEYMODULE_APIVER_1) == VALKEYMODULE_ERR) {
|
||||
return VALKEYMODULE_ERR;
|
||||
}
|
||||
|
||||
unsigned long long abi_version = VALKEYMODULE_SCRIPTING_ENGINE_ABI_VERSION;
|
||||
if (argc > 0) {
|
||||
if (ValkeyModule_StringToULongLong(argv[0], &abi_version) == VALKEYMODULE_ERR) {
|
||||
const char *arg_str = ValkeyModule_StringPtrLen(argv[0], NULL);
|
||||
ValkeyModule_Log(ctx, "error", "Invalid ABI version: %s", arg_str);
|
||||
return VALKEYMODULE_ERR;
|
||||
}
|
||||
else {
|
||||
const char *arg_str = ValkeyModule_StringPtrLen(argv[0], NULL);
|
||||
ValkeyModule_Log(ctx, "info", "initializing Hello scripting engine with ABI version: %s", arg_str);
|
||||
}
|
||||
}
|
||||
|
||||
hello_ctx = ValkeyModule_Alloc(sizeof(HelloLangCtx));
|
||||
hello_ctx->program = NULL;
|
||||
|
||||
ValkeyModuleScriptingEngineMethods methods = {
|
||||
.version = VALKEYMODULE_SCRIPTING_ENGINE_ABI_VERSION,
|
||||
.compile_code = createHelloLangEngine,
|
||||
.free_function = engineFreeFunction,
|
||||
.call_function = callHelloLangFunction,
|
||||
.get_function_memory_overhead = engineFunctionMemoryOverhead,
|
||||
.reset_eval_env = helloResetEvalEnv,
|
||||
.get_memory_info = engineGetMemoryInfo,
|
||||
};
|
||||
ValkeyModuleScriptingEngineMethods methods;
|
||||
|
||||
if (abi_version <= 2) {
|
||||
methods = (ValkeyModuleScriptingEngineMethods) {
|
||||
.version = abi_version,
|
||||
.compile_code = createHelloLangEngine,
|
||||
.free_function = engineFreeFunction,
|
||||
.call_function = callHelloLangFunction,
|
||||
.get_function_memory_overhead = engineFunctionMemoryOverhead,
|
||||
.reset_eval_env_v2 = helloResetEvalEnv,
|
||||
.get_memory_info = engineGetMemoryInfo,
|
||||
};
|
||||
} else {
|
||||
methods = (ValkeyModuleScriptingEngineMethods) {
|
||||
.version = abi_version,
|
||||
.compile_code = createHelloLangEngine,
|
||||
.free_function = engineFreeFunction,
|
||||
.call_function = callHelloLangFunction,
|
||||
.get_function_memory_overhead = engineFunctionMemoryOverhead,
|
||||
.reset_env = helloResetEnv,
|
||||
.get_memory_info = engineGetMemoryInfo,
|
||||
};
|
||||
}
|
||||
|
||||
ValkeyModule_RegisterScriptingEngine(ctx,
|
||||
"HELLO",
|
||||
|
|
|
|||
|
|
@ -300,6 +300,41 @@ start_server {tags {"scripting"}} {
|
|||
assert_match {} [r function list]
|
||||
}
|
||||
|
||||
test {FUNCTION - test function flush will re-create the lua engine} {
|
||||
for {set i 0} {$i < 60} {incr i} {
|
||||
r function load [get_function_code lua test_$i test_$i {local a = 1 while true do a = a + 1 end}]
|
||||
}
|
||||
set before_flush_memory [s used_memory_vm_functions]
|
||||
r function flush sync
|
||||
set after_flush_memory [s used_memory_vm_functions]
|
||||
assert_lessthan $after_flush_memory $before_flush_memory
|
||||
|
||||
for {set i 0} {$i < 100} {incr i} {
|
||||
r function load [get_function_code lua test_$i test_$i {local a = 1 while true do a = a + 1 end}]
|
||||
}
|
||||
set before_flush_memory [s used_memory_vm_functions]
|
||||
r function flush async
|
||||
set after_flush_memory [s used_memory_vm_functions]
|
||||
assert_lessthan $after_flush_memory $before_flush_memory
|
||||
}
|
||||
|
||||
test {FUNCTION - test loading function during the flush async} {
|
||||
for {set i 0} {$i < 10000} {incr i} {
|
||||
r function load [get_function_code LUA test_$i test_$i {return 'hello'}]
|
||||
}
|
||||
r function flush sync
|
||||
|
||||
for {set i 0} {$i < 10000} {incr i} {
|
||||
r function load [get_function_code LUA test_$i test_$i {return 'hello'}]
|
||||
}
|
||||
r function flush async
|
||||
|
||||
for {set i 0} {$i < 10000} {incr i} {
|
||||
r function load [get_function_code LUA test_$i test_$i {return 'hello'}]
|
||||
}
|
||||
r function flush
|
||||
}
|
||||
|
||||
test {FUNCTION - test function wrong argument} {
|
||||
catch {r function flush bad_arg} e
|
||||
assert_match {*only supports SYNC|ASYNC*} $e
|
||||
|
|
|
|||
|
|
@ -180,4 +180,14 @@ start_server {tags {"modules"}} {
|
|||
set result [r module unload helloengine]
|
||||
assert_equal $result "OK"
|
||||
}
|
||||
|
||||
test {Load scripting engine in older version} {
|
||||
r module load $testmodule 2
|
||||
r function load $HELLO_PROGRAM
|
||||
set result [r fcall foo 0 123]
|
||||
assert_equal $result 123
|
||||
set result [r function flush async]
|
||||
assert_equal $result {OK}
|
||||
assert_error {ERR Function not found} {r fcall foo 0 123}
|
||||
}
|
||||
}
|
||||
|
|
|
|||
Loading…
Reference in New Issue