Thread-safe filter chains.

This commit is contained in:
Roman Arutyunyan 2025-05-12 09:46:05 +04:00
parent c70457482c
commit 856e73367c
12 changed files with 141 additions and 63 deletions

View File

@ -314,6 +314,10 @@ ngx_http_block(ngx_conf_t *cf, ngx_command_t *cmd, void *conf)
}
}
if (ngx_http_init_filters(cf) != NGX_OK) {
return NGX_CONF_ERROR;
}
if (ngx_http_variables_init_vars(cf) != NGX_OK) {
return NGX_CONF_ERROR;
}

View File

@ -172,6 +172,8 @@ char *ngx_http_merge_types(ngx_conf_t *cf, ngx_array_t **keys,
ngx_int_t ngx_http_set_default_types(ngx_conf_t *cf, ngx_array_t **types,
ngx_str_t *default_type);
char *ngx_http_init_filters(ngx_conf_t *cf);
#if (NGX_HTTP_DEGRADATION)
ngx_uint_t ngx_http_degraded(ngx_http_request_t *);
#endif

View File

@ -1845,6 +1845,8 @@ ngx_http_send_response(ngx_http_request_t *r, ngx_uint_t status,
ngx_int_t
ngx_http_send_header(ngx_http_request_t *r)
{
ngx_http_core_main_conf_t *cmcf;
if (r->post_action) {
return NGX_OK;
}
@ -1860,7 +1862,9 @@ ngx_http_send_header(ngx_http_request_t *r)
r->headers_out.status_line.len = 0;
}
return ngx_http_top_header_filter(r);
cmcf = ngx_http_get_module_main_conf(r, ngx_http_core_module);
return cmcf->top_header_filter(r);
}
@ -1900,13 +1904,16 @@ ngx_http_output_filter(ngx_http_request_t *r, ngx_chain_t *in)
{
ngx_int_t rc;
ngx_connection_t *c;
ngx_http_core_main_conf_t *cmcf;
c = r->connection;
cmcf = ngx_http_get_module_main_conf(r, ngx_http_core_module);
ngx_log_debug2(NGX_LOG_DEBUG_HTTP, c->log, 0,
"http output filter \"%V?%V\"", &r->uri, &r->args);
rc = ngx_http_top_body_filter(r, in);
rc = cmcf->top_body_filter(r, in);
if (rc == NGX_ERROR) {
/* NGX_ERROR may be returned by any filter */
@ -5406,3 +5413,22 @@ ngx_http_core_pool_size(ngx_conf_t *cf, void *post, void *data)
return NGX_CONF_OK;
}
char *
ngx_http_init_filters(ngx_conf_t *cf)
{
ngx_http_core_main_conf_t *cmcf;
cmcf = ngx_http_conf_get_module_main_conf(cf, ngx_http_core_module);
cmcf->top_header_filter = ngx_http_top_header_filter;
cmcf->top_body_filter = ngx_http_top_body_filter;
cmcf->top_request_body_filter = ngx_http_top_request_body_filter;
ngx_http_top_header_filter = NULL;
ngx_http_top_body_filter = NULL;
ngx_http_top_request_body_filter = NULL;
return NGX_CONF_OK;
}

View File

@ -64,6 +64,13 @@ typedef struct ngx_http_location_tree_node_s ngx_http_location_tree_node_t;
typedef struct ngx_http_core_loc_conf_s ngx_http_core_loc_conf_t;
typedef ngx_int_t (*ngx_http_output_header_filter_pt)(ngx_http_request_t *r);
typedef ngx_int_t (*ngx_http_output_body_filter_pt)
(ngx_http_request_t *r, ngx_chain_t *chain);
typedef ngx_int_t (*ngx_http_request_body_filter_pt)
(ngx_http_request_t *r, ngx_chain_t *chain);
typedef struct {
struct sockaddr *sockaddr;
socklen_t socklen;
@ -175,6 +182,10 @@ typedef struct {
ngx_array_t *ports;
ngx_http_phase_t phases[NGX_HTTP_LOG_PHASE + 1];
ngx_http_output_header_filter_pt top_header_filter;
ngx_http_output_body_filter_pt top_body_filter;
ngx_http_request_body_filter_pt top_request_body_filter;
} ngx_http_core_main_conf_t;
@ -524,13 +535,6 @@ ngx_int_t ngx_http_named_location(ngx_http_request_t *r, ngx_str_t *name);
ngx_http_cleanup_t *ngx_http_cleanup_add(ngx_http_request_t *r, size_t size);
typedef ngx_int_t (*ngx_http_output_header_filter_pt)(ngx_http_request_t *r);
typedef ngx_int_t (*ngx_http_output_body_filter_pt)
(ngx_http_request_t *r, ngx_chain_t *chain);
typedef ngx_int_t (*ngx_http_request_body_filter_pt)
(ngx_http_request_t *r, ngx_chain_t *chain);
ngx_int_t ngx_http_output_filter(ngx_http_request_t *r, ngx_chain_t *chain);
ngx_int_t ngx_http_write_filter(ngx_http_request_t *r, ngx_chain_t *chain);
ngx_int_t ngx_http_request_body_save_filter(ngx_http_request_t *r,

View File

@ -1005,6 +1005,7 @@ ngx_http_request_body_length_filter(ngx_http_request_t *r, ngx_chain_t *in)
ngx_buf_t *b;
ngx_chain_t *cl, *tl, *out, **ll;
ngx_http_request_body_t *rb;
ngx_http_core_main_conf_t *cmcf;
rb = r->request_body;
@ -1075,7 +1076,9 @@ ngx_http_request_body_length_filter(ngx_http_request_t *r, ngx_chain_t *in)
ll = &tl->next;
}
rc = ngx_http_top_request_body_filter(r, out);
cmcf = ngx_http_get_module_main_conf(r, ngx_http_core_module);
rc = cmcf->top_request_body_filter(r, out);
ngx_chain_update_chains(r->pool, &rb->free, &rb->busy, &out,
(ngx_buf_tag_t) &ngx_http_read_client_request_body);
@ -1094,6 +1097,7 @@ ngx_http_request_body_chunked_filter(ngx_http_request_t *r, ngx_chain_t *in)
ngx_http_request_body_t *rb;
ngx_http_core_loc_conf_t *clcf;
ngx_http_core_srv_conf_t *cscf;
ngx_http_core_main_conf_t *cmcf;
rb = r->request_body;
@ -1259,7 +1263,9 @@ ngx_http_request_body_chunked_filter(ngx_http_request_t *r, ngx_chain_t *in)
}
}
rc = ngx_http_top_request_body_filter(r, out);
cmcf = ngx_http_get_module_main_conf(r, ngx_http_core_module);
rc = cmcf->top_request_body_filter(r, out);
ngx_chain_update_chains(r->pool, &rb->free, &rb->busy, &out,
(ngx_buf_tag_t) &ngx_http_read_client_request_body);

View File

@ -3907,6 +3907,7 @@ ngx_http_v2_read_request_body(ngx_http_request_t *r)
ngx_http_request_body_t *rb;
ngx_http_core_loc_conf_t *clcf;
ngx_http_v2_connection_t *h2c;
ngx_http_core_main_conf_t *cmcf;
stream = r->stream;
rb = r->request_body;
@ -3921,7 +3922,9 @@ ngx_http_v2_read_request_body(ngx_http_request_t *r)
/* set rb->filter_need_buffering */
rc = ngx_http_top_request_body_filter(r, NULL);
cmcf = ngx_http_get_module_main_conf(r, ngx_http_core_module);
rc = cmcf->top_request_body_filter(r, NULL);
if (rc != NGX_OK) {
stream->skip_data = 1;
@ -4180,6 +4183,7 @@ ngx_http_v2_filter_request_body(ngx_http_request_t *r)
ngx_chain_t *cl;
ngx_http_request_body_t *rb;
ngx_http_core_loc_conf_t *clcf;
ngx_http_core_main_conf_t *cmcf;
rb = r->request_body;
buf = rb->buf;
@ -4255,7 +4259,9 @@ ngx_http_v2_filter_request_body(ngx_http_request_t *r)
update:
rc = ngx_http_top_request_body_filter(r, cl);
cmcf = ngx_http_get_module_main_conf(r, ngx_http_core_module);
rc = cmcf->top_request_body_filter(r, cl);
ngx_chain_update_chains(r->pool, &rb->free, &rb->busy, &cl,
(ngx_buf_tag_t) &ngx_http_v2_filter_request_body);

View File

@ -1506,6 +1506,7 @@ ngx_http_v3_request_body_filter(ngx_http_request_t *r, ngx_chain_t *in)
ngx_http_core_loc_conf_t *clcf;
ngx_http_core_srv_conf_t *cscf;
ngx_http_v3_parse_data_t *st;
ngx_http_core_main_conf_t *cmcf;
rb = r->request_body;
st = &r->v3_parse->body;
@ -1723,7 +1724,9 @@ done:
rb->rest = (off_t) cscf->large_client_header_buffers.size;
}
rc = ngx_http_top_request_body_filter(r, out);
cmcf = ngx_http_get_module_main_conf(r, ngx_http_core_module);
rc = cmcf->top_request_body_filter(r, out);
ngx_chain_update_chains(r->pool, &rb->free, &rb->busy, &out,
(ngx_buf_tag_t) &ngx_http_read_client_request_body);

View File

@ -257,6 +257,10 @@ ngx_stream_block(ngx_conf_t *cf, ngx_command_t *cmd, void *conf)
}
}
if (ngx_stream_init_filters(cf) != NGX_OK) {
return NGX_CONF_ERROR;
}
if (ngx_stream_variables_init_vars(cf) != NGX_OK) {
return NGX_CONF_ERROR;
}

