summaryrefslogtreecommitdiffhomepage
path: root/src/ruby
diff options
context:
space:
mode:
authorMax Romanov <max.romanov@nginx.com>2020-11-05 12:45:10 +0300
committerMax Romanov <max.romanov@nginx.com>2020-11-05 12:45:10 +0300
commitb6475df79cd14d80c794abfc9d1edbcebbe86f2c (patch)
tree2bf4a06f070793b5210bdde4a431135bc05cba54 /src/ruby
parent9f8b746e776031e6eef6dea84c8dafbb8c24c725 (diff)
downloadunit-b6475df79cd14d80c794abfc9d1edbcebbe86f2c.tar.gz
unit-b6475df79cd14d80c794abfc9d1edbcebbe86f2c.tar.bz2
Ruby: request processing in multiple threads.
This closes #482 issue on GitHub.
Diffstat (limited to '')
-rw-r--r--src/ruby/nxt_ruby.c647
-rw-r--r--src/ruby/nxt_ruby.h8
-rw-r--r--src/ruby/nxt_ruby_stream_io.c51
3 files changed, 500 insertions, 206 deletions
diff --git a/src/ruby/nxt_ruby.c b/src/ruby/nxt_ruby.c
index 7b383557..ffc1469a 100644
--- a/src/ruby/nxt_ruby.c
+++ b/src/ruby/nxt_ruby.c
@@ -8,6 +8,8 @@
#include <nxt_unit.h>
#include <nxt_unit_request.h>
+#include <ruby/thread.h>
+
#include NXT_RUBY_MOUNTS_H
@@ -16,16 +18,15 @@
typedef struct {
- nxt_task_t *task;
- nxt_str_t *script;
- VALUE builder;
+ nxt_task_t *task;
+ nxt_str_t *script;
+ nxt_ruby_ctx_t *rctx;
} nxt_ruby_rack_init_t;
static nxt_int_t nxt_ruby_start(nxt_task_t *task,
nxt_process_data_t *data);
static VALUE nxt_ruby_init_basic(VALUE arg);
-static nxt_int_t nxt_ruby_init_io(nxt_task_t *task);
static VALUE nxt_ruby_rack_init(nxt_ruby_rack_init_t *rack_init);
static VALUE nxt_ruby_require_rubygems(VALUE arg);
@@ -33,24 +34,40 @@ static VALUE nxt_ruby_bundler_setup(VALUE arg);
static VALUE nxt_ruby_require_rack(VALUE arg);
static VALUE nxt_ruby_rack_parse_script(VALUE ctx);
static VALUE nxt_ruby_rack_env_create(VALUE arg);
+static int nxt_ruby_init_io(nxt_ruby_ctx_t *rctx);
static void nxt_ruby_request_handler(nxt_unit_request_info_t *req);
+static void *nxt_ruby_request_handler_gvl(void *req);
+static int nxt_ruby_ready_handler(nxt_unit_ctx_t *ctx);
+static VALUE nxt_ruby_thread_func(VALUE arg);
+static void *nxt_ruby_unit_run(void *ctx);
+static void nxt_ruby_ubf(void *ctx);
+static int nxt_ruby_init_threads(nxt_ruby_app_conf_t *c);
+static void nxt_ruby_join_threads(nxt_unit_ctx_t *ctx,
+ nxt_ruby_app_conf_t *c);
static VALUE nxt_ruby_rack_app_run(VALUE arg);
-static int nxt_ruby_read_request(VALUE hash_env);
+static int nxt_ruby_read_request(nxt_unit_request_info_t *req, VALUE hash_env);
nxt_inline void nxt_ruby_add_sptr(VALUE hash_env, VALUE name,
nxt_unit_sptr_t *sptr, uint32_t len);
-static nxt_int_t nxt_ruby_rack_result_status(VALUE result);
-static int nxt_ruby_rack_result_headers(VALUE result, nxt_int_t status);
+static nxt_int_t nxt_ruby_rack_result_status(nxt_unit_request_info_t *req,
+ VALUE result);
+static int nxt_ruby_rack_result_headers(nxt_unit_request_info_t *req,
+ VALUE result, nxt_int_t status);
static int nxt_ruby_hash_info(VALUE r_key, VALUE r_value, VALUE arg);
static int nxt_ruby_hash_add(VALUE r_key, VALUE r_value, VALUE arg);
-static int nxt_ruby_rack_result_body(VALUE result);
-static int nxt_ruby_rack_result_body_file_write(VALUE filepath);
+static int nxt_ruby_rack_result_body(nxt_unit_request_info_t *req,
+ VALUE result);
+static int nxt_ruby_rack_result_body_file_write(nxt_unit_request_info_t *req,
+ VALUE filepath);
+static void *nxt_ruby_response_write_cb(void *read_info);
static VALUE nxt_ruby_rack_result_body_each(VALUE body, VALUE arg,
int argc, const VALUE *argv, VALUE blockarg);
+static void *nxt_ruby_response_write(void *body);
-static void nxt_ruby_exception_log(nxt_task_t *task, uint32_t level,
- const char *desc);
+static void nxt_ruby_exception_log(nxt_unit_request_info_t *req,
+ uint32_t level, const char *desc);
+static void nxt_ruby_ctx_done(nxt_ruby_ctx_t *rctx);
static void nxt_ruby_atexit(void);
@@ -58,12 +75,11 @@ static uint32_t compat[] = {
NXT_VERNUM, NXT_DEBUG,
};
-static VALUE nxt_ruby_rackup;
-static VALUE nxt_ruby_call;
-static VALUE nxt_ruby_env;
-static VALUE nxt_ruby_io_input;
-static VALUE nxt_ruby_io_error;
-static nxt_ruby_run_ctx_t nxt_ruby_run_ctx;
+static VALUE nxt_ruby_rackup;
+static VALUE nxt_ruby_call;
+
+static uint32_t nxt_ruby_threads;
+static nxt_ruby_ctx_t *nxt_ruby_ctxs;
NXT_EXPORT nxt_app_module_t nxt_app_module = {
sizeof(compat),
@@ -169,79 +185,114 @@ nxt_ruby_start(nxt_task_t *task, nxt_process_data_t *data)
{
int state, rc;
VALUE res;
+ nxt_ruby_ctx_t ruby_ctx;
nxt_unit_ctx_t *unit_ctx;
nxt_unit_init_t ruby_unit_init;
+ nxt_ruby_app_conf_t *c;
nxt_ruby_rack_init_t rack_init;
nxt_common_app_conf_t *conf;
static char *argv[2] = { (char *) "NGINX_Unit", (char *) "-e0" };
conf = data->app;
+ c = &conf->u.ruby;
+
+ nxt_ruby_threads = c->threads;
RUBY_INIT_STACK
ruby_init();
ruby_options(2, argv);
ruby_script("NGINX_Unit");
+ ruby_ctx.env = Qnil;
+ ruby_ctx.io_input = Qnil;
+ ruby_ctx.io_error = Qnil;
+ ruby_ctx.thread = Qnil;
+ ruby_ctx.ctx = NULL;
+ ruby_ctx.req = NULL;
+
rack_init.task = task;
- rack_init.script = &conf->u.ruby.script;
+ rack_init.script = &c->script;
+ rack_init.rctx = &ruby_ctx;
nxt_ruby_init_strings();
res = rb_protect(nxt_ruby_init_basic,
(VALUE) (uintptr_t) &rack_init, &state);
if (nxt_slow_path(res == Qnil || state != 0)) {
- nxt_ruby_exception_log(task, NXT_LOG_ALERT,
+ nxt_ruby_exception_log(NULL, NXT_LOG_ALERT,
"Failed to init basic variables");
return NXT_ERROR;
}
+ nxt_ruby_call = Qnil;
+
nxt_ruby_rackup = nxt_ruby_rack_init(&rack_init);
if (nxt_slow_path(nxt_ruby_rackup == Qnil)) {
return NXT_ERROR;
}
+ rb_gc_register_address(&nxt_ruby_rackup);
+
nxt_ruby_call = rb_intern("call");
if (nxt_slow_path(nxt_ruby_call == Qnil)) {
nxt_alert(task, "Ruby: Unable to find rack entry point");
- return NXT_ERROR;
+ goto fail;
}
- nxt_ruby_env = rb_protect(nxt_ruby_rack_env_create, Qnil, &state);
- if (nxt_slow_path(state != 0)) {
- nxt_ruby_exception_log(task, NXT_LOG_ALERT,
+ rb_gc_register_address(&nxt_ruby_call);
+
+ ruby_ctx.env = rb_protect(nxt_ruby_rack_env_create,
+ (VALUE) (uintptr_t) &ruby_ctx, &state);
+ if (nxt_slow_path(ruby_ctx.env == Qnil || state != 0)) {
+ nxt_ruby_exception_log(NULL, NXT_LOG_ALERT,
"Failed to create 'environ' variable");
- return NXT_ERROR;
+ goto fail;
}
- rb_gc_register_address(&nxt_ruby_rackup);
- rb_gc_register_address(&nxt_ruby_call);
- rb_gc_register_address(&nxt_ruby_env);
+ rc = nxt_ruby_init_threads(c);
+ if (nxt_slow_path(rc == NXT_UNIT_ERROR)) {
+ goto fail;
+ }
nxt_unit_default_init(task, &ruby_unit_init);
ruby_unit_init.callbacks.request_handler = nxt_ruby_request_handler;
+ ruby_unit_init.callbacks.ready_handler = nxt_ruby_ready_handler;
ruby_unit_init.shm_limit = conf->shm_limit;
+ ruby_unit_init.data = c;
+ ruby_unit_init.ctx_data = &ruby_ctx;
unit_ctx = nxt_unit_init(&ruby_unit_init);
if (nxt_slow_path(unit_ctx == NULL)) {
- return NXT_ERROR;
+ goto fail;
}
- nxt_ruby_run_ctx.unit_ctx = unit_ctx;
+ rc = (intptr_t) rb_thread_call_without_gvl(nxt_ruby_unit_run, unit_ctx,
+ nxt_ruby_ubf, unit_ctx);
- rc = nxt_unit_run(unit_ctx);
+ nxt_ruby_join_threads(unit_ctx, c);
- nxt_ruby_atexit();
+ nxt_unit_done(unit_ctx);
- nxt_ruby_run_ctx.unit_ctx = NULL;
+ nxt_ruby_ctx_done(&ruby_ctx);
- nxt_unit_done(unit_ctx);
+ nxt_ruby_atexit();
exit(rc);
return NXT_OK;
+
+fail:
+
+ nxt_ruby_join_threads(NULL, c);
+
+ nxt_ruby_ctx_done(&ruby_ctx);
+
+ nxt_ruby_atexit();
+
+ return NXT_ERROR;
}
@@ -249,7 +300,6 @@ static VALUE
nxt_ruby_init_basic(VALUE arg)
{
int state;
- nxt_int_t rc;
nxt_ruby_rack_init_t *rack_init;
rack_init = (nxt_ruby_rack_init_t *) (uintptr_t) arg;
@@ -265,56 +315,19 @@ nxt_ruby_init_basic(VALUE arg)
rb_funcall(rb_cObject, rb_intern("require"), 1,
rb_str_new2("enc/trans/transdb"));
- rc = nxt_ruby_init_io(rack_init->task);
- if (nxt_slow_path(rc != NXT_OK)) {
- return Qnil;
- }
-
return arg;
}
-static nxt_int_t
-nxt_ruby_init_io(nxt_task_t *task)
-{
- VALUE rb, io_input, io_error;
-
- io_input = nxt_ruby_stream_io_input_init();
- rb = Data_Wrap_Struct(io_input, 0, 0, &nxt_ruby_run_ctx);
-
- nxt_ruby_io_input = rb_funcall(io_input, rb_intern("new"), 1, rb);
- if (nxt_slow_path(nxt_ruby_io_input == Qnil)) {
- nxt_alert(task, "Ruby: Failed to create environment 'rack.input' var");
-
- return NXT_ERROR;
- }
-
- io_error = nxt_ruby_stream_io_error_init();
- rb = Data_Wrap_Struct(io_error, 0, 0, &nxt_ruby_run_ctx);
-
- nxt_ruby_io_error = rb_funcall(io_error, rb_intern("new"), 1, rb);
- if (nxt_slow_path(nxt_ruby_io_error == Qnil)) {
- nxt_alert(task, "Ruby: Failed to create environment 'rack.error' var");
-
- return NXT_ERROR;
- }
-
- rb_gc_register_address(&nxt_ruby_io_input);
- rb_gc_register_address(&nxt_ruby_io_error);
-
- return NXT_OK;
-}
-
-
static VALUE
nxt_ruby_rack_init(nxt_ruby_rack_init_t *rack_init)
{
int state;
- VALUE rack, rackup, err;
+ VALUE rackup, err;
rb_protect(nxt_ruby_require_rubygems, Qnil, &state);
if (nxt_slow_path(state != 0)) {
- nxt_ruby_exception_log(rack_init->task, NXT_LOG_ALERT,
+ nxt_ruby_exception_log(NULL, NXT_LOG_ALERT,
"Failed to require 'rubygems' package");
return Qnil;
}
@@ -324,7 +337,7 @@ nxt_ruby_rack_init(nxt_ruby_rack_init_t *rack_init)
err = rb_errinfo();
if (rb_obj_is_kind_of(err, rb_eLoadError) == Qfalse) {
- nxt_ruby_exception_log(rack_init->task, NXT_LOG_ALERT,
+ nxt_ruby_exception_log(NULL, NXT_LOG_ALERT,
"Failed to require 'bundler/setup' package");
return Qnil;
}
@@ -334,18 +347,15 @@ nxt_ruby_rack_init(nxt_ruby_rack_init_t *rack_init)
rb_protect(nxt_ruby_require_rack, Qnil, &state);
if (nxt_slow_path(state != 0)) {
- nxt_ruby_exception_log(rack_init->task, NXT_LOG_ALERT,
+ nxt_ruby_exception_log(NULL, NXT_LOG_ALERT,
"Failed to require 'rack' package");
return Qnil;
}
- rack = rb_const_get(rb_cObject, rb_intern("Rack"));
- rack_init->builder = rb_const_get(rack, rb_intern("Builder"));
-
rackup = rb_protect(nxt_ruby_rack_parse_script,
(VALUE) (uintptr_t) rack_init, &state);
if (nxt_slow_path(TYPE(rackup) != T_ARRAY || state != 0)) {
- nxt_ruby_exception_log(rack_init->task, NXT_LOG_ALERT,
+ nxt_ruby_exception_log(NULL, NXT_LOG_ALERT,
"Failed to parse rack script");
return Qnil;
}
@@ -385,15 +395,18 @@ nxt_ruby_require_rack(VALUE arg)
static VALUE
nxt_ruby_rack_parse_script(VALUE ctx)
{
- VALUE script, res;
+ VALUE script, res, rack, builder;
nxt_ruby_rack_init_t *rack_init;
rack_init = (nxt_ruby_rack_init_t *) (uintptr_t) ctx;
+ rack = rb_const_get(rb_cObject, rb_intern("Rack"));
+ builder = rb_const_get(rack, rb_intern("Builder"));
+
script = rb_str_new((const char *) rack_init->script->start,
(long) rack_init->script->length);
- res = rb_funcall(rack_init->builder, rb_intern("parse_file"), 1, script);
+ res = rb_funcall(builder, rb_intern("parse_file"), 1, script);
rb_str_free(script);
@@ -404,7 +417,16 @@ nxt_ruby_rack_parse_script(VALUE ctx)
static VALUE
nxt_ruby_rack_env_create(VALUE arg)
{
- VALUE hash_env, version;
+ int rc;
+ VALUE hash_env, version;
+ nxt_ruby_ctx_t *rctx;
+
+ rctx = (nxt_ruby_ctx_t *) (uintptr_t) arg;
+
+ rc = nxt_ruby_init_io(rctx);
+ if (nxt_slow_path(rc != NXT_UNIT_OK)) {
+ return Qnil;
+ }
hash_env = rb_hash_new();
@@ -418,47 +440,114 @@ nxt_ruby_rack_env_create(VALUE arg)
rb_ary_push(version, UINT2NUM(NXT_RUBY_RACK_API_VERSION_MINOR));
rb_hash_aset(hash_env, rb_str_new2("rack.version"), version);
- rb_hash_aset(hash_env, rb_str_new2("rack.input"), nxt_ruby_io_input);
- rb_hash_aset(hash_env, rb_str_new2("rack.errors"), nxt_ruby_io_error);
- rb_hash_aset(hash_env, rb_str_new2("rack.multithread"), Qfalse);
+ rb_hash_aset(hash_env, rb_str_new2("rack.input"), rctx->io_input);
+ rb_hash_aset(hash_env, rb_str_new2("rack.errors"), rctx->io_error);
+ rb_hash_aset(hash_env, rb_str_new2("rack.multithread"),
+ nxt_ruby_threads > 1 ? Qtrue : Qfalse);
rb_hash_aset(hash_env, rb_str_new2("rack.multiprocess"), Qtrue);
rb_hash_aset(hash_env, rb_str_new2("rack.run_once"), Qfalse);
rb_hash_aset(hash_env, rb_str_new2("rack.hijack?"), Qfalse);
rb_hash_aset(hash_env, rb_str_new2("rack.hijack"), Qnil);
rb_hash_aset(hash_env, rb_str_new2("rack.hijack_io"), Qnil);
+ rctx->env = hash_env;
+
+ rb_gc_register_address(&rctx->env);
+
return hash_env;
}
+static int
+nxt_ruby_init_io(nxt_ruby_ctx_t *rctx)
+{
+ VALUE io_input, io_error;
+
+ io_input = nxt_ruby_stream_io_input_init();
+
+ rctx->io_input = rb_funcall(io_input, rb_intern("new"), 1,
+ (VALUE) (uintptr_t) rctx);
+ if (nxt_slow_path(rctx->io_input == Qnil)) {
+ nxt_unit_alert(NULL,
+ "Ruby: Failed to create environment 'rack.input' var");
+
+ return NXT_UNIT_ERROR;
+ }
+
+ rb_gc_register_address(&rctx->io_input);
+
+ io_error = nxt_ruby_stream_io_error_init();
+
+ rctx->io_error = rb_funcall(io_error, rb_intern("new"), 1,
+ (VALUE) (uintptr_t) rctx);
+ if (nxt_slow_path(rctx->io_error == Qnil)) {
+ nxt_unit_alert(NULL,
+ "Ruby: Failed to create environment 'rack.error' var");
+
+ return NXT_UNIT_ERROR;
+ }
+
+ rb_gc_register_address(&rctx->io_error);
+
+ return NXT_UNIT_OK;
+}
+
+
static void
nxt_ruby_request_handler(nxt_unit_request_info_t *req)
{
- int state;
- VALUE res;
+ (void) rb_thread_call_with_gvl(nxt_ruby_request_handler_gvl, req);
+}
+
+
+static void *
+nxt_ruby_request_handler_gvl(void *data)
+{
+ int state;
+ VALUE res;
+ nxt_ruby_ctx_t *rctx;
+ nxt_unit_request_info_t *req;
+
+ req = data;
- nxt_ruby_run_ctx.req = req;
+ rctx = req->ctx->data;
+ rctx->req = req;
- res = rb_protect(nxt_ruby_rack_app_run, Qnil, &state);
+ res = rb_protect(nxt_ruby_rack_app_run, (VALUE) (uintptr_t) req, &state);
if (nxt_slow_path(res == Qnil || state != 0)) {
- nxt_ruby_exception_log(NULL, NXT_LOG_ERR,
+ nxt_ruby_exception_log(req, NXT_LOG_ERR,
"Failed to run ruby script");
+
+ nxt_unit_request_done(req, NXT_UNIT_ERROR);
+
+ } else {
+ nxt_unit_request_done(req, NXT_UNIT_OK);
}
+
+ rctx->req = NULL;
+
+ return NULL;
}
static VALUE
nxt_ruby_rack_app_run(VALUE arg)
{
- int rc;
- VALUE env, result;
- nxt_int_t status;
+ int rc;
+ VALUE env, result;
+ nxt_int_t status;
+ nxt_ruby_ctx_t *rctx;
+ nxt_unit_request_info_t *req;
- env = rb_hash_dup(nxt_ruby_env);
+ req = (nxt_unit_request_info_t *) arg;
- rc = nxt_ruby_read_request(env);
+ rctx = req->ctx->data;
+
+ env = rb_hash_dup(rctx->env);
+
+ rc = nxt_ruby_read_request(req, env);
if (nxt_slow_path(rc != NXT_UNIT_OK)) {
- nxt_unit_req_alert(nxt_ruby_run_ctx.req,
+ nxt_unit_req_alert(req,
"Ruby: Failed to process incoming request");
goto fail;
@@ -466,50 +555,44 @@ nxt_ruby_rack_app_run(VALUE arg)
result = rb_funcall(nxt_ruby_rackup, nxt_ruby_call, 1, env);
if (nxt_slow_path(TYPE(result) != T_ARRAY)) {
- nxt_unit_req_error(nxt_ruby_run_ctx.req,
+ nxt_unit_req_error(req,
"Ruby: Invalid response format from application");
goto fail;
}
if (nxt_slow_path(RARRAY_LEN(result) != 3)) {
- nxt_unit_req_error(nxt_ruby_run_ctx.req,
+ nxt_unit_req_error(req,
"Ruby: Invalid response format from application. "
"Need 3 entries [Status, Headers, Body]");
goto fail;
}
- status = nxt_ruby_rack_result_status(result);
+ status = nxt_ruby_rack_result_status(req, result);
if (nxt_slow_path(status < 0)) {
- nxt_unit_req_error(nxt_ruby_run_ctx.req,
+ nxt_unit_req_error(req,
"Ruby: Invalid response status from application.");
goto fail;
}
- rc = nxt_ruby_rack_result_headers(result, status);
+ rc = nxt_ruby_rack_result_headers(req, result, status);
if (nxt_slow_path(rc != NXT_UNIT_OK)) {
goto fail;
}
- rc = nxt_ruby_rack_result_body(result);
+ rc = nxt_ruby_rack_result_body(req, result);
if (nxt_slow_path(rc != NXT_UNIT_OK)) {
goto fail;
}
- nxt_unit_request_done(nxt_ruby_run_ctx.req, rc);
- nxt_ruby_run_ctx.req = NULL;
-
rb_hash_delete(env, rb_obj_id(env));
return result;
fail:
- nxt_unit_request_done(nxt_ruby_run_ctx.req, NXT_UNIT_ERROR);
- nxt_ruby_run_ctx.req = NULL;
-
rb_hash_delete(env, rb_obj_id(env));
return Qnil;
@@ -517,14 +600,14 @@ fail:
static int
-nxt_ruby_read_request(VALUE hash_env)
+nxt_ruby_read_request(nxt_unit_request_info_t *req, VALUE hash_env)
{
VALUE name;
uint32_t i;
nxt_unit_field_t *f;
nxt_unit_request_t *r;
- r = nxt_ruby_run_ctx.req->request;
+ r = req->request;
nxt_ruby_add_sptr(hash_env, nxt_rb_request_method_str, &r->method,
r->method_length);
@@ -586,7 +669,7 @@ nxt_ruby_add_sptr(VALUE hash_env, VALUE name,
static nxt_int_t
-nxt_ruby_rack_result_status(VALUE result)
+nxt_ruby_rack_result_status(nxt_unit_request_info_t *req, VALUE result)
{
VALUE status;
@@ -601,7 +684,7 @@ nxt_ruby_rack_result_status(VALUE result)
RSTRING_LEN(status));
}
- nxt_unit_req_error(nxt_ruby_run_ctx.req, "Ruby: Invalid response 'status' "
+ nxt_unit_req_error(req, "Ruby: Invalid response 'status' "
"format from application");
return -2;
@@ -609,14 +692,16 @@ nxt_ruby_rack_result_status(VALUE result)
typedef struct {
- int rc;
- uint32_t fields;
- uint32_t size;
+ int rc;
+ uint32_t fields;
+ uint32_t size;
+ nxt_unit_request_info_t *req;
} nxt_ruby_headers_info_t;
static int
-nxt_ruby_rack_result_headers(VALUE result, nxt_int_t status)
+nxt_ruby_rack_result_headers(nxt_unit_request_info_t *req, VALUE result,
+ nxt_int_t status)
{
int rc;
VALUE headers;
@@ -624,7 +709,7 @@ nxt_ruby_rack_result_headers(VALUE result, nxt_int_t status)
headers = rb_ary_entry(result, 1);
if (nxt_slow_path(TYPE(headers) != T_HASH)) {
- nxt_unit_req_error(nxt_ruby_run_ctx.req,
+ nxt_unit_req_error(req,
"Ruby: Invalid response 'headers' format from "
"application");
@@ -636,6 +721,7 @@ nxt_ruby_rack_result_headers(VALUE result, nxt_int_t status)
headers_info.rc = NXT_UNIT_OK;
headers_info.fields = 0;
headers_info.size = 0;
+ headers_info.req = req;
rb_hash_foreach(headers, nxt_ruby_hash_info,
(VALUE) (uintptr_t) &headers_info);
@@ -643,13 +729,14 @@ nxt_ruby_rack_result_headers(VALUE result, nxt_int_t status)
return headers_info.rc;
}
- rc = nxt_unit_response_init(nxt_ruby_run_ctx.req, status,
+ rc = nxt_unit_response_init(req, status,
headers_info.fields, headers_info.size);
if (nxt_slow_path(rc != NXT_UNIT_OK)) {
return rc;
}
- rb_hash_foreach(headers, nxt_ruby_hash_add, (VALUE) (uintptr_t) &rc);
+ rb_hash_foreach(headers, nxt_ruby_hash_add,
+ (VALUE) (uintptr_t) &headers_info);
return rc;
}
@@ -664,14 +751,14 @@ nxt_ruby_hash_info(VALUE r_key, VALUE r_value, VALUE arg)
headers_info = (void *) (uintptr_t) arg;
if (nxt_slow_path(TYPE(r_key) != T_STRING)) {
- nxt_unit_req_error(nxt_ruby_run_ctx.req,
+ nxt_unit_req_error(headers_info->req,
"Ruby: Wrong header entry 'key' from application");
goto fail;
}
if (nxt_slow_path(TYPE(r_value) != T_STRING)) {
- nxt_unit_req_error(nxt_ruby_run_ctx.req,
+ nxt_unit_req_error(headers_info->req,
"Ruby: Wrong header entry 'value' from application");
goto fail;
@@ -714,11 +801,13 @@ fail:
static int
nxt_ruby_hash_add(VALUE r_key, VALUE r_value, VALUE arg)
{
- int *rc;
- uint32_t key_len;
- const char *value, *value_end, *pos;
+ int *rc;
+ uint32_t key_len;
+ const char *value, *value_end, *pos;
+ nxt_ruby_headers_info_t *headers_info;
- rc = (int *) (uintptr_t) arg;
+ headers_info = (void *) (uintptr_t) arg;
+ rc = &headers_info->rc;
value = RSTRING_PTR(r_value);
value_end = value + RSTRING_LEN(r_value);
@@ -734,7 +823,7 @@ nxt_ruby_hash_add(VALUE r_key, VALUE r_value, VALUE arg)
break;
}
- *rc = nxt_unit_response_add_field(nxt_ruby_run_ctx.req,
+ *rc = nxt_unit_response_add_field(headers_info->req,
RSTRING_PTR(r_key), key_len,
value, pos - value);
if (nxt_slow_path(*rc != NXT_UNIT_OK)) {
@@ -746,7 +835,7 @@ nxt_ruby_hash_add(VALUE r_key, VALUE r_value, VALUE arg)
}
if (value <= value_end) {
- *rc = nxt_unit_response_add_field(nxt_ruby_run_ctx.req,
+ *rc = nxt_unit_response_add_field(headers_info->req,
RSTRING_PTR(r_key), key_len,
value, value_end - value);
if (nxt_slow_path(*rc != NXT_UNIT_OK)) {
@@ -765,7 +854,7 @@ fail:
static int
-nxt_ruby_rack_result_body(VALUE result)
+nxt_ruby_rack_result_body(nxt_unit_request_info_t *req, VALUE result)
{
int rc;
VALUE fn, body;
@@ -776,24 +865,24 @@ nxt_ruby_rack_result_body(VALUE result)
fn = rb_funcall(body, rb_intern("to_path"), 0);
if (nxt_slow_path(TYPE(fn) != T_STRING)) {
- nxt_unit_req_error(nxt_ruby_run_ctx.req,
+ nxt_unit_req_error(req,
"Ruby: Failed to get 'body' file path from "
"application");
return NXT_UNIT_ERROR;
}
- rc = nxt_ruby_rack_result_body_file_write(fn);
+ rc = nxt_ruby_rack_result_body_file_write(req, fn);
if (nxt_slow_path(rc != NXT_UNIT_OK)) {
return rc;
}
} else if (rb_respond_to(body, rb_intern("each"))) {
rb_block_call(body, rb_intern("each"), 0, 0,
- nxt_ruby_rack_result_body_each, 0);
+ nxt_ruby_rack_result_body_each, (VALUE) (uintptr_t) req);
} else {
- nxt_unit_req_error(nxt_ruby_run_ctx.req,
+ nxt_unit_req_error(req,
"Ruby: Invalid response 'body' format "
"from application");
@@ -842,17 +931,24 @@ nxt_ruby_rack_file_read(nxt_unit_read_info_t *read_info, void *dst, size_t size)
}
+typedef struct {
+ nxt_unit_read_info_t read_info;
+ nxt_unit_request_info_t *req;
+} nxt_ruby_read_info_t;
+
+
static int
-nxt_ruby_rack_result_body_file_write(VALUE filepath)
+nxt_ruby_rack_result_body_file_write(nxt_unit_request_info_t *req,
+ VALUE filepath)
{
int fd, rc;
struct stat finfo;
nxt_ruby_rack_file_t ruby_file;
- nxt_unit_read_info_t read_info;
+ nxt_ruby_read_info_t ri;
fd = open(RSTRING_PTR(filepath), O_RDONLY, 0);
if (nxt_slow_path(fd == -1)) {
- nxt_unit_req_error(nxt_ruby_run_ctx.req,
+ nxt_unit_req_error(req,
"Ruby: Failed to open content file \"%s\": %s (%d)",
RSTRING_PTR(filepath), strerror(errno), errno);
@@ -861,7 +957,7 @@ nxt_ruby_rack_result_body_file_write(VALUE filepath)
rc = fstat(fd, &finfo);
if (nxt_slow_path(rc == -1)) {
- nxt_unit_req_error(nxt_ruby_run_ctx.req,
+ nxt_unit_req_error(req,
"Ruby: Content file fstat(\"%s\") failed: %s (%d)",
RSTRING_PTR(filepath), strerror(errno), errno);
@@ -874,16 +970,16 @@ nxt_ruby_rack_result_body_file_write(VALUE filepath)
ruby_file.pos = 0;
ruby_file.rest = finfo.st_size;
- read_info.read = nxt_ruby_rack_file_read;
- read_info.eof = ruby_file.rest == 0;
- read_info.buf_size = ruby_file.rest;
- read_info.data = &ruby_file;
+ ri.read_info.read = nxt_ruby_rack_file_read;
+ ri.read_info.eof = ruby_file.rest == 0;
+ ri.read_info.buf_size = ruby_file.rest;
+ ri.read_info.data = &ruby_file;
+ ri.req = req;
- rc = nxt_unit_response_write_cb(nxt_ruby_run_ctx.req, &read_info);
- if (nxt_slow_path(rc != NXT_UNIT_OK)) {
- nxt_unit_req_error(nxt_ruby_run_ctx.req,
- "Ruby: Failed to write content file.");
- }
+ rc = (intptr_t) rb_thread_call_without_gvl(nxt_ruby_response_write_cb,
+ &ri,
+ nxt_ruby_ubf,
+ req->ctx);
close(fd);
@@ -891,39 +987,77 @@ nxt_ruby_rack_result_body_file_write(VALUE filepath)
}
+static void *
+nxt_ruby_response_write_cb(void *data)
+{
+ int rc;
+ nxt_ruby_read_info_t *ri;
+
+ ri = data;
+
+ rc = nxt_unit_response_write_cb(ri->req, &ri->read_info);
+ if (nxt_slow_path(rc != NXT_UNIT_OK)) {
+ nxt_unit_req_error(ri->req, "Ruby: Failed to write content file.");
+ }
+
+ return (void *) (intptr_t) rc;
+}
+
+
+typedef struct {
+ VALUE body;
+ nxt_unit_request_info_t *req;
+} nxt_ruby_write_info_t;
+
+
static VALUE
nxt_ruby_rack_result_body_each(VALUE body, VALUE arg, int argc,
const VALUE *argv, VALUE blockarg)
{
- int rc;
+ nxt_ruby_write_info_t wi;
if (TYPE(body) != T_STRING) {
return Qnil;
}
- rc = nxt_unit_response_write(nxt_ruby_run_ctx.req, RSTRING_PTR(body),
- RSTRING_LEN(body));
+ wi.body = body;
+ wi.req = (void *) (uintptr_t) arg;
+
+ (void) rb_thread_call_without_gvl(nxt_ruby_response_write,
+ (void *) (uintptr_t) &wi,
+ nxt_ruby_ubf, wi.req->ctx);
+
+ return Qnil;
+}
+
+
+static void *
+nxt_ruby_response_write(void *data)
+{
+ int rc;
+ nxt_ruby_write_info_t *wi;
+
+ wi = data;
+
+ rc = nxt_unit_response_write(wi->req, RSTRING_PTR(wi->body),
+ RSTRING_LEN(wi->body));
if (nxt_slow_path(rc != NXT_UNIT_OK)) {
- nxt_unit_req_error(nxt_ruby_run_ctx.req,
+ nxt_unit_req_error(wi->req,
"Ruby: Failed to write 'body' from application");
}
- return Qnil;
+ return (void *) (intptr_t) rc;
}
static void
-nxt_ruby_exception_log(nxt_task_t *task, uint32_t level, const char *desc)
+nxt_ruby_exception_log(nxt_unit_request_info_t *req, uint32_t level,
+ const char *desc)
{
int i;
VALUE err, ary, eclass, msg;
- if (task != NULL) {
- nxt_log(task, level, "Ruby: %s", desc);
-
- } else {
- nxt_unit_log(nxt_ruby_run_ctx.unit_ctx, level, "Ruby: %s", desc);
- }
+ nxt_unit_req_log(req, level, "Ruby: %s", desc);
err = rb_errinfo();
if (nxt_slow_path(err == Qnil)) {
@@ -938,25 +1072,30 @@ nxt_ruby_exception_log(nxt_task_t *task, uint32_t level, const char *desc)
eclass = rb_class_name(rb_class_of(err));
msg = rb_funcall(err, rb_intern("message"), 0);
- if (task != NULL) {
- nxt_log(task, level, "Ruby: %s: %s (%s)",
- RSTRING_PTR(RARRAY_PTR(ary)[0]),
- RSTRING_PTR(msg), RSTRING_PTR(eclass));
-
- } else {
- nxt_unit_log(nxt_ruby_run_ctx.unit_ctx, level, "Ruby: %s: %s (%s)",
+ nxt_unit_req_log(req, level, "Ruby: %s: %s (%s)",
RSTRING_PTR(RARRAY_PTR(ary)[0]),
RSTRING_PTR(msg), RSTRING_PTR(eclass));
- }
for (i = 1; i < RARRAY_LEN(ary); i++) {
- if (task != NULL) {
- nxt_log(task, level, "from %s", RSTRING_PTR(RARRAY_PTR(ary)[i]));
-
- } else {
- nxt_unit_log(nxt_ruby_run_ctx.unit_ctx, level, "from %s",
+ nxt_unit_req_log(req, level, "from %s",
RSTRING_PTR(RARRAY_PTR(ary)[i]));
- }
+ }
+}
+
+
+static void
+nxt_ruby_ctx_done(nxt_ruby_ctx_t *rctx)
+{
+ if (rctx->io_input != Qnil) {
+ rb_gc_unregister_address(&rctx->io_input);
+ }
+
+ if (rctx->io_error != Qnil) {
+ rb_gc_unregister_address(&rctx->io_error);
+ }
+
+ if (rctx->env != Qnil) {
+ rb_gc_unregister_address(&rctx->env);
}
}
@@ -964,14 +1103,170 @@ nxt_ruby_exception_log(nxt_task_t *task, uint32_t level, const char *desc)
static void
nxt_ruby_atexit(void)
{
- rb_gc_unregister_address(&nxt_ruby_io_input);
- rb_gc_unregister_address(&nxt_ruby_io_error);
+ if (nxt_ruby_rackup != Qnil) {
+ rb_gc_unregister_address(&nxt_ruby_rackup);
+ }
- rb_gc_unregister_address(&nxt_ruby_rackup);
- rb_gc_unregister_address(&nxt_ruby_call);
- rb_gc_unregister_address(&nxt_ruby_env);
+ if (nxt_ruby_call != Qnil) {
+ rb_gc_unregister_address(&nxt_ruby_call);
+ }
nxt_ruby_done_strings();
ruby_cleanup(0);
}
+
+
+static int
+nxt_ruby_ready_handler(nxt_unit_ctx_t *ctx)
+{
+ VALUE res;
+ uint32_t i;
+ nxt_ruby_ctx_t *rctx;
+ nxt_ruby_app_conf_t *c;
+
+ /* Worker thread context. */
+ if (!nxt_unit_is_main_ctx(ctx)) {
+ return NXT_UNIT_OK;
+ }
+
+ c = ctx->unit->data;
+
+ if (c->threads <= 1) {
+ return NXT_UNIT_OK;
+ }
+
+ for (i = 0; i < c->threads - 1; i++) {
+ rctx = &nxt_ruby_ctxs[i];
+
+ rctx->ctx = ctx;
+
+ res = rb_thread_create(RUBY_METHOD_FUNC(nxt_ruby_thread_func), rctx);
+
+ if (nxt_fast_path(res != Qnil)) {
+ nxt_unit_debug(ctx, "thread #%d created", (int) (i + 1));
+
+ rctx->thread = res;
+
+ } else {
+ nxt_unit_alert(ctx, "thread #%d create failed", (int) (i + 1));
+ }
+ }
+
+ return NXT_UNIT_OK;
+}
+
+
+static VALUE
+nxt_ruby_thread_func(VALUE arg)
+{
+ nxt_unit_ctx_t *ctx;
+ nxt_ruby_ctx_t *rctx;
+
+ rctx = (nxt_ruby_ctx_t *) (uintptr_t) arg;
+
+ nxt_unit_debug(rctx->ctx, "worker thread start");
+
+ ctx = nxt_unit_ctx_alloc(rctx->ctx, rctx);
+ if (nxt_slow_path(ctx == NULL)) {
+ goto fail;
+ }
+
+ (void) rb_thread_call_without_gvl(nxt_ruby_unit_run, ctx,
+ nxt_ruby_ubf, ctx);
+
+ nxt_unit_done(ctx);
+
+fail:
+
+ nxt_unit_debug(NULL, "worker thread end");
+
+ return Qnil;
+}
+
+
+static void *
+nxt_ruby_unit_run(void *ctx)
+{
+ return (void *) (intptr_t) nxt_unit_run(ctx);
+}
+
+
+static void
+nxt_ruby_ubf(void *ctx)
+{
+ nxt_unit_warn(ctx, "Ruby: UBF");
+}
+
+
+static int
+nxt_ruby_init_threads(nxt_ruby_app_conf_t *c)
+{
+ int state;
+ uint32_t i;
+ nxt_ruby_ctx_t *rctx;
+
+ if (c->threads <= 1) {
+ return NXT_UNIT_OK;
+ }
+
+ nxt_ruby_ctxs = nxt_unit_malloc(NULL, sizeof(nxt_ruby_ctx_t)
+ * (c->threads - 1));
+ if (nxt_slow_path(nxt_ruby_ctxs == NULL)) {
+ nxt_unit_alert(NULL, "Failed to allocate run contexts array");
+
+ return NXT_UNIT_ERROR;
+ }
+
+ for (i = 0; i < c->threads - 1; i++) {
+ rctx = &nxt_ruby_ctxs[i];
+
+ rctx->env = Qnil;
+ rctx->io_input = Qnil;
+ rctx->io_error = Qnil;
+ rctx->thread = Qnil;
+ }
+
+ for (i = 0; i < c->threads - 1; i++) {
+ rctx = &nxt_ruby_ctxs[i];
+
+ rctx->env = rb_protect(nxt_ruby_rack_env_create,
+ (VALUE) (uintptr_t) rctx, &state);
+ if (nxt_slow_path(rctx->env == Qnil || state != 0)) {
+ nxt_ruby_exception_log(NULL, NXT_LOG_ALERT,
+ "Failed to create 'environ' variable");
+ return NXT_UNIT_ERROR;
+ }
+ }
+
+ return NXT_UNIT_OK;
+}
+
+
+static void
+nxt_ruby_join_threads(nxt_unit_ctx_t *ctx, nxt_ruby_app_conf_t *c)
+{
+ uint32_t i;
+ nxt_ruby_ctx_t *rctx;
+
+ if (nxt_ruby_ctxs == NULL) {
+ return;
+ }
+
+ for(i = 0; i < c->threads - 1; i++) {
+ rctx = &nxt_ruby_ctxs[i];
+
+ if (rctx->thread != Qnil) {
+ rb_funcall(rctx->thread, rb_intern("join"), 0);
+
+ nxt_unit_debug(ctx, "thread #%d joined", (int) (i + 1));
+
+ } else {
+ nxt_unit_debug(ctx, "thread #%d not started", (int) (i + 1));
+ }
+
+ nxt_ruby_ctx_done(rctx);
+ }
+
+ nxt_unit_free(ctx, nxt_ruby_ctxs);
+}
diff --git a/src/ruby/nxt_ruby.h b/src/ruby/nxt_ruby.h
index 77a65894..26430021 100644
--- a/src/ruby/nxt_ruby.h
+++ b/src/ruby/nxt_ruby.h
@@ -21,9 +21,13 @@
typedef struct {
- nxt_unit_ctx_t *unit_ctx;
+ VALUE env;
+ VALUE io_input;
+ VALUE io_error;
+ VALUE thread;
+ nxt_unit_ctx_t *ctx;
nxt_unit_request_info_t *req;
-} nxt_ruby_run_ctx_t;
+} nxt_ruby_ctx_t;
VALUE nxt_ruby_stream_io_input_init(void);
diff --git a/src/ruby/nxt_ruby_stream_io.c b/src/ruby/nxt_ruby_stream_io.c
index cc110035..69bf289e 100644
--- a/src/ruby/nxt_ruby_stream_io.c
+++ b/src/ruby/nxt_ruby_stream_io.c
@@ -8,7 +8,7 @@
#include <nxt_unit.h>
-static VALUE nxt_ruby_stream_io_new(VALUE class, VALUE wrap);
+static VALUE nxt_ruby_stream_io_new(VALUE class, VALUE arg);
static VALUE nxt_ruby_stream_io_initialize(int argc, VALUE *argv, VALUE self);
static VALUE nxt_ruby_stream_io_gets(VALUE obj);
static VALUE nxt_ruby_stream_io_each(VALUE obj);
@@ -16,8 +16,7 @@ static VALUE nxt_ruby_stream_io_read(VALUE obj, VALUE args);
static VALUE nxt_ruby_stream_io_rewind(VALUE obj);
static VALUE nxt_ruby_stream_io_puts(VALUE obj, VALUE args);
static VALUE nxt_ruby_stream_io_write(VALUE obj, VALUE args);
-nxt_inline long nxt_ruby_stream_io_s_write(nxt_ruby_run_ctx_t *run_ctx,
- VALUE val);
+nxt_inline long nxt_ruby_stream_io_s_write(nxt_ruby_ctx_t *rctx, VALUE val);
static VALUE nxt_ruby_stream_io_flush(VALUE obj);
@@ -63,13 +62,11 @@ nxt_ruby_stream_io_error_init(void)
static VALUE
-nxt_ruby_stream_io_new(VALUE class, VALUE wrap)
+nxt_ruby_stream_io_new(VALUE class, VALUE arg)
{
- VALUE self;
- nxt_ruby_run_ctx_t *run_ctx;
+ VALUE self;
- Data_Get_Struct(wrap, nxt_ruby_run_ctx_t, run_ctx);
- self = Data_Wrap_Struct(class, 0, 0, run_ctx);
+ self = Data_Wrap_Struct(class, 0, 0, (void *) (uintptr_t) arg);
rb_obj_call_init(self, 0, NULL);
@@ -89,12 +86,11 @@ nxt_ruby_stream_io_gets(VALUE obj)
{
VALUE buf;
ssize_t res;
- nxt_ruby_run_ctx_t *run_ctx;
+ nxt_ruby_ctx_t *rctx;
nxt_unit_request_info_t *req;
- Data_Get_Struct(obj, nxt_ruby_run_ctx_t, run_ctx);
-
- req = run_ctx->req;
+ Data_Get_Struct(obj, nxt_ruby_ctx_t, rctx);
+ req = rctx->req;
if (req->content_length == 0) {
return Qnil;
@@ -145,13 +141,13 @@ nxt_ruby_stream_io_each(VALUE obj)
static VALUE
nxt_ruby_stream_io_read(VALUE obj, VALUE args)
{
- VALUE buf;
- long copy_size, u_size;
- nxt_ruby_run_ctx_t *run_ctx;
+ VALUE buf;
+ long copy_size, u_size;
+ nxt_ruby_ctx_t *rctx;
- Data_Get_Struct(obj, nxt_ruby_run_ctx_t, run_ctx);
+ Data_Get_Struct(obj, nxt_ruby_ctx_t, rctx);
- copy_size = run_ctx->req->content_length;
+ copy_size = rctx->req->content_length;
if (RARRAY_LEN(args) > 0 && TYPE(RARRAY_PTR(args)[0]) == T_FIXNUM) {
u_size = NUM2LONG(RARRAY_PTR(args)[0]);
@@ -175,8 +171,7 @@ nxt_ruby_stream_io_read(VALUE obj, VALUE args)
return Qnil;
}
- copy_size = nxt_unit_request_read(run_ctx->req, RSTRING_PTR(buf),
- copy_size);
+ copy_size = nxt_unit_request_read(rctx->req, RSTRING_PTR(buf), copy_size);
if (RARRAY_LEN(args) > 1 && TYPE(RARRAY_PTR(args)[1]) == T_STRING) {
@@ -200,15 +195,15 @@ nxt_ruby_stream_io_rewind(VALUE obj)
static VALUE
nxt_ruby_stream_io_puts(VALUE obj, VALUE args)
{
- nxt_ruby_run_ctx_t *run_ctx;
+ nxt_ruby_ctx_t *rctx;
if (RARRAY_LEN(args) != 1) {
return Qnil;
}
- Data_Get_Struct(obj, nxt_ruby_run_ctx_t, run_ctx);
+ Data_Get_Struct(obj, nxt_ruby_ctx_t, rctx);
- nxt_ruby_stream_io_s_write(run_ctx, RARRAY_PTR(args)[0]);
+ nxt_ruby_stream_io_s_write(rctx, RARRAY_PTR(args)[0]);
return Qnil;
}
@@ -217,23 +212,23 @@ nxt_ruby_stream_io_puts(VALUE obj, VALUE args)
static VALUE
nxt_ruby_stream_io_write(VALUE obj, VALUE args)
{
- long len;
- nxt_ruby_run_ctx_t *run_ctx;
+ long len;
+ nxt_ruby_ctx_t *rctx;
if (RARRAY_LEN(args) != 1) {
return Qnil;
}
- Data_Get_Struct(obj, nxt_ruby_run_ctx_t, run_ctx);
+ Data_Get_Struct(obj, nxt_ruby_ctx_t, rctx);
- len = nxt_ruby_stream_io_s_write(run_ctx, RARRAY_PTR(args)[0]);
+ len = nxt_ruby_stream_io_s_write(rctx, RARRAY_PTR(args)[0]);
return LONG2FIX(len);
}
nxt_inline long
-nxt_ruby_stream_io_s_write(nxt_ruby_run_ctx_t *run_ctx, VALUE val)
+nxt_ruby_stream_io_s_write(nxt_ruby_ctx_t *rctx, VALUE val)
{
if (nxt_slow_path(val == Qnil)) {
return 0;
@@ -247,7 +242,7 @@ nxt_ruby_stream_io_s_write(nxt_ruby_run_ctx_t *run_ctx, VALUE val)
}
}
- nxt_unit_req_error(run_ctx->req, "Ruby: %s", RSTRING_PTR(val));
+ nxt_unit_req_error(rctx->req, "Ruby: %s", RSTRING_PTR(val));
return RSTRING_LEN(val);
}