/* * Copyright (C) Alexander Borisov * Copyright (C) NGINX, Inc. */ #include #include #include #include #include #include #include #include #include typedef struct { PerlInterpreter *my_perl; nxt_perl_psgi_io_arg_t arg_input; nxt_perl_psgi_io_arg_t arg_error; SV *app; CV *cb; nxt_unit_request_info_t *req; pthread_t thread; nxt_unit_ctx_t *ctx; } nxt_perl_psgi_ctx_t; static SSize_t nxt_perl_psgi_io_input_read(PerlInterpreter *my_perl, nxt_perl_psgi_io_arg_t *arg, void *vbuf, size_t length); static SSize_t nxt_perl_psgi_io_input_write(PerlInterpreter *my_perl, nxt_perl_psgi_io_arg_t *arg, const void *vbuf, size_t length); static SSize_t nxt_perl_psgi_io_error_read(PerlInterpreter *my_perl, nxt_perl_psgi_io_arg_t *arg, void *vbuf, size_t length); static SSize_t nxt_perl_psgi_io_error_write(PerlInterpreter *my_perl, nxt_perl_psgi_io_arg_t *arg, const void *vbuf, size_t length); /* static void nxt_perl_psgi_xs_core_global_changes(PerlInterpreter *my_perl, const char *core, const char *sub, XSUBADDR_t sub_addr); */ static void nxt_perl_psgi_xs_init(pTHX); static SV *nxt_perl_psgi_call_var_application(PerlInterpreter *my_perl, SV *env, SV *app, nxt_unit_request_info_t *req); static SV *nxt_perl_psgi_call_method(PerlInterpreter *my_perl, SV *obj, const char *method, nxt_unit_request_info_t *req); /* For currect load XS modules */ EXTERN_C void boot_DynaLoader(pTHX_ CV *cv); static int nxt_perl_psgi_io_init(PerlInterpreter *my_perl, nxt_perl_psgi_io_arg_t *arg, const char *mode, void *req); static int nxt_perl_psgi_ctx_init(const char *script, nxt_perl_psgi_ctx_t *pctx); static SV *nxt_perl_psgi_env_create(PerlInterpreter *my_perl, nxt_unit_request_info_t *req); nxt_inline int nxt_perl_psgi_add_sptr(PerlInterpreter *my_perl, HV *hash_env, const char *name, uint32_t name_len, nxt_unit_sptr_t *sptr, uint32_t len); nxt_inline int nxt_perl_psgi_add_str(PerlInterpreter *my_perl, HV *hash_env, const char *name, uint32_t name_len, const char *str, uint32_t len); nxt_inline int nxt_perl_psgi_add_value(PerlInterpreter *my_perl, HV *hash_env, const char *name, uint32_t name_len, void *value); static char *nxt_perl_psgi_module_create(const char *script); static nxt_int_t nxt_perl_psgi_result_status(PerlInterpreter *my_perl, SV *result); static int nxt_perl_psgi_result_head(PerlInterpreter *my_perl, SV *sv_head, nxt_unit_request_info_t *req, uint16_t status); static int nxt_perl_psgi_result_body(PerlInterpreter *my_perl, SV *result, nxt_unit_request_info_t *req); static int nxt_perl_psgi_result_body_ref(PerlInterpreter *my_perl, SV *sv_body, nxt_unit_request_info_t *req); static int nxt_perl_psgi_result_body_fh(PerlInterpreter *my_perl, SV *sv_body, nxt_unit_request_info_t *req); static ssize_t nxt_perl_psgi_io_read(nxt_unit_read_info_t *read_info, void *dst, size_t size); static int nxt_perl_psgi_result_array(PerlInterpreter *my_perl, SV *result, nxt_unit_request_info_t *req); static void nxt_perl_psgi_result_cb(PerlInterpreter *my_perl, SV *result, nxt_unit_request_info_t *req); static nxt_int_t nxt_perl_psgi_start(nxt_task_t *task, nxt_process_data_t *data); static void nxt_perl_psgi_request_handler(nxt_unit_request_info_t *req); static int nxt_perl_psgi_ready_handler(nxt_unit_ctx_t *ctx); static void *nxt_perl_psgi_thread_func(void *main_ctx); static int nxt_perl_psgi_init_threads(nxt_perl_app_conf_t *c); static void nxt_perl_psgi_join_threads(nxt_unit_ctx_t *ctx, nxt_perl_app_conf_t *c); static void nxt_perl_psgi_ctx_free(nxt_perl_psgi_ctx_t *pctx); static CV *nxt_perl_psgi_write; static CV *nxt_perl_psgi_close; static CV *nxt_perl_psgi_cb; static pthread_attr_t *nxt_perl_psgi_thread_attr; static nxt_perl_psgi_ctx_t *nxt_perl_psgi_ctxs; static uint32_t nxt_perl_psgi_compat[] = { NXT_VERNUM, NXT_DEBUG, }; NXT_EXPORT nxt_app_module_t nxt_app_module = { sizeof(nxt_perl_psgi_compat), nxt_perl_psgi_compat, nxt_string("perl"), PERL_VERSION_STRING, NULL, 0, NULL, nxt_perl_psgi_start, }; const nxt_perl_psgi_io_tab_t nxt_perl_psgi_io_tab_input = { .read = nxt_perl_psgi_io_input_read, .write = nxt_perl_psgi_io_input_write, }; const nxt_perl_psgi_io_tab_t nxt_perl_psgi_io_tab_error = { .read = nxt_perl_psgi_io_error_read, .write = nxt_perl_psgi_io_error_write, }; static SSize_t nxt_perl_psgi_io_input_read(PerlInterpreter *my_perl, nxt_perl_psgi_io_arg_t *arg, void *vbuf, size_t length) { return nxt_unit_request_read(arg->req, vbuf, length); } static SSize_t nxt_perl_psgi_io_input_write(PerlInterpreter *my_perl, nxt_perl_psgi_io_arg_t *arg, const void *vbuf, size_t length) { return 0; } static SSize_t nxt_perl_psgi_io_error_read(PerlInterpreter *my_perl, nxt_perl_psgi_io_arg_t *arg, void *vbuf, size_t length) { return 0; } static SSize_t nxt_perl_psgi_io_error_write(PerlInterpreter *my_perl, nxt_perl_psgi_io_arg_t *arg, const void *vbuf, size_t length) { nxt_unit_req_error(arg->req, "Perl: %s", (const char*) vbuf); return (SSize_t) length; } /* In the future it will be necessary to change some Perl functions. */ /* static void nxt_perl_psgi_xs_core_global_changes(PerlInterpreter *my_perl, const char *core, const char *sub, XSUBADDR_t sub_addr) { GV *gv; gv = gv_fetchpv(core, TRUE, SVt_PVCV); #ifdef MUTABLE_CV GvCV_set(gv, MUTABLE_CV(SvREFCNT_inc(get_cv(sub, TRUE)))); #else GvCV_set(gv, (CV *) (SvREFCNT_inc(get_cv(sub, TRUE)))); #endif GvIMPORTED_CV_on(gv); newXS(sub, sub_addr, __FILE__); } */ XS(XS_NGINX__Unit__PSGI_exit); XS(XS_NGINX__Unit__PSGI_exit) { I32 ax = POPMARK; Perl_croak(aTHX_ (char *) NULL); XSRETURN_EMPTY; } XS(XS_NGINX__Unit__Sandbox_write); XS(XS_NGINX__Unit__Sandbox_write) { int rc; char *body; size_t len; nxt_perl_psgi_ctx_t *pctx; dXSARGS; if (nxt_slow_path(items != 2)) { Perl_croak(aTHX_ "Wrong number of arguments. Need one string"); XSRETURN_EMPTY; } body = SvPV(ST(1), len); pctx = CvXSUBANY(cv).any_ptr; rc = nxt_unit_response_write(pctx->req, body, len); if (nxt_slow_path(rc != NXT_UNIT_OK)) { Perl_croak(aTHX_ "Failed to write response body"); XSRETURN_EMPTY; } XSRETURN_IV(len); } nxt_inline void nxt_perl_psgi_cb_request_done(nxt_perl_psgi_ctx_t *pctx, int status) { if (pctx->req != NULL) { nxt_unit_request_done(pctx->req, status); pctx->req = NULL; } } XS(XS_NGINX__Unit__Sandbox_close); XS(XS_NGINX__Unit__Sandbox_close) { I32 ax; ax = POPMARK; nxt_perl_psgi_cb_request_done(CvXSUBANY(cv).any_ptr, NXT_UNIT_OK); XSRETURN_NO; } XS(XS_NGINX__Unit__Sandbox_cb); XS(XS_NGINX__Unit__Sandbox_cb) { SV *obj; int rc; long array_len; nxt_perl_psgi_ctx_t *pctx; dXSARGS; if (nxt_slow_path(items != 1)) { nxt_perl_psgi_cb_request_done(CvXSUBANY(cv).any_ptr, NXT_UNIT_ERROR); Perl_croak(aTHX_ "Wrong number of arguments"); XSRETURN_EMPTY; } pctx = CvXSUBANY(cv).any_ptr; if (nxt_slow_path(SvOK(ST(0)) == 0 || SvROK(ST(0)) == 0 || SvTYPE(SvRV(ST(0))) != SVt_PVAV || pctx->req == NULL)) { nxt_perl_psgi_cb_request_done(CvXSUBANY(cv).any_ptr, NXT_UNIT_ERROR); Perl_croak(aTHX_ "PSGI: An unexpected response was received " "from Perl Application"); XSRETURN_EMPTY; } rc = nxt_perl_psgi_result_array(PERL_GET_CONTEXT, ST(0), pctx->req); if (nxt_slow_path(rc != NXT_UNIT_OK)) { nxt_perl_psgi_cb_request_done(CvXSUBANY(cv).any_ptr, NXT_UNIT_ERROR); Perl_croak(aTHX_ (char *) NULL); XSRETURN_EMPTY; } array_len = av_len((AV *) SvRV(ST(0))); if (array_len < 2) { obj = sv_bless(newRV_noinc((SV *) newHV()), gv_stashpv("NGINX::Unit::Sandbox", GV_ADD)); ST(0) = obj; XSRETURN(1); } nxt_perl_psgi_cb_request_done(CvXSUBANY(cv).any_ptr, NXT_UNIT_OK); XSRETURN_EMPTY; } static void nxt_perl_psgi_xs_init(pTHX) { /* nxt_perl_psgi_xs_core_global_changes(my_perl, "CORE::GLOBAL::exit", "NGINX::Unit::PSGI::exit", XS_NGINX__Unit__PSGI_exit); */ nxt_perl_psgi_layer_stream_init(aTHX); /* DynaLoader for Perl modules who use XS */ newXS("DynaLoader::boot_DynaLoader", boot_DynaLoader, __FILE__); nxt_perl_psgi_write = newXS("NGINX::Unit::Sandbox::write", XS_NGINX__Unit__Sandbox_write, __FILE__); nxt_perl_psgi_close = newXS("NGINX::Unit::Sandbox::close", XS_NGINX__Unit__Sandbox_close, __FILE__); nxt_perl_psgi_cb = newXS("NGINX::Unit::Sandbox::cb", XS_NGINX__Unit__Sandbox_cb, __FILE__); } static SV * nxt_perl_psgi_call_var_application(PerlInterpreter *my_perl, SV *env, SV *app, nxt_unit_request_info_t *req) { SV *result; dSP; ENTER; SAVETMPS; PUSHMARK(sp); XPUSHs(env); PUTBACK; call_sv(app, G_EVAL|G_SCALAR); SPAGAIN; if (SvTRUE(ERRSV)) { nxt_unit_req_error(req, "PSGI: Failed to run Perl Application: \n%s", SvPV_nolen(ERRSV)); } result = POPs; SvREFCNT_inc(result); PUTBACK; FREETMPS; LEAVE; return result; } static SV * nxt_perl_psgi_call_method(PerlInterpreter *my_perl, SV *obj, const char *method, nxt_unit_request_info_t *req) { SV *result; dSP; ENTER; SAVETMPS; PUSHMARK(sp); XPUSHs(obj); PUTBACK; call_method(method, G_EVAL|G_SCALAR); SPAGAIN; if (SvTRUE(ERRSV)) { nxt_unit_req_error(req, "PSGI: Failed to call method '%s':\n%s", method, SvPV_nolen(ERRSV)); result = NULL; } else { result = SvREFCNT_inc(POPs); } PUTBACK; FREETMPS; LEAVE; return result; } static char * nxt_perl_psgi_module_create(const char *script) { char *buf, *p; size_t length; static const nxt_str_t prefix = nxt_string( "package NGINX::Unit::Sandbox;" "sub new {" " return bless {}, $_[0];" "}" "{my $app = do \"" ); static const nxt_str_t suffix = nxt_string_zero( "\";" "unless ($app) {" " if($@ || $1) {die $@ || $1}" " else {die \"File not found or compilation error.\"}" "} " "return $app}" ); length = strlen(script); buf = nxt_unit_malloc(NULL, prefix.length + length + suffix.length); if (nxt_slow_path(buf == NULL)) { nxt_unit_alert(NULL, "PSGI: Failed to allocate memory " "for Perl script file %s", script); return NULL; } p = nxt_cpymem(buf, prefix.start, prefix.length); p = nxt_cpymem(p, script, length); nxt_memcpy(p, suffix.start, suffix.length); return buf; } static int nxt_perl_psgi_io_init(PerlInterpreter *my_perl, nxt_perl_psgi_io_arg_t *arg, const char *mode, void *req) { SV *io; PerlIO *fp; if (arg->io == NULL) { fp = nxt_perl_psgi_layer_stream_fp_create(aTHX_ arg->rv, mode); if (nxt_slow_path(fp == NULL)) { return NXT_UNIT_ERROR; } io = nxt_perl_psgi_layer_stream_io_create(aTHX_ fp); if (nxt_slow_path(io == NULL)) { nxt_perl_psgi_layer_stream_fp_destroy(aTHX_ fp); return NXT_UNIT_ERROR; } arg->io = io; arg->fp = fp; } arg->req = req; return NXT_UNIT_OK; } static void nxt_perl_psgi_io_release(PerlInterpreter *my_perl, nxt_perl_psgi_io_arg_t *arg) { if (arg->io != NULL) { SvREFCNT_dec(arg->io); arg->io = NULL; } } static int nxt_perl_psgi_ctx_init(const char *script, nxt_perl_psgi_ctx_t *pctx) { int status, res; char *run_module; PerlInterpreter *my_perl; static char argv[] = "\0""-e\0""0"; static char *embedding[] = { &argv[0], &argv[1], &argv[4] }; my_perl = perl_alloc(); if (nxt_slow_path(my_perl == NULL)) { nxt_unit_alert(NULL, "PSGI: Failed to allocate memory for Perl interpreter"); return NXT_UNIT_ERROR; } pctx->my_perl = my_perl; run_module = NULL; perl_construct(my_perl); PERL_SET_CONTEXT(my_perl); status = perl_parse(my_perl, nxt_perl_psgi_xs_init, 3, embedding, NULL); if (nxt_slow_path(status != 0)) { nxt_unit_alert(NULL, "PSGI: Failed to parse Perl Script"); goto fail; } CvXSUBANY(nxt_perl_psgi_write).any_ptr = pctx; CvXSUBANY(nxt_perl_psgi_close).any_ptr = pctx; CvXSUBANY(nxt_perl_psgi_cb).any_ptr = pctx; pctx->cb = nxt_perl_psgi_cb; PL_exit_flags |= PERL_EXIT_DESTRUCT_END; PL_origalen = 1; status = perl_run(my_perl); if (nxt_slow_path(status != 0)) { nxt_unit_alert(NULL, "PSGI: Failed to run Perl"); goto fail; } sv_setsv(get_sv("0", 0), newSVpv(script, 0)); run_module = nxt_perl_psgi_module_create(script); if (nxt_slow_path(run_module == NULL)) { goto fail; } pctx->arg_input.rv = newSV_type(SVt_RV); sv_setptrref(pctx->arg_input.rv, &pctx->arg_input); SvSETMAGIC(pctx->arg_input.rv); pctx->arg_input.io_tab = &nxt_perl_psgi_io_tab_input; res = nxt_perl_psgi_io_init(my_perl, &pctx->arg_input, "r", NULL); if (nxt_slow_path(res != NXT_UNIT_OK)) { nxt_unit_alert(NULL, "PSGI: Failed to init io.psgi.input"); goto fail; } pctx->arg_error.rv = newSV_type(SVt_RV); sv_setptrref(pctx->arg_error.rv, &pctx->arg_error); SvSETMAGIC(pctx->arg_error.rv); pctx->arg_error.io_tab = &nxt_perl_psgi_io_tab_error; res = nxt_perl_psgi_io_init(my_perl, &pctx->arg_error, "w", NULL); if (nxt_slow_path(res != NXT_UNIT_OK)) { nxt_unit_alert(NULL, "PSGI: Failed to init io.psgi.error"); goto fail; } pctx->app = eval_pv(run_module, FALSE); if (SvTRUE(ERRSV)) { nxt_unit_alert(NULL, "PSGI: Failed to parse script: %s\n%s", script, SvPV_nolen(ERRSV)); goto fail; } nxt_unit_free(NULL, run_module); return NXT_UNIT_OK; fail: nxt_perl_psgi_io_release(my_perl, &pctx->arg_input); nxt_perl_psgi_io_release(my_perl, &pctx->arg_error); if (run_module != NULL) { nxt_unit_free(NULL, run_module); } perl_destruct(my_perl); perl_free(my_perl); pctx->my_perl = NULL; return NXT_UNIT_ERROR; } static SV * nxt_perl_psgi_env_create(PerlInterpreter *my_perl, nxt_unit_request_info_t *req) { HV *hash_env; AV *array_version; uint32_t i; nxt_unit_field_t *f; nxt_unit_request_t *r; nxt_perl_psgi_ctx_t *pctx; pctx = req->ctx->data; hash_env = newHV(); if (nxt_slow_path(hash_env == NULL)) { return NULL; } #define RC(FNS) \ do { \ if (nxt_slow_path((FNS) != NXT_UNIT_OK)) \ goto fail; \ } while (0) #define NL(S) (S), sizeof(S)-1 r = req->request; RC(nxt_perl_psgi_add_str(my_perl, hash_env, NL("SERVER_SOFTWARE"), (char *) nxt_server.start, nxt_server.length)); RC(nxt_perl_psgi_add_sptr(my_perl, hash_env, NL("REQUEST_METHOD"), &r->method, r->method_length)); RC(nxt_perl_psgi_add_sptr(my_perl, hash_env, NL("REQUEST_URI"), &r->target, r->target_length)); RC(nxt_perl_psgi_add_sptr(my_perl, hash_env, NL("PATH_INFO"), &r->path, r->path_length)); array_version = newAV(); if (nxt_slow_path(array_version == NULL)) { goto fail; } av_push(array_version, newSViv(1)); av_push(array_version, newSViv(1)); RC(nxt_perl_psgi_add_value(my_perl, hash_env, NL("psgi.version"), newRV_noinc((SV *) array_version))); RC(nxt_perl_psgi_add_value(my_perl, hash_env, NL("psgi.url_scheme"), r->tls ? newSVpv("https", 5) : newSVpv("http", 4))); RC(nxt_perl_psgi_io_init(my_perl, &pctx->arg_input, "r", req)); RC(nxt_perl_psgi_add_value(my_perl, hash_env, NL("psgi.input"), SvREFCNT_inc(pctx->arg_input.io))); RC(nxt_perl_psgi_io_init(my_perl, &pctx->arg_error, "w", req)); RC(nxt_perl_psgi_add_value(my_perl, hash_env, NL("psgi.errors"), SvREFCNT_inc(pctx->arg_error.io))); RC(nxt_perl_psgi_add_value(my_perl, hash_env, NL("psgi.multithread"), nxt_perl_psgi_ctxs != NULL ? &PL_sv_yes : &PL_sv_no)); RC(nxt_perl_psgi_add_value(my_perl, hash_env, NL("psgi.multiprocess"), &PL_sv_yes)); RC(nxt_perl_psgi_add_value(my_perl, hash_env, NL("psgi.run_once"), &PL_sv_no)); RC(nxt_perl_psgi_add_value(my_perl, hash_env, NL("psgi.nonblocking"), &PL_sv_no)); RC(nxt_perl_psgi_add_value(my_perl, hash_env, NL("psgi.streaming"), &PL_sv_yes)); RC(nxt_perl_psgi_add_sptr(my_perl, hash_env, NL("QUERY_STRING"), &r->query, r->query_length)); RC(nxt_perl_psgi_add_sptr(my_perl, hash_env, NL("SERVER_PROTOCOL"), &r->version, r->version_length)); RC(nxt_perl_psgi_add_sptr(my_perl, hash_env, NL("REMOTE_ADDR"), &r->remote, r->remote_length)); RC(nxt_perl_psgi_add_sptr(my_perl, hash_env, NL("SERVER_ADDR"), &r->local_addr, r->local_addr_length)); RC(nxt_perl_psgi_add_sptr(my_perl, hash_env, NL("SERVER_NAME"), &r->server_name, r->server_name_length)); RC(nxt_perl_psgi_add_str(my_perl, hash_env, NL("SERVER_PORT"), "80", 2)); for (i = 0; i < r->fields_count; i++) { f = r->fields + i; RC(nxt_perl_psgi_add_sptr(my_perl, hash_env, nxt_unit_sptr_get(&f->name), f->name_length, &f->value, f->value_length)); } if (r->content_length_field != NXT_UNIT_NONE_FIELD) { f = r->fields + r->content_length_field; RC(nxt_perl_psgi_add_sptr(my_perl, hash_env, NL("CONTENT_LENGTH"), &f->value, f->value_length)); } if (r->content_type_field != NXT_UNIT_NONE_FIELD) { f = r->fields + r->content_type_field; RC(nxt_perl_psgi_add_sptr(my_perl, hash_env, NL("CONTENT_TYPE"), &f->value, f->value_length)); } #undef NL #undef RC return newRV_noinc((SV *) hash_env); fail: SvREFCNT_dec(hash_env); return NULL; } nxt_inline int nxt_perl_psgi_add_sptr(PerlInterpreter *my_perl, HV *hash_env, const char *name, uint32_t name_len, nxt_unit_sptr_t *sptr, uint32_t len) { return nxt_perl_psgi_add_str(my_perl, hash_env, name, name_len, nxt_unit_sptr_get(sptr), len); } nxt_inline int nxt_perl_psgi_add_str(PerlInterpreter *my_perl, HV *hash_env, const char *name, uint32_t name_len, const char *str, uint32_t len) { SV **ha; ha = hv_store(hash_env, name, (I32) name_len, newSVpv(str, (STRLEN) len), 0); if (nxt_slow_path(ha == NULL)) { return NXT_UNIT_ERROR; } return NXT_UNIT_OK; } nxt_inline int nxt_perl_psgi_add_value(PerlInterpreter *my_perl, HV *hash_env, const char *name, uint32_t name_len, void *value) { SV **ha; ha = hv_store(hash_env, name, (I32) name_len, value, 0); if (nxt_slow_path(ha == NULL)) { return NXT_UNIT_ERROR; } return NXT_UNIT_OK; } static nxt_int_t nxt_perl_psgi_result_status(PerlInterpreter *my_perl, SV *result) { SV **sv_status; AV *array; u_char *space; nxt_str_t status; array = (AV *) SvRV(result); sv_status = av_fetch(array, 0, 0); status.start = (u_char *) SvPV(*sv_status, status.length); space = memchr(status.start, ' ', status.length); if (space != NULL) { status.length = space - status.start; } return nxt_int_parse(status.start, status.length); } static int nxt_perl_psgi_result_head(PerlInterpreter *my_perl, SV *sv_head, nxt_unit_request_info_t *req, uint16_t status) { AV *array_head; SV **entry; int rc; long i, array_len; char *name, *value; STRLEN name_len, value_len; uint32_t fields, size; if (nxt_slow_path(SvROK(sv_head) == 0 || SvTYPE(SvRV(sv_head)) != SVt_PVAV)) { nxt_unit_req_error(req, "PSGI: An unsupported format was received from " "Perl Application for head part"); return NXT_UNIT_ERROR; } array_head = (AV *) SvRV(sv_head); array_len = av_len(array_head); if (array_len < 1) { return nxt_unit_response_init(req, status, 0, 0); } if (nxt_slow_path((array_len % 2) == 0)) { nxt_unit_req_error(req, "PSGI: Bad format for head from " "Perl Application"); return NXT_UNIT_ERROR; } fields = 0; size = 0; for (i = 0; i <= array_len; i++) { entry = av_fetch(array_head, i, 0); if (nxt_fast_path(entry == NULL)) { nxt_unit_req_error(req, "PSGI: Failed to get head entry from " "Perl Application"); return NXT_UNIT_ERROR; } value = SvPV(*entry, value_len); size += value_len; if ((i % 2) == 0) { fields++; } } rc = nxt_unit_response_init(req, status, fields, size); if (nxt_slow_path(rc != NXT_UNIT_OK)) { return rc; } for (i = 0; i <= array_len; i += 2) { entry = av_fetch(array_head, i, 0); name = SvPV(*entry, name_len); entry = av_fetch(array_head, i + 1, 0); value = SvPV(*entry, value_len); rc = nxt_unit_response_add_field(req, name, name_len, value, value_len); if (nxt_slow_path(rc != NXT_UNIT_OK)) { return rc; } } return NXT_UNIT_OK; } static int nxt_perl_psgi_result_body(PerlInterpreter *my_perl, SV *sv_body, nxt_unit_request_info_t *req) { SV **entry; AV *body_array; int rc; long i; nxt_str_t body; if (nxt_slow_path(SvROK(sv_body) == 0 || SvTYPE(SvRV(sv_body)) != SVt_PVAV)) { nxt_unit_req_error(req, "PSGI: An unsupported format was received from " "Perl Application for a body part"); return NXT_UNIT_ERROR; } body_array = (AV *) SvRV(sv_body); for (i = 0; i <= av_len(body_array); i++) { entry = av_fetch(body_array, i, 0); if (nxt_fast_path(entry == NULL)) { nxt_unit_req_error(req, "PSGI: Failed to get body entry from " "Perl Application"); return NXT_UNIT_ERROR; } body.start = (u_char *) SvPV(*entry, body.length); if (body.length == 0) { continue; } rc = nxt_unit_response_write(req, body.start, body.length); if (nxt_slow_path(rc != NXT_UNIT_OK)) { nxt_unit_req_error(req, "PSGI: Failed to write content from " "Perl Application"); return rc; } } return NXT_UNIT_OK; } static int nxt_perl_psgi_result_body_ref(PerlInterpreter *my_perl, SV *sv_body, nxt_unit_request_info_t *req) { SV *data, *old_rs, *old_perl_rs; int rc; size_t len; const char *body; /* * Servers should set the $/ special variable to the buffer size * when reading content from $body using the getline method. * This is done by setting $/ with a reference to an integer ($/ = \8192). */ old_rs = PL_rs; old_perl_rs = get_sv("/", GV_ADD); PL_rs = sv_2mortal(newRV_noinc(newSViv(nxt_unit_buf_min()))); sv_setsv(old_perl_rs, PL_rs); rc = NXT_UNIT_OK; for ( ;; ) { data = nxt_perl_psgi_call_method(my_perl, sv_body, "getline", req); if (nxt_slow_path(data == NULL)) { rc = NXT_UNIT_ERROR; break; } body = SvPV(data, len); if (len == 0) { SvREFCNT_dec(data); data = nxt_perl_psgi_call_method(my_perl, sv_body, "close", req); if (nxt_fast_path(data != NULL)) { SvREFCNT_dec(data); } break; } rc = nxt_unit_response_write(req, body, len); SvREFCNT_dec(data); if (nxt_slow_path(rc != NXT_UNIT_OK)) { nxt_unit_req_error(req, "PSGI: Failed to write content from " "Perl Application"); break; } }; PL_rs = old_rs; sv_setsv(get_sv("/", GV_ADD), old_perl_rs); return rc; } typedef struct { PerlInterpreter *my_perl; PerlIO *fp; } nxt_perl_psgi_io_ctx_t; static int nxt_perl_psgi_result_body_fh(PerlInterpreter *my_perl, SV *sv_body, nxt_unit_request_info_t *req) { IO *io; nxt_unit_read_info_t read_info; nxt_perl_psgi_io_ctx_t io_ctx; io = GvIO(SvRV(sv_body)); if (io == NULL) { return NXT_UNIT_OK; } io_ctx.my_perl = my_perl; io_ctx.fp = IoIFP(io); read_info.read = nxt_perl_psgi_io_read; read_info.eof = PerlIO_eof(io_ctx.fp); read_info.buf_size = 8192; read_info.data = &io_ctx; return nxt_unit_response_write_cb(req, &read_info); } static ssize_t nxt_perl_psgi_io_read(nxt_unit_read_info_t *read_info, void *dst, size_t size) { ssize_t res; nxt_perl_psgi_io_ctx_t *ctx; ctx = read_info->data; dTHXa(ctx->my_perl); res = PerlIO_read(ctx->fp, dst, size); read_info->eof = PerlIO_eof(ctx->fp); return res; } static int nxt_perl_psgi_result_array(PerlInterpreter *my_perl, SV *result, nxt_unit_request_info_t *req) { AV *array; SV **sv_temp; int rc; long array_len; nxt_int_t status; array = (AV *) SvRV(result); array_len = av_len(array); if (nxt_slow_path(array_len < 0)) { nxt_unit_req_error(req, "PSGI: Invalid result format from Perl Application"); return NXT_UNIT_ERROR; } status = nxt_perl_psgi_result_status(my_perl, result); if (nxt_slow_path(status < 0)) { nxt_unit_req_error(req, "PSGI: An unexpected status was received " "from Perl Application"); return NXT_UNIT_ERROR; } if (array_len >= 1) { sv_temp = av_fetch(array, 1, 0); if (nxt_slow_path(sv_temp == NULL)) { nxt_unit_req_error(req, "PSGI: Failed to get head from " "Perl ARRAY variable"); return NXT_UNIT_ERROR; } rc = nxt_perl_psgi_result_head(my_perl, *sv_temp, req, status); if (nxt_slow_path(rc != NXT_UNIT_OK)) { return rc; } } else { return nxt_unit_response_init(req, status, 0, 0); } if (nxt_fast_path(array_len < 2)) { return NXT_UNIT_OK; } sv_temp = av_fetch(array, 2, 0); if (nxt_slow_path(sv_temp == NULL || SvROK(*sv_temp) == FALSE)) { nxt_unit_req_error(req, "PSGI: Failed to get body from " "Perl ARRAY variable"); return NXT_UNIT_ERROR; } if (SvTYPE(SvRV(*sv_temp)) == SVt_PVAV) { return nxt_perl_psgi_result_body(my_perl, *sv_temp, req); } if (SvTYPE(SvRV(*sv_temp)) == SVt_PVGV) { return nxt_perl_psgi_result_body_fh(my_perl, *sv_temp, req); } return nxt_perl_psgi_result_body_ref(my_perl, *sv_temp, req); } static void nxt_perl_psgi_result_cb(PerlInterpreter *my_perl, SV *result, nxt_unit_request_info_t *req) { nxt_perl_psgi_ctx_t *pctx; dSP; pctx = req->ctx->data; ENTER; SAVETMPS; PUSHMARK(sp); XPUSHs(newRV_noinc((SV*) pctx->cb)); PUTBACK; call_sv(result, G_EVAL|G_SCALAR); SPAGAIN; if (SvTRUE(ERRSV)) { nxt_unit_error(NULL, "PSGI: Failed to execute result callback: \n%s", SvPV_nolen(ERRSV)); nxt_perl_psgi_cb_request_done(pctx, NXT_UNIT_ERROR); } PUTBACK; FREETMPS; LEAVE; } static nxt_int_t nxt_perl_psgi_start(nxt_task_t *task, nxt_process_data_t *data) { int rc, pargc; char **pargv, **penv; nxt_unit_ctx_t *unit_ctx; nxt_unit_init_t perl_init; nxt_perl_psgi_ctx_t pctx; nxt_perl_app_conf_t *c; nxt_common_app_conf_t *common_conf; common_conf = data->app; c = &common_conf->u.perl; pargc = 0; pargv = NULL; penv = NULL; PERL_SYS_INIT3(&pargc, &pargv, &penv); memset(&pctx, 0, sizeof(nxt_perl_psgi_ctx_t)); rc = nxt_perl_psgi_ctx_init(c->script, &pctx); if (nxt_slow_path(rc != NXT_UNIT_OK)) { goto fail; } rc = nxt_perl_psgi_init_threads(c); PERL_SET_CONTEXT(pctx.my_perl); if (nxt_slow_path(rc != NXT_UNIT_OK)) { goto fail; } nxt_unit_default_init(task, &perl_init, common_conf); perl_init.callbacks.request_handler = nxt_perl_psgi_request_handler; perl_init.callbacks.ready_handler = nxt_perl_psgi_ready_handler; perl_init.data = c; perl_init.ctx_data = &pctx; unit_ctx = nxt_unit_init(&perl_init); if (nxt_slow_path(unit_ctx == NULL)) { goto fail; } rc = nxt_unit_run(unit_ctx); nxt_perl_psgi_join_threads(unit_ctx, c); nxt_unit_done(unit_ctx); nxt_perl_psgi_ctx_free(&pctx); PERL_SYS_TERM(); exit(rc); return NXT_OK; fail: nxt_perl_psgi_join_threads(NULL, c); nxt_perl_psgi_ctx_free(&pctx); PERL_SYS_TERM(); return NXT_ERROR; } static void nxt_perl_psgi_request_handler(nxt_unit_request_info_t *req) { SV *env, *result; nxt_int_t rc; PerlInterpreter *my_perl; nxt_perl_psgi_ctx_t *pctx; pctx = req->ctx->data; my_perl = pctx->my_perl; pctx->req = req; /* * Create environ variable for perl sub "application". * > sub application { * > my ($environ) = @_; */ env = nxt_perl_psgi_env_create(my_perl, req); if (nxt_slow_path(env == NULL)) { nxt_unit_req_error(req, "PSGI: Failed to create 'env' for Perl Application"); nxt_unit_request_done(req, NXT_UNIT_ERROR); pctx->req = NULL; return; } /* Call perl sub and get result as SV*. */ result = nxt_perl_psgi_call_var_application(my_perl, env, pctx->app, req); if (nxt_fast_path(SvOK(result) != 0 && SvROK(result) != 0)) { if (SvTYPE(SvRV(result)) == SVt_PVAV) { rc = nxt_perl_psgi_result_array(my_perl, result, req); nxt_unit_request_done(req, rc); pctx->req = NULL; goto release; } if (SvTYPE(SvRV(result)) == SVt_PVCV) { nxt_perl_psgi_result_cb(my_perl, result, req); goto release; } } nxt_unit_req_error(req, "PSGI: An unexpected response was received " "from Perl Application"); nxt_unit_request_done(req, NXT_UNIT_ERROR); pctx->req = NULL; release: SvREFCNT_dec(result); SvREFCNT_dec(env); } static int nxt_perl_psgi_ready_handler(nxt_unit_ctx_t *ctx) { int res; uint32_t i; nxt_perl_app_conf_t *c; nxt_perl_psgi_ctx_t *pctx; c = ctx->unit->data; if (c->threads <= 1) { return NXT_UNIT_OK; } for (i = 0; i < c->threads - 1; i++) { pctx = &nxt_perl_psgi_ctxs[i]; pctx->ctx = ctx; res = pthread_create(&pctx->thread, nxt_perl_psgi_thread_attr, nxt_perl_psgi_thread_func, pctx); if (nxt_fast_path(res == 0)) { nxt_unit_debug(ctx, "thread #%d created", (int) (i + 1)); } else { nxt_unit_alert(ctx, "thread #%d create failed: %s (%d)", (int) (i + 1), strerror(res), res); return NXT_UNIT_ERROR; } } return NXT_UNIT_OK; } static void * nxt_perl_psgi_thread_func(void *data) { nxt_unit_ctx_t *ctx; nxt_perl_psgi_ctx_t *pctx; pctx = data; nxt_unit_debug(pctx->ctx, "worker thread start"); ctx = nxt_unit_ctx_alloc(pctx->ctx, pctx); if (nxt_slow_path(ctx == NULL)) { return NULL; } pctx->ctx = ctx; PERL_SET_CONTEXT(pctx->my_perl); (void) nxt_unit_run(ctx); nxt_unit_done(ctx); nxt_unit_debug(NULL, "worker thread end"); return NULL; } static int nxt_perl_psgi_init_threads(nxt_perl_app_conf_t *c) { int rc; uint32_t i; static pthread_attr_t attr; if (c->threads <= 1) { return NXT_UNIT_OK; } if (c->thread_stack_size > 0) { rc = pthread_attr_init(&attr); if (nxt_slow_path(rc != 0)) { nxt_unit_alert(NULL, "thread attr init failed: %s (%d)", strerror(rc), rc); return NXT_UNIT_ERROR; } rc = pthread_attr_setstacksize(&attr, c->thread_stack_size); if (nxt_slow_path(rc != 0)) { nxt_unit_alert(NULL, "thread attr set stack size failed: %s (%d)", strerror(rc), rc); return NXT_UNIT_ERROR; } nxt_perl_psgi_thread_attr = &attr; } nxt_perl_psgi_ctxs = nxt_unit_malloc(NULL, sizeof(nxt_perl_psgi_ctx_t) * (c->threads - 1)); if (nxt_slow_path(nxt_perl_psgi_ctxs == NULL)) { return NXT_UNIT_ERROR; } memset(nxt_perl_psgi_ctxs, 0, sizeof(nxt_perl_psgi_ctx_t) * (c->threads - 1)); for (i = 0; i < c->threads - 1; i++) { rc = nxt_perl_psgi_ctx_init(c->script, &nxt_perl_psgi_ctxs[i]); if (nxt_slow_path(rc != NXT_UNIT_OK)) { return NXT_UNIT_ERROR; } } return NXT_UNIT_OK; } static void nxt_perl_psgi_join_threads(nxt_unit_ctx_t *ctx, nxt_perl_app_conf_t *c) { int res; uint32_t i; nxt_perl_psgi_ctx_t *pctx; if (nxt_perl_psgi_ctxs == NULL) { return; } for (i = 0; i < c->threads - 1; i++) { pctx = &nxt_perl_psgi_ctxs[i]; res = pthread_join(pctx->thread, NULL); if (nxt_fast_path(res == 0)) { nxt_unit_debug(ctx, "thread #%d joined", (int) (i + 1)); } else { nxt_unit_alert(ctx, "thread #%d join failed: %s (%d)", (int) (i + 1), strerror(res), res); } } for (i = 0; i < c->threads - 1; i++) { nxt_perl_psgi_ctx_free(&nxt_perl_psgi_ctxs[i]); } nxt_unit_free(NULL, nxt_perl_psgi_ctxs); } static void nxt_perl_psgi_ctx_free(nxt_perl_psgi_ctx_t *pctx) { PerlInterpreter *my_perl; my_perl = pctx->my_perl; if (nxt_slow_path(my_perl == NULL)) { return; } PERL_SET_CONTEXT(my_perl); SvREFCNT_dec(pctx->arg_input.rv); SvREFCNT_dec(pctx->arg_error.rv); nxt_perl_psgi_io_release(my_perl, &pctx->arg_input); nxt_perl_psgi_io_release(my_perl, &pctx->arg_error); perl_destruct(my_perl); perl_free(my_perl); }