Skip to content

Commit 4b9e53f

Browse files
spacewanderthibaultcha
authored andcommitted
feature: ngx.pipe: allowed for calling resty.core 'ngx_pipe.shutdown()' on a sub-process when a light thread is waiting on it.
Signed-off-by: Thibault Charbonnier <thibaultcha@me.com>
1 parent 8573c1c commit 4b9e53f

File tree

1 file changed

+56
-64
lines changed

1 file changed

+56
-64
lines changed

src/ngx_http_lua_pipe.c

Lines changed: 56 additions & 64 deletions
Original file line numberDiff line numberDiff line change
@@ -30,16 +30,12 @@ static ssize_t ngx_http_lua_pipe_fd_read(ngx_connection_t *c, u_char *buf,
3030
size_t size);
3131
static ssize_t ngx_http_lua_pipe_fd_write(ngx_connection_t *c, u_char *buf,
3232
size_t size);
33-
static ngx_int_t ngx_http_lua_pipe_close_helper(
34-
ngx_http_lua_pipe_ctx_t *pipe_ctx, ngx_event_t *ev, int forced);
35-
static ngx_int_t ngx_http_lua_pipe_close_stdin(ngx_http_lua_pipe_t *pipe,
36-
int forced);
37-
static ngx_int_t ngx_http_lua_pipe_close_stdout(ngx_http_lua_pipe_t *pipe,
38-
int forced);
39-
static ngx_int_t ngx_http_lua_pipe_close_stderr(ngx_http_lua_pipe_t *pipe,
40-
int forced);
41-
static void ngx_http_lua_pipe_proc_finalize(ngx_http_lua_ffi_pipe_proc_t *proc,
42-
int forced);
33+
static void ngx_http_lua_pipe_close_helper(
34+
ngx_http_lua_pipe_ctx_t *pipe_ctx, ngx_event_t *ev);
35+
static void ngx_http_lua_pipe_close_stdin(ngx_http_lua_pipe_t *pipe);
36+
static void ngx_http_lua_pipe_close_stdout(ngx_http_lua_pipe_t *pipe);
37+
static void ngx_http_lua_pipe_close_stderr(ngx_http_lua_pipe_t *pipe);
38+
static void ngx_http_lua_pipe_proc_finalize(ngx_http_lua_ffi_pipe_proc_t *proc);
4339
static ngx_int_t ngx_http_lua_pipe_get_lua_ctx(ngx_http_request_t *r,
4440
ngx_http_lua_ctx_t **ctx, u_char *errbuf, size_t *errbuf_size);
4541
static void ngx_http_lua_pipe_put_error(ngx_http_lua_pipe_ctx_t *pipe_ctx,
@@ -121,7 +117,8 @@ enum {
121117
PIPE_ERR_NOMEM,
122118
PIPE_ERR_TIMEOUT,
123119
PIPE_ERR_ADD_READ_EV,
124-
PIPE_ERR_ADD_WRITE_EV
120+
PIPE_ERR_ADD_WRITE_EV,
121+
PIPE_ERR_ABORTED
125122
};
126123

127124

@@ -914,26 +911,27 @@ ngx_http_lua_ffi_pipe_spawn(ngx_http_lua_ffi_pipe_proc_t *proc,
914911
}
915912

916913

917-
static ngx_int_t
914+
static void
918915
ngx_http_lua_pipe_close_helper(ngx_http_lua_pipe_ctx_t *pipe_ctx,
919-
ngx_event_t *ev, int forced)
916+
ngx_event_t *ev)
920917
{
921-
if (ev->handler != ngx_http_lua_pipe_dummy_event_handler && !forced) {
922-
ngx_log_debug3(NGX_LOG_DEBUG_HTTP, ngx_cycle->log, 0,
923-
"lua pipe cannot close fd:%d without "
924-
"forced pipe:%p ev:%p", pipe_ctx->c->fd, pipe_ctx, ev);
925-
return NGX_ERROR;
918+
if (ev->handler != ngx_http_lua_pipe_dummy_event_handler) {
919+
ngx_log_debug2(NGX_LOG_DEBUG_HTTP, ngx_cycle->log, 0,
920+
"lua pipe abort blocking operation pipe_ctx:%p ev:%p",
921+
pipe_ctx, ev);
922+
923+
pipe_ctx->err_type = PIPE_ERR_ABORTED;
924+
ngx_post_event(ev, &ngx_posted_events);
925+
return;
926926
}
927927

928928
ngx_close_connection(pipe_ctx->c);
929929
pipe_ctx->c = NULL;
930-
931-
return NGX_OK;
932930
}
933931

934932

935-
static ngx_int_t
936-
ngx_http_lua_pipe_close_stdin(ngx_http_lua_pipe_t *pipe, int forced)
933+
static void
934+
ngx_http_lua_pipe_close_stdin(ngx_http_lua_pipe_t *pipe)
937935
{
938936
ngx_event_t *wev;
939937

@@ -949,15 +947,13 @@ ngx_http_lua_pipe_close_stdin(ngx_http_lua_pipe_t *pipe, int forced)
949947

950948
} else if (pipe->stdin_ctx->c != NULL) {
951949
wev = pipe->stdin_ctx->c->write;
952-
return ngx_http_lua_pipe_close_helper(pipe->stdin_ctx, wev, forced);
950+
ngx_http_lua_pipe_close_helper(pipe->stdin_ctx, wev);
953951
}
954-
955-
return NGX_OK;
956952
}
957953

