diff options
author | Max Romanov <max.romanov@nginx.com> | 2020-11-05 16:10:59 +0300 |
---|---|---|
committer | Max Romanov <max.romanov@nginx.com> | 2020-11-05 16:10:59 +0300 |
commit | d321d454f9304b083d62280d0621f282a74047ee (patch) | |
tree | 6b943c825614a541a2ebde01c08c66097b0eac79 | |
parent | e17e73eddab015b8942d2228780bbd9afcc5e392 (diff) | |
download | unit-d321d454f9304b083d62280d0621f282a74047ee.tar.gz unit-d321d454f9304b083d62280d0621f282a74047ee.tar.bz2 |
Perl: request processing in multiple threads.
This closes #486 issue on GitHub.
Diffstat (limited to '')
-rw-r--r-- | src/nxt_application.h | 2 | ||||
-rw-r--r-- | src/nxt_conf_validation.c | 8 | ||||
-rw-r--r-- | src/nxt_main_process.c | 12 | ||||
-rw-r--r-- | src/perl/nxt_perl_psgi.c | 494 | ||||
-rw-r--r-- | src/perl/nxt_perl_psgi_layer.h | 6 |
5 files changed, 374 insertions, 148 deletions
diff --git a/src/nxt_application.h b/src/nxt_application.h index b8e18a23..66c0e1f7 100644 --- a/src/nxt_application.h +++ b/src/nxt_application.h @@ -64,6 +64,8 @@ typedef struct { typedef struct { char *script; + uint32_t threads; + uint32_t thread_stack_size; } nxt_perl_app_conf_t; diff --git a/src/nxt_conf_validation.c b/src/nxt_conf_validation.c index 8dd9082d..8e6279fa 100644 --- a/src/nxt_conf_validation.c +++ b/src/nxt_conf_validation.c @@ -604,6 +604,14 @@ static nxt_conf_vldt_object_t nxt_conf_vldt_perl_members[] = { .name = nxt_string("script"), .type = NXT_CONF_VLDT_STRING, .flags = NXT_CONF_VLDT_REQUIRED, + }, { + .name = nxt_string("threads"), + .type = NXT_CONF_VLDT_INTEGER, + .validator = nxt_conf_vldt_threads, + }, { + .name = nxt_string("thread_stack_size"), + .type = NXT_CONF_VLDT_INTEGER, + .validator = nxt_conf_vldt_thread_stack_size, }, NXT_CONF_VLDT_NEXT(nxt_conf_vldt_common_members) diff --git a/src/nxt_main_process.c b/src/nxt_main_process.c index 631b3146..99fc7995 100644 --- a/src/nxt_main_process.c +++ b/src/nxt_main_process.c @@ -233,6 +233,18 @@ static nxt_conf_map_t nxt_perl_app_conf[] = { NXT_CONF_MAP_CSTRZ, offsetof(nxt_common_app_conf_t, u.perl.script), }, + + { + nxt_string("threads"), + NXT_CONF_MAP_INT32, + offsetof(nxt_common_app_conf_t, u.perl.threads), + }, + + { + nxt_string("thread_stack_size"), + NXT_CONF_MAP_INT32, + offsetof(nxt_common_app_conf_t, u.perl.thread_stack_size), + }, }; diff --git a/src/perl/nxt_perl_psgi.c b/src/perl/nxt_perl_psgi.c index 16079a38..5df1465d 100644 --- a/src/perl/nxt_perl_psgi.c +++ b/src/perl/nxt_perl_psgi.c @@ -18,14 +18,14 @@ typedef struct { PerlInterpreter *my_perl; - nxt_unit_request_info_t *req; -} nxt_perl_psgi_input_t; - - -typedef struct { - PerlInterpreter *my_perl; + nxt_perl_psgi_io_arg_t arg_input; + nxt_perl_psgi_io_arg_t arg_error; SV *app; -} nxt_perl_psgi_module_t; + CV *cb; + nxt_unit_request_info_t *req; + pthread_t thread; + nxt_unit_ctx_t *ctx; +} nxt_perl_psgi_ctx_t; static long nxt_perl_psgi_io_input_read(PerlInterpreter *my_perl, @@ -62,11 +62,11 @@ static nxt_int_t nxt_perl_psgi_io_input_init(PerlInterpreter *my_perl, static nxt_int_t nxt_perl_psgi_io_error_init(PerlInterpreter *my_perl, nxt_perl_psgi_io_arg_t *arg); -static PerlInterpreter *nxt_perl_psgi_interpreter_init(nxt_task_t *task, - char *script, SV **app); +static int nxt_perl_psgi_ctx_init(const char *script, + nxt_perl_psgi_ctx_t *pctx); static SV *nxt_perl_psgi_env_create(PerlInterpreter *my_perl, - nxt_unit_request_info_t *req, nxt_perl_psgi_input_t *input); + nxt_unit_request_info_t *req); nxt_inline int nxt_perl_psgi_add_sptr(PerlInterpreter *my_perl, HV *hash_env, const char *name, uint32_t name_len, nxt_unit_sptr_t *sptr, uint32_t len); nxt_inline int nxt_perl_psgi_add_str(PerlInterpreter *my_perl, HV *hash_env, @@ -75,8 +75,7 @@ nxt_inline int nxt_perl_psgi_add_value(PerlInterpreter *my_perl, HV *hash_env, const char *name, uint32_t name_len, void *value); -static u_char *nxt_perl_psgi_module_create(nxt_task_t *task, - const char *script); +static char *nxt_perl_psgi_module_create(const char *script); static nxt_int_t nxt_perl_psgi_result_status(PerlInterpreter *my_perl, SV *result); @@ -96,18 +95,20 @@ static void nxt_perl_psgi_result_cb(PerlInterpreter *my_perl, SV *result, nxt_unit_request_info_t *req); static nxt_int_t nxt_perl_psgi_start(nxt_task_t *task, - nxt_process_data_t *conf); + nxt_process_data_t *data); static void nxt_perl_psgi_request_handler(nxt_unit_request_info_t *req); -static void nxt_perl_psgi_atexit(void); - -typedef SV *(*nxt_perl_psgi_callback_f)(PerlInterpreter *my_perl, - SV *env, nxt_task_t *task); - -static CV *nxt_perl_psgi_cb; -static PerlInterpreter *nxt_perl_psgi; -static nxt_perl_psgi_io_arg_t nxt_perl_psgi_arg_input; -static nxt_perl_psgi_io_arg_t nxt_perl_psgi_arg_error; -static nxt_unit_request_info_t *nxt_perl_psgi_request; +static int nxt_perl_psgi_ready_handler(nxt_unit_ctx_t *ctx); +static void *nxt_perl_psgi_thread_func(void *main_ctx); +static int nxt_perl_psgi_init_threads(nxt_perl_app_conf_t *c); +static void nxt_perl_psgi_join_threads(nxt_unit_ctx_t *ctx, + nxt_perl_app_conf_t *c); +static void nxt_perl_psgi_ctx_free(nxt_perl_psgi_ctx_t *pctx); + +static CV *nxt_perl_psgi_write; +static CV *nxt_perl_psgi_close; +static CV *nxt_perl_psgi_cb; +static pthread_attr_t *nxt_perl_psgi_thread_attr; +static nxt_perl_psgi_ctx_t *nxt_perl_psgi_ctxs; static uint32_t nxt_perl_psgi_compat[] = { NXT_VERNUM, NXT_DEBUG, @@ -129,11 +130,11 @@ static long nxt_perl_psgi_io_input_read(PerlInterpreter *my_perl, nxt_perl_psgi_io_arg_t *arg, void *vbuf, size_t length) { - nxt_perl_psgi_input_t *input; + nxt_perl_psgi_ctx_t *pctx; - input = (nxt_perl_psgi_input_t *) arg->ctx; + pctx = arg->pctx; - return nxt_unit_request_read(input->req, vbuf, length); + return nxt_unit_request_read(pctx->req, vbuf, length); } @@ -165,10 +166,11 @@ static long nxt_perl_psgi_io_error_write(PerlInterpreter *my_perl, nxt_perl_psgi_io_arg_t *arg, const void *vbuf, size_t length) { - nxt_perl_psgi_input_t *input; + nxt_perl_psgi_ctx_t *pctx; - input = (nxt_perl_psgi_input_t *) arg->ctx; - nxt_unit_req_error(input->req, "Perl: %s", (const char*) vbuf); + pctx = arg->pctx; + + nxt_unit_req_error(pctx->req, "Perl: %s", (const char*) vbuf); return (long) length; } @@ -216,9 +218,10 @@ XS(XS_NGINX__Unit__PSGI_exit) XS(XS_NGINX__Unit__Sandbox_write); XS(XS_NGINX__Unit__Sandbox_write) { - int rc; - char *body; - size_t len; + int rc; + char *body; + size_t len; + nxt_perl_psgi_ctx_t *pctx; dXSARGS; @@ -230,7 +233,9 @@ XS(XS_NGINX__Unit__Sandbox_write) body = SvPV(ST(1), len); - rc = nxt_unit_response_write(nxt_perl_psgi_request, body, len); + pctx = CvXSUBANY(cv).any_ptr; + + rc = nxt_unit_response_write(pctx->req, body, len); if (nxt_slow_path(rc != NXT_UNIT_OK)) { Perl_croak(aTHX_ "Failed to write response body"); @@ -242,15 +247,11 @@ XS(XS_NGINX__Unit__Sandbox_write) nxt_inline void -nxt_perl_psgi_cb_request_done(nxt_int_t status) +nxt_perl_psgi_cb_request_done(nxt_perl_psgi_ctx_t *pctx, int status) { - nxt_unit_request_info_t *req; - - req = nxt_perl_psgi_request; - - if (req != NULL) { - nxt_unit_request_done(req, status); - nxt_perl_psgi_request = NULL; + if (pctx->req != NULL) { + nxt_unit_request_done(pctx->req, status); + pctx->req = NULL; } } @@ -262,7 +263,7 @@ XS(XS_NGINX__Unit__Sandbox_close) ax = POPMARK; - nxt_perl_psgi_cb_request_done(NXT_UNIT_OK); + nxt_perl_psgi_cb_request_done(CvXSUBANY(cv).any_ptr, NXT_UNIT_OK); XSRETURN_NO; } @@ -271,14 +272,15 @@ XS(XS_NGINX__Unit__Sandbox_close) XS(XS_NGINX__Unit__Sandbox_cb); XS(XS_NGINX__Unit__Sandbox_cb) { - SV *obj; - int rc; - long array_len; + SV *obj; + int rc; + long array_len; + nxt_perl_psgi_ctx_t *pctx; dXSARGS; if (nxt_slow_path(items != 1)) { - nxt_perl_psgi_cb_request_done(NXT_UNIT_ERROR); + nxt_perl_psgi_cb_request_done(CvXSUBANY(cv).any_ptr, NXT_UNIT_ERROR); Perl_croak(aTHX_ "Wrong number of arguments"); @@ -288,7 +290,7 @@ XS(XS_NGINX__Unit__Sandbox_cb) if (nxt_slow_path(SvOK(ST(0)) == 0 || SvROK(ST(0)) == 0 || SvTYPE(SvRV(ST(0))) != SVt_PVAV)) { - nxt_perl_psgi_cb_request_done(NXT_UNIT_ERROR); + nxt_perl_psgi_cb_request_done(CvXSUBANY(cv).any_ptr, NXT_UNIT_ERROR); Perl_croak(aTHX_ "PSGI: An unexpected response was received " "from Perl Application"); @@ -296,10 +298,11 @@ XS(XS_NGINX__Unit__Sandbox_cb) XSRETURN_EMPTY; } - rc = nxt_perl_psgi_result_array(PERL_GET_CONTEXT, ST(0), - nxt_perl_psgi_request); + pctx = CvXSUBANY(cv).any_ptr; + + rc = nxt_perl_psgi_result_array(PERL_GET_CONTEXT, ST(0), pctx->req); if (nxt_slow_path(rc != NXT_UNIT_OK)) { - nxt_perl_psgi_cb_request_done(NXT_UNIT_ERROR); + nxt_perl_psgi_cb_request_done(CvXSUBANY(cv).any_ptr, NXT_UNIT_ERROR); Perl_croak(aTHX_ (char *) NULL); @@ -316,7 +319,7 @@ XS(XS_NGINX__Unit__Sandbox_cb) XSRETURN(1); } - nxt_perl_psgi_cb_request_done(NXT_UNIT_OK); + nxt_perl_psgi_cb_request_done(CvXSUBANY(cv).any_ptr, NXT_UNIT_OK); XSRETURN_EMPTY; } @@ -335,10 +338,11 @@ nxt_perl_psgi_xs_init(pTHX) /* DynaLoader for Perl modules who use XS */ newXS("DynaLoader::boot_DynaLoader", boot_DynaLoader, __FILE__); - newXS("NGINX::Unit::Sandbox::write", XS_NGINX__Unit__Sandbox_write, - __FILE__); - newXS("NGINX::Unit::Sandbox::close", XS_NGINX__Unit__Sandbox_close, - __FILE__); + nxt_perl_psgi_write = newXS("NGINX::Unit::Sandbox::write", + XS_NGINX__Unit__Sandbox_write, __FILE__); + + nxt_perl_psgi_close = newXS("NGINX::Unit::Sandbox::close", + XS_NGINX__Unit__Sandbox_close, __FILE__); nxt_perl_psgi_cb = newXS("NGINX::Unit::Sandbox::cb", XS_NGINX__Unit__Sandbox_cb, __FILE__); @@ -416,10 +420,10 @@ nxt_perl_psgi_call_method(PerlInterpreter *my_perl, SV *obj, const char *method, } -static u_char * -nxt_perl_psgi_module_create(nxt_task_t *task, const char *script) +static char * +nxt_perl_psgi_module_create(const char *script) { - u_char *buf, *p; + char *buf, *p; size_t length; static nxt_str_t prefix = nxt_string( @@ -441,12 +445,11 @@ nxt_perl_psgi_module_create(nxt_task_t *task, const char *script) length = strlen(script); - buf = nxt_malloc(prefix.length + length + suffix.length); - + buf = nxt_unit_malloc(NULL, prefix.length + length + suffix.length); if (nxt_slow_path(buf == NULL)) { - nxt_log_error(NXT_LOG_ERR, task->log, - "PSGI: Failed to allocate memory " - "for Perl script file %s", script); + nxt_unit_alert(NULL, "PSGI: Failed to allocate memory " + "for Perl script file %s", script); + return NULL; } @@ -518,30 +521,27 @@ nxt_perl_psgi_io_error_init(PerlInterpreter *my_perl, } -static PerlInterpreter * -nxt_perl_psgi_interpreter_init(nxt_task_t *task, char *script, SV **app) +static int +nxt_perl_psgi_ctx_init(const char *script, nxt_perl_psgi_ctx_t *pctx) { - int status, pargc; - char **pargv, **penv; - u_char *run_module; + int status; + char *run_module; PerlInterpreter *my_perl; static char argv[] = "\0""-e\0""0"; static char *embedding[] = { &argv[0], &argv[1], &argv[4] }; - pargc = 0; - pargv = NULL; - penv = NULL; - - PERL_SYS_INIT3(&pargc, &pargv, &penv); - my_perl = perl_alloc(); if (nxt_slow_path(my_perl == NULL)) { - nxt_alert(task, "PSGI: Failed to allocate memory for Perl interpreter"); - return NULL; + nxt_unit_alert(NULL, + "PSGI: Failed to allocate memory for Perl interpreter"); + + return NXT_UNIT_ERROR; } + pctx->my_perl = my_perl; + run_module = NULL; perl_construct(my_perl); @@ -550,77 +550,86 @@ nxt_perl_psgi_interpreter_init(nxt_task_t *task, char *script, SV **app) status = perl_parse(my_perl, nxt_perl_psgi_xs_init, 3, embedding, NULL); if (nxt_slow_path(status != 0)) { - nxt_alert(task, "PSGI: Failed to parse Perl Script"); + nxt_unit_alert(NULL, "PSGI: Failed to parse Perl Script"); goto fail; } + CvXSUBANY(nxt_perl_psgi_write).any_ptr = pctx; + CvXSUBANY(nxt_perl_psgi_close).any_ptr = pctx; + CvXSUBANY(nxt_perl_psgi_cb).any_ptr = pctx; + + pctx->cb = nxt_perl_psgi_cb; + PL_exit_flags |= PERL_EXIT_DESTRUCT_END; PL_origalen = 1; status = perl_run(my_perl); if (nxt_slow_path(status != 0)) { - nxt_alert(task, "PSGI: Failed to run Perl"); + nxt_unit_alert(NULL, "PSGI: Failed to run Perl"); goto fail; } sv_setsv(get_sv("0", 0), newSVpv(script, 0)); - run_module = nxt_perl_psgi_module_create(task, script); - + run_module = nxt_perl_psgi_module_create(script); if (nxt_slow_path(run_module == NULL)) { goto fail; } - status = nxt_perl_psgi_io_input_init(my_perl, &nxt_perl_psgi_arg_input); + pctx->arg_input.pctx = pctx; + status = nxt_perl_psgi_io_input_init(my_perl, &pctx->arg_input); if (nxt_slow_path(status != NXT_OK)) { - nxt_alert(task, "PSGI: Failed to init io.psgi.input"); + nxt_unit_alert(NULL, "PSGI: Failed to init io.psgi.input"); goto fail; } - status = nxt_perl_psgi_io_error_init(my_perl, &nxt_perl_psgi_arg_error); + pctx->arg_error.pctx = pctx; + status = nxt_perl_psgi_io_error_init(my_perl, &pctx->arg_error); if (nxt_slow_path(status != NXT_OK)) { - nxt_alert(task, "PSGI: Failed to init io.psgi.errors"); + nxt_unit_alert(NULL, "PSGI: Failed to init io.psgi.errors"); goto fail; } - *app = eval_pv((const char *) run_module, FALSE); + pctx->app = eval_pv(run_module, FALSE); if (SvTRUE(ERRSV)) { - nxt_alert(task, "PSGI: Failed to parse script: %s\n%s", - script, SvPV_nolen(ERRSV)); + nxt_unit_alert(NULL, "PSGI: Failed to parse script: %s\n%s", + script, SvPV_nolen(ERRSV)); goto fail; } - nxt_free(run_module); + nxt_unit_free(NULL, run_module); - return my_perl; + return NXT_UNIT_OK; fail: if (run_module != NULL) { - nxt_free(run_module); + nxt_unit_free(NULL, run_module); } perl_destruct(my_perl); perl_free(my_perl); - PERL_SYS_TERM(); - return NULL; + return NXT_UNIT_ERROR; } static SV * nxt_perl_psgi_env_create(PerlInterpreter *my_perl, - nxt_unit_request_info_t *req, nxt_perl_psgi_input_t *input) + nxt_unit_request_info_t *req) { - HV *hash_env; - AV *array_version; - uint32_t i; - nxt_unit_field_t *f; - nxt_unit_request_t *r; + HV *hash_env; + AV *array_version; + uint32_t i; + nxt_unit_field_t *f; + nxt_unit_request_t *r; + nxt_perl_psgi_ctx_t *pctx; + + pctx = req->ctx->data; hash_env = newHV(); if (nxt_slow_path(hash_env == NULL)) { @@ -664,11 +673,12 @@ nxt_perl_psgi_env_create(PerlInterpreter *my_perl, : newSVpv("http", 4))); RC(nxt_perl_psgi_add_value(my_perl, hash_env, NL("psgi.input"), - SvREFCNT_inc(nxt_perl_psgi_arg_input.io))); + SvREFCNT_inc(pctx->arg_input.io))); RC(nxt_perl_psgi_add_value(my_perl, hash_env, NL("psgi.errors"), - SvREFCNT_inc(nxt_perl_psgi_arg_error.io))); + SvREFCNT_inc(pctx->arg_error.io))); RC(nxt_perl_psgi_add_value(my_perl, hash_env, NL("psgi.multithread"), - &PL_sv_no)); + nxt_perl_psgi_ctxs != NULL + ? &PL_sv_yes : &PL_sv_no)); RC(nxt_perl_psgi_add_value(my_perl, hash_env, NL("psgi.multiprocess"), &PL_sv_yes)); RC(nxt_perl_psgi_add_value(my_perl, hash_env, NL("psgi.run_once"), @@ -1109,13 +1119,17 @@ static void nxt_perl_psgi_result_cb(PerlInterpreter *my_perl, SV *result, nxt_unit_request_info_t *req) { + nxt_perl_psgi_ctx_t *pctx; + dSP; + pctx = req->ctx->data; + ENTER; SAVETMPS; PUSHMARK(sp); - XPUSHs(newRV_noinc((SV*) nxt_perl_psgi_cb)); + XPUSHs(newRV_noinc((SV*) pctx->cb)); PUTBACK; call_sv(result, G_EVAL|G_SCALAR); @@ -1126,7 +1140,7 @@ nxt_perl_psgi_result_cb(PerlInterpreter *my_perl, SV *result, nxt_unit_error(NULL, "PSGI: Failed to execute result callback: \n%s", SvPV_nolen(ERRSV)); - nxt_perl_psgi_cb_request_done(NXT_UNIT_ERROR); + nxt_perl_psgi_cb_request_done(pctx, NXT_UNIT_ERROR); } PUTBACK; @@ -1138,90 +1152,116 @@ nxt_perl_psgi_result_cb(PerlInterpreter *my_perl, SV *result, static nxt_int_t nxt_perl_psgi_start(nxt_task_t *task, nxt_process_data_t *data) { - int rc; - nxt_unit_ctx_t *unit_ctx; - nxt_unit_init_t perl_init; - PerlInterpreter *my_perl; - nxt_common_app_conf_t *conf; - nxt_perl_psgi_module_t module; + int rc, pargc; + char **pargv, **penv; + nxt_unit_ctx_t *unit_ctx; + nxt_unit_init_t perl_init; + nxt_perl_psgi_ctx_t pctx; + nxt_perl_app_conf_t *c; + nxt_common_app_conf_t *common_conf; + + common_conf = data->app; + c = &common_conf->u.perl; + + pargc = 0; + pargv = NULL; + penv = NULL; - conf = data->app; + PERL_SYS_INIT3(&pargc, &pargv, &penv); - my_perl = nxt_perl_psgi_interpreter_init(task, conf->u.perl.script, - &module.app); + memset(&pctx, 0, sizeof(nxt_perl_psgi_ctx_t)); - if (nxt_slow_path(my_perl == NULL)) { - return NXT_ERROR; + rc = nxt_perl_psgi_ctx_init(c->script, &pctx); + if (nxt_slow_path(rc != NXT_UNIT_OK)) { + goto fail; } - module.my_perl = my_perl; - nxt_perl_psgi = my_perl; + rc = nxt_perl_psgi_init_threads(c); + + PERL_SET_CONTEXT(pctx.my_perl); + + if (nxt_slow_path(rc != NXT_UNIT_OK)) { + goto fail; + } nxt_unit_default_init(task, &perl_init); perl_init.callbacks.request_handler = nxt_perl_psgi_request_handler; - perl_init.data = &module; - perl_init.shm_limit = conf->shm_limit; + perl_init.callbacks.ready_handler = nxt_perl_psgi_ready_handler; + perl_init.data = c; + perl_init.ctx_data = &pctx; + perl_init.shm_limit = common_conf->shm_limit; unit_ctx = nxt_unit_init(&perl_init); if (nxt_slow_path(unit_ctx == NULL)) { - return NXT_ERROR; + goto fail; } rc = nxt_unit_run(unit_ctx); + nxt_perl_psgi_join_threads(unit_ctx, c); + nxt_unit_done(unit_ctx); - nxt_perl_psgi_atexit(); + nxt_perl_psgi_ctx_free(&pctx); + + PERL_SYS_TERM(); exit(rc); return NXT_OK; + +fail: + + nxt_perl_psgi_join_threads(NULL, c); + + nxt_perl_psgi_ctx_free(&pctx); + + PERL_SYS_TERM(); + + return NXT_ERROR; } static void nxt_perl_psgi_request_handler(nxt_unit_request_info_t *req) { - SV *env, *result; - nxt_int_t rc; - PerlInterpreter *my_perl; - nxt_perl_psgi_input_t input; - nxt_perl_psgi_module_t *module; - - module = req->unit->data; - my_perl = module->my_perl; + SV *env, *result; + nxt_int_t rc; + PerlInterpreter *my_perl; + nxt_perl_psgi_ctx_t *pctx; - input.my_perl = my_perl; - input.req = req; + pctx = req->ctx->data; + my_perl = pctx->my_perl; - nxt_perl_psgi_request = req; + pctx->req = req; /* * Create environ variable for perl sub "application". * > sub application { * > my ($environ) = @_; */ - env = nxt_perl_psgi_env_create(my_perl, req, &input); + env = nxt_perl_psgi_env_create(my_perl, req); if (nxt_slow_path(env == NULL)) { nxt_unit_req_error(req, "PSGI: Failed to create 'env' for Perl Application"); nxt_unit_request_done(req, NXT_UNIT_ERROR); + pctx->req = NULL; return; } - nxt_perl_psgi_arg_input.ctx = &input; - nxt_perl_psgi_arg_error.ctx = &input; - /* Call perl sub and get result as SV*. */ - result = nxt_perl_psgi_call_var_application(my_perl, env, module->app, req); + result = nxt_perl_psgi_call_var_application(my_perl, env, pctx->app, + req); if (nxt_fast_path(SvOK(result) != 0 && SvROK(result) != 0)) { if (SvTYPE(SvRV(result)) == SVt_PVAV) { rc = nxt_perl_psgi_result_array(my_perl, result, req); nxt_unit_request_done(req, rc); + pctx->req = NULL; + goto release; } @@ -1235,6 +1275,7 @@ nxt_perl_psgi_request_handler(nxt_unit_request_info_t *req) "from Perl Application"); nxt_unit_request_done(req, NXT_UNIT_ERROR); + pctx->req = NULL; release: @@ -1243,18 +1284,181 @@ release: } +static int +nxt_perl_psgi_ready_handler(nxt_unit_ctx_t *ctx) +{ + int res; + uint32_t i; + nxt_perl_app_conf_t *c; + nxt_perl_psgi_ctx_t *pctx; + + /* Worker thread context. */ + if (!nxt_unit_is_main_ctx(ctx)) { + return NXT_UNIT_OK; + } + + c = ctx->unit->data; + + if (c->threads <= 1) { + return NXT_UNIT_OK; + } + + for (i = 0; i < c->threads - 1; i++) { + pctx = &nxt_perl_psgi_ctxs[i]; + + pctx->ctx = ctx; + + res = pthread_create(&pctx->thread, nxt_perl_psgi_thread_attr, + nxt_perl_psgi_thread_func, pctx); + + if (nxt_fast_path(res == 0)) { + nxt_unit_debug(ctx, "thread #%d created", (int) (i + 1)); + + } else { + nxt_unit_alert(ctx, "thread #%d create failed: %s (%d)", + (int) (i + 1), strerror(res), res); + + return NXT_UNIT_ERROR; + } + } + + return NXT_UNIT_OK; +} + + +static void * +nxt_perl_psgi_thread_func(void *data) +{ + nxt_unit_ctx_t *ctx; + nxt_perl_psgi_ctx_t *pctx; + + pctx = data; + + nxt_unit_debug(pctx->ctx, "worker thread start"); + + ctx = nxt_unit_ctx_alloc(pctx->ctx, pctx); + if (nxt_slow_path(ctx == NULL)) { + return NULL; + } + + pctx->ctx = ctx; + + PERL_SET_CONTEXT(pctx->my_perl); + + (void) nxt_unit_run(ctx); + + nxt_unit_done(ctx); + + nxt_unit_debug(NULL, "worker thread end"); + + return NULL; +} + + +static int +nxt_perl_psgi_init_threads(nxt_perl_app_conf_t *c) +{ + int rc; + uint32_t i; + static pthread_attr_t attr; + + if (c->threads <= 1) { + return NXT_UNIT_OK; + } + + if (c->thread_stack_size > 0) { + rc = pthread_attr_init(&attr); + if (nxt_slow_path(rc != 0)) { + nxt_unit_alert(NULL, "thread attr init failed: %s (%d)", + strerror(rc), rc); + + return NXT_UNIT_ERROR; + } + + rc = pthread_attr_setstacksize(&attr, c->thread_stack_size); + if (nxt_slow_path(rc != 0)) { + nxt_unit_alert(NULL, "thread attr set stack size failed: %s (%d)", + strerror(rc), rc); + + return NXT_UNIT_ERROR; + } + + nxt_perl_psgi_thread_attr = &attr; + } + + nxt_perl_psgi_ctxs = nxt_unit_malloc(NULL, sizeof(nxt_perl_psgi_ctx_t) + * (c->threads - 1)); + if (nxt_slow_path(nxt_perl_psgi_ctxs == NULL)) { + return NXT_UNIT_ERROR; + } + + memset(nxt_perl_psgi_ctxs, 0, sizeof(nxt_perl_psgi_ctx_t) + * (c->threads - 1)); + + for (i = 0; i < c->threads - 1; i++) { + rc = nxt_perl_psgi_ctx_init(c->script, &nxt_perl_psgi_ctxs[i]); + + if (nxt_slow_path(rc != NXT_UNIT_OK)) { + return NXT_UNIT_ERROR; + } + } + + return NXT_UNIT_OK; +} + + static void -nxt_perl_psgi_atexit(void) +nxt_perl_psgi_join_threads(nxt_unit_ctx_t *ctx, nxt_perl_app_conf_t *c) { - dTHXa(nxt_perl_psgi); + int res; + uint32_t i; + nxt_perl_psgi_ctx_t *pctx; - nxt_perl_psgi_layer_stream_io_destroy(aTHX_ nxt_perl_psgi_arg_input.io); - nxt_perl_psgi_layer_stream_fp_destroy(aTHX_ nxt_perl_psgi_arg_input.fp); + if (nxt_perl_psgi_ctxs == NULL) { + return; + } - nxt_perl_psgi_layer_stream_io_destroy(aTHX_ nxt_perl_psgi_arg_error.io); - nxt_perl_psgi_layer_stream_fp_destroy(aTHX_ nxt_perl_psgi_arg_error.fp); + for (i = 0; i < c->threads - 1; i++) { + pctx = &nxt_perl_psgi_ctxs[i]; - perl_destruct(nxt_perl_psgi); - perl_free(nxt_perl_psgi); - PERL_SYS_TERM(); + res = pthread_join(pctx->thread, NULL); + + if (nxt_fast_path(res == 0)) { + nxt_unit_debug(ctx, "thread #%d joined", (int) (i + 1)); + + } else { + nxt_unit_alert(ctx, "thread #%d join failed: %s (%d)", + (int) (i + 1), strerror(res), res); + } + } + + for (i = 0; i < c->threads - 1; i++) { + nxt_perl_psgi_ctx_free(&nxt_perl_psgi_ctxs[i]); + } + + nxt_unit_free(NULL, nxt_perl_psgi_ctxs); +} + + +static void +nxt_perl_psgi_ctx_free(nxt_perl_psgi_ctx_t *pctx) +{ + PerlInterpreter *my_perl; + + my_perl = pctx->my_perl; + + if (nxt_slow_path(my_perl == NULL)) { + return; + } + + PERL_SET_CONTEXT(my_perl); + + nxt_perl_psgi_layer_stream_io_destroy(aTHX_ pctx->arg_input.io); + nxt_perl_psgi_layer_stream_fp_destroy(aTHX_ pctx->arg_input.fp); + + nxt_perl_psgi_layer_stream_io_destroy(aTHX_ pctx->arg_error.io); + nxt_perl_psgi_layer_stream_fp_destroy(aTHX_ pctx->arg_error.fp); + + perl_destruct(my_perl); + perl_free(my_perl); } diff --git a/src/perl/nxt_perl_psgi_layer.h b/src/perl/nxt_perl_psgi_layer.h index 3fa349c0..af18ad0d 100644 --- a/src/perl/nxt_perl_psgi_layer.h +++ b/src/perl/nxt_perl_psgi_layer.h @@ -14,7 +14,7 @@ #include <perliol.h> -typedef struct nxt_perl_psgi_io_arg nxt_perl_psgi_io_arg_t; +typedef struct nxt_perl_psgi_io_arg_s nxt_perl_psgi_io_arg_t; typedef long (*nxt_perl_psgi_io_read_f)(PerlInterpreter *my_perl, nxt_perl_psgi_io_arg_t *arg, void *vbuf, size_t length); @@ -24,7 +24,7 @@ typedef long (*nxt_perl_psgi_io_arg_f)(PerlInterpreter *my_perl, nxt_perl_psgi_io_arg_t *arg); -struct nxt_perl_psgi_io_arg { +struct nxt_perl_psgi_io_arg_s { SV *io; PerlIO *fp; @@ -32,7 +32,7 @@ struct nxt_perl_psgi_io_arg { nxt_perl_psgi_io_read_f read; nxt_perl_psgi_io_write_f write; - void *ctx; + void *pctx; }; |