diff --git a/src/ngx_http_lua_capturefilter.c b/src/ngx_http_lua_capturefilter.c index 8f1e20f02f..db4c78585f 100644 --- a/src/ngx_http_lua_capturefilter.c +++ b/src/ngx_http_lua_capturefilter.c @@ -16,7 +16,7 @@ #include "ngx_http_lua_util.h" #include "ngx_http_lua_exception.h" #include "ngx_http_lua_subrequest.h" - +#include "ngx_http_lua_contentby.h" ngx_http_output_header_filter_pt ngx_http_lua_next_header_filter; ngx_http_output_body_filter_pt ngx_http_lua_next_body_filter; @@ -105,6 +105,15 @@ ngx_http_lua_capture_header_filter(ngx_http_request_t *r) } +#ifdef NGX_LUA_CAPTURE_DOWN_STREAMING +static ngx_int_t +_should_store_chain_link(ngx_chain_t *in) +{ + return ((in != NULL) && ((in->buf->pos != in->buf->last))); +} +#endif + + static ngx_int_t ngx_http_lua_capture_body_filter(ngx_http_request_t *r, ngx_chain_t *in) { @@ -163,6 +172,40 @@ ngx_http_lua_capture_body_filter(ngx_http_request_t *r, ngx_chain_t *in) ctx->seen_last_for_subreq = 1; } +#ifdef NGX_LUA_CAPTURE_DOWN_STREAMING + if (pr_ctx->async_capture) { + /* In order to wake the parent up, we should call post and not discard + the buffer */ + pr_ctx->current_subrequest = r; /* Required for wake up (?) */ + pr_ctx->current_subrequest_ctx = ctx; /* Required for the buffer */ + + /* XXX: In some cases, pr_ctx->current_subrequest_buffer is being + cleaned by Nginx and buf gets the value 0x1... */ + if (((pr_ctx->current_subrequest_buffer == NULL) + || (pr_ctx->current_subrequest_buffer->buf == (void *) 1)) + && (_should_store_chain_link(in))) { + pr_ctx->current_subrequest_buffer = in; + } + + /* TODO: Is this line needed? */ + r->parent->write_event_handler = ngx_http_lua_content_wev_handler; + + if (!eof) { + pr_ctx->wakeup_subrequest = 1; + /* On EOF, the post subrequest callback is called, and it handles + the setting of the resume handler. The parent request would be + woken up anyway by Nginx. + */ + if (ngx_http_post_request(r->parent, NULL) != NGX_OK) { + return NGX_ERROR; + } + return NGX_OK; + } else { + pr_ctx->wakeup_subrequest = 0; + } + } +#endif + ngx_http_lua_discard_bufs(r->pool, in); return NGX_OK; diff --git a/src/ngx_http_lua_common.h b/src/ngx_http_lua_common.h index dbe9adfe46..627d885fd5 100644 --- a/src/ngx_http_lua_common.h +++ b/src/ngx_http_lua_common.h @@ -349,6 +349,19 @@ typedef struct ngx_http_lua_ctx_s { ngx_int_t exit_code; +#ifdef NGX_LUA_CAPTURE_DOWN_STREAMING + ngx_int_t async_capture; + ngx_http_request_t *current_subrequest; + struct ngx_http_lua_ctx_s *current_subrequest_ctx; + ngx_chain_t *current_subrequest_buffer; + ngx_int_t returned_headers; +#endif + + ngx_http_lua_co_ctx_t *calling_coctx; /* co ctx for the caller to location.capture */ + + ngx_http_lua_co_ctx_t *req_body_reader_co_ctx; /* co ctx for the coroutine + reading the request + body */ ngx_http_lua_co_ctx_t *downstream_co_ctx; /* co ctx for the coroutine reading the request body */ @@ -390,6 +403,11 @@ typedef struct ngx_http_lua_ctx_s { unsigned headers_set:1; /* whether the user has set custom response headers */ +#ifdef NGX_LUA_CAPTURE_DOWN_STREAMING + unsigned wakeup_subrequest:1; + unsigned subrequest_yield:1; +#endif + unsigned entered_rewrite_phase:1; unsigned entered_access_phase:1; unsigned entered_content_phase:1; diff --git a/src/ngx_http_lua_contentby.c b/src/ngx_http_lua_contentby.c index 4a3ab98aa6..80c4b77035 100644 --- a/src/ngx_http_lua_contentby.c +++ b/src/ngx_http_lua_contentby.c @@ -20,6 +20,12 @@ static void ngx_http_lua_content_phase_post_read(ngx_http_request_t *r); +#ifdef NGX_LUA_CAPTURE_DOWN_STREAMING +static ngx_int_t _is_chain_valid(ngx_chain_t * cl); +static ngx_int_t _is_last_chain_link(ngx_chain_t * cl); +static ngx_int_t _post_request_if_not_posted(ngx_http_request_t *r, + ngx_http_posted_request_t *pr); +#endif ngx_int_t ngx_http_lua_content_by_chunk(lua_State *L, ngx_http_request_t *r) @@ -115,17 +121,67 @@ ngx_http_lua_content_by_chunk(lua_State *L, ngx_http_request_t *r) } +#ifdef NGX_LUA_CAPTURE_DOWN_STREAMING +static ngx_int_t +_post_request_if_not_posted(ngx_http_request_t *r, ngx_http_posted_request_t *pr) +{ + ngx_http_posted_request_t *p; + + /* Search request in the posted requests list, so that it would not be posted twice. */ + for (p = r->main->posted_requests; p; p = p->next) { + if (p->request == r) { + return NGX_OK; + } + } + + return ngx_http_post_request(r, pr); +} + +static ngx_int_t +_is_chain_valid(ngx_chain_t * cl) +{ + /* For some reason, sometimes when cl->buf is cleaned, 1 is assigned to it. */ + return ((cl != NULL) && (cl->buf != NULL) && (cl->buf != (void *) 1)); +} +static ngx_int_t +_is_last_chain_link(ngx_chain_t * cl) +{ + /* last_in_chain is for subrequests. */ + return cl->buf->last_in_chain || cl->buf->last_buf; +} +#endif + void ngx_http_lua_content_wev_handler(ngx_http_request_t *r) { ngx_http_lua_ctx_t *ctx; + ngx_int_t rc; ctx = ngx_http_get_module_ctx(r, ngx_http_lua_module); if (ctx == NULL) { return; } - (void) ctx->resume_handler(r); + rc = ctx->resume_handler(r); + + if (rc == NGX_DONE) { + return; + } + +#ifdef NGX_LUA_CAPTURE_DOWN_STREAMING + if (ctx->current_subrequest && ctx->wakeup_subrequest) { + /* Make sure that the subrequest continues */ + if (_post_request_if_not_posted(ctx->current_subrequest, NULL) != NGX_OK) { + ngx_http_lua_finalize_request(r, NGX_ERROR); + } + /* Don't try to discard the last buffer, as it will cause a NULL dereference... */ + if (_is_chain_valid(ctx->current_subrequest_buffer) && (!_is_last_chain_link(ctx->current_subrequest_buffer))) { + ngx_http_lua_discard_bufs(ctx->current_subrequest->pool, ctx->current_subrequest_buffer); + } + ctx->current_subrequest_buffer = NULL; + ctx->wakeup_subrequest = 0; + } +#endif } diff --git a/src/ngx_http_lua_subrequest.c b/src/ngx_http_lua_subrequest.c index 54f550565e..2a74a17143 100644 --- a/src/ngx_http_lua_subrequest.c +++ b/src/ngx_http_lua_subrequest.c @@ -71,7 +71,7 @@ static ngx_int_t ngx_http_lua_subrequest_add_extra_vars(ngx_http_request_t *r, static ngx_int_t ngx_http_lua_subrequest(ngx_http_request_t *r, ngx_str_t *uri, ngx_str_t *args, ngx_http_request_t **psr, ngx_http_post_subrequest_t *ps, ngx_uint_t flags); -static ngx_int_t ngx_http_lua_subrequest_resume(ngx_http_request_t *r); +ngx_int_t ngx_http_lua_subrequest_resume(ngx_http_request_t *r); static void ngx_http_lua_handle_subreq_responses(ngx_http_request_t *r, ngx_http_lua_ctx_t *ctx); static void ngx_http_lua_cancel_subreq(ngx_http_request_t *r); @@ -80,6 +80,11 @@ static ngx_int_t ngx_http_lua_copy_in_file_request_body(ngx_http_request_t *r); static ngx_int_t ngx_http_lua_copy_request_headers(ngx_http_request_t *sr, ngx_http_request_t *r); +#ifdef NGX_LUA_CAPTURE_DOWN_STREAMING +static int ngx_http_lua_ngx_location_capture_stream(lua_State *L); +static int ngx_http_lua_ngx_location_get_subrequest_buffer(lua_State *L); +static ngx_int_t _prepare_subrequest_body_chunk(ngx_http_request_t *r, ngx_http_lua_ctx_t *ctx, u_char ** dest_buffer, unsigned long * length); +#endif /* ngx.location.capture is just a thin wrapper around * ngx.location.capture_multi */ @@ -111,6 +116,51 @@ ngx_http_lua_ngx_location_capture(lua_State *L) return ngx_http_lua_ngx_location_capture_multi(L); } +#ifdef NGX_LUA_CAPTURE_DOWN_STREAMING +static int +ngx_http_lua_ngx_location_capture_stream(lua_State *L) +{ + int n; + ngx_http_request_t *r; + ngx_http_lua_ctx_t *ctx; + + r = ngx_http_lua_get_req(L); + + if (r == NULL) { + return luaL_error(L, "no request object found"); + } + + ctx = ngx_http_get_module_ctx(r, ngx_http_lua_module); + if (ctx == NULL) { + return luaL_error(L, "no ctx found"); + } + + n = lua_gettop(L); + + if (n != 1 && n != 2) { + return luaL_error(L, "expecting one or two arguments"); + } + + ctx->async_capture = 1; + ctx->calling_coctx = ctx->cur_co_ctx; + + lua_createtable(L, n, 0); /* uri opts? table */ + lua_insert(L, 1); /* table uri opts? */ + if (n == 1) { /* table uri */ + lua_rawseti(L, 1, 1); /* table */ + + } else { /* table uri opts */ + lua_rawseti(L, 1, 2); /* table uri */ + lua_rawseti(L, 1, 1); /* table */ + } + + lua_createtable(L, 1, 0); /* table table' */ + lua_insert(L, 1); /* table' table */ + lua_rawseti(L, 1, 1); /* table' */ + + return ngx_http_lua_ngx_location_capture_multi(L); +} +#endif static int ngx_http_lua_ngx_location_capture_multi(lua_State *L) @@ -611,10 +661,351 @@ ngx_http_lua_ngx_location_capture_multi(lua_State *L) ctx->no_abort = 1; +#ifdef NGX_LUA_CAPTURE_DOWN_STREAMING + /* Different resume handler for async requests. */ + if (ctx->async_capture) { + ctx->resume_handler = ngx_http_lua_ngx_capture_buffer_handler; + } +#endif + + return lua_yield(L, 0); +} + + +#ifdef NGX_LUA_CAPTURE_DOWN_STREAMING +static void +_create_headers_table(lua_State *L, ngx_http_request_t *request) +{ + ngx_http_headers_out_t *sr_headers; + ngx_table_elt_t *header; + ngx_list_part_t *part; + ngx_uint_t i; + u_char buf[sizeof("Mon, 28 Sep 1970 06:00:00 GMT") - 1]; + + lua_newtable(L); + + sr_headers = &(request->headers_out); + + dd("saving subrequest response headers"); + + part = &sr_headers->headers.part; + header = part->elts; + + for (i = 0; /* void */; i++) { + + if (i >= part->nelts) { + if (part->next == NULL) { + break; + } + + part = part->next; + header = part->elts; + i = 0; + } + + dd("checking sr header %.*s", (int) header[i].key.len, + header[i].key.data); + +#if 1 + if (header[i].hash == 0) { + continue; + } +#endif + + header[i].hash = 0; + + dd("pushing sr header %.*s", (int) header[i].key.len, + header[i].key.data); + + lua_pushlstring(L, (char *) header[i].key.data, + header[i].key.len); /* header key */ + lua_pushvalue(L, -1); /* stack: table key key */ + + /* check if header already exists */ + lua_rawget(L, -3); /* stack: table key value */ + + if (lua_isnil(L, -1)) { + lua_pop(L, 1); /* stack: table key */ + + lua_pushlstring(L, (char *) header[i].value.data, + header[i].value.len); + /* stack: table key value */ + + lua_rawset(L, -3); /* stack: table */ + + } else { + + if (!lua_istable(L, -1)) { /* already inserted one value */ + lua_createtable(L, 4, 0); + /* stack: table key value table */ + + lua_insert(L, -2); /* stack: table key table value */ + lua_rawseti(L, -2, 1); /* stack: table key table */ + + lua_pushlstring(L, (char *) header[i].value.data, + header[i].value.len); + /* stack: table key table value */ + + lua_rawseti(L, -2, lua_objlen(L, -2) + 1); + /* stack: table key table */ + + lua_rawset(L, -3); /* stack: table */ + + } else { + lua_pushlstring(L, (char *) header[i].value.data, + header[i].value.len); + /* stack: table key table value */ + + lua_rawseti(L, -2, lua_objlen(L, -2) + 1); + /* stack: table key table */ + + lua_pop(L, 2); /* stack: table */ + } + } + } + + if (sr_headers->content_type.len) { + lua_pushliteral(L, "Content-Type"); /* header key */ + lua_pushlstring(L, (char *) sr_headers->content_type.data, + sr_headers->content_type.len); /* head key value */ + lua_rawset(L, -3); /* head */ + } + + if (sr_headers->content_length == NULL + && sr_headers->content_length_n >= 0) + { + lua_pushliteral(L, "Content-Length"); /* header key */ + + lua_pushnumber(L, sr_headers->content_length_n); + /* head key value */ + + lua_rawset(L, -3); /* head */ + } + + /* to work-around an issue in ngx_http_static_module + * (github issue #41) */ + if (sr_headers->location && sr_headers->location->value.len) { + lua_pushliteral(L, "Location"); /* header key */ + lua_pushlstring(L, (char *) sr_headers->location->value.data, + sr_headers->location->value.len); + /* head key value */ + lua_rawset(L, -3); /* head */ + } + + if (sr_headers->last_modified_time != -1) { + if (sr_headers->status != NGX_HTTP_OK + && sr_headers->status != NGX_HTTP_PARTIAL_CONTENT + && sr_headers->status != NGX_HTTP_NOT_MODIFIED + && sr_headers->status != NGX_HTTP_NO_CONTENT) + { + sr_headers->last_modified_time = -1; + sr_headers->last_modified = NULL; + } + } + + if (sr_headers->last_modified == NULL + && sr_headers->last_modified_time != -1) + { + (void) ngx_http_time(buf, sr_headers->last_modified_time); + + lua_pushliteral(L, "Last-Modified"); /* header key */ + lua_pushlstring(L, (char *) buf, sizeof(buf)); /* head key value */ + lua_rawset(L, -3); /* head */ + } +} + + +static int +ngx_http_lua_ngx_location_get_subrequest_buffer(lua_State *L) +{ + ngx_http_request_t *r = NULL; + ngx_http_lua_ctx_t *ctx = NULL; + unsigned long buffer_length = 0; + u_char *current_buffer = NULL; + + r = ngx_http_lua_get_req(L); + + if (r == NULL) { + return luaL_error(L, "no request object found"); + } + + ctx = ngx_http_get_module_ctx(r, ngx_http_lua_module); + if (ctx == NULL) { + return luaL_error(L, "no ctx found"); + } + + if ((!ctx->async_capture) || (ctx->current_subrequest_ctx == NULL)) { + return luaL_error(L, "no capture streaming currently"); + } + + /* If there are chunks currently, we should return them immediately. Otherwise, wait.. */ + if (ctx->current_subrequest_ctx->body) { + if (_prepare_subrequest_body_chunk(r, ctx, ¤t_buffer, &buffer_length) != NGX_OK) { + return luaL_error(L, "memory allocation failure"); + } + ctx->subrequest_yield = 0; + lua_pushlstring(L, (char *) current_buffer, buffer_length); + /* Free the buffer that was copied to Lua */ + ngx_pfree(r->pool, current_buffer); + return 1; + } + + /* If the body is NULL and the post subrequest handler was called, this means there are no more buffers. */ + if (ctx->current_subrequest_ctx->run_post_subrequest) { + /* TODO: Tight coupling between the ctx and the request object. */ + ctx->current_subrequest_ctx = NULL; + ctx->current_subrequest = NULL; + ctx->resume_handler = ngx_http_lua_wev_handler; + return 0; + } + + ctx->resume_handler = ngx_http_lua_ngx_capture_buffer_handler; + + ctx->subrequest_yield = 1; + return lua_yield(L, 0); } +static unsigned long _get_chain_size(ngx_chain_t *in) +{ + ngx_chain_t *current_chain_link = NULL; + unsigned long chain_link_size = 0; + + for (current_chain_link = in, chain_link_size = 0; current_chain_link != NULL ;current_chain_link = current_chain_link->next) { + chain_link_size += current_chain_link->buf->last - current_chain_link->buf->pos; + } + + return chain_link_size; +} + + +static ngx_int_t _prepare_subrequest_body_chunk(ngx_http_request_t *r, ngx_http_lua_ctx_t *ctx, u_char ** dest_buffer, unsigned long * length) +{ + ngx_chain_t *cl; + unsigned long buffer_length = 0; + u_char *current_buffer; + + /* Prepare the buffer for the callback */ + buffer_length = _get_chain_size(ctx->current_subrequest_ctx->body); + current_buffer = ngx_palloc(r->pool, buffer_length); + if (current_buffer == NULL) { + return NGX_ERROR; + } + + *length = buffer_length; + *dest_buffer = current_buffer; + + /* TODO: Support the limitation of the buffer size */ + for (cl = ctx->current_subrequest_ctx->body; cl; cl = cl->next) { + current_buffer = ngx_copy(current_buffer, cl->buf->pos, cl->buf->last - cl->buf->pos); + /* TODO: Should we acutally call ngx_pfree? According to the post subrequest callback, we shouldn't. */ + cl->buf->last = cl->buf->pos; + ctx->current_subrequest_ctx->body = NULL; + ctx->current_subrequest_ctx->last_body = &(ctx->current_subrequest_ctx->body); + } + + return NGX_OK; +} + +ngx_int_t +ngx_http_lua_ngx_capture_buffer_handler(ngx_http_request_t *r) +{ + ngx_http_lua_ctx_t *ctx; + unsigned long buffer_length = 0; + u_char *current_buffer; + ngx_http_lua_co_ctx_t *current_co_ctx; + lua_State * co; + ngx_int_t rc; + int returned_values; + ngx_http_lua_main_conf_t *lmcf; + + lmcf = ngx_http_get_module_main_conf(r, ngx_http_lua_module); + + ctx = ngx_http_get_module_ctx(r, ngx_http_lua_module); + if (ctx == NULL) { + goto error_handling; + } + + if ((!ctx->current_subrequest_ctx) || (!ctx->current_subrequest)) { + return NGX_AGAIN; + } + + current_co_ctx = ctx->cur_co_ctx; + ctx->cur_co_ctx = ctx->calling_coctx; + co = ctx->cur_co_ctx->co; + + if (_prepare_subrequest_body_chunk(r, ctx, ¤t_buffer, &buffer_length) != NGX_OK) { + goto error_handling; + } + + if (buffer_length == 0) { + ctx->current_subrequest_buffer = NULL; + } + + /* Headers are to be returned only once. */ + if (!ctx->returned_headers) { + _create_headers_table(co, ctx->current_subrequest); + lua_pushinteger(co, ctx->current_subrequest->headers_out.status); + ctx->returned_headers = 1; + returned_values = 3; + } else { + returned_values = 1; + } + + lua_pushlstring(co, (char *) current_buffer, buffer_length); + /* Free the buffer that was copied to Lua */ + ngx_pfree(r->pool, current_buffer); + + ctx->subrequest_yield = 0; + rc = ngx_http_lua_run_thread(co, r, ctx, returned_values); + + ctx->cur_co_ctx = current_co_ctx; + + if (rc == NGX_AGAIN) { + /* NGX_AGAIN can be returned if: + - Any built-in Lua function in the current thread needs waiting. + - The function get_subrequest_buffer needs to wait for the subrequest. + + If a Lua function needs waiting, we'll remove our resume handler. + If we need waiting, we shall keep it. + Anyway, if the subrequest isn't over, we should try to wake it up. + */ + if ((!ctx->current_subrequest_ctx || !ctx->current_subrequest_ctx->seen_last_for_subreq) + && (!ctx->current_subrequest || !ctx->current_subrequest->done)) + { + ctx->wakeup_subrequest = 1; + } else { + ctx->wakeup_subrequest = 0; + } + + if (ctx->subrequest_yield) { + ctx->subrequest_yield = 0; + } else { + ctx->resume_handler = ngx_http_lua_wev_handler; + } + + return NGX_AGAIN; + } + + if (rc == NGX_DONE) { + ngx_http_lua_finalize_request(r, NGX_DONE); + return ngx_http_lua_run_posted_threads(r->connection, lmcf->lua, r, ctx); + } + + if (ctx->entered_content_phase) { + ngx_http_lua_finalize_request(r, rc); + return NGX_DONE; + } + + return rc; + + error_handling: + ngx_http_lua_finalize_request(r, NGX_ERROR); + return NGX_ERROR; +} +#endif + static ngx_int_t ngx_http_lua_adjust_subrequest(ngx_http_request_t *sr, ngx_uint_t method, int always_forward_body, ngx_http_request_body_t *body, @@ -980,7 +1371,19 @@ ngx_http_lua_post_subrequest(ngx_http_request_t *r, void *data, ngx_int_t rc) dd("all subrequests are done"); pr_ctx->no_abort = 0; + +#ifdef NGX_LUA_CAPTURE_DOWN_STREAMING + if (!pr_ctx->async_capture) { + pr_ctx->resume_handler = ngx_http_lua_subrequest_resume; + } else { + /* XXX: Make sure that the parent request has the correct context. */ + pr_ctx->current_subrequest = r; + pr_ctx->current_subrequest_ctx = ctx; + } +#else pr_ctx->resume_handler = ngx_http_lua_subrequest_resume; +#endif + pr_ctx->cur_co_ctx = pr_coctx; } @@ -1037,46 +1440,52 @@ ngx_http_lua_post_subrequest(ngx_http_request_t *r, void *data, ngx_int_t rc) } body_str->len = len; + +#ifdef NGX_LUA_CAPTURE_DOWN_STREAMING + /* If we're in capture streaming mode, we want to keep the buffer, and to free it from the resume handler's context. */ + if (!pr_ctx->async_capture) { +#endif + if (len == 0) { + body_str->data = NULL; - if (len == 0) { - body_str->data = NULL; - - } else { - p = ngx_palloc(r->pool, len); - if (p == NULL) { - return NGX_ERROR; - } - - body_str->data = p; + } else { + p = ngx_palloc(r->pool, len); + if (p == NULL) { + return NGX_ERROR; + } - /* copy from and then free the data buffers */ + body_str->data = p; - for (cl = ctx->body; cl; cl = cl->next) { - p = ngx_copy(p, cl->buf->pos, cl->buf->last - cl->buf->pos); + /* copy from and then free the data buffers */ - cl->buf->last = cl->buf->pos; + for (cl = ctx->body; cl; cl = cl->next) { + p = ngx_copy(p, cl->buf->pos, cl->buf->last - cl->buf->pos); + cl->buf->last = cl->buf->pos; #if 0 - dd("free body chain link buf ASAP"); - ngx_pfree(r->pool, cl->buf->start); + dd("free body chain link buf ASAP"); + ngx_pfree(r->pool, cl->buf->start); #endif + } } - } - if (ctx->body) { + if (ctx->body) { #if defined(nginx_version) && nginx_version >= 1001004 - ngx_chain_update_chains(r->pool, + ngx_chain_update_chains(r->pool, #else - ngx_chain_update_chains( + ngx_chain_update_chains( #endif - &pr_ctx->free_bufs, &pr_ctx->busy_bufs, - &ctx->body, - (ngx_buf_tag_t) &ngx_http_lua_module); + &pr_ctx->free_bufs, &pr_ctx->busy_bufs, + &ctx->body, + (ngx_buf_tag_t) &ngx_http_lua_module); - dd("free bufs: %p", pr_ctx->free_bufs); + dd("free bufs: %p", pr_ctx->free_bufs); + } +#ifdef NGX_LUA_CAPTURE_DOWN_STREAMING } - +#endif + ngx_http_post_request_to_head(pr); if (r != r->connection->data) { @@ -1424,6 +1833,15 @@ ngx_http_lua_inject_subrequest_api(lua_State *L) lua_pushcfunction(L, ngx_http_lua_ngx_location_capture); lua_setfield(L, -2, "capture"); +#ifdef NGX_LUA_CAPTURE_DOWN_STREAMING + lua_pushcfunction(L, ngx_http_lua_ngx_location_capture_stream); + lua_setfield(L, -2, "capture_stream"); + + /* TODO: Call this 'get_subrequest_body_chunk' */ + lua_pushcfunction(L, ngx_http_lua_ngx_location_get_subrequest_buffer); + lua_setfield(L, -2, "get_subrequest_buffer"); +#endif + lua_pushcfunction(L, ngx_http_lua_ngx_location_capture_multi); lua_setfield(L, -2, "capture_multi"); @@ -1552,7 +1970,7 @@ ngx_http_lua_subrequest(ngx_http_request_t *r, } -static ngx_int_t +ngx_int_t ngx_http_lua_subrequest_resume(ngx_http_request_t *r) { lua_State *vm; @@ -1560,12 +1978,16 @@ ngx_http_lua_subrequest_resume(ngx_http_request_t *r) ngx_connection_t *c; ngx_http_lua_ctx_t *ctx; ngx_http_lua_co_ctx_t *coctx; + ngx_http_lua_main_conf_t *lmcf; + int returned_values; ctx = ngx_http_get_module_ctx(r, ngx_http_lua_module); if (ctx == NULL) { return NGX_ERROR; } + lmcf = ngx_http_get_module_main_conf(r, ngx_http_lua_module); + ctx->resume_handler = ngx_http_lua_wev_handler; ngx_log_debug0(NGX_LOG_DEBUG_HTTP, r->connection->log, 0, @@ -1575,7 +1997,28 @@ ngx_http_lua_subrequest_resume(ngx_http_request_t *r) dd("nsubreqs: %d", (int) coctx->nsubreqs); - ngx_http_lua_handle_subreq_responses(r, ctx); +#ifdef NGX_LUA_CAPTURE_DOWN_STREAMING + + if (ctx->async_capture) { + if (!ctx->returned_headers) { + _create_headers_table(coctx->co, ctx->current_subrequest); + lua_pushinteger(coctx->co, ctx->current_subrequest->headers_out.status); + ctx->returned_headers = 1; + returned_values = 3; + } else { + returned_values = 1; + } + lua_pushnil(coctx->co); + } else { + +#endif + + ngx_http_lua_handle_subreq_responses(r, ctx); + returned_values = coctx->nsubreqs; + +#ifdef NGX_LUA_CAPTURE_DOWN_STREAMING + } +#endif dd("free sr_statues/headers/bodies memory ASAP"); @@ -1591,7 +2034,7 @@ ngx_http_lua_subrequest_resume(ngx_http_request_t *r) c = r->connection; vm = ngx_http_lua_get_lua_vm(r, ctx); - rc = ngx_http_lua_run_thread(vm, r, ctx, coctx->nsubreqs); + rc = ngx_http_lua_run_thread(lmcf->lua, r, ctx, returned_values); ngx_log_debug1(NGX_LOG_DEBUG_HTTP, r->connection->log, 0, "lua run thread returned %d", rc); diff --git a/src/ngx_http_lua_subrequest.h b/src/ngx_http_lua_subrequest.h index aad4236d84..ba7c3b3fc6 100644 --- a/src/ngx_http_lua_subrequest.h +++ b/src/ngx_http_lua_subrequest.h @@ -40,6 +40,8 @@ typedef struct ngx_http_lua_post_subrequest_data_s { } ngx_http_lua_post_subrequest_data_t; +ngx_int_t ngx_http_lua_ngx_capture_buffer_handler(ngx_http_request_t *r); +ngx_int_t ngx_http_lua_subrequest_resume(ngx_http_request_t *r); #endif /* _NGX_HTTP_LUA_SUBREQUEST_H_INCLUDED_ */