958954

959-
static ngx_int_t
960-
ngx_http_lua_pipe_close_stdout(ngx_http_lua_pipe_t *pipe, int forced)
955+
static void
956+
ngx_http_lua_pipe_close_stdout(ngx_http_lua_pipe_t *pipe)
961957
{
962958
ngx_event_t *rev;
963959

@@ -973,15 +969,13 @@ ngx_http_lua_pipe_close_stdout(ngx_http_lua_pipe_t *pipe, int forced)
973969

974970
} else if (pipe->stdout_ctx->c != NULL) {
975971
rev = pipe->stdout_ctx->c->read;
976-
return ngx_http_lua_pipe_close_helper(pipe->stdout_ctx, rev, forced);
972+
ngx_http_lua_pipe_close_helper(pipe->stdout_ctx, rev);
977973
}
978-
979-
return NGX_OK;
980974
}
981975

982976

983-
static ngx_int_t
984-
ngx_http_lua_pipe_close_stderr(ngx_http_lua_pipe_t *pipe, int forced)
977+
static void
978+
ngx_http_lua_pipe_close_stderr(ngx_http_lua_pipe_t *pipe)
985979
{
986980
ngx_event_t *rev;
987981

@@ -997,18 +991,15 @@ ngx_http_lua_pipe_close_stderr(ngx_http_lua_pipe_t *pipe, int forced)
997991

998992
} else if (pipe->stderr_ctx->c != NULL) {
999993
rev = pipe->stderr_ctx->c->read;
1000-
return ngx_http_lua_pipe_close_helper(pipe->stderr_ctx, rev, forced);
994+
ngx_http_lua_pipe_close_helper(pipe->stderr_ctx, rev);
1001995
}
1002-
1003-
return NGX_OK;
1004996
}
1005997

1006998

1007999
int
10081000
ngx_http_lua_ffi_pipe_proc_shutdown_stdin(ngx_http_lua_ffi_pipe_proc_t *proc,
10091001
u_char *errbuf, size_t *errbuf_size)
10101002
{
1011-
ngx_int_t rc;
10121003
ngx_http_lua_pipe_t *pipe;
10131004

10141005
pipe = proc->pipe;
@@ -1017,12 +1008,7 @@ ngx_http_lua_ffi_pipe_proc_shutdown_stdin(ngx_http_lua_ffi_pipe_proc_t *proc,
10171008
return NGX_ERROR;
10181009
}
10191010

1020-
rc = ngx_http_lua_pipe_close_stdin(pipe, 0);
1021-
if (rc != NGX_OK) {
1022-
*errbuf_size = ngx_snprintf(errbuf, *errbuf_size, "pipe busy writing")
1023-
- errbuf;
1024-
return NGX_ERROR;
1025-
}
1011+
ngx_http_lua_pipe_close_stdin(pipe);
10261012

10271013
return NGX_OK;
10281014
}
@@ -1032,7 +1018,6 @@ int
10321018
ngx_http_lua_ffi_pipe_proc_shutdown_stdout(ngx_http_lua_ffi_pipe_proc_t *proc,
10331019
u_char *errbuf, size_t *errbuf_size)
10341020
{
1035-
ngx_int_t rc;
10361021
ngx_http_lua_pipe_t *pipe;
10371022

10381023
pipe = proc->pipe;
@@ -1041,12 +1026,7 @@ ngx_http_lua_ffi_pipe_proc_shutdown_stdout(ngx_http_lua_ffi_pipe_proc_t *proc,
10411026
return NGX_ERROR;
10421027
}
10431028

1044-
rc = ngx_http_lua_pipe_close_stdout(pipe, 0);
1045-
if (rc != NGX_OK) {
1046-
*errbuf_size = ngx_snprintf(errbuf, *errbuf_size, "pipe busy reading")
1047-
- errbuf;
1048-
return NGX_ERROR;
1049-
}
1029+
ngx_http_lua_pipe_close_stdout(pipe);
10501030

10511031
return NGX_OK;
10521032
}
@@ -1071,24 +1051,20 @@ ngx_http_lua_ffi_pipe_proc_shutdown_stderr(ngx_http_lua_ffi_pipe_proc_t *proc,
10711051
return NGX_ERROR;
10721052
}
10731053

1074-
if (ngx_http_lua_pipe_close_stderr(pipe, 0) != NGX_OK) {
1075-
*errbuf_size = ngx_snprintf(errbuf, *errbuf_size, "pipe busy reading")
1076-
- errbuf;
1077-
return NGX_ERROR;
1078-
}
1054+
ngx_http_lua_pipe_close_stderr(pipe);
10791055

