diff options
author | Max Romanov <max.romanov@nginx.com> | 2020-11-05 12:45:10 +0300 |
---|---|---|
committer | Max Romanov <max.romanov@nginx.com> | 2020-11-05 12:45:10 +0300 |
commit | b6475df79cd14d80c794abfc9d1edbcebbe86f2c (patch) | |
tree | 2bf4a06f070793b5210bdde4a431135bc05cba54 | |
parent | 9f8b746e776031e6eef6dea84c8dafbb8c24c725 (diff) | |
download | unit-b6475df79cd14d80c794abfc9d1edbcebbe86f2c.tar.gz unit-b6475df79cd14d80c794abfc9d1edbcebbe86f2c.tar.bz2 |
Ruby: request processing in multiple threads.
This closes #482 issue on GitHub.
Diffstat (limited to '')
-rw-r--r-- | src/nxt_application.h | 1 | ||||
-rw-r--r-- | src/nxt_conf_validation.c | 4 | ||||
-rw-r--r-- | src/nxt_main_process.c | 5 | ||||
-rw-r--r-- | src/ruby/nxt_ruby.c | 647 | ||||
-rw-r--r-- | src/ruby/nxt_ruby.h | 8 | ||||
-rw-r--r-- | src/ruby/nxt_ruby_stream_io.c | 51 |
6 files changed, 510 insertions, 206 deletions
diff --git a/src/nxt_application.h b/src/nxt_application.h index 08de0fce..b8e18a23 100644 --- a/src/nxt_application.h +++ b/src/nxt_application.h @@ -69,6 +69,7 @@ typedef struct { typedef struct { nxt_str_t script; + uint32_t threads; } nxt_ruby_app_conf_t; diff --git a/src/nxt_conf_validation.c b/src/nxt_conf_validation.c index 1ce2d9ea..8dd9082d 100644 --- a/src/nxt_conf_validation.c +++ b/src/nxt_conf_validation.c @@ -615,6 +615,10 @@ static nxt_conf_vldt_object_t nxt_conf_vldt_ruby_members[] = { .name = nxt_string("script"), .type = NXT_CONF_VLDT_STRING, .flags = NXT_CONF_VLDT_REQUIRED, + }, { + .name = nxt_string("threads"), + .type = NXT_CONF_VLDT_INTEGER, + .validator = nxt_conf_vldt_threads, }, NXT_CONF_VLDT_NEXT(nxt_conf_vldt_common_members) diff --git a/src/nxt_main_process.c b/src/nxt_main_process.c index ce8d6916..631b3146 100644 --- a/src/nxt_main_process.c +++ b/src/nxt_main_process.c @@ -242,6 +242,11 @@ static nxt_conf_map_t nxt_ruby_app_conf[] = { NXT_CONF_MAP_STR, offsetof(nxt_common_app_conf_t, u.ruby.script), }, + { + nxt_string("threads"), + NXT_CONF_MAP_INT32, + offsetof(nxt_common_app_conf_t, u.ruby.threads), + }, }; diff --git a/src/ruby/nxt_ruby.c b/src/ruby/nxt_ruby.c index 7b383557..ffc1469a 100644 --- a/src/ruby/nxt_ruby.c +++ b/src/ruby/nxt_ruby.c @@ -8,6 +8,8 @@ #include <nxt_unit.h> #include <nxt_unit_request.h> +#include <ruby/thread.h> + #include NXT_RUBY_MOUNTS_H @@ -16,16 +18,15 @@ typedef struct { - nxt_task_t *task; - nxt_str_t *script; - VALUE builder; + nxt_task_t *task; + nxt_str_t *script; + nxt_ruby_ctx_t *rctx; } nxt_ruby_rack_init_t; static nxt_int_t nxt_ruby_start(nxt_task_t *task, nxt_process_data_t *data); static VALUE nxt_ruby_init_basic(VALUE arg); -static nxt_int_t nxt_ruby_init_io(nxt_task_t *task); static VALUE nxt_ruby_rack_init(nxt_ruby_rack_init_t *rack_init); static VALUE nxt_ruby_require_rubygems(VALUE arg); @@ -33,24 +34,40 @@ static VALUE nxt_ruby_bundler_setup(VALUE arg); static VALUE nxt_ruby_require_rack(VALUE arg); static VALUE nxt_ruby_rack_parse_script(VALUE ctx); static VALUE nxt_ruby_rack_env_create(VALUE arg); +static int nxt_ruby_init_io(nxt_ruby_ctx_t *rctx); static void nxt_ruby_request_handler(nxt_unit_request_info_t *req); +static void *nxt_ruby_request_handler_gvl(void *req); +static int nxt_ruby_ready_handler(nxt_unit_ctx_t *ctx); +static VALUE nxt_ruby_thread_func(VALUE arg); +static void *nxt_ruby_unit_run(void *ctx); +static void nxt_ruby_ubf(void *ctx); +static int nxt_ruby_init_threads(nxt_ruby_app_conf_t *c); +static void nxt_ruby_join_threads(nxt_unit_ctx_t *ctx, + nxt_ruby_app_conf_t *c); static VALUE nxt_ruby_rack_app_run(VALUE arg); -static int nxt_ruby_read_request(VALUE hash_env); +static int nxt_ruby_read_request(nxt_unit_request_info_t *req, VALUE hash_env); nxt_inline void nxt_ruby_add_sptr(VALUE hash_env, VALUE name, nxt_unit_sptr_t *sptr, uint32_t len); -static nxt_int_t nxt_ruby_rack_result_status(VALUE result); -static int nxt_ruby_rack_result_headers(VALUE result, nxt_int_t status); +static nxt_int_t nxt_ruby_rack_result_status(nxt_unit_request_info_t *req, + VALUE result); +static int nxt_ruby_rack_result_headers(nxt_unit_request_info_t *req, + VALUE result, nxt_int_t status); static int nxt_ruby_hash_info(VALUE r_key, VALUE r_value, VALUE arg); static int nxt_ruby_hash_add(VALUE r_key, VALUE r_value, VALUE arg); -static int nxt_ruby_rack_result_body(VALUE result); -static int nxt_ruby_rack_result_body_file_write(VALUE filepath); +static int nxt_ruby_rack_result_body(nxt_unit_request_info_t *req, + VALUE result); +static int nxt_ruby_rack_result_body_file_write(nxt_unit_request_info_t *req, + VALUE filepath); +static void *nxt_ruby_response_write_cb(void *read_info); static VALUE nxt_ruby_rack_result_body_each(VALUE body, VALUE arg, int argc, const VALUE *argv, VALUE blockarg); +static void *nxt_ruby_response_write(void *body); -static void nxt_ruby_exception_log(nxt_task_t *task, uint32_t level, - const char *desc); +static void nxt_ruby_exception_log(nxt_unit_request_info_t *req, + uint32_t level, const char *desc); +static void nxt_ruby_ctx_done(nxt_ruby_ctx_t *rctx); static void nxt_ruby_atexit(void); @@ -58,12 +75,11 @@ static uint32_t compat[] = { NXT_VERNUM, NXT_DEBUG, }; -static VALUE nxt_ruby_rackup; -static VALUE nxt_ruby_call; -static VALUE nxt_ruby_env; -static VALUE nxt_ruby_io_input; -static VALUE nxt_ruby_io_error; -static nxt_ruby_run_ctx_t nxt_ruby_run_ctx; +static VALUE nxt_ruby_rackup; +static VALUE nxt_ruby_call; + +static uint32_t nxt_ruby_threads; +static nxt_ruby_ctx_t *nxt_ruby_ctxs; NXT_EXPORT nxt_app_module_t nxt_app_module = { sizeof(compat), @@ -169,79 +185,114 @@ nxt_ruby_start(nxt_task_t *task, nxt_process_data_t *data) { int state, rc; VALUE res; + nxt_ruby_ctx_t ruby_ctx; nxt_unit_ctx_t *unit_ctx; nxt_unit_init_t ruby_unit_init; + nxt_ruby_app_conf_t *c; nxt_ruby_rack_init_t rack_init; nxt_common_app_conf_t *conf; static char *argv[2] = { (char *) "NGINX_Unit", (char *) "-e0" }; conf = data->app; + c = &conf->u.ruby; + + nxt_ruby_threads = c->threads; RUBY_INIT_STACK ruby_init(); ruby_options(2, argv); ruby_script("NGINX_Unit"); + ruby_ctx.env = Qnil; + ruby_ctx.io_input = Qnil; + ruby_ctx.io_error = Qnil; + ruby_ctx.thread = Qnil; + ruby_ctx.ctx = NULL; + ruby_ctx.req = NULL; + rack_init.task = task; - rack_init.script = &conf->u.ruby.script; + rack_init.script = &c->script; + rack_init.rctx = &ruby_ctx; nxt_ruby_init_strings(); res = rb_protect(nxt_ruby_init_basic, (VALUE) (uintptr_t) &rack_init, &state); if (nxt_slow_path(res == Qnil || state != 0)) { - nxt_ruby_exception_log(task, NXT_LOG_ALERT, + nxt_ruby_exception_log(NULL, NXT_LOG_ALERT, "Failed to init basic variables"); return NXT_ERROR; } + nxt_ruby_call = Qnil; + nxt_ruby_rackup = nxt_ruby_rack_init(&rack_init); if (nxt_slow_path(nxt_ruby_rackup == Qnil)) { return NXT_ERROR; } + rb_gc_register_address(&nxt_ruby_rackup); + nxt_ruby_call = rb_intern("call"); if (nxt_slow_path(nxt_ruby_call == Qnil)) { nxt_alert(task, "Ruby: Unable to find rack entry point"); - return NXT_ERROR; + goto fail; } - nxt_ruby_env = rb_protect(nxt_ruby_rack_env_create, Qnil, &state); - if (nxt_slow_path(state != 0)) { - nxt_ruby_exception_log(task, NXT_LOG_ALERT, + rb_gc_register_address(&nxt_ruby_call); + + ruby_ctx.env = rb_protect(nxt_ruby_rack_env_create, + (VALUE) (uintptr_t) &ruby_ctx, &state); + if (nxt_slow_path(ruby_ctx.env == Qnil || state != 0)) { + nxt_ruby_exception_log(NULL, NXT_LOG_ALERT, "Failed to create 'environ' variable"); - return NXT_ERROR; + goto fail; } - rb_gc_register_address(&nxt_ruby_rackup); - rb_gc_register_address(&nxt_ruby_call); - rb_gc_register_address(&nxt_ruby_env); + rc = nxt_ruby_init_threads(c); + if (nxt_slow_path(rc == NXT_UNIT_ERROR)) { + goto fail; + } nxt_unit_default_init(task, &ruby_unit_init); ruby_unit_init.callbacks.request_handler = nxt_ruby_request_handler; + ruby_unit_init.callbacks.ready_handler = nxt_ruby_ready_handler; ruby_unit_init.shm_limit = conf->shm_limit; + ruby_unit_init.data = c; + ruby_unit_init.ctx_data = &ruby_ctx; unit_ctx = nxt_unit_init(&ruby_unit_init); if (nxt_slow_path(unit_ctx == NULL)) { - return NXT_ERROR; + goto fail; } - nxt_ruby_run_ctx.unit_ctx = unit_ctx; + rc = (intptr_t) rb_thread_call_without_gvl(nxt_ruby_unit_run, unit_ctx, + nxt_ruby_ubf, unit_ctx); - rc = nxt_unit_run(unit_ctx); + nxt_ruby_join_threads(unit_ctx, c); - nxt_ruby_atexit(); + nxt_unit_done(unit_ctx); - nxt_ruby_run_ctx.unit_ctx = NULL; + nxt_ruby_ctx_done(&ruby_ctx); - nxt_unit_done(unit_ctx); + nxt_ruby_atexit(); exit(rc); return NXT_OK; + +fail: + + nxt_ruby_join_threads(NULL, c); + + nxt_ruby_ctx_done(&ruby_ctx); + + nxt_ruby_atexit(); + + return NXT_ERROR; } @@ -249,7 +300,6 @@ static VALUE nxt_ruby_init_basic(VALUE arg) { int state; - nxt_int_t rc; nxt_ruby_rack_init_t *rack_init; rack_init = (nxt_ruby_rack_init_t *) (uintptr_t) arg; @@ -265,56 +315,19 @@ nxt_ruby_init_basic(VALUE arg) rb_funcall(rb_cObject, rb_intern("require"), 1, rb_str_new2("enc/trans/transdb")); - rc = nxt_ruby_init_io(rack_init->task); - if (nxt_slow_path(rc != NXT_OK)) { - return Qnil; - } - return arg; } -static nxt_int_t -nxt_ruby_init_io(nxt_task_t *task) -{ - VALUE rb, io_input, io_error; - - io_input = nxt_ruby_stream_io_input_init(); - rb = Data_Wrap_Struct(io_input, 0, 0, &nxt_ruby_run_ctx); - - nxt_ruby_io_input = rb_funcall(io_input, rb_intern("new"), 1, rb); - if (nxt_slow_path(nxt_ruby_io_input == Qnil)) { - nxt_alert(task, "Ruby: Failed to create environment 'rack.input' var"); - - return NXT_ERROR; - } - - io_error = nxt_ruby_stream_io_error_init(); - rb = Data_Wrap_Struct(io_error, 0, 0, &nxt_ruby_run_ctx); - - nxt_ruby_io_error = rb_funcall(io_error, rb_intern("new"), 1, rb); - if (nxt_slow_path(nxt_ruby_io_error == Qnil)) { - nxt_alert(task, "Ruby: Failed to create environment 'rack.error' var"); - - return NXT_ERROR; - } - - rb_gc_register_address(&nxt_ruby_io_input); - rb_gc_register_address(&nxt_ruby_io_error); - - return NXT_OK; -} - - static VALUE nxt_ruby_rack_init(nxt_ruby_rack_init_t *rack_init) { int state; - VALUE rack, rackup, err; + VALUE rackup, err; rb_protect(nxt_ruby_require_rubygems, Qnil, &state); if (nxt_slow_path(state != 0)) { - nxt_ruby_exception_log(rack_init->task, NXT_LOG_ALERT, + nxt_ruby_exception_log(NULL, NXT_LOG_ALERT, "Failed to require 'rubygems' package"); return Qnil; } @@ -324,7 +337,7 @@ nxt_ruby_rack_init(nxt_ruby_rack_init_t *rack_init) err = rb_errinfo(); if (rb_obj_is_kind_of(err, rb_eLoadError) == Qfalse) { - nxt_ruby_exception_log(rack_init->task, NXT_LOG_ALERT, + nxt_ruby_exception_log(NULL, NXT_LOG_ALERT, "Failed to require 'bundler/setup' package"); return Qnil; } @@ -334,18 +347,15 @@ nxt_ruby_rack_init(nxt_ruby_rack_init_t *rack_init) rb_protect(nxt_ruby_require_rack, Qnil, &state); if (nxt_slow_path(state != 0)) { - nxt_ruby_exception_log(rack_init->task, NXT_LOG_ALERT, + nxt_ruby_exception_log(NULL, NXT_LOG_ALERT, "Failed to require 'rack' package"); return Qnil; } - rack = rb_const_get(rb_cObject, rb_intern("Rack")); - rack_init->builder = rb_const_get(rack, rb_intern("Builder")); - rackup = rb_protect(nxt_ruby_rack_parse_script, (VALUE) (uintptr_t) rack_init, &state); if (nxt_slow_path(TYPE(rackup) != T_ARRAY || state != 0)) { - nxt_ruby_exception_log(rack_init->task, NXT_LOG_ALERT, + nxt_ruby_exception_log(NULL, NXT_LOG_ALERT, "Failed to parse rack script"); return Qnil; } @@ -385,15 +395,18 @@ nxt_ruby_require_rack(VALUE arg) static VALUE nxt_ruby_rack_parse_script(VALUE ctx) { - VALUE script, res; + VALUE script, res, rack, builder; nxt_ruby_rack_init_t *rack_init; rack_init = (nxt_ruby_rack_init_t *) (uintptr_t) ctx; + rack = rb_const_get(rb_cObject, rb_intern("Rack")); + builder = rb_const_get(rack, rb_intern("Builder")); + script = rb_str_new((const char *) rack_init->script->start, (long) rack_init->script->length); - res = rb_funcall(rack_init->builder, rb_intern("parse_file"), 1, script); + res = rb_funcall(builder, rb_intern("parse_file"), 1, script); rb_str_free(script); @@ -404,7 +417,16 @@ nxt_ruby_rack_parse_script(VALUE ctx) static VALUE nxt_ruby_rack_env_create(VALUE arg) { - VALUE hash_env, version; + int rc; + VALUE hash_env, version; + nxt_ruby_ctx_t *rctx; + + rctx = (nxt_ruby_ctx_t *) (uintptr_t) arg; + + rc = nxt_ruby_init_io(rctx); + if (nxt_slow_path(rc != NXT_UNIT_OK)) { + return Qnil; + } hash_env = rb_hash_new(); @@ -418,47 +440,114 @@ nxt_ruby_rack_env_create(VALUE arg) rb_ary_push(version, UINT2NUM(NXT_RUBY_RACK_API_VERSION_MINOR)); rb_hash_aset(hash_env, rb_str_new2("rack.version"), version); - rb_hash_aset(hash_env, rb_str_new2("rack.input"), nxt_ruby_io_input); - rb_hash_aset(hash_env, rb_str_new2("rack.errors"), nxt_ruby_io_error); - rb_hash_aset(hash_env, rb_str_new2("rack.multithread"), Qfalse); + rb_hash_aset(hash_env, rb_str_new2("rack.input"), rctx->io_input); + rb_hash_aset(hash_env, rb_str_new2("rack.errors"), rctx->io_error); + rb_hash_aset(hash_env, rb_str_new2("rack.multithread"), + nxt_ruby_threads > 1 ? Qtrue : Qfalse); rb_hash_aset(hash_env, rb_str_new2("rack.multiprocess"), Qtrue); rb_hash_aset(hash_env, rb_str_new2("rack.run_once"), Qfalse); rb_hash_aset(hash_env, rb_str_new2("rack.hijack?"), Qfalse); rb_hash_aset(hash_env, rb_str_new2("rack.hijack"), Qnil); rb_hash_aset(hash_env, rb_str_new2("rack.hijack_io"), Qnil); + rctx->env = hash_env; + + rb_gc_register_address(&rctx->env); + return hash_env; } +static int +nxt_ruby_init_io(nxt_ruby_ctx_t *rctx) +{ + VALUE io_input, io_error; + + io_input = nxt_ruby_stream_io_input_init(); + + rctx->io_input = rb_funcall(io_input, rb_intern("new"), 1, + (VALUE) (uintptr_t) rctx); + if (nxt_slow_path(rctx->io_input == Qnil)) { + nxt_unit_alert(NULL, + "Ruby: Failed to create environment 'rack.input' var"); + + return NXT_UNIT_ERROR; + } + + rb_gc_register_address(&rctx->io_input); + + io_error = nxt_ruby_stream_io_error_init(); + + rctx->io_error = rb_funcall(io_error, rb_intern("new"), 1, + (VALUE) (uintptr_t) rctx); + if (nxt_slow_path(rctx->io_error == Qnil)) { + nxt_unit_alert(NULL, + "Ruby: Failed to create environment 'rack.error' var"); + + return NXT_UNIT_ERROR; + } + + rb_gc_register_address(&rctx->io_error); + + return NXT_UNIT_OK; +} + + static void nxt_ruby_request_handler(nxt_unit_request_info_t *req) { - int state; - VALUE res; + (void) rb_thread_call_with_gvl(nxt_ruby_request_handler_gvl, req); +} + + +static void * +nxt_ruby_request_handler_gvl(void *data) +{ + int state; + VALUE res; + nxt_ruby_ctx_t *rctx; + nxt_unit_request_info_t *req; + + req = data; - nxt_ruby_run_ctx.req = req; + rctx = req->ctx->data; + rctx->req = req; - res = rb_protect(nxt_ruby_rack_app_run, Qnil, &state); + res = rb_protect(nxt_ruby_rack_app_run, (VALUE) (uintptr_t) req, &state); if (nxt_slow_path(res == Qnil || state != 0)) { - nxt_ruby_exception_log(NULL, NXT_LOG_ERR, + nxt_ruby_exception_log(req, NXT_LOG_ERR, "Failed to run ruby script"); + + nxt_unit_request_done(req, NXT_UNIT_ERROR); + + } else { + nxt_unit_request_done(req, NXT_UNIT_OK); } + + rctx->req = NULL; + + return NULL; } static VALUE nxt_ruby_rack_app_run(VALUE arg) { - int rc; - VALUE env, result; - nxt_int_t status; + int rc; + VALUE env, result; + nxt_int_t status; + nxt_ruby_ctx_t *rctx; + nxt_unit_request_info_t *req; - env = rb_hash_dup(nxt_ruby_env); + req = (nxt_unit_request_info_t *) arg; - rc = nxt_ruby_read_request(env); + rctx = req->ctx->data; + + env = rb_hash_dup(rctx->env); + + rc = nxt_ruby_read_request(req, env); if (nxt_slow_path(rc != NXT_UNIT_OK)) { - nxt_unit_req_alert(nxt_ruby_run_ctx.req, + nxt_unit_req_alert(req, "Ruby: Failed to process incoming request"); goto fail; @@ -466,50 +555,44 @@ nxt_ruby_rack_app_run(VALUE arg) result = rb_funcall(nxt_ruby_rackup, nxt_ruby_call, 1, env); if (nxt_slow_path(TYPE(result) != T_ARRAY)) { - nxt_unit_req_error(nxt_ruby_run_ctx.req, + nxt_unit_req_error(req, "Ruby: Invalid response format from application"); goto fail; } if (nxt_slow_path(RARRAY_LEN(result) != 3)) { - nxt_unit_req_error(nxt_ruby_run_ctx.req, + nxt_unit_req_error(req, "Ruby: Invalid response format from application. " "Need 3 entries [Status, Headers, Body]"); goto fail; } - status = nxt_ruby_rack_result_status(result); + status = nxt_ruby_rack_result_status(req, result); if (nxt_slow_path(status < 0)) { - nxt_unit_req_error(nxt_ruby_run_ctx.req, + nxt_unit_req_error(req, "Ruby: Invalid response status from application."); goto fail; } - rc = nxt_ruby_rack_result_headers(result, status); + rc = nxt_ruby_rack_result_headers(req, result, status); if (nxt_slow_path(rc != NXT_UNIT_OK)) { goto fail; } - rc = nxt_ruby_rack_result_body(result); + rc = nxt_ruby_rack_result_body(req, result); if (nxt_slow_path(rc != NXT_UNIT_OK)) { goto fail; } - nxt_unit_request_done(nxt_ruby_run_ctx.req, rc); - nxt_ruby_run_ctx.req = NULL; - rb_hash_delete(env, rb_obj_id(env)); return result; fail: - nxt_unit_request_done(nxt_ruby_run_ctx.req, NXT_UNIT_ERROR); - nxt_ruby_run_ctx.req = NULL; - rb_hash_delete(env, rb_obj_id(env)); return Qnil; @@ -517,14 +600,14 @@ fail: static int -nxt_ruby_read_request(VALUE hash_env) +nxt_ruby_read_request(nxt_unit_request_info_t *req, VALUE hash_env) { VALUE name; uint32_t i; nxt_unit_field_t *f; nxt_unit_request_t *r; - r = nxt_ruby_run_ctx.req->request; + r = req->request; nxt_ruby_add_sptr(hash_env, nxt_rb_request_method_str, &r->method, r->method_length); @@ -586,7 +669,7 @@ nxt_ruby_add_sptr(VALUE hash_env, VALUE name, static nxt_int_t -nxt_ruby_rack_result_status(VALUE result) +nxt_ruby_rack_result_status(nxt_unit_request_info_t *req, VALUE result) { VALUE status; @@ -601,7 +684,7 @@ nxt_ruby_rack_result_status(VALUE result) RSTRING_LEN(status)); } - nxt_unit_req_error(nxt_ruby_run_ctx.req, "Ruby: Invalid response 'status' " + nxt_unit_req_error(req, "Ruby: Invalid response 'status' " "format from application"); return -2; @@ -609,14 +692,16 @@ nxt_ruby_rack_result_status(VALUE result) typedef struct { - int rc; - uint32_t fields; - uint32_t size; + int rc; + uint32_t fields; + uint32_t size; + nxt_unit_request_info_t *req; } nxt_ruby_headers_info_t; static int -nxt_ruby_rack_result_headers(VALUE result, nxt_int_t status) +nxt_ruby_rack_result_headers(nxt_unit_request_info_t *req, VALUE result, + nxt_int_t status) { int rc; VALUE headers; @@ -624,7 +709,7 @@ nxt_ruby_rack_result_headers(VALUE result, nxt_int_t status) headers = rb_ary_entry(result, 1); if (nxt_slow_path(TYPE(headers) != T_HASH)) { - nxt_unit_req_error(nxt_ruby_run_ctx.req, + nxt_unit_req_error(req, "Ruby: Invalid response 'headers' format from " "application"); @@ -636,6 +721,7 @@ nxt_ruby_rack_result_headers(VALUE result, nxt_int_t status) headers_info.rc = NXT_UNIT_OK; headers_info.fields = 0; headers_info.size = 0; + headers_info.req = req; rb_hash_foreach(headers, nxt_ruby_hash_info, (VALUE) (uintptr_t) &headers_info); @@ -643,13 +729,14 @@ nxt_ruby_rack_result_headers(VALUE result, nxt_int_t status) return headers_info.rc; } - rc = nxt_unit_response_init(nxt_ruby_run_ctx.req, status, + rc = nxt_unit_response_init(req, status, headers_info.fields, headers_info.size); if (nxt_slow_path(rc != NXT_UNIT_OK)) { return rc; } - rb_hash_foreach(headers, nxt_ruby_hash_add, (VALUE) (uintptr_t) &rc); + rb_hash_foreach(headers, nxt_ruby_hash_add, + (VALUE) (uintptr_t) &headers_info); return rc; } @@ -664,14 +751,14 @@ nxt_ruby_hash_info(VALUE r_key, VALUE r_value, VALUE arg) headers_info = (void *) (uintptr_t) arg; if (nxt_slow_path(TYPE(r_key) != T_STRING)) { - nxt_unit_req_error(nxt_ruby_run_ctx.req, + nxt_unit_req_error(headers_info->req, "Ruby: Wrong header entry 'key' from application"); goto fail; } if (nxt_slow_path(TYPE(r_value) != T_STRING)) { - nxt_unit_req_error(nxt_ruby_run_ctx.req, + nxt_unit_req_error(headers_info->req, "Ruby: Wrong header entry 'value' from application"); goto fail; @@ -714,11 +801,13 @@ fail: static int nxt_ruby_hash_add(VALUE r_key, VALUE r_value, VALUE arg) { - int *rc; - uint32_t key_len; - const char *value, *value_end, *pos; + int *rc; + uint32_t key_len; + const char *value, *value_end, *pos; + nxt_ruby_headers_info_t *headers_info; - rc = (int *) (uintptr_t) arg; + headers_info = (void *) (uintptr_t) arg; + rc = &headers_info->rc; value = RSTRING_PTR(r_value); value_end = value + RSTRING_LEN(r_value); @@ -734,7 +823,7 @@ nxt_ruby_hash_add(VALUE r_key, VALUE r_value, VALUE arg) break; } - *rc = nxt_unit_response_add_field(nxt_ruby_run_ctx.req, + *rc = nxt_unit_response_add_field(headers_info->req, RSTRING_PTR(r_key), key_len, value, pos - value); if (nxt_slow_path(*rc != NXT_UNIT_OK)) { @@ -746,7 +835,7 @@ nxt_ruby_hash_add(VALUE r_key, VALUE r_value, VALUE arg) } if (value <= value_end) { - *rc = nxt_unit_response_add_field(nxt_ruby_run_ctx.req, + *rc = nxt_unit_response_add_field(headers_info->req, RSTRING_PTR(r_key), key_len, value, value_end - value); if (nxt_slow_path(*rc != NXT_UNIT_OK)) { @@ -765,7 +854,7 @@ fail: static int -nxt_ruby_rack_result_body(VALUE result) +nxt_ruby_rack_result_body(nxt_unit_request_info_t *req, VALUE result) { int rc; VALUE fn, body; @@ -776,24 +865,24 @@ nxt_ruby_rack_result_body(VALUE result) fn = rb_funcall(body, rb_intern("to_path"), 0); if (nxt_slow_path(TYPE(fn) != T_STRING)) { - nxt_unit_req_error(nxt_ruby_run_ctx.req, + nxt_unit_req_error(req, "Ruby: Failed to get 'body' file path from " "application"); return NXT_UNIT_ERROR; } - rc = nxt_ruby_rack_result_body_file_write(fn); + rc = nxt_ruby_rack_result_body_file_write(req, fn); if (nxt_slow_path(rc != NXT_UNIT_OK)) { return rc; } } else if (rb_respond_to(body, rb_intern("each"))) { rb_block_call(body, rb_intern("each"), 0, 0, - nxt_ruby_rack_result_body_each, 0); + nxt_ruby_rack_result_body_each, (VALUE) (uintptr_t) req); } else { - nxt_unit_req_error(nxt_ruby_run_ctx.req, + nxt_unit_req_error(req, "Ruby: Invalid response 'body' format " "from application"); @@ -842,17 +931,24 @@ nxt_ruby_rack_file_read(nxt_unit_read_info_t *read_info, void *dst, size_t size) } +typedef struct { + nxt_unit_read_info_t read_info; + nxt_unit_request_info_t *req; +} nxt_ruby_read_info_t; + + static int -nxt_ruby_rack_result_body_file_write(VALUE filepath) +nxt_ruby_rack_result_body_file_write(nxt_unit_request_info_t *req, + VALUE filepath) { int fd, rc; struct stat finfo; nxt_ruby_rack_file_t ruby_file; - nxt_unit_read_info_t read_info; + nxt_ruby_read_info_t ri; fd = open(RSTRING_PTR(filepath), O_RDONLY, 0); if (nxt_slow_path(fd == -1)) { - nxt_unit_req_error(nxt_ruby_run_ctx.req, + nxt_unit_req_error(req, "Ruby: Failed to open content file \"%s\": %s (%d)", RSTRING_PTR(filepath), strerror(errno), errno); @@ -861,7 +957,7 @@ nxt_ruby_rack_result_body_file_write(VALUE filepath) rc = fstat(fd, &finfo); if (nxt_slow_path(rc == -1)) { - nxt_unit_req_error(nxt_ruby_run_ctx.req, + nxt_unit_req_error(req, "Ruby: Content file fstat(\"%s\") failed: %s (%d)", RSTRING_PTR(filepath), strerror(errno), errno); @@ -874,16 +970,16 @@ nxt_ruby_rack_result_body_file_write(VALUE filepath) ruby_file.pos = 0; ruby_file.rest = finfo.st_size; - read_info.read = nxt_ruby_rack_file_read; - read_info.eof = ruby_file.rest == 0; - read_info.buf_size = ruby_file.rest; - read_info.data = &ruby_file; + ri.read_info.read = nxt_ruby_rack_file_read; + ri.read_info.eof = ruby_file.rest == 0; + ri.read_info.buf_size = ruby_file.rest; + ri.read_info.data = &ruby_file; + ri.req = req; - rc = nxt_unit_response_write_cb(nxt_ruby_run_ctx.req, &read_info); - if (nxt_slow_path(rc != NXT_UNIT_OK)) { - nxt_unit_req_error(nxt_ruby_run_ctx.req, - "Ruby: Failed to write content file."); - } + rc = (intptr_t) rb_thread_call_without_gvl(nxt_ruby_response_write_cb, + &ri, + nxt_ruby_ubf, + req->ctx); close(fd); @@ -891,39 +987,77 @@ nxt_ruby_rack_result_body_file_write(VALUE filepath) } +static void * +nxt_ruby_response_write_cb(void *data) +{ + int rc; + nxt_ruby_read_info_t *ri; + + ri = data; + + rc = nxt_unit_response_write_cb(ri->req, &ri->read_info); + if (nxt_slow_path(rc != NXT_UNIT_OK)) { + nxt_unit_req_error(ri->req, "Ruby: Failed to write content file."); + } + + return (void *) (intptr_t) rc; +} + + +typedef struct { + VALUE body; + nxt_unit_request_info_t *req; +} nxt_ruby_write_info_t; + + static VALUE nxt_ruby_rack_result_body_each(VALUE body, VALUE arg, int argc, const VALUE *argv, VALUE blockarg) { - int rc; + nxt_ruby_write_info_t wi; if (TYPE(body) != T_STRING) { return Qnil; } - rc = nxt_unit_response_write(nxt_ruby_run_ctx.req, RSTRING_PTR(body), - RSTRING_LEN(body)); + wi.body = body; + wi.req = (void *) (uintptr_t) arg; + + (void) rb_thread_call_without_gvl(nxt_ruby_response_write, + (void *) (uintptr_t) &wi, + nxt_ruby_ubf, wi.req->ctx); + + return Qnil; +} + + +static void * +nxt_ruby_response_write(void *data) +{ + int rc; + nxt_ruby_write_info_t *wi; + + wi = data; + + rc = nxt_unit_response_write(wi->req, RSTRING_PTR(wi->body), + RSTRING_LEN(wi->body)); if (nxt_slow_path(rc != NXT_UNIT_OK)) { - nxt_unit_req_error(nxt_ruby_run_ctx.req, + nxt_unit_req_error(wi->req, "Ruby: Failed to write 'body' from application"); } - return Qnil; + return (void *) (intptr_t) rc; } static void -nxt_ruby_exception_log(nxt_task_t *task, uint32_t level, const char *desc) +nxt_ruby_exception_log(nxt_unit_request_info_t *req, uint32_t level, + const char *desc) { int i; VALUE err, ary, eclass, msg; - if (task != NULL) { - nxt_log(task, level, "Ruby: %s", desc); - - } else { - nxt_unit_log(nxt_ruby_run_ctx.unit_ctx, level, "Ruby: %s", desc); - } + nxt_unit_req_log(req, level, "Ruby: %s", desc); err = rb_errinfo(); if (nxt_slow_path(err == Qnil)) { @@ -938,25 +1072,30 @@ nxt_ruby_exception_log(nxt_task_t *task, uint32_t level, const char *desc) eclass = rb_class_name(rb_class_of(err)); msg = rb_funcall(err, rb_intern("message"), 0); - if (task != NULL) { - nxt_log(task, level, "Ruby: %s: %s (%s)", - RSTRING_PTR(RARRAY_PTR(ary)[0]), - RSTRING_PTR(msg), RSTRING_PTR(eclass)); - - } else { - nxt_unit_log(nxt_ruby_run_ctx.unit_ctx, level, "Ruby: %s: %s (%s)", + nxt_unit_req_log(req, level, "Ruby: %s: %s (%s)", RSTRING_PTR(RARRAY_PTR(ary)[0]), RSTRING_PTR(msg), RSTRING_PTR(eclass)); - } for (i = 1; i < RARRAY_LEN(ary); i++) { - if (task != NULL) { - nxt_log(task, level, "from %s", RSTRING_PTR(RARRAY_PTR(ary)[i])); - - } else { - nxt_unit_log(nxt_ruby_run_ctx.unit_ctx, level, "from %s", + nxt_unit_req_log(req, level, "from %s", RSTRING_PTR(RARRAY_PTR(ary)[i])); - } + } +} + + +static void +nxt_ruby_ctx_done(nxt_ruby_ctx_t *rctx) +{ + if (rctx->io_input != Qnil) { + rb_gc_unregister_address(&rctx->io_input); + } + + if (rctx->io_error != Qnil) { + rb_gc_unregister_address(&rctx->io_error); + } + + if (rctx->env != Qnil) { + rb_gc_unregister_address(&rctx->env); } } @@ -964,14 +1103,170 @@ nxt_ruby_exception_log(nxt_task_t *task, uint32_t level, const char *desc) static void nxt_ruby_atexit(void) { - rb_gc_unregister_address(&nxt_ruby_io_input); - rb_gc_unregister_address(&nxt_ruby_io_error); + if (nxt_ruby_rackup != Qnil) { + rb_gc_unregister_address(&nxt_ruby_rackup); + } - rb_gc_unregister_address(&nxt_ruby_rackup); - rb_gc_unregister_address(&nxt_ruby_call); - rb_gc_unregister_address(&nxt_ruby_env); + if (nxt_ruby_call != Qnil) { + rb_gc_unregister_address(&nxt_ruby_call); + } nxt_ruby_done_strings(); ruby_cleanup(0); } + + +static int +nxt_ruby_ready_handler(nxt_unit_ctx_t *ctx) +{ + VALUE res; + uint32_t i; + nxt_ruby_ctx_t *rctx; + nxt_ruby_app_conf_t *c; + + /* Worker thread context. */ + if (!nxt_unit_is_main_ctx(ctx)) { + return NXT_UNIT_OK; + } + + c = ctx->unit->data; + + if (c->threads <= 1) { + return NXT_UNIT_OK; + } + + for (i = 0; i < c->threads - 1; i++) { + rctx = &nxt_ruby_ctxs[i]; + + rctx->ctx = ctx; + + res = rb_thread_create(RUBY_METHOD_FUNC(nxt_ruby_thread_func), rctx); + + if (nxt_fast_path(res != Qnil)) { + nxt_unit_debug(ctx, "thread #%d created", (int) (i + 1)); + + rctx->thread = res; + + } else { + nxt_unit_alert(ctx, "thread #%d create failed", (int) (i + 1)); + } + } + + return NXT_UNIT_OK; +} + + +static VALUE +nxt_ruby_thread_func(VALUE arg) +{ + nxt_unit_ctx_t *ctx; + nxt_ruby_ctx_t *rctx; + + rctx = (nxt_ruby_ctx_t *) (uintptr_t) arg; + + nxt_unit_debug(rctx->ctx, "worker thread start"); + + ctx = nxt_unit_ctx_alloc(rctx->ctx, rctx); + if (nxt_slow_path(ctx == NULL)) { + goto fail; + } + + (void) rb_thread_call_without_gvl(nxt_ruby_unit_run, ctx, + nxt_ruby_ubf, ctx); + + nxt_unit_done(ctx); + +fail: + + nxt_unit_debug(NULL, "worker thread end"); + + return Qnil; +} + + +static void * +nxt_ruby_unit_run(void *ctx) +{ + return (void *) (intptr_t) nxt_unit_run(ctx); +} + + +static void +nxt_ruby_ubf(void *ctx) +{ + nxt_unit_warn(ctx, "Ruby: UBF"); +} + + +static int +nxt_ruby_init_threads(nxt_ruby_app_conf_t *c) +{ + int state; + uint32_t i; + nxt_ruby_ctx_t *rctx; + + if (c->threads <= 1) { + return NXT_UNIT_OK; + } + + nxt_ruby_ctxs = nxt_unit_malloc(NULL, sizeof(nxt_ruby_ctx_t) + * (c->threads - 1)); + if (nxt_slow_path(nxt_ruby_ctxs == NULL)) { + nxt_unit_alert(NULL, "Failed to allocate run contexts array"); + + return NXT_UNIT_ERROR; + } + + for (i = 0; i < c->threads - 1; i++) { + rctx = &nxt_ruby_ctxs[i]; + + rctx->env = Qnil; + rctx->io_input = Qnil; + rctx->io_error = Qnil; + rctx->thread = Qnil; + } + + for (i = 0; i < c->threads - 1; i++) { + rctx = &nxt_ruby_ctxs[i]; + + rctx->env = rb_protect(nxt_ruby_rack_env_create, + (VALUE) (uintptr_t) rctx, &state); + if (nxt_slow_path(rctx->env == Qnil || state != 0)) { + nxt_ruby_exception_log(NULL, NXT_LOG_ALERT, + "Failed to create 'environ' variable"); + return NXT_UNIT_ERROR; + } + } + + return NXT_UNIT_OK; +} + + +static void +nxt_ruby_join_threads(nxt_unit_ctx_t *ctx, nxt_ruby_app_conf_t *c) +{ + uint32_t i; + nxt_ruby_ctx_t *rctx; + + if (nxt_ruby_ctxs == NULL) { + return; + } + + for(i = 0; i < c->threads - 1; i++) { + rctx = &nxt_ruby_ctxs[i]; + + if (rctx->thread != Qnil) { + rb_funcall(rctx->thread, rb_intern("join"), 0); + + nxt_unit_debug(ctx, "thread #%d joined", (int) (i + 1)); + + } else { + nxt_unit_debug(ctx, "thread #%d not started", (int) (i + 1)); + } + + nxt_ruby_ctx_done(rctx); + } + + nxt_unit_free(ctx, nxt_ruby_ctxs); +} diff --git a/src/ruby/nxt_ruby.h b/src/ruby/nxt_ruby.h index 77a65894..26430021 100644 --- a/src/ruby/nxt_ruby.h +++ b/src/ruby/nxt_ruby.h @@ -21,9 +21,13 @@ typedef struct { - nxt_unit_ctx_t *unit_ctx; + VALUE env; + VALUE io_input; + VALUE io_error; + VALUE thread; + nxt_unit_ctx_t *ctx; nxt_unit_request_info_t *req; -} nxt_ruby_run_ctx_t; +} nxt_ruby_ctx_t; VALUE nxt_ruby_stream_io_input_init(void); diff --git a/src/ruby/nxt_ruby_stream_io.c b/src/ruby/nxt_ruby_stream_io.c index cc110035..69bf289e 100644 --- a/src/ruby/nxt_ruby_stream_io.c +++ b/src/ruby/nxt_ruby_stream_io.c @@ -8,7 +8,7 @@ #include <nxt_unit.h> -static VALUE nxt_ruby_stream_io_new(VALUE class, VALUE wrap); +static VALUE nxt_ruby_stream_io_new(VALUE class, VALUE arg); static VALUE nxt_ruby_stream_io_initialize(int argc, VALUE *argv, VALUE self); static VALUE nxt_ruby_stream_io_gets(VALUE obj); static VALUE nxt_ruby_stream_io_each(VALUE obj); @@ -16,8 +16,7 @@ static VALUE nxt_ruby_stream_io_read(VALUE obj, VALUE args); static VALUE nxt_ruby_stream_io_rewind(VALUE obj); static VALUE nxt_ruby_stream_io_puts(VALUE obj, VALUE args); static VALUE nxt_ruby_stream_io_write(VALUE obj, VALUE args); -nxt_inline long nxt_ruby_stream_io_s_write(nxt_ruby_run_ctx_t *run_ctx, - VALUE val); +nxt_inline long nxt_ruby_stream_io_s_write(nxt_ruby_ctx_t *rctx, VALUE val); static VALUE nxt_ruby_stream_io_flush(VALUE obj); @@ -63,13 +62,11 @@ nxt_ruby_stream_io_error_init(void) static VALUE -nxt_ruby_stream_io_new(VALUE class, VALUE wrap) +nxt_ruby_stream_io_new(VALUE class, VALUE arg) { - VALUE self; - nxt_ruby_run_ctx_t *run_ctx; + VALUE self; - Data_Get_Struct(wrap, nxt_ruby_run_ctx_t, run_ctx); - self = Data_Wrap_Struct(class, 0, 0, run_ctx); + self = Data_Wrap_Struct(class, 0, 0, (void *) (uintptr_t) arg); rb_obj_call_init(self, 0, NULL); @@ -89,12 +86,11 @@ nxt_ruby_stream_io_gets(VALUE obj) { VALUE buf; ssize_t res; - nxt_ruby_run_ctx_t *run_ctx; + nxt_ruby_ctx_t *rctx; nxt_unit_request_info_t *req; - Data_Get_Struct(obj, nxt_ruby_run_ctx_t, run_ctx); - - req = run_ctx->req; + Data_Get_Struct(obj, nxt_ruby_ctx_t, rctx); + req = rctx->req; if (req->content_length == 0) { return Qnil; @@ -145,13 +141,13 @@ nxt_ruby_stream_io_each(VALUE obj) static VALUE nxt_ruby_stream_io_read(VALUE obj, VALUE args) { - VALUE buf; - long copy_size, u_size; - nxt_ruby_run_ctx_t *run_ctx; + VALUE buf; + long copy_size, u_size; + nxt_ruby_ctx_t *rctx; - Data_Get_Struct(obj, nxt_ruby_run_ctx_t, run_ctx); + Data_Get_Struct(obj, nxt_ruby_ctx_t, rctx); - copy_size = run_ctx->req->content_length; + copy_size = rctx->req->content_length; if (RARRAY_LEN(args) > 0 && TYPE(RARRAY_PTR(args)[0]) == T_FIXNUM) { u_size = NUM2LONG(RARRAY_PTR(args)[0]); @@ -175,8 +171,7 @@ nxt_ruby_stream_io_read(VALUE obj, VALUE args) return Qnil; } - copy_size = nxt_unit_request_read(run_ctx->req, RSTRING_PTR(buf), - copy_size); + copy_size = nxt_unit_request_read(rctx->req, RSTRING_PTR(buf), copy_size); if (RARRAY_LEN(args) > 1 && TYPE(RARRAY_PTR(args)[1]) == T_STRING) { @@ -200,15 +195,15 @@ nxt_ruby_stream_io_rewind(VALUE obj) static VALUE nxt_ruby_stream_io_puts(VALUE obj, VALUE args) { - nxt_ruby_run_ctx_t *run_ctx; + nxt_ruby_ctx_t *rctx; if (RARRAY_LEN(args) != 1) { return Qnil; } - Data_Get_Struct(obj, nxt_ruby_run_ctx_t, run_ctx); + Data_Get_Struct(obj, nxt_ruby_ctx_t, rctx); - nxt_ruby_stream_io_s_write(run_ctx, RARRAY_PTR(args)[0]); + nxt_ruby_stream_io_s_write(rctx, RARRAY_PTR(args)[0]); return Qnil; } @@ -217,23 +212,23 @@ nxt_ruby_stream_io_puts(VALUE obj, VALUE args) static VALUE nxt_ruby_stream_io_write(VALUE obj, VALUE args) { - long len; - nxt_ruby_run_ctx_t *run_ctx; + long len; + nxt_ruby_ctx_t *rctx; if (RARRAY_LEN(args) != 1) { return Qnil; } - Data_Get_Struct(obj, nxt_ruby_run_ctx_t, run_ctx); + Data_Get_Struct(obj, nxt_ruby_ctx_t, rctx); - len = nxt_ruby_stream_io_s_write(run_ctx, RARRAY_PTR(args)[0]); + len = nxt_ruby_stream_io_s_write(rctx, RARRAY_PTR(args)[0]); return LONG2FIX(len); } nxt_inline long -nxt_ruby_stream_io_s_write(nxt_ruby_run_ctx_t *run_ctx, VALUE val) +nxt_ruby_stream_io_s_write(nxt_ruby_ctx_t *rctx, VALUE val) { if (nxt_slow_path(val == Qnil)) { return 0; @@ -247,7 +242,7 @@ nxt_ruby_stream_io_s_write(nxt_ruby_run_ctx_t *run_ctx, VALUE val) } } - nxt_unit_req_error(run_ctx->req, "Ruby: %s", RSTRING_PTR(val)); + nxt_unit_req_error(rctx->req, "Ruby: %s", RSTRING_PTR(val)); return RSTRING_LEN(val); } |