View File

@ -97,6 +97,8 @@ typedef ngx_int_t (*ngx_stream_phase_handler_pt)(ngx_stream_session_t *s,
ngx_stream_phase_handler_t *ph);
typedef ngx_int_t (*ngx_stream_handler_pt)(ngx_stream_session_t *s);
typedef void (*ngx_stream_content_handler_pt)(ngx_stream_session_t *s);
typedef ngx_int_t (*ngx_stream_filter_pt)(ngx_stream_session_t *s,
ngx_chain_t *chain, ngx_uint_t from_upstream);
struct ngx_stream_phase_handler_s {
@ -138,6 +140,8 @@ typedef struct {
ngx_array_t *ports;
ngx_stream_phase_t phases[NGX_STREAM_LOG_PHASE + 1];
ngx_stream_filter_pt top_filter;
} ngx_stream_core_main_conf_t;
@ -363,6 +367,8 @@ ngx_int_t ngx_stream_validate_host(ngx_str_t *host, ngx_pool_t *pool,
ngx_int_t ngx_stream_find_virtual_server(ngx_stream_session_t *s,
ngx_str_t *host, ngx_stream_core_srv_conf_t **cscfp);
char *ngx_stream_init_filters(ngx_conf_t *cf);
void ngx_stream_init_connection(ngx_connection_t *c);
void ngx_stream_session_handler(ngx_event_t *rev);
void ngx_stream_finalize_session(ngx_stream_session_t *s, ngx_uint_t rc);
@ -373,10 +379,6 @@ extern ngx_uint_t ngx_stream_max_module;
extern ngx_module_t ngx_stream_core_module;
typedef ngx_int_t (*ngx_stream_filter_pt)(ngx_stream_session_t *s,
ngx_chain_t *chain, ngx_uint_t from_upstream);
extern ngx_stream_filter_pt ngx_stream_top_filter;

View File

@ -1502,3 +1502,18 @@ ngx_stream_core_resolver(ngx_conf_t *cf, ngx_command_t *cmd, void *conf)
return NGX_CONF_OK;
}
char *
ngx_stream_init_filters(ngx_conf_t *cf)
{
ngx_stream_core_main_conf_t *cmcf;
cmcf = ngx_stream_conf_get_module_main_conf(cf, ngx_stream_core_module);
cmcf->top_filter = ngx_stream_top_filter;
ngx_stream_top_filter = NULL;
return NGX_CONF_OK;
}

View File

@ -1729,6 +1729,7 @@ ngx_stream_proxy_process(ngx_stream_session_t *s, ngx_uint_t from_upstream,
ngx_log_handler_pt handler;
ngx_stream_upstream_t *u;
ngx_stream_proxy_srv_conf_t *pscf;
ngx_stream_core_main_conf_t *cmcf;
u = s->upstream;
@ -1777,6 +1778,8 @@ ngx_stream_proxy_process(ngx_stream_session_t *s, ngx_uint_t from_upstream,
send_action = "proxying and sending to upstream";
}
cmcf = ngx_stream_get_module_main_conf(s, ngx_stream_core_module);
for ( ;; ) {
if (do_write && dst) {
@ -1784,7 +1787,7 @@ ngx_stream_proxy_process(ngx_stream_session_t *s, ngx_uint_t from_upstream,
if (*out || *busy || dst->buffered) {
c->log->action = send_action;
rc = ngx_stream_top_filter(s, *out, from_upstream);
rc = cmcf->top_filter(s, *out, from_upstream);
if (rc == NGX_ERROR) {
ngx_stream_proxy_finalize(s, NGX_STREAM_OK);

View File

@ -136,6 +136,7 @@ ngx_stream_return_write_handler(ngx_event_t *ev)
ngx_connection_t *c;
ngx_stream_session_t *s;
ngx_stream_return_ctx_t *ctx;
ngx_stream_core_main_conf_t *cmcf;
c = ev->data;
s = c->data;
@ -146,9 +147,11 @@ ngx_stream_return_write_handler(ngx_event_t *ev)
return;
}
cmcf = ngx_stream_get_module_main_conf(s, ngx_stream_core_module);
ctx = ngx_stream_get_module_ctx(s, ngx_stream_return_module);
if (ngx_stream_top_filter(s, ctx->out, 1) == NGX_ERROR) {
if (cmcf->top_filter(s, ctx->out, 1) == NGX_ERROR) {
ngx_stream_finalize_session(s, NGX_STREAM_INTERNAL_SERVER_ERROR);
return;
}