From 3f3953599a2de05f82f09cb0efe442d3d58f3e16 Mon Sep 17 00:00:00 2001 From: Aviram Date: Wed, 24 Apr 2013 10:36:55 +0300 Subject: [PATCH 01/12] Added an optional parameter for ngx.req.set_header and ngx.req.clear_header that determines whether or not to replace underscores with hyphens. Previously, underscores were replaced unconditionally. Currently each of the functions has another boolean argument. If it's false, underscores would not be touched. If it's true, they would. The default value of the argument is true. --- src/ngx_http_lua_headers.c | 40 ++++++++++++++++++++++++++++++-------- 1 file changed, 32 insertions(+), 8 deletions(-) diff --git a/src/ngx_http_lua_headers.c b/src/ngx_http_lua_headers.c index 0f30a78ff8..a91a3c9314 100644 --- a/src/ngx_http_lua_headers.c +++ b/src/ngx_http_lua_headers.c @@ -581,12 +581,26 @@ ngx_http_lua_ngx_header_set(lua_State *L) static int ngx_http_lua_ngx_req_header_clear(lua_State *L) { - if (lua_gettop(L) != 1) { - return luaL_error(L, "expecting one arguments, but seen %d", + ngx_uint_t n; + n = lua_gettop(L); + if ((n != 1) && (n != 2)) { + return luaL_error(L, "expecting one or two arguments, but seen %d", lua_gettop(L)); } - lua_pushnil(L); + if (n == 2) { + u_char *p; + size_t len; + int replace_underscores = 1; + p = (u_char *) luaL_checklstring(L, 1, &len); + replace_underscores = lua_toboolean(L, 2); + lua_pop(L, 2); + lua_pushlstring(L, (char *) p, len); + lua_pushnil(L); + lua_pushboolean(L, replace_underscores); + } else { + lua_pushnil(L); + } return ngx_http_lua_ngx_req_header_set_helper(L); } @@ -595,7 +609,7 @@ ngx_http_lua_ngx_req_header_clear(lua_State *L) static int ngx_http_lua_ngx_req_header_set(lua_State *L) { - if (lua_gettop(L) != 2) { + if ((lua_gettop(L) != 2) && (lua_gettop(L) != 3)) { return luaL_error(L, "expecting two arguments, but seen %d", lua_gettop(L)); } @@ -615,6 +629,7 @@ ngx_http_lua_ngx_req_header_set_helper(lua_State *L) size_t len; ngx_int_t rc; ngx_uint_t n; + int replace_underscores = 1; lua_pushlightuserdata(L, &ngx_http_lua_request_key); lua_rawget(L, LUA_GLOBALSINDEX); @@ -627,17 +642,26 @@ ngx_http_lua_ngx_req_header_set_helper(lua_State *L) ngx_http_lua_check_fake_request(L, r); + n = lua_gettop(L); + if (n >= 3) { + replace_underscores = lua_toboolean(L, 3); + } else { + replace_underscores = 1; + } + p = (u_char *) luaL_checklstring(L, 1, &len); dd("key: %.*s, len %d", (int) len, p, (int) len); + if (replace_underscores) { /* replace "_" with "-" */ - for (i = 0; i < len; i++) { - if (p[i] == '_') { - p[i] = '-'; + for (i = 0; i < len; i++) { + if (p[i] == '_') { + p[i] = '-'; + } } } - + key.data = ngx_palloc(r->pool, len + 1); if (key.data == NULL) { return luaL_error(L, "out of memory"); From 1c5e5b9f0d030f66aec123b4cd22c7e23feaf34e Mon Sep 17 00:00:00 2001 From: aviramc Date: Wed, 8 May 2013 13:23:50 +0300 Subject: [PATCH 02/12] Changed the replace_underscores parameter to a table of parameters (options), in which the only possible option currently is replace_underscores. --- src/ngx_http_lua_headers.c | 7 +++++-- 1 file changed, 5 insertions(+), 2 deletions(-) diff --git a/src/ngx_http_lua_headers.c b/src/ngx_http_lua_headers.c index a91a3c9314..95df6111c8 100644 --- a/src/ngx_http_lua_headers.c +++ b/src/ngx_http_lua_headers.c @@ -643,8 +643,11 @@ ngx_http_lua_ngx_req_header_set_helper(lua_State *L) ngx_http_lua_check_fake_request(L, r); n = lua_gettop(L); - if (n >= 3) { - replace_underscores = lua_toboolean(L, 3); + if (n == 3) { + luaL_checktype(L, 3, LUA_TTABLE); + lua_getfield(L, 3, "replace_underscores"); + replace_underscores = lua_toboolean(L, -1); + lua_pop(L, 1); } else { replace_underscores = 1; } From f2e849a7efc23f86971596110a819d1057aeee37 Mon Sep 17 00:00:00 2001 From: aviramc Date: Wed, 8 May 2013 13:44:32 +0300 Subject: [PATCH 03/12] Added an options table to clean_header as well. --- src/ngx_http_lua_headers.c | 10 ++-------- 1 file changed, 2 insertions(+), 8 deletions(-) diff --git a/src/ngx_http_lua_headers.c b/src/ngx_http_lua_headers.c index 95df6111c8..36f6a01c5a 100644 --- a/src/ngx_http_lua_headers.c +++ b/src/ngx_http_lua_headers.c @@ -589,15 +589,9 @@ ngx_http_lua_ngx_req_header_clear(lua_State *L) } if (n == 2) { - u_char *p; - size_t len; - int replace_underscores = 1; - p = (u_char *) luaL_checklstring(L, 1, &len); - replace_underscores = lua_toboolean(L, 2); - lua_pop(L, 2); - lua_pushlstring(L, (char *) p, len); lua_pushnil(L); - lua_pushboolean(L, replace_underscores); + /* Top element is now 3, replace it with element 3 */ + lua_insert(L, 2); } else { lua_pushnil(L); } From d4d2988488c5a6758d2e87a47c592d833513329d Mon Sep 17 00:00:00 2001 From: aviram Date: Sun, 29 Sep 2013 10:54:33 +0200 Subject: [PATCH 04/12] Removed unneeded variable. --- src/ngx_http_lua_headers.c | 1 - 1 file changed, 1 deletion(-) diff --git a/src/ngx_http_lua_headers.c b/src/ngx_http_lua_headers.c index 6960a3c238..3a0226517d 100644 --- a/src/ngx_http_lua_headers.c +++ b/src/ngx_http_lua_headers.c @@ -604,7 +604,6 @@ ngx_http_lua_ngx_req_header_set_helper(lua_State *L) size_t len; ngx_int_t rc; ngx_uint_t n; - int replace_underscores = 1; r = ngx_http_lua_get_req(L); if (r == NULL) { From c1115829d09cc68d7f0e77d7a2dd901334cd6e48 Mon Sep 17 00:00:00 2001 From: aviram Date: Mon, 30 Sep 2013 11:20:04 +0200 Subject: [PATCH 05/12] Added ngx.location.capture_stream, which enables a single subrequest to be performed in a streaming fashion. --- src/ngx_http_lua_capturefilter.c | 45 ++- src/ngx_http_lua_common.h | 18 ++ src/ngx_http_lua_contentby.c | 58 +++- src/ngx_http_lua_subrequest.c | 478 +++++++++++++++++++++++++++++-- src/ngx_http_lua_subrequest.h | 2 + 5 files changed, 570 insertions(+), 31 deletions(-) diff --git a/src/ngx_http_lua_capturefilter.c b/src/ngx_http_lua_capturefilter.c index 9a945ff80d..01243e2c6e 100644 --- a/src/ngx_http_lua_capturefilter.c +++ b/src/ngx_http_lua_capturefilter.c @@ -16,7 +16,7 @@ #include "ngx_http_lua_util.h" #include "ngx_http_lua_exception.h" #include "ngx_http_lua_subrequest.h" - +#include "ngx_http_lua_contentby.h" ngx_http_output_header_filter_pt ngx_http_lua_next_header_filter; ngx_http_output_body_filter_pt ngx_http_lua_next_body_filter; @@ -104,6 +104,15 @@ ngx_http_lua_capture_header_filter(ngx_http_request_t *r) } +#ifdef NGX_LUA_CAPTURE_DOWN_STREAMING +static ngx_int_t +_should_store_chain_link(ngx_chain_t *in) +{ + return ((in != NULL) && ((in->buf->pos != in->buf->last))); +} +#endif + + static ngx_int_t ngx_http_lua_capture_body_filter(ngx_http_request_t *r, ngx_chain_t *in) { @@ -162,6 +171,40 @@ ngx_http_lua_capture_body_filter(ngx_http_request_t *r, ngx_chain_t *in) ctx->seen_last_for_subreq = 1; } +#ifdef NGX_LUA_CAPTURE_DOWN_STREAMING + if (pr_ctx->async_capture) { + /* In order to wake the parent up, we should call post and not discard + the buffer */ + pr_ctx->current_subrequest = r; /* Required for wake up (?) */ + pr_ctx->current_subrequest_ctx = ctx; /* Required for the buffer */ + + /* XXX: In some cases, pr_ctx->current_subrequest_buffer is being + cleaned by Nginx and buf gets the value 0x1... */ + if (((pr_ctx->current_subrequest_buffer == NULL) + || (pr_ctx->current_subrequest_buffer->buf == (void *) 1)) + && (_should_store_chain_link(in))) { + pr_ctx->current_subrequest_buffer = in; + } + + /* TODO: Is this line needed? */ + r->parent->write_event_handler = ngx_http_lua_content_wev_handler; + + if (!eof) { + pr_ctx->wakeup_subrequest = 1; + /* On EOF, the post subrequest callback is called, and it handles + the setting of the resume handler. The parent request would be + woken up anyway by Nginx. + */ + if (ngx_http_post_request(r->parent, NULL) != NGX_OK) { + return NGX_ERROR; + } + return NGX_OK; + } else { + pr_ctx->wakeup_subrequest = 0; + } + } +#endif + ngx_http_lua_discard_bufs(r->pool, in); return NGX_OK; diff --git a/src/ngx_http_lua_common.h b/src/ngx_http_lua_common.h index b03d7bb994..121b28e2a6 100644 --- a/src/ngx_http_lua_common.h +++ b/src/ngx_http_lua_common.h @@ -330,6 +330,19 @@ typedef struct ngx_http_lua_ctx_s { ngx_int_t exit_code; +#ifdef NGX_LUA_CAPTURE_DOWN_STREAMING + ngx_int_t async_capture; + ngx_http_request_t *current_subrequest; + struct ngx_http_lua_ctx_s *current_subrequest_ctx; + ngx_chain_t *current_subrequest_buffer; + ngx_int_t returned_headers; +#endif + + ngx_http_lua_co_ctx_t *calling_coctx; /* co ctx for the caller to location.capture */ + + ngx_http_lua_co_ctx_t *req_body_reader_co_ctx; /* co ctx for the coroutine + reading the request + body */ ngx_http_lua_co_ctx_t *downstream_co_ctx; /* co ctx for the coroutine reading the request body */ @@ -371,6 +384,11 @@ typedef struct ngx_http_lua_ctx_s { unsigned headers_set:1; /* whether the user has set custom response headers */ +#ifdef NGX_LUA_CAPTURE_DOWN_STREAMING + unsigned wakeup_subrequest:1; + unsigned subrequest_yield:1; +#endif + unsigned entered_rewrite_phase:1; unsigned entered_access_phase:1; unsigned entered_content_phase:1; diff --git a/src/ngx_http_lua_contentby.c b/src/ngx_http_lua_contentby.c index c635ab72cb..1168040ed5 100644 --- a/src/ngx_http_lua_contentby.c +++ b/src/ngx_http_lua_contentby.c @@ -20,6 +20,12 @@ static void ngx_http_lua_content_phase_post_read(ngx_http_request_t *r); +#ifdef NGX_LUA_CAPTURE_DOWN_STREAMING +static ngx_int_t _is_chain_valid(ngx_chain_t * cl); +static ngx_int_t _is_last_chain_link(ngx_chain_t * cl); +static ngx_int_t _post_request_if_not_posted(ngx_http_request_t *r, + ngx_http_posted_request_t *pr); +#endif ngx_int_t ngx_http_lua_content_by_chunk(lua_State *L, ngx_http_request_t *r) @@ -115,17 +121,67 @@ ngx_http_lua_content_by_chunk(lua_State *L, ngx_http_request_t *r) } +#ifdef NGX_LUA_CAPTURE_DOWN_STREAMING +static ngx_int_t +_post_request_if_not_posted(ngx_http_request_t *r, ngx_http_posted_request_t *pr) +{ + ngx_http_posted_request_t *p; + + /* Search request in the posted requests list, so that it would not be posted twice. */ + for (p = r->main->posted_requests; p; p = p->next) { + if (p->request == r) { + return NGX_OK; + } + } + + return ngx_http_post_request(r, pr); +} + +static ngx_int_t +_is_chain_valid(ngx_chain_t * cl) +{ + /* For some reason, sometimes when cl->buf is cleaned, 1 is assigned to it. */ + return ((cl != NULL) && (cl->buf != NULL) && (cl->buf != (void *) 1)); +} +static ngx_int_t +_is_last_chain_link(ngx_chain_t * cl) +{ + /* last_in_chain is for subrequests. */ + return cl->buf->last_in_chain || cl->buf->last_buf; +} +#endif + void ngx_http_lua_content_wev_handler(ngx_http_request_t *r) { ngx_http_lua_ctx_t *ctx; + ngx_int_t rc; ctx = ngx_http_get_module_ctx(r, ngx_http_lua_module); if (ctx == NULL) { return; } - (void) ctx->resume_handler(r); + rc = ctx->resume_handler(r); + + if (rc == NGX_DONE) { + return; + } + +#ifdef NGX_LUA_CAPTURE_DOWN_STREAMING + if (ctx->current_subrequest && ctx->wakeup_subrequest) { + /* Make sure that the subrequest continues */ + if (_post_request_if_not_posted(ctx->current_subrequest, NULL) != NGX_OK) { + ngx_http_lua_finalize_request(r, NGX_ERROR); + } + /* Don't try to discard the last buffer, as it will cause a NULL dereference... */ + if (_is_chain_valid(ctx->current_subrequest_buffer) && (!_is_last_chain_link(ctx->current_subrequest_buffer))) { + ngx_http_lua_discard_bufs(ctx->current_subrequest->pool, ctx->current_subrequest_buffer); + } + ctx->current_subrequest_buffer = NULL; + ctx->wakeup_subrequest = 0; + } +#endif } diff --git a/src/ngx_http_lua_subrequest.c b/src/ngx_http_lua_subrequest.c index bb10fff4b0..3a3eca01bf 100644 --- a/src/ngx_http_lua_subrequest.c +++ b/src/ngx_http_lua_subrequest.c @@ -70,13 +70,18 @@ static ngx_int_t ngx_http_lua_subrequest_add_extra_vars(ngx_http_request_t *r, static ngx_int_t ngx_http_lua_subrequest(ngx_http_request_t *r, ngx_str_t *uri, ngx_str_t *args, ngx_http_request_t **psr, ngx_http_post_subrequest_t *ps, ngx_uint_t flags); -static ngx_int_t ngx_http_lua_subrequest_resume(ngx_http_request_t *r); +ngx_int_t ngx_http_lua_subrequest_resume(ngx_http_request_t *r); static void ngx_http_lua_handle_subreq_responses(ngx_http_request_t *r, ngx_http_lua_ctx_t *ctx); static void ngx_http_lua_cancel_subreq(ngx_http_request_t *r); static ngx_int_t ngx_http_post_request_to_head(ngx_http_request_t *r); static ngx_int_t ngx_http_lua_copy_in_file_request_body(ngx_http_request_t *r); +#ifdef NGX_LUA_CAPTURE_DOWN_STREAMING +static int ngx_http_lua_ngx_location_capture_stream(lua_State *L); +static int ngx_http_lua_ngx_location_get_subrequest_buffer(lua_State *L); +static ngx_int_t _prepare_subrequest_body_chunk(ngx_http_request_t *r, ngx_http_lua_ctx_t *ctx, u_char ** dest_buffer, unsigned long * length); +#endif /* ngx.location.capture is just a thin wrapper around * ngx.location.capture_multi */ @@ -108,6 +113,51 @@ ngx_http_lua_ngx_location_capture(lua_State *L) return ngx_http_lua_ngx_location_capture_multi(L); } +#ifdef NGX_LUA_CAPTURE_DOWN_STREAMING +static int +ngx_http_lua_ngx_location_capture_stream(lua_State *L) +{ + int n; + ngx_http_request_t *r; + ngx_http_lua_ctx_t *ctx; + + r = ngx_http_lua_get_req(L); + + if (r == NULL) { + return luaL_error(L, "no request object found"); + } + + ctx = ngx_http_get_module_ctx(r, ngx_http_lua_module); + if (ctx == NULL) { + return luaL_error(L, "no ctx found"); + } + + n = lua_gettop(L); + + if (n != 1 && n != 2) { + return luaL_error(L, "expecting one or two arguments"); + } + + ctx->async_capture = 1; + ctx->calling_coctx = ctx->cur_co_ctx; + + lua_createtable(L, n, 0); /* uri opts? table */ + lua_insert(L, 1); /* table uri opts? */ + if (n == 1) { /* table uri */ + lua_rawseti(L, 1, 1); /* table */ + + } else { /* table uri opts */ + lua_rawseti(L, 1, 2); /* table uri */ + lua_rawseti(L, 1, 1); /* table */ + } + + lua_createtable(L, 1, 0); /* table table' */ + lua_insert(L, 1); /* table' table */ + lua_rawseti(L, 1, 1); /* table' */ + + return ngx_http_lua_ngx_location_capture_multi(L); +} +#endif static int ngx_http_lua_ngx_location_capture_multi(lua_State *L) @@ -607,10 +657,349 @@ ngx_http_lua_ngx_location_capture_multi(lua_State *L) ctx->no_abort = 1; +#ifdef NGX_LUA_CAPTURE_DOWN_STREAMING + /* Different resume handler for async requests. */ + if (ctx->async_capture) { + ctx->resume_handler = ngx_http_lua_ngx_capture_buffer_handler; + } +#endif + return lua_yield(L, 0); } +static void +_create_headers_table(lua_State *L, ngx_http_request_t *request) +{ + ngx_http_headers_out_t *sr_headers; + ngx_table_elt_t *header; + ngx_list_part_t *part; + ngx_uint_t i; + u_char buf[sizeof("Mon, 28 Sep 1970 06:00:00 GMT") - 1]; + + lua_newtable(L); + + sr_headers = &(request->headers_out); + + dd("saving subrequest response headers"); + + part = &sr_headers->headers.part; + header = part->elts; + + for (i = 0; /* void */; i++) { + + if (i >= part->nelts) { + if (part->next == NULL) { + break; + } + + part = part->next; + header = part->elts; + i = 0; + } + + dd("checking sr header %.*s", (int) header[i].key.len, + header[i].key.data); + +#if 1 + if (header[i].hash == 0) { + continue; + } +#endif + + header[i].hash = 0; + + dd("pushing sr header %.*s", (int) header[i].key.len, + header[i].key.data); + + lua_pushlstring(L, (char *) header[i].key.data, + header[i].key.len); /* header key */ + lua_pushvalue(L, -1); /* stack: table key key */ + + /* check if header already exists */ + lua_rawget(L, -3); /* stack: table key value */ + + if (lua_isnil(L, -1)) { + lua_pop(L, 1); /* stack: table key */ + + lua_pushlstring(L, (char *) header[i].value.data, + header[i].value.len); + /* stack: table key value */ + + lua_rawset(L, -3); /* stack: table */ + + } else { + + if (!lua_istable(L, -1)) { /* already inserted one value */ + lua_createtable(L, 4, 0); + /* stack: table key value table */ + + lua_insert(L, -2); /* stack: table key table value */ + lua_rawseti(L, -2, 1); /* stack: table key table */ + + lua_pushlstring(L, (char *) header[i].value.data, + header[i].value.len); + /* stack: table key table value */ + + lua_rawseti(L, -2, lua_objlen(L, -2) + 1); + /* stack: table key table */ + + lua_rawset(L, -3); /* stack: table */ + + } else { + lua_pushlstring(L, (char *) header[i].value.data, + header[i].value.len); + /* stack: table key table value */ + + lua_rawseti(L, -2, lua_objlen(L, -2) + 1); + /* stack: table key table */ + + lua_pop(L, 2); /* stack: table */ + } + } + } + + if (sr_headers->content_type.len) { + lua_pushliteral(L, "Content-Type"); /* header key */ + lua_pushlstring(L, (char *) sr_headers->content_type.data, + sr_headers->content_type.len); /* head key value */ + lua_rawset(L, -3); /* head */ + } + + if (sr_headers->content_length == NULL + && sr_headers->content_length_n >= 0) + { + lua_pushliteral(L, "Content-Length"); /* header key */ + + lua_pushnumber(L, sr_headers->content_length_n); + /* head key value */ + + lua_rawset(L, -3); /* head */ + } + + /* to work-around an issue in ngx_http_static_module + * (github issue #41) */ + if (sr_headers->location && sr_headers->location->value.len) { + lua_pushliteral(L, "Location"); /* header key */ + lua_pushlstring(L, (char *) sr_headers->location->value.data, + sr_headers->location->value.len); + /* head key value */ + lua_rawset(L, -3); /* head */ + } + + if (sr_headers->last_modified_time != -1) { + if (sr_headers->status != NGX_HTTP_OK + && sr_headers->status != NGX_HTTP_PARTIAL_CONTENT + && sr_headers->status != NGX_HTTP_NOT_MODIFIED + && sr_headers->status != NGX_HTTP_NO_CONTENT) + { + sr_headers->last_modified_time = -1; + sr_headers->last_modified = NULL; + } + } + + if (sr_headers->last_modified == NULL + && sr_headers->last_modified_time != -1) + { + (void) ngx_http_time(buf, sr_headers->last_modified_time); + + lua_pushliteral(L, "Last-Modified"); /* header key */ + lua_pushlstring(L, (char *) buf, sizeof(buf)); /* head key value */ + lua_rawset(L, -3); /* head */ + } +} + + +#ifdef NGX_LUA_CAPTURE_DOWN_STREAMING +static int +ngx_http_lua_ngx_location_get_subrequest_buffer(lua_State *L) +{ + ngx_http_request_t *r = NULL; + ngx_http_lua_ctx_t *ctx = NULL; + unsigned long buffer_length = 0; + u_char *current_buffer = NULL; + + r = ngx_http_lua_get_req(L); + + if (r == NULL) { + return luaL_error(L, "no request object found"); + } + + ctx = ngx_http_get_module_ctx(r, ngx_http_lua_module); + if (ctx == NULL) { + return luaL_error(L, "no ctx found"); + } + + if ((!ctx->async_capture) || (ctx->current_subrequest_ctx == NULL)) { + return luaL_error(L, "no capture streaming currently"); + } + + /* If there are chunks currently, we should return them immediately. Otherwise, wait.. */ + if (ctx->current_subrequest_ctx->body) { + if (_prepare_subrequest_body_chunk(r, ctx, ¤t_buffer, &buffer_length) != NGX_OK) { + return luaL_error(L, "memory allocation failure"); + } + ctx->subrequest_yield = 0; + lua_pushlstring(L, (char *) current_buffer, buffer_length); + /* Free the buffer that was copied to Lua */ + ngx_pfree(r->pool, current_buffer); + return 1; + } + + /* If the body is NULL and the post subrequest handler was called, this means there are no more buffers. */ + if (ctx->current_subrequest_ctx->run_post_subrequest) { + /* TODO: Tight coupling between the ctx and the request object. */ + ctx->current_subrequest_ctx = NULL; + ctx->current_subrequest = NULL; + ctx->resume_handler = ngx_http_lua_wev_handler; + return 0; + } + + ctx->resume_handler = ngx_http_lua_ngx_capture_buffer_handler; + + ctx->subrequest_yield = 1; + + return lua_yield(L, 0); +} + + +static unsigned long _get_chain_size(ngx_chain_t *in) +{ + ngx_chain_t *current_chain_link = NULL; + unsigned long chain_link_size = 0; + + for (current_chain_link = in, chain_link_size = 0; current_chain_link != NULL ;current_chain_link = current_chain_link->next) { + chain_link_size += current_chain_link->buf->last - current_chain_link->buf->pos; + } + + return chain_link_size; +} + + +static ngx_int_t _prepare_subrequest_body_chunk(ngx_http_request_t *r, ngx_http_lua_ctx_t *ctx, u_char ** dest_buffer, unsigned long * length) +{ + ngx_chain_t *cl; + unsigned long buffer_length = 0; + u_char *current_buffer; + + /* Prepare the buffer for the callback */ + buffer_length = _get_chain_size(ctx->current_subrequest_ctx->body); + current_buffer = ngx_palloc(r->pool, buffer_length); + if (current_buffer == NULL) { + return NGX_ERROR; + } + + *length = buffer_length; + *dest_buffer = current_buffer; + + /* TODO: Support the limitation of the buffer size */ + for (cl = ctx->current_subrequest_ctx->body; cl; cl = cl->next) { + current_buffer = ngx_copy(current_buffer, cl->buf->pos, cl->buf->last - cl->buf->pos); + /* TODO: Should we acutally call ngx_pfree? According to the post subrequest callback, we shouldn't. */ + cl->buf->last = cl->buf->pos; + ctx->current_subrequest_ctx->body = NULL; + ctx->current_subrequest_ctx->last_body = &(ctx->current_subrequest_ctx->body); + } + + return NGX_OK; +} + +ngx_int_t +ngx_http_lua_ngx_capture_buffer_handler(ngx_http_request_t *r) +{ + ngx_http_lua_ctx_t *ctx; + unsigned long buffer_length = 0; + u_char *current_buffer; + ngx_http_lua_co_ctx_t *current_co_ctx; + lua_State * co; + ngx_int_t rc; + int returned_values; + ngx_http_lua_main_conf_t *lmcf; + + lmcf = ngx_http_get_module_main_conf(r, ngx_http_lua_module); + + ctx = ngx_http_get_module_ctx(r, ngx_http_lua_module); + if (ctx == NULL) { + goto error_handling; + } + + if ((!ctx->current_subrequest_ctx) || (!ctx->current_subrequest)) { + return NGX_AGAIN; + } + + current_co_ctx = ctx->cur_co_ctx; + ctx->cur_co_ctx = ctx->calling_coctx; + co = ctx->cur_co_ctx->co; + + if (_prepare_subrequest_body_chunk(r, ctx, ¤t_buffer, &buffer_length) != NGX_OK) { + goto error_handling; + } + + if (buffer_length == 0) { + ctx->current_subrequest_buffer = NULL; + } + + /* Headers are to be returned only once. */ + if (!ctx->returned_headers) { + _create_headers_table(co, ctx->current_subrequest); + lua_pushinteger(co, ctx->current_subrequest->headers_out.status); + ctx->returned_headers = 1; + returned_values = 3; + } else { + returned_values = 1; + } + + lua_pushlstring(co, (char *) current_buffer, buffer_length); + /* Free the buffer that was copied to Lua */ + ngx_pfree(r->pool, current_buffer); + + ctx->subrequest_yield = 0; + rc = ngx_http_lua_run_thread(co, r, ctx, returned_values); + + ctx->cur_co_ctx = current_co_ctx; + + if (rc == NGX_AGAIN) { + /* NGX_AGAIN can be returned if: + - Any built-in Lua function in the current thread needs waiting. + - The function get_subrequest_buffer needs to wait for the subrequest. + + If a Lua function needs waiting, we'll remove our resume handler. + If we need waiting, we shall keep it. + Anyway, if the subrequest isn't over, we should try to wake it up. + */ + if (!ctx->current_subrequest_ctx || !ctx->current_subrequest_ctx->seen_last_for_subreq) { + ctx->wakeup_subrequest = 1; + } else { + ctx->wakeup_subrequest = 0; + } + + if (ctx->subrequest_yield) { + ctx->subrequest_yield = 0; + } else { + ctx->resume_handler = ngx_http_lua_wev_handler; + } + + return NGX_AGAIN; + } + + if (rc == NGX_DONE) { + ngx_http_lua_finalize_request(r, NGX_DONE); + return ngx_http_lua_run_posted_threads(r->connection, lmcf->lua, r, ctx); + } + + if (ctx->entered_content_phase) { + ngx_http_lua_finalize_request(r, rc); + return NGX_DONE; + } + + return rc; + + error_handling: + ngx_http_lua_finalize_request(r, NGX_ERROR); + return NGX_ERROR; +} +#endif + static ngx_int_t ngx_http_lua_adjust_subrequest(ngx_http_request_t *sr, ngx_uint_t method, int always_forward_body, ngx_http_request_body_t *body, @@ -975,7 +1364,13 @@ ngx_http_lua_post_subrequest(ngx_http_request_t *r, void *data, ngx_int_t rc) dd("all subrequests are done"); pr_ctx->no_abort = 0; - pr_ctx->resume_handler = ngx_http_lua_subrequest_resume; + if (!pr_ctx->async_capture) { + pr_ctx->resume_handler = ngx_http_lua_subrequest_resume; + } else { + /* XXX: Make sure that the parent request has the correct context. */ + pr_ctx->current_subrequest = r; + pr_ctx->current_subrequest_ctx = ctx; + } pr_ctx->cur_co_ctx = pr_coctx; } @@ -1033,45 +1428,47 @@ ngx_http_lua_post_subrequest(ngx_http_request_t *r, void *data, ngx_int_t rc) body_str->len = len; - if (len == 0) { - body_str->data = NULL; + /* If we're in capture streaming mode, we want to keep the buffer, and to free it from the resume handler's context. */ + if (!pr_ctx->async_capture) { + if (len == 0) { + body_str->data = NULL; - } else { - p = ngx_palloc(r->pool, len); - if (p == NULL) { - return NGX_ERROR; - } - - body_str->data = p; + } else { + p = ngx_palloc(r->pool, len); + if (p == NULL) { + return NGX_ERROR; + } - /* copy from and then free the data buffers */ + body_str->data = p; - for (cl = ctx->body; cl; cl = cl->next) { - p = ngx_copy(p, cl->buf->pos, cl->buf->last - cl->buf->pos); + /* copy from and then free the data buffers */ - cl->buf->last = cl->buf->pos; + for (cl = ctx->body; cl; cl = cl->next) { + p = ngx_copy(p, cl->buf->pos, cl->buf->last - cl->buf->pos); + cl->buf->last = cl->buf->pos; #if 0 - dd("free body chain link buf ASAP"); - ngx_pfree(r->pool, cl->buf->start); + dd("free body chain link buf ASAP"); + ngx_pfree(r->pool, cl->buf->start); #endif + } } - } - if (ctx->body) { + if (ctx->body) { #if defined(nginx_version) && nginx_version >= 1001004 - ngx_chain_update_chains(r->pool, + ngx_chain_update_chains(r->pool, #else - ngx_chain_update_chains( + ngx_chain_update_chains( #endif - &pr_ctx->free_bufs, &pr_ctx->busy_bufs, - &ctx->body, - (ngx_buf_tag_t) &ngx_http_lua_module); + &pr_ctx->free_bufs, &pr_ctx->busy_bufs, + &ctx->body, + (ngx_buf_tag_t) &ngx_http_lua_module); - dd("free bufs: %p", pr_ctx->free_bufs); + dd("free bufs: %p", pr_ctx->free_bufs); + } } - + ngx_http_post_request_to_head(pr); if (r != r->connection->data) { @@ -1417,6 +1814,15 @@ ngx_http_lua_inject_subrequest_api(lua_State *L) lua_pushcfunction(L, ngx_http_lua_ngx_location_capture); lua_setfield(L, -2, "capture"); +#ifdef NGX_LUA_CAPTURE_DOWN_STREAMING + lua_pushcfunction(L, ngx_http_lua_ngx_location_capture_stream); + lua_setfield(L, -2, "capture_stream"); + + /* TODO: Call this 'get_subrequest_body_chunk' */ + lua_pushcfunction(L, ngx_http_lua_ngx_location_get_subrequest_buffer); + lua_setfield(L, -2, "get_subrequest_buffer"); +#endif + lua_pushcfunction(L, ngx_http_lua_ngx_location_capture_multi); lua_setfield(L, -2, "capture_multi"); @@ -1544,7 +1950,7 @@ ngx_http_lua_subrequest(ngx_http_request_t *r, } -static ngx_int_t +ngx_int_t ngx_http_lua_subrequest_resume(ngx_http_request_t *r) { ngx_int_t rc; @@ -1552,6 +1958,7 @@ ngx_http_lua_subrequest_resume(ngx_http_request_t *r) ngx_http_lua_ctx_t *ctx; ngx_http_lua_co_ctx_t *coctx; ngx_http_lua_main_conf_t *lmcf; + int returned_values; ctx = ngx_http_get_module_ctx(r, ngx_http_lua_module); if (ctx == NULL) { @@ -1569,7 +1976,20 @@ ngx_http_lua_subrequest_resume(ngx_http_request_t *r) dd("nsubreqs: %d", (int) coctx->nsubreqs); - ngx_http_lua_handle_subreq_responses(r, ctx); + if (ctx->async_capture) { + if (!ctx->returned_headers) { + _create_headers_table(coctx->co, ctx->current_subrequest); + lua_pushinteger(coctx->co, ctx->current_subrequest->headers_out.status); + ctx->returned_headers = 1; + returned_values = 3; + } else { + returned_values = 1; + } + lua_pushnil(coctx->co); + } else { + ngx_http_lua_handle_subreq_responses(r, ctx); + returned_values = coctx->nsubreqs; + } dd("free sr_statues/headers/bodies memory ASAP"); @@ -1584,7 +2004,7 @@ ngx_http_lua_subrequest_resume(ngx_http_request_t *r) c = r->connection; - rc = ngx_http_lua_run_thread(lmcf->lua, r, ctx, coctx->nsubreqs); + rc = ngx_http_lua_run_thread(lmcf->lua, r, ctx, returned_values); ngx_log_debug1(NGX_LOG_DEBUG_HTTP, r->connection->log, 0, "lua run thread returned %d", rc); diff --git a/src/ngx_http_lua_subrequest.h b/src/ngx_http_lua_subrequest.h index aad4236d84..ba7c3b3fc6 100644 --- a/src/ngx_http_lua_subrequest.h +++ b/src/ngx_http_lua_subrequest.h @@ -40,6 +40,8 @@ typedef struct ngx_http_lua_post_subrequest_data_s { } ngx_http_lua_post_subrequest_data_t; +ngx_int_t ngx_http_lua_ngx_capture_buffer_handler(ngx_http_request_t *r); +ngx_int_t ngx_http_lua_subrequest_resume(ngx_http_request_t *r); #endif /* _NGX_HTTP_LUA_SUBREQUEST_H_INCLUDED_ */ From b8d420926a71aadf6b1355da478937e1fe69f38f Mon Sep 17 00:00:00 2001 From: aviram Date: Thu, 3 Oct 2013 09:47:16 +0200 Subject: [PATCH 06/12] Added all necessary safe guards to protect the subrequest streaming feature. --- src/ngx_http_lua_subrequest.c | 29 +++++++++++++++++++++++------ 1 file changed, 23 insertions(+), 6 deletions(-) diff --git a/src/ngx_http_lua_subrequest.c b/src/ngx_http_lua_subrequest.c index 3a3eca01bf..805f2c281b 100644 --- a/src/ngx_http_lua_subrequest.c +++ b/src/ngx_http_lua_subrequest.c @@ -668,6 +668,7 @@ ngx_http_lua_ngx_location_capture_multi(lua_State *L) } +#ifdef NGX_LUA_CAPTURE_DOWN_STREAMING static void _create_headers_table(lua_State *L, ngx_http_request_t *request) { @@ -810,7 +811,6 @@ _create_headers_table(lua_State *L, ngx_http_request_t *request) } -#ifdef NGX_LUA_CAPTURE_DOWN_STREAMING static int ngx_http_lua_ngx_location_get_subrequest_buffer(lua_State *L) { @@ -1364,14 +1364,19 @@ ngx_http_lua_post_subrequest(ngx_http_request_t *r, void *data, ngx_int_t rc) dd("all subrequests are done"); pr_ctx->no_abort = 0; - if (!pr_ctx->async_capture) { - pr_ctx->resume_handler = ngx_http_lua_subrequest_resume; - } else { + pr_ctx->resume_handler = ngx_http_lua_subrequest_resume; + pr_ctx->cur_co_ctx = pr_coctx; + +#ifdef NGX_LUA_CAPTURE_DOWN_STREAMING + + if (pr_ctx->async_capture) { /* XXX: Make sure that the parent request has the correct context. */ pr_ctx->current_subrequest = r; pr_ctx->current_subrequest_ctx = ctx; } - pr_ctx->cur_co_ctx = pr_coctx; + +#endif + } if (pr_ctx->entered_content_phase) { @@ -1427,9 +1432,11 @@ ngx_http_lua_post_subrequest(ngx_http_request_t *r, void *data, ngx_int_t rc) } body_str->len = len; - + +#ifdef NGX_LUA_CAPTURE_DOWN_STREAMING /* If we're in capture streaming mode, we want to keep the buffer, and to free it from the resume handler's context. */ if (!pr_ctx->async_capture) { +#endif if (len == 0) { body_str->data = NULL; @@ -1467,7 +1474,9 @@ ngx_http_lua_post_subrequest(ngx_http_request_t *r, void *data, ngx_int_t rc) dd("free bufs: %p", pr_ctx->free_bufs); } +#ifdef NGX_LUA_CAPTURE_DOWN_STREAMING } +#endif ngx_http_post_request_to_head(pr); @@ -1976,6 +1985,8 @@ ngx_http_lua_subrequest_resume(ngx_http_request_t *r) dd("nsubreqs: %d", (int) coctx->nsubreqs); +#ifdef NGX_LUA_CAPTURE_DOWN_STREAMING + if (ctx->async_capture) { if (!ctx->returned_headers) { _create_headers_table(coctx->co, ctx->current_subrequest); @@ -1987,9 +1998,15 @@ ngx_http_lua_subrequest_resume(ngx_http_request_t *r) } lua_pushnil(coctx->co); } else { + +#endif + ngx_http_lua_handle_subreq_responses(r, ctx); returned_values = coctx->nsubreqs; + +#ifdef NGX_LUA_CAPTURE_DOWN_STREAMING } +#endif dd("free sr_statues/headers/bodies memory ASAP"); From e9e1dec69b88803a0080cbdbcb28151e31674fd9 Mon Sep 17 00:00:00 2001 From: aviram Date: Thu, 3 Oct 2013 17:40:18 +0200 Subject: [PATCH 07/12] Reverted unnecessary changes. --- src/ngx_http_lua_headers.c | 23 ++++++++--------------- 1 file changed, 8 insertions(+), 15 deletions(-) diff --git a/src/ngx_http_lua_headers.c b/src/ngx_http_lua_headers.c index af56a2d8b0..3fd4672216 100644 --- a/src/ngx_http_lua_headers.c +++ b/src/ngx_http_lua_headers.c @@ -574,19 +574,13 @@ ngx_http_lua_ngx_req_header_clear(lua_State *L) { ngx_uint_t n; n = lua_gettop(L); - if ((n != 1) && (n != 2)) { - return luaL_error(L, "expecting one or two arguments, but seen %d", + if (lua_gettop(L) != 1) { + return luaL_error(L, "expecting one arguments, but seen %d", lua_gettop(L)); } - if (n == 2) { - lua_pushnil(L); - /* Top element is now 3, replace it with element 3 */ - lua_insert(L, 2); - } else { - lua_pushnil(L); - } - + lua_pushnil(L); + return ngx_http_lua_ngx_req_header_set_helper(L); } @@ -594,7 +588,7 @@ ngx_http_lua_ngx_req_header_clear(lua_State *L) static int ngx_http_lua_ngx_req_header_set(lua_State *L) { - if ((lua_gettop(L) != 2) && (lua_gettop(L) != 3)) { + if (lua_gettop(L) != 2) { return luaL_error(L, "expecting two arguments, but seen %d", lua_gettop(L)); } @@ -632,10 +626,9 @@ ngx_http_lua_ngx_req_header_set_helper(lua_State *L) #if 0 /* replace "_" with "-" */ - for (i = 0; i < len; i++) { - if (p[i] == '_') { - p[i] = '-'; - } + for (i = 0; i < len; i++) { + if (p[i] == '_') { + p[i] = '-'; } } #endif From 24cb6857e2a1452733be9185a59f600e962dc029 Mon Sep 17 00:00:00 2001 From: aviram Date: Thu, 3 Oct 2013 17:41:25 +0200 Subject: [PATCH 08/12] Reverted unnecessary changes. --- src/ngx_http_lua_headers.c | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/src/ngx_http_lua_headers.c b/src/ngx_http_lua_headers.c index 3fd4672216..c9d89aa941 100644 --- a/src/ngx_http_lua_headers.c +++ b/src/ngx_http_lua_headers.c @@ -572,15 +572,13 @@ ngx_http_lua_ngx_header_set(lua_State *L) static int ngx_http_lua_ngx_req_header_clear(lua_State *L) { - ngx_uint_t n; - n = lua_gettop(L); if (lua_gettop(L) != 1) { return luaL_error(L, "expecting one arguments, but seen %d", lua_gettop(L)); } lua_pushnil(L); - + return ngx_http_lua_ngx_req_header_set_helper(L); } From 2064ed99ad490965e479c466ba46b40470c8fd97 Mon Sep 17 00:00:00 2001 From: aviram Date: Thu, 3 Oct 2013 17:50:50 +0200 Subject: [PATCH 09/12] Reverted unnecessary changes. --- src/ngx_http_lua_headers.c | 23 +++++++---------------- 1 file changed, 7 insertions(+), 16 deletions(-) diff --git a/src/ngx_http_lua_headers.c b/src/ngx_http_lua_headers.c index 3a0226517d..3afbd7ba76 100644 --- a/src/ngx_http_lua_headers.c +++ b/src/ngx_http_lua_headers.c @@ -562,20 +562,12 @@ ngx_http_lua_ngx_header_set(lua_State *L) static int ngx_http_lua_ngx_req_header_clear(lua_State *L) { - ngx_uint_t n; - n = lua_gettop(L); - if ((n != 1) && (n != 2)) { - return luaL_error(L, "expecting one or two arguments, but seen %d", + if (lua_gettop(L) != 1) { + return luaL_error(L, "expecting one arguments, but seen %d", lua_gettop(L)); } - if (n == 2) { - lua_pushnil(L); - /* Top element is now 3, replace it with element 3 */ - lua_insert(L, 2); - } else { - lua_pushnil(L); - } + lua_pushnil(L); return ngx_http_lua_ngx_req_header_set_helper(L); } @@ -584,7 +576,7 @@ ngx_http_lua_ngx_req_header_clear(lua_State *L) static int ngx_http_lua_ngx_req_header_set(lua_State *L) { - if ((lua_gettop(L) != 2) && (lua_gettop(L) != 3)) { + if (lua_gettop(L) != 2) { return luaL_error(L, "expecting two arguments, but seen %d", lua_gettop(L)); } @@ -622,10 +614,9 @@ ngx_http_lua_ngx_req_header_set_helper(lua_State *L) #if 0 /* replace "_" with "-" */ - for (i = 0; i < len; i++) { - if (p[i] == '_') { - p[i] = '-'; - } + for (i = 0; i < len; i++) { + if (p[i] == '_') { + p[i] = '-'; } } #endif From 21129820a40a0036db04bf49501ccfb6224b6650 Mon Sep 17 00:00:00 2001 From: aviram Date: Sun, 13 Oct 2013 11:01:57 +0200 Subject: [PATCH 10/12] Made a correction in the safe guard of the capture streaming. Previously, the resume handler was set to ngx_http_lua_subrequest_resume even when the subrequest is streaming, meaning the Lua code would return from a wrong context. Fixed and tested. --- src/ngx_http_lua_subrequest.c | 13 +++++++------ 1 file changed, 7 insertions(+), 6 deletions(-) diff --git a/src/ngx_http_lua_subrequest.c b/src/ngx_http_lua_subrequest.c index 805f2c281b..11e6ee9174 100644 --- a/src/ngx_http_lua_subrequest.c +++ b/src/ngx_http_lua_subrequest.c @@ -1364,19 +1364,20 @@ ngx_http_lua_post_subrequest(ngx_http_request_t *r, void *data, ngx_int_t rc) dd("all subrequests are done"); pr_ctx->no_abort = 0; - pr_ctx->resume_handler = ngx_http_lua_subrequest_resume; - pr_ctx->cur_co_ctx = pr_coctx; - + #ifdef NGX_LUA_CAPTURE_DOWN_STREAMING - - if (pr_ctx->async_capture) { + if (!pr_ctx->async_capture) { + pr_ctx->resume_handler = ngx_http_lua_subrequest_resume; + } else { /* XXX: Make sure that the parent request has the correct context. */ pr_ctx->current_subrequest = r; pr_ctx->current_subrequest_ctx = ctx; } - +#else + pr_ctx->resume_handler = ngx_http_lua_subrequest_resume; #endif + pr_ctx->cur_co_ctx = pr_coctx; } if (pr_ctx->entered_content_phase) { From c23fb519566e6c56f143b56c08df517f553fe2aa Mon Sep 17 00:00:00 2001 From: aviram Date: Tue, 10 Dec 2013 16:59:45 +0200 Subject: [PATCH 11/12] Added another criteria for a subrequest ending (this may happen only in an empty gzip encoded response). --- src/ngx_http_lua_subrequest.c | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/src/ngx_http_lua_subrequest.c b/src/ngx_http_lua_subrequest.c index 11e6ee9174..e1305f8104 100644 --- a/src/ngx_http_lua_subrequest.c +++ b/src/ngx_http_lua_subrequest.c @@ -967,7 +967,9 @@ ngx_http_lua_ngx_capture_buffer_handler(ngx_http_request_t *r) If we need waiting, we shall keep it. Anyway, if the subrequest isn't over, we should try to wake it up. */ - if (!ctx->current_subrequest_ctx || !ctx->current_subrequest_ctx->seen_last_for_subreq) { + if ((!ctx->current_subrequest_ctx || !ctx->current_subrequest_ctx->seen_last_for_subreq) + && (!ctx->current_subrequest || !ctx->current_subrequest->done)) + { ctx->wakeup_subrequest = 1; } else { ctx->wakeup_subrequest = 0; From 2165d5070e237faed96139d4d23ab6ce8ca409bf Mon Sep 17 00:00:00 2001 From: aviramc Date: Wed, 18 Dec 2013 12:25:23 +0200 Subject: [PATCH 12/12] Added lmcf initialization. --- src/ngx_http_lua_subrequest.c | 2 ++ 1 file changed, 2 insertions(+) diff --git a/src/ngx_http_lua_subrequest.c b/src/ngx_http_lua_subrequest.c index ba37924c32..3b2b973fe2 100644 --- a/src/ngx_http_lua_subrequest.c +++ b/src/ngx_http_lua_subrequest.c @@ -1993,6 +1993,8 @@ ngx_http_lua_subrequest_resume(ngx_http_request_t *r) return NGX_ERROR; } + lmcf = ngx_http_get_module_main_conf(r, ngx_http_lua_module); + ctx->resume_handler = ngx_http_lua_wev_handler; ngx_log_debug0(NGX_LOG_DEBUG_HTTP, r->connection->log, 0,