10801056
return NGX_OK;
10811057
}
10821058

10831059

10841060
static void
1085-
ngx_http_lua_pipe_proc_finalize(ngx_http_lua_ffi_pipe_proc_t *proc, int forced)
1061+
ngx_http_lua_pipe_proc_finalize(ngx_http_lua_ffi_pipe_proc_t *proc)
10861062
{
10871063
ngx_http_lua_pipe_t *pipe;
10881064

1089-
ngx_log_debug3(NGX_LOG_DEBUG_HTTP, ngx_cycle->log, 0,
1090-
"lua pipe finalize process:%p pid:%P forced:%d", proc,
1091-
proc->_pid, forced);
1065+
ngx_log_debug2(NGX_LOG_DEBUG_HTTP, ngx_cycle->log, 0,
1066+
"lua pipe finalize process:%p pid:%P",
1067+
proc, proc->_pid);
10921068
pipe = proc->pipe;
10931069

10941070
if (pipe->node) {
@@ -1098,11 +1074,11 @@ ngx_http_lua_pipe_proc_finalize(ngx_http_lua_ffi_pipe_proc_t *proc, int forced)
10981074

10991075
pipe->dead = 1;
11001076

1101-
ngx_http_lua_pipe_close_stdin(pipe, forced);
1102-
ngx_http_lua_pipe_close_stdout(pipe, forced);
1077+
ngx_http_lua_pipe_close_stdin(pipe);
1078+
ngx_http_lua_pipe_close_stdout(pipe);
11031079

11041080
if (!pipe->merge_stderr) {
1105-
ngx_http_lua_pipe_close_stderr(pipe, forced);
1081+
ngx_http_lua_pipe_close_stderr(pipe);
11061082
}
11071083

11081084
pipe->closed = 1;
@@ -1135,7 +1111,7 @@ ngx_http_lua_ffi_pipe_proc_destroy(ngx_http_lua_ffi_pipe_proc_t *proc)
11351111
}
11361112
}
11371113

1138-
ngx_http_lua_pipe_proc_finalize(proc, 1);
1114+
ngx_http_lua_pipe_proc_finalize(proc);
11391115
ngx_destroy_pool(pipe->pool);
11401116
proc->pipe = NULL;
11411117
}
@@ -1205,6 +1181,10 @@ ngx_http_lua_pipe_put_error(ngx_http_lua_pipe_ctx_t *pipe_ctx, u_char *errbuf,
12051181
- errbuf;
12061182
break;
12071183

1184+
case PIPE_ERR_ABORTED:
1185+
*errbuf_size = ngx_snprintf(errbuf, *errbuf_size, "aborted") - errbuf;
1186+
break;
1187+
12081188
default:
12091189
ngx_log_error(NGX_LOG_ERR, ngx_cycle->log, 0,
12101190
"unexpected err type: %d", pipe_ctx->err_type);
@@ -2005,7 +1985,7 @@ ngx_http_lua_ffi_pipe_proc_wait(ngx_http_request_t *r,
20051985
*reason = REASON_UNKNOWN;
20061986
}
20071987

2008-
ngx_http_lua_pipe_proc_finalize(proc, 0);
1988+
ngx_http_lua_pipe_proc_finalize(proc);
20091989

20101990
if (*status == 0) {
20111991
return NGX_OK;
@@ -2096,6 +2076,12 @@ ngx_http_lua_pipe_read_retval_helper(ngx_http_lua_ffi_pipe_proc_t *proc,
20962076
return 0;
20972077
}
20982078

2079+
if (pipe_ctx->err_type == PIPE_ERR_ABORTED) {
2080+
ngx_close_connection(pipe_ctx->c);
2081+
pipe_ctx->c = NULL;
2082+
return 0;
2083+
}
2084+
20992085
rc = ngx_http_lua_pipe_read(pipe, pipe_ctx);
21002086
if (rc != NGX_AGAIN) {
21012087
return 0;
@@ -2147,6 +2133,12 @@ ngx_http_lua_pipe_write_retval(ngx_http_lua_ffi_pipe_proc_t *proc,
21472133
return 0;
21482134
}
21492135

2136+
if (pipe_ctx->err_type == PIPE_ERR_ABORTED) {
2137+
ngx_close_connection(pipe_ctx->c);
2138+
pipe_ctx->c = NULL;
2139+
return 0;
2140+
}
2141+
21502142
rc = ngx_http_lua_pipe_write(pipe, pipe_ctx);
21512143
if (rc != NGX_AGAIN) {
21522144
return 0;
@@ -2192,7 +2184,7 @@ ngx_http_lua_pipe_wait_retval(ngx_http_lua_ffi_pipe_proc_t *proc, lua_State *L)
21922184
return 2;
21932185
}
21942186

2195-
ngx_http_lua_pipe_proc_finalize(pipe_node->proc, 0);
2187+
ngx_http_lua_pipe_proc_finalize(pipe_node->proc);
21962188

21972189
if (pipe_node->status == 0) {
21982190
lua_pushboolean(L, 1);

0 commit comments

Comments
 (0)