summaryrefslogtreecommitdiffhomepage
path: root/src/ruby
diff options
context:
space:
mode:
Diffstat (limited to 'src/ruby')
-rw-r--r--src/ruby/nxt_ruby.c693
-rw-r--r--src/ruby/nxt_ruby.h8
-rw-r--r--src/ruby/nxt_ruby_stream_io.c78
3 files changed, 360 insertions, 419 deletions
diff --git a/src/ruby/nxt_ruby.c b/src/ruby/nxt_ruby.c
index ea05b133..a08b8189 100644
--- a/src/ruby/nxt_ruby.c
+++ b/src/ruby/nxt_ruby.c
@@ -5,6 +5,9 @@
#include <ruby/nxt_ruby.h>
+#include <nxt_unit.h>
+#include <nxt_unit_request.h>
+
#define NXT_RUBY_RACK_API_VERSION_MAJOR 1
#define NXT_RUBY_RACK_API_VERSION_MINOR 3
@@ -35,29 +38,26 @@ static VALUE nxt_ruby_bundler_setup(VALUE arg);
static VALUE nxt_ruby_require_rack(VALUE arg);
static VALUE nxt_ruby_rack_parse_script(VALUE ctx);
static VALUE nxt_ruby_rack_env_create(VALUE arg);
-static nxt_int_t nxt_ruby_run(nxt_task_t *task, nxt_app_rmsg_t *rmsg,
- nxt_app_wmsg_t *wmsg);
+static void nxt_ruby_request_handler(nxt_unit_request_info_t *req);
static VALUE nxt_ruby_rack_app_run(VALUE arg);
-static nxt_int_t nxt_ruby_read_request(nxt_ruby_run_ctx_t *run_ctx,
- VALUE hash_env);
-nxt_inline nxt_int_t nxt_ruby_read_add_env(nxt_task_t *task,
- nxt_app_rmsg_t *rmsg, VALUE hash_env, const char *name, nxt_str_t *str);
+static int nxt_ruby_read_request(VALUE hash_env);
+nxt_inline void nxt_ruby_add_sptr(VALUE hash_env,
+ const char *name, uint32_t name_len, nxt_unit_sptr_t *sptr, uint32_t len);
+nxt_inline void nxt_ruby_add_str(VALUE hash_env,
+ const char *name, uint32_t name_len, char *str, uint32_t len);
static nxt_int_t nxt_ruby_rack_result_status(VALUE result);
-nxt_inline nxt_int_t nxt_ruby_write(nxt_task_t *task, nxt_app_wmsg_t *wmsg,
- const u_char *data, size_t len, nxt_bool_t flush, nxt_bool_t last);
-static nxt_int_t nxt_ruby_rack_result_headers(VALUE result);
-static int nxt_ruby_hash_foreach(VALUE r_key, VALUE r_value, VALUE arg);
-static nxt_int_t nxt_ruby_head_send_part(const char *key, size_t key_size,
- const char *value, size_t value_size);
-static nxt_int_t nxt_ruby_rack_result_body(VALUE result);
-static nxt_int_t nxt_ruby_rack_result_body_file_write(VALUE filepath);
+static int nxt_ruby_rack_result_headers(VALUE result, nxt_int_t status);
+static int nxt_ruby_hash_info(VALUE r_key, VALUE r_value, VALUE arg);
+static int nxt_ruby_hash_add(VALUE r_key, VALUE r_value, VALUE arg);
+static int nxt_ruby_rack_result_body(VALUE result);
+static int nxt_ruby_rack_result_body_file_write(VALUE filepath);
static VALUE nxt_ruby_rack_result_body_each(VALUE body);
static void nxt_ruby_exception_log(nxt_task_t *task, uint32_t level,
const char *desc);
-static void nxt_ruby_atexit(nxt_task_t *task);
+static void nxt_ruby_atexit(void);
static uint32_t compat[] = {
@@ -71,22 +71,22 @@ static VALUE nxt_ruby_io_input;
static VALUE nxt_ruby_io_error;
static nxt_ruby_run_ctx_t nxt_ruby_run_ctx;
-NXT_EXPORT nxt_application_module_t nxt_app_module = {
+NXT_EXPORT nxt_app_module_t nxt_app_module = {
sizeof(compat),
compat,
nxt_string("ruby"),
ruby_version,
nxt_ruby_init,
- nxt_ruby_run,
- nxt_ruby_atexit,
};
static nxt_int_t
nxt_ruby_init(nxt_task_t *task, nxt_common_app_conf_t *conf)
{
- int state;
+ int state, rc;
VALUE dummy, res;
+ nxt_unit_ctx_t *unit_ctx;
+ nxt_unit_init_t ruby_unit_init;
nxt_ruby_rack_init_t rack_init;
ruby_init();
@@ -128,6 +128,27 @@ nxt_ruby_init(nxt_task_t *task, nxt_common_app_conf_t *conf)
rb_gc_register_address(&nxt_ruby_call);
rb_gc_register_address(&nxt_ruby_env);
+ nxt_unit_default_init(task, &ruby_unit_init);
+
+ ruby_unit_init.callbacks.request_handler = nxt_ruby_request_handler;
+
+ unit_ctx = nxt_unit_init(&ruby_unit_init);
+ if (nxt_slow_path(unit_ctx == NULL)) {
+ return NXT_ERROR;
+ }
+
+ nxt_ruby_run_ctx.unit_ctx = unit_ctx;
+
+ rc = nxt_unit_run(unit_ctx);
+
+ nxt_ruby_atexit();
+
+ nxt_ruby_run_ctx.unit_ctx = NULL;
+
+ nxt_unit_done(unit_ctx);
+
+ exit(rc);
+
return NXT_OK;
}
@@ -319,82 +340,75 @@ nxt_ruby_rack_env_create(VALUE arg)
}
-static nxt_int_t
-nxt_ruby_run(nxt_task_t *task, nxt_app_rmsg_t *rmsg, nxt_app_wmsg_t *wmsg)
+static void
+nxt_ruby_request_handler(nxt_unit_request_info_t *req)
{
int state;
VALUE res;
- nxt_ruby_run_ctx.task = task;
- nxt_ruby_run_ctx.rmsg = rmsg;
- nxt_ruby_run_ctx.wmsg = wmsg;
+ nxt_ruby_run_ctx.req = req;
res = rb_protect(nxt_ruby_rack_app_run, Qnil, &state);
- if (nxt_slow_path(state != 0)) {
- nxt_ruby_exception_log(nxt_ruby_run_ctx.task, NXT_LOG_ERR,
+ if (nxt_slow_path(res == Qnil || state != 0)) {
+ nxt_ruby_exception_log(NULL, NXT_LOG_ERR,
"Failed to run ruby script");
- return NXT_ERROR;
- }
-
- if (nxt_slow_path(res == Qnil)) {
- return NXT_ERROR;
}
-
- return NXT_OK;
}
static VALUE
nxt_ruby_rack_app_run(VALUE arg)
{
+ int rc;
VALUE env, result;
- nxt_int_t rc;
+ nxt_int_t status;
env = rb_hash_dup(nxt_ruby_env);
- rc = nxt_ruby_read_request(&nxt_ruby_run_ctx, env);
- if (nxt_slow_path(rc != NXT_OK)) {
- nxt_alert(nxt_ruby_run_ctx.task,
- "Ruby: Failed to process incoming request");
+ rc = nxt_ruby_read_request(env);
+ if (nxt_slow_path(rc != NXT_UNIT_OK)) {
+ nxt_unit_req_alert(nxt_ruby_run_ctx.req,
+ "Ruby: Failed to process incoming request");
goto fail;
}
result = rb_funcall(nxt_ruby_rackup, nxt_ruby_call, 1, env);
if (nxt_slow_path(TYPE(result) != T_ARRAY)) {
- nxt_log(nxt_ruby_run_ctx.task, NXT_LOG_ERR,
- "Ruby: Invalid response format from application");
+ nxt_unit_req_error(nxt_ruby_run_ctx.req,
+ "Ruby: Invalid response format from application");
goto fail;
}
if (nxt_slow_path(RARRAY_LEN(result) != 3)) {
- nxt_log(nxt_ruby_run_ctx.task, NXT_LOG_ERR,
- "Ruby: Invalid response format from application. "
- "Need 3 entries [Status, Headers, Body]");
+ nxt_unit_req_error(nxt_ruby_run_ctx.req,
+ "Ruby: Invalid response format from application. "
+ "Need 3 entries [Status, Headers, Body]");
goto fail;
}
- rc = nxt_ruby_rack_result_status(result);
- if (nxt_slow_path(rc != NXT_OK)) {
+ status = nxt_ruby_rack_result_status(result);
+ if (nxt_slow_path(status < 0)) {
+ nxt_unit_req_error(nxt_ruby_run_ctx.req,
+ "Ruby: Invalid response status from application.");
+
goto fail;
}
- rc = nxt_ruby_rack_result_headers(result);
- if (nxt_slow_path(rc != NXT_OK)) {
+ rc = nxt_ruby_rack_result_headers(result, status);
+ if (nxt_slow_path(rc != NXT_UNIT_OK)) {
goto fail;
}
rc = nxt_ruby_rack_result_body(result);
- if (nxt_slow_path(rc != NXT_OK)) {
+ if (nxt_slow_path(rc != NXT_UNIT_OK)) {
goto fail;
}
- rc = nxt_app_msg_flush(nxt_ruby_run_ctx.task, nxt_ruby_run_ctx.wmsg, 1);
- if (nxt_slow_path(rc != NXT_OK)) {
- goto fail;
- }
+ nxt_unit_request_done(nxt_ruby_run_ctx.req, rc);
+ nxt_ruby_run_ctx.req = NULL;
rb_hash_delete(env, rb_obj_id(env));
@@ -402,296 +416,194 @@ nxt_ruby_rack_app_run(VALUE arg)
fail:
+ nxt_unit_request_done(nxt_ruby_run_ctx.req, NXT_UNIT_ERROR);
+ nxt_ruby_run_ctx.req = NULL;
+
rb_hash_delete(env, rb_obj_id(env));
return Qnil;
}
-static nxt_int_t
-nxt_ruby_read_request(nxt_ruby_run_ctx_t *run_ctx, VALUE hash_env)
+static int
+nxt_ruby_read_request(VALUE hash_env)
{
- u_char *colon;
- size_t query_size;
- nxt_int_t rc;
- nxt_str_t str, value, path, target;
- nxt_str_t host, server_name, server_port;
- nxt_task_t *task;
- nxt_app_rmsg_t *rmsg;
+ char *host_start, *port_start;
+ uint32_t i, host_length, port_length;
+ nxt_unit_field_t *f;
+ nxt_unit_request_t *r;
- static nxt_str_t def_host = nxt_string("localhost");
- static nxt_str_t def_port = nxt_string("80");
+ r = nxt_ruby_run_ctx.req->request;
- task = run_ctx->task;
- rmsg = run_ctx->rmsg;
+#define NL(S) (S), sizeof(S)-1
- rc = nxt_ruby_read_add_env(task, rmsg, hash_env, "REQUEST_METHOD", &str);
- if (nxt_slow_path(rc != NXT_OK)) {
- return NXT_ERROR;
+ nxt_ruby_add_sptr(hash_env, NL("REQUEST_METHOD"), &r->method,
+ r->method_length);
+ nxt_ruby_add_sptr(hash_env, NL("REQUEST_URI"), &r->target,
+ r->target_length);
+ nxt_ruby_add_sptr(hash_env, NL("PATH_INFO"), &r->path, r->path_length);
+ if (r->query.offset) {
+ nxt_ruby_add_sptr(hash_env, NL("QUERY_STRING"), &r->query,
+ r->query_length);
}
+ nxt_ruby_add_sptr(hash_env, NL("SERVER_PROTOCOL"), &r->version,
+ r->version_length);
+ nxt_ruby_add_sptr(hash_env, NL("REMOTE_ADDR"), &r->remote,
+ r->remote_length);
+ nxt_ruby_add_sptr(hash_env, NL("SERVER_ADDR"), &r->local, r->local_length);
- rc = nxt_ruby_read_add_env(task, rmsg, hash_env, "REQUEST_URI", &target);
- if (nxt_slow_path(rc != NXT_OK)) {
- return NXT_ERROR;
- }
+ for (i = 0; i < r->fields_count; i++) {
+ f = r->fields + i;
- rc = nxt_app_msg_read_str(task, rmsg, &path);
- if (nxt_slow_path(rc != NXT_OK)) {
- return NXT_ERROR;
+ nxt_ruby_add_sptr(hash_env, nxt_unit_sptr_get(&f->name), f->name_length,
+ &f->value, f->value_length);
}
- rc = nxt_app_msg_read_size(task, rmsg, &query_size);
- if (nxt_slow_path(rc != NXT_OK)) {
- return NXT_ERROR;
- }
+ if (r->content_length_field != NXT_UNIT_NONE_FIELD) {
+ f = r->fields + r->content_length_field;
- if (path.start == NULL || path.length == 0) {
- path = target;
+ nxt_ruby_add_sptr(hash_env, NL("CONTENT_LENGTH"),
+ &f->value, f->value_length);
}
- rb_hash_aset(hash_env, rb_str_new2("PATH_INFO"),
- rb_str_new((const char *) path.start, (long) path.length));
+ if (r->content_type_field != NXT_UNIT_NONE_FIELD) {
+ f = r->fields + r->content_type_field;
- if (query_size > 0) {
- query_size--;
-
- if (nxt_slow_path(target.length < query_size)) {
- return NXT_ERROR;
- }
-
- str.start = &target.start[query_size];
- str.length = target.length - query_size;
-
- rb_hash_aset(hash_env, rb_str_new2("QUERY_STRING"),
- rb_str_new((const char *) str.start, (long) str.length));
+ nxt_ruby_add_sptr(hash_env, NL("CONTENT_TYPE"),
+ &f->value, f->value_length);
}
- rc = nxt_ruby_read_add_env(task, rmsg, hash_env, "SERVER_PROTOCOL", &str);
- if (nxt_slow_path(rc != NXT_OK)) {
- return NXT_ERROR;
- }
+ if (r->host_field != NXT_UNIT_NONE_FIELD) {
+ f = r->fields + r->host_field;
- rc = nxt_ruby_read_add_env(task, rmsg, hash_env, "REMOTE_ADDR", &str);
- if (nxt_slow_path(rc != NXT_OK)) {
- return NXT_ERROR;
- }
-
- rc = nxt_ruby_read_add_env(task, rmsg, hash_env, "SERVER_ADDR", &str);
- if (nxt_slow_path(rc != NXT_OK)) {
- return NXT_ERROR;
- }
-
- rc = nxt_app_msg_read_str(task, rmsg, &host);
- if (nxt_slow_path(rc != NXT_OK)) {
- return NXT_ERROR;
- }
-
- if (host.length == 0) {
- host = def_host;
- }
-
- colon = nxt_memchr(host.start, ':', host.length);
- server_name = host;
-
- if (colon != NULL) {
- server_name.length = colon - host.start;
-
- server_port.start = colon + 1;
- server_port.length = host.length - server_name.length - 1;
+ host_start = nxt_unit_sptr_get(&f->value);
+ host_length = f->value_length;
} else {
- server_port = def_port;
- }
-
- rb_hash_aset(hash_env, rb_str_new2("SERVER_NAME"),
- rb_str_new((const char *) server_name.start,
- (long) server_name.length));
-
- rb_hash_aset(hash_env, rb_str_new2("SERVER_PORT"),
- rb_str_new((const char *) server_port.start,
- (long) server_port.length));
-
- rc = nxt_ruby_read_add_env(task, rmsg, hash_env, "CONTENT_TYPE", &str);
- if (nxt_slow_path(rc != NXT_OK)) {
- return NXT_ERROR;
- }
-
- rc = nxt_ruby_read_add_env(task, rmsg, hash_env, "CONTENT_LENGTH", &str);
- if (nxt_slow_path(rc != NXT_OK)) {
- return NXT_ERROR;
+ host_start = NULL;
+ host_length = 0;
}
- for ( ;; ) {
- rc = nxt_app_msg_read_str(task, rmsg, &str);
- if (nxt_slow_path(rc != NXT_OK)) {
- return NXT_ERROR;
- }
+ nxt_unit_split_host(host_start, host_length, &host_start, &host_length,
+ &port_start, &port_length);
- if (nxt_slow_path(str.length == 0)) {
- break;
- }
+ nxt_ruby_add_str(hash_env, NL("SERVER_NAME"), host_start, host_length);
+ nxt_ruby_add_str(hash_env, NL("SERVER_PORT"), port_start, port_length);
- rc = nxt_app_msg_read_str(task, rmsg, &value);
- if (nxt_slow_path(rc != NXT_OK)) {
- return NXT_ERROR;
- }
-
- rb_hash_aset(hash_env,
- rb_str_new((char *) str.start, (long) str.length),
- rb_str_new((const char *) value.start,
- (long) value.length));
- }
+#undef NL
- rc = nxt_app_msg_read_size(task, rmsg, &run_ctx->body_preread_size);
- if (nxt_slow_path(rc != NXT_OK)) {
- return NXT_ERROR;
- }
-
- return NXT_OK;
+ return NXT_UNIT_OK;
}
-nxt_inline nxt_int_t
-nxt_ruby_read_add_env(nxt_task_t *task, nxt_app_rmsg_t *rmsg, VALUE hash_env,
- const char *name, nxt_str_t *str)
+nxt_inline void
+nxt_ruby_add_sptr(VALUE hash_env,
+ const char *name, uint32_t name_len, nxt_unit_sptr_t *sptr, uint32_t len)
{
- nxt_int_t rc;
+ char *str;
- rc = nxt_app_msg_read_str(task, rmsg, str);
- if (nxt_slow_path(rc != NXT_OK)) {
- return rc;
- }
+ str = nxt_unit_sptr_get(sptr);
- if (str->start == NULL) {
- rb_hash_aset(hash_env, rb_str_new2(name), Qnil);
- return NXT_OK;
- }
+ rb_hash_aset(hash_env, rb_str_new(name, name_len), rb_str_new(str, len));
+}
- rb_hash_aset(hash_env, rb_str_new2(name),
- rb_str_new((const char *) str->start, (long) str->length));
- return NXT_OK;
+nxt_inline void
+nxt_ruby_add_str(VALUE hash_env,
+ const char *name, uint32_t name_len, char *str, uint32_t len)
+{
+ rb_hash_aset(hash_env, rb_str_new(name, name_len), rb_str_new(str, len));
}
static nxt_int_t
nxt_ruby_rack_result_status(VALUE result)
{
- VALUE status;
- u_char *p;
- size_t len;
- nxt_int_t rc;
- u_char buf[3];
+ VALUE status;
status = rb_ary_entry(result, 0);
if (TYPE(status) == T_FIXNUM) {
- nxt_sprintf(buf, buf + 3, "%03d", FIX2INT(status));
-
- p = buf;
- len = 3;
-
- } else if (TYPE(status) == T_STRING) {
- p = (u_char *) RSTRING_PTR(status);
- len = RSTRING_LEN(status);
-
- } else {
- nxt_log(nxt_ruby_run_ctx.task, NXT_LOG_ERR,
- "Ruby: Invalid response 'status' format from application");
-
- return NXT_ERROR;
+ return FIX2INT(status);
}
- rc = nxt_ruby_write(nxt_ruby_run_ctx.task, nxt_ruby_run_ctx.wmsg,
- (u_char *) "Status: ", nxt_length("Status: "), 0, 0);
- if (nxt_slow_path(rc != NXT_OK)) {
- return NXT_ERROR;
+ if (TYPE(status) == T_STRING) {
+ return nxt_int_parse((u_char *) RSTRING_PTR(status),
+ RSTRING_LEN(status));
}
- rc = nxt_ruby_write(nxt_ruby_run_ctx.task, nxt_ruby_run_ctx.wmsg,
- p, len, 0, 0);
- if (nxt_slow_path(rc != NXT_OK)) {
- return NXT_ERROR;
- }
-
- rc = nxt_ruby_write(nxt_ruby_run_ctx.task, nxt_ruby_run_ctx.wmsg,
- (u_char *) "\r\n", nxt_length("\r\n"), 0, 0);
- if (nxt_slow_path(rc != NXT_OK)) {
- return NXT_ERROR;
- }
+ nxt_unit_req_error(nxt_ruby_run_ctx.req, "Ruby: Invalid response 'status' "
+ "format from application");
- return NXT_OK;
+ return -2;
}
-nxt_inline nxt_int_t
-nxt_ruby_write(nxt_task_t *task, nxt_app_wmsg_t *wmsg,
- const u_char *data, size_t len, nxt_bool_t flush, nxt_bool_t last)
-{
- nxt_int_t rc;
-
- rc = nxt_app_msg_write_raw(task, wmsg, data, len);
- if (nxt_slow_path(rc != NXT_OK)) {
- return rc;
- }
-
- if (flush || last) {
- rc = nxt_app_msg_flush(task, wmsg, last);
- }
-
- return rc;
-}
+typedef struct {
+ int rc;
+ uint32_t fields;
+ uint32_t size;
+} nxt_ruby_headers_info_t;
-static nxt_int_t
-nxt_ruby_rack_result_headers(VALUE result)
+static int
+nxt_ruby_rack_result_headers(VALUE result, nxt_int_t status)
{
- VALUE headers;
- nxt_int_t rc;
+ int rc;
+ VALUE headers;
+ nxt_ruby_headers_info_t headers_info;
headers = rb_ary_entry(result, 1);
if (nxt_slow_path(TYPE(headers) != T_HASH)) {
- nxt_log(nxt_ruby_run_ctx.task, NXT_LOG_ERR,
- "Ruby: Invalid response 'headers' format from application");
+ nxt_unit_req_error(nxt_ruby_run_ctx.req,
+ "Ruby: Invalid response 'headers' format from "
+ "application");
- return NXT_ERROR;
+ return NXT_UNIT_ERROR;
}
- rc = NXT_OK;
+ rc = NXT_UNIT_OK;
- rb_hash_foreach(headers, nxt_ruby_hash_foreach, (VALUE) (uintptr_t) &rc);
- if (nxt_slow_path(rc != NXT_OK)) {
- return NXT_ERROR;
+ headers_info.rc = NXT_UNIT_OK;
+ headers_info.fields = 0;
+ headers_info.size = 0;
+
+ rb_hash_foreach(headers, nxt_ruby_hash_info,
+ (VALUE) (uintptr_t) &headers_info);
+ if (nxt_slow_path(headers_info.rc != NXT_UNIT_OK)) {
+ return headers_info.rc;
}
- rc = nxt_ruby_write(nxt_ruby_run_ctx.task, nxt_ruby_run_ctx.wmsg,
- (u_char *) "\r\n", nxt_length("\r\n"), 0, 0);
- if (nxt_slow_path(rc != NXT_OK)) {
- return NXT_ERROR;
+ rc = nxt_unit_response_init(nxt_ruby_run_ctx.req, status,
+ headers_info.fields, headers_info.size);
+ if (nxt_slow_path(rc != NXT_UNIT_OK)) {
+ return rc;
}
- return NXT_OK;
+ rb_hash_foreach(headers, nxt_ruby_hash_add, (VALUE) (uintptr_t) &rc);
+
+ return rc;
}
static int
-nxt_ruby_hash_foreach(VALUE r_key, VALUE r_value, VALUE arg)
+nxt_ruby_hash_info(VALUE r_key, VALUE r_value, VALUE arg)
{
- nxt_int_t rc, *rc_p;
- const char *value, *value_end, *pos;
+ const char *value, *value_end, *pos;
+ nxt_ruby_headers_info_t *headers_info;
- rc_p = (nxt_int_t *) (uintptr_t) arg;
+ headers_info = (void *) (uintptr_t) arg;
if (nxt_slow_path(TYPE(r_key) != T_STRING)) {
- nxt_log(nxt_ruby_run_ctx.task, NXT_LOG_ERR,
- "Ruby: Wrong header entry 'key' from application");
+ nxt_unit_req_error(nxt_ruby_run_ctx.req,
+ "Ruby: Wrong header entry 'key' from application");
goto fail;
}
if (nxt_slow_path(TYPE(r_value) != T_STRING)) {
- nxt_log(nxt_ruby_run_ctx.task, NXT_LOG_ERR,
- "Ruby: Wrong header entry 'value' from application");
+ nxt_unit_req_error(nxt_ruby_run_ctx.req,
+ "Ruby: Wrong header entry 'value' from application");
goto fail;
}
@@ -708,70 +620,86 @@ nxt_ruby_hash_foreach(VALUE r_key, VALUE r_value, VALUE arg)
break;
}
- rc = nxt_ruby_head_send_part(RSTRING_PTR(r_key), RSTRING_LEN(r_key),
- value, pos - value);
- if (nxt_slow_path(rc != NXT_OK)) {
- goto fail;
- }
+ headers_info->fields++;
+ headers_info->size += RSTRING_LEN(r_key) + (pos - value);
pos++;
value = pos;
}
if (value <= value_end) {
- rc = nxt_ruby_head_send_part(RSTRING_PTR(r_key), RSTRING_LEN(r_key),
- value, value_end - value);
- if (nxt_slow_path(rc != NXT_OK)) {
- goto fail;
- }
+ headers_info->fields++;
+ headers_info->size += RSTRING_LEN(r_key) + (value_end - value);
}
- *rc_p = NXT_OK;
-
return ST_CONTINUE;
fail:
- *rc_p = NXT_ERROR;
+ headers_info->rc = NXT_UNIT_ERROR;
return ST_STOP;
}
-static nxt_int_t
-nxt_ruby_head_send_part(const char *key, size_t key_size,
- const char *value, size_t value_size)
+static int
+nxt_ruby_hash_add(VALUE r_key, VALUE r_value, VALUE arg)
{
- nxt_int_t rc;
+ int *rc;
+ uint32_t key_len;
+ const char *value, *value_end, *pos;
- rc = nxt_app_msg_write_raw(nxt_ruby_run_ctx.task, nxt_ruby_run_ctx.wmsg,
- (u_char *) key, key_size);
- if (nxt_slow_path(rc != NXT_OK)) {
- return rc;
- }
+ rc = (int *) (uintptr_t) arg;
- rc = nxt_app_msg_write_raw(nxt_ruby_run_ctx.task, nxt_ruby_run_ctx.wmsg,
- (u_char *) ": ", nxt_length(": "));
- if (nxt_slow_path(rc != NXT_OK)) {
- return rc;
+ value = RSTRING_PTR(r_value);
+ value_end = value + RSTRING_LEN(r_value);
+
+ key_len = RSTRING_LEN(r_key);
+
+ pos = value;
+
+ for ( ;; ) {
+ pos = strchr(pos, '\n');
+
+ if (pos == NULL) {
+ break;
+ }
+
+ *rc = nxt_unit_response_add_field(nxt_ruby_run_ctx.req,
+ RSTRING_PTR(r_key), key_len,
+ value, pos - value);
+ if (nxt_slow_path(*rc != NXT_UNIT_OK)) {
+ goto fail;
+ }
+
+ pos++;
+ value = pos;
}
- rc = nxt_app_msg_write_raw(nxt_ruby_run_ctx.task, nxt_ruby_run_ctx.wmsg,
- (u_char *) value, value_size);
- if (nxt_slow_path(rc != NXT_OK)) {
- return rc;
+ if (value <= value_end) {
+ *rc = nxt_unit_response_add_field(nxt_ruby_run_ctx.req,
+ RSTRING_PTR(r_key), key_len,
+ value, value_end - value);
+ if (nxt_slow_path(*rc != NXT_UNIT_OK)) {
+ goto fail;
+ }
}
- return nxt_app_msg_write_raw(nxt_ruby_run_ctx.task, nxt_ruby_run_ctx.wmsg,
- (u_char *) "\r\n", nxt_length("\r\n"));
+ return ST_CONTINUE;
+
+fail:
+
+ *rc = NXT_UNIT_ERROR;
+
+ return ST_STOP;
}
-static nxt_int_t
+static int
nxt_ruby_rack_result_body(VALUE result)
{
- VALUE fn, body;
- nxt_int_t rc;
+ int rc;
+ VALUE fn, body;
body = rb_ary_entry(result, 2);
@@ -779,120 +707,134 @@ nxt_ruby_rack_result_body(VALUE result)
fn = rb_funcall(body, rb_intern("to_path"), 0);
if (nxt_slow_path(TYPE(fn) != T_STRING)) {
- nxt_log(nxt_ruby_run_ctx.task, NXT_LOG_ERR,
- "Ruby: Failed to get 'body' file path from application");
+ nxt_unit_req_error(nxt_ruby_run_ctx.req,
+ "Ruby: Failed to get 'body' file path from "
+ "application");
- return NXT_ERROR;
+ return NXT_UNIT_ERROR;
}
rc = nxt_ruby_rack_result_body_file_write(fn);
- if (nxt_slow_path(rc != NXT_OK)) {
- return NXT_ERROR;
+ if (nxt_slow_path(rc != NXT_UNIT_OK)) {
+ return rc;
}
} else if (rb_respond_to(body, rb_intern("each"))) {
rb_iterate(rb_each, body, nxt_ruby_rack_result_body_each, 0);
} else {
- nxt_log(nxt_ruby_run_ctx.task, NXT_LOG_ERR,
- "Ruby: Invalid response 'body' format from application");
+ nxt_unit_req_error(nxt_ruby_run_ctx.req,
+ "Ruby: Invalid response 'body' format "
+ "from application");
- return NXT_ERROR;
+ return NXT_UNIT_ERROR;
}
if (rb_respond_to(body, rb_intern("close"))) {
rb_funcall(body, rb_intern("close"), 0);
}
- return NXT_OK;
+ return NXT_UNIT_OK;
}
-static nxt_int_t
-nxt_ruby_rack_result_body_file_write(VALUE filepath)
+typedef struct {
+ int fd;
+ off_t pos;
+ off_t rest;
+} nxt_ruby_rack_file_t;
+
+
+static ssize_t
+nxt_ruby_rack_file_read(nxt_unit_read_info_t *read_info, void *dst, size_t size)
{
- size_t len;
- ssize_t n;
- nxt_off_t rest;
- nxt_int_t rc;
- nxt_file_t file;
- nxt_file_info_t finfo;
- u_char buf[8192];
+ ssize_t res;
+ nxt_ruby_rack_file_t *file;
- nxt_memzero(&file, sizeof(nxt_file_t));
+ file = read_info->data;
- file.name = (nxt_file_name_t *) RSTRING_PTR(filepath);
+ size = nxt_min(size, (size_t) file->rest);
- rc = nxt_file_open(nxt_ruby_run_ctx.task, &file, NXT_FILE_RDONLY,
- NXT_FILE_OPEN, 0);
- if (nxt_slow_path(rc != NXT_OK)) {
- nxt_log(nxt_ruby_run_ctx.task, NXT_LOG_ERR,
- "Ruby: Failed to open 'body' file: %s",
- (const char *) file.name);
+ res = pread(file->fd, dst, size, file->pos);
- return NXT_ERROR;
+ if (res >= 0) {
+ file->pos += res;
+ file->rest -= res;
+
+ if (size > (size_t) res) {
+ file->rest = 0;
+ }
}
- rc = nxt_file_info(&file, &finfo);
- if (nxt_slow_path(rc != NXT_OK)) {
- nxt_log(nxt_ruby_run_ctx.task, NXT_LOG_ERR,
- "Ruby: Failed to get 'body' file information: %s",
- (const char *) file.name);
+ read_info->eof = file->rest == 0;
- goto fail;
- }
+ return res;
+}
- rest = nxt_file_size(&finfo);
- while (rest != 0) {
- len = nxt_min(rest, (nxt_off_t) sizeof(buf));
+static int
+nxt_ruby_rack_result_body_file_write(VALUE filepath)
+{
+ int fd, rc;
+ struct stat finfo;
+ nxt_ruby_rack_file_t ruby_file;
+ nxt_unit_read_info_t read_info;
- n = nxt_file_read(&file, buf, len, nxt_file_size(&finfo) - rest);
- if (nxt_slow_path(n != (ssize_t) len)) {
- nxt_log(nxt_ruby_run_ctx.task, NXT_LOG_ERR,
- "Ruby: Failed to read 'body' file");
+ fd = open(RSTRING_PTR(filepath), O_RDONLY, 0);
+ if (nxt_slow_path(fd == -1)) {
+ nxt_unit_req_error(nxt_ruby_run_ctx.req,
+ "Ruby: Failed to open content file \"%s\": %s (%d)",
+ RSTRING_PTR(filepath), strerror(errno), errno);
- goto fail;
- }
+ return NXT_UNIT_ERROR;
+ }
- rest -= len;
+ rc = fstat(fd, &finfo);
+ if (nxt_slow_path(rc == -1)) {
+ nxt_unit_req_error(nxt_ruby_run_ctx.req,
+ "Ruby: Content file fstat(\"%s\") failed: %s (%d)",
+ RSTRING_PTR(filepath), strerror(errno), errno);
- rc = nxt_app_msg_write_raw(nxt_ruby_run_ctx.task, nxt_ruby_run_ctx.wmsg,
- buf, len);
- if (nxt_slow_path(rc != NXT_OK)) {
- nxt_log(nxt_ruby_run_ctx.task, NXT_LOG_ERR,
- "Ruby: Failed to write 'body' from application");
+ close(fd);
- goto fail;
- }
+ return NXT_UNIT_ERROR;
}
- nxt_file_close(nxt_ruby_run_ctx.task, &file);
+ ruby_file.fd = fd;
+ ruby_file.pos = 0;
+ ruby_file.rest = finfo.st_size;
- return NXT_OK;
+ read_info.read = nxt_ruby_rack_file_read;
+ read_info.eof = ruby_file.rest == 0;
+ read_info.buf_size = ruby_file.rest;
+ read_info.data = &ruby_file;
-fail:
+ rc = nxt_unit_response_write_cb(nxt_ruby_run_ctx.req, &read_info);
+ if (nxt_slow_path(rc != NXT_UNIT_OK)) {
+ nxt_unit_req_error(nxt_ruby_run_ctx.req,
+ "Ruby: Failed to write content file.");
+ }
- nxt_file_close(nxt_ruby_run_ctx.task, &file);
+ close(fd);
- return NXT_ERROR;
+ return rc;
}
static VALUE
nxt_ruby_rack_result_body_each(VALUE body)
{
- nxt_int_t rc;
+ int rc;
if (TYPE(body) != T_STRING) {
return Qnil;
}
- rc = nxt_app_msg_write_raw(nxt_ruby_run_ctx.task, nxt_ruby_run_ctx.wmsg,
- (u_char *) RSTRING_PTR(body), RSTRING_LEN(body));
- if (nxt_slow_path(rc != NXT_OK)) {
- nxt_log(nxt_ruby_run_ctx.task, NXT_LOG_ERR,
- "Ruby: Failed to write 'body' from application");
+ rc = nxt_unit_response_write(nxt_ruby_run_ctx.req, RSTRING_PTR(body),
+ RSTRING_LEN(body));
+ if (nxt_slow_path(rc != NXT_UNIT_OK)) {
+ nxt_unit_req_error(nxt_ruby_run_ctx.req,
+ "Ruby: Failed to write 'body' from application");
}
return Qnil;
@@ -905,30 +847,51 @@ nxt_ruby_exception_log(nxt_task_t *task, uint32_t level, const char *desc)
int i;
VALUE err, ary, eclass, msg;
- nxt_log(task, level, "Ruby: %s", desc);
+ if (task != NULL) {
+ nxt_log(task, level, "Ruby: %s", desc);
+
+ } else {
+ nxt_unit_log(nxt_ruby_run_ctx.unit_ctx, level, "Ruby: %s", desc);
+ }
err = rb_errinfo();
- ary = rb_funcall(err, rb_intern("backtrace"), 0);
+ if (nxt_slow_path(err == Qnil)) {
+ return;
+ }
- if (RARRAY_LEN(ary) == 0) {
+ ary = rb_funcall(err, rb_intern("backtrace"), 0);
+ if (nxt_slow_path(RARRAY_LEN(ary) == 0)) {
return;
}
eclass = rb_class_name(rb_class_of(err));
msg = rb_funcall(err, rb_intern("message"), 0);
- nxt_log(task, level, "Ruby: %s: %s (%s)",
- RSTRING_PTR(RARRAY_PTR(ary)[0]),
- RSTRING_PTR(msg), RSTRING_PTR(eclass));
+ if (task != NULL) {
+ nxt_log(task, level, "Ruby: %s: %s (%s)",
+ RSTRING_PTR(RARRAY_PTR(ary)[0]),
+ RSTRING_PTR(msg), RSTRING_PTR(eclass));
+
+ } else {
+ nxt_unit_log(nxt_ruby_run_ctx.unit_ctx, level, "Ruby: %s: %s (%s)",
+ RSTRING_PTR(RARRAY_PTR(ary)[0]),
+ RSTRING_PTR(msg), RSTRING_PTR(eclass));
+ }
for (i = 1; i < RARRAY_LEN(ary); i++) {
- nxt_log(task, level, "from %s", RSTRING_PTR(RARRAY_PTR(ary)[i]));
+ if (task != NULL) {
+ nxt_log(task, level, "from %s", RSTRING_PTR(RARRAY_PTR(ary)[i]));
+
+ } else {
+ nxt_unit_log(nxt_ruby_run_ctx.unit_ctx, level, "from %s",
+ RSTRING_PTR(RARRAY_PTR(ary)[i]));
+ }
}
}
static void
-nxt_ruby_atexit(nxt_task_t *task)
+nxt_ruby_atexit(void)
{
rb_gc_unregister_address(&nxt_ruby_io_input);
rb_gc_unregister_address(&nxt_ruby_io_error);
diff --git a/src/ruby/nxt_ruby.h b/src/ruby/nxt_ruby.h
index 9a6be0d4..77a65894 100644
--- a/src/ruby/nxt_ruby.h
+++ b/src/ruby/nxt_ruby.h
@@ -17,14 +17,12 @@
#include <nxt_router.h>
#include <nxt_runtime.h>
#include <nxt_application.h>
+#include <nxt_unit_typedefs.h>
typedef struct {
- nxt_task_t *task;
- nxt_app_rmsg_t *rmsg;
- nxt_app_wmsg_t *wmsg;
-
- size_t body_preread_size;
+ nxt_unit_ctx_t *unit_ctx;
+ nxt_unit_request_info_t *req;
} nxt_ruby_run_ctx_t;
diff --git a/src/ruby/nxt_ruby_stream_io.c b/src/ruby/nxt_ruby_stream_io.c
index eee11c3c..3f6cac89 100644
--- a/src/ruby/nxt_ruby_stream_io.c
+++ b/src/ruby/nxt_ruby_stream_io.c
@@ -5,12 +5,12 @@
*/
#include <ruby/nxt_ruby.h>
+#include <nxt_unit.h>
static VALUE nxt_ruby_stream_io_new(VALUE class, VALUE wrap);
static VALUE nxt_ruby_stream_io_initialize(int argc, VALUE *argv, VALUE self);
static VALUE nxt_ruby_stream_io_gets(VALUE obj, VALUE args);
-static size_t nxt_ruby_stream_io_read_line(nxt_app_rmsg_t *rmsg, VALUE str);
static VALUE nxt_ruby_stream_io_each(VALUE obj, VALUE args);
static VALUE nxt_ruby_stream_io_read(VALUE obj, VALUE args);
static VALUE nxt_ruby_stream_io_rewind(VALUE obj, VALUE args);
@@ -85,63 +85,47 @@ nxt_ruby_stream_io_initialize(int argc, VALUE *argv, VALUE self)
static VALUE
nxt_ruby_stream_io_gets(VALUE obj, VALUE args)
{
- VALUE buf;
- nxt_ruby_run_ctx_t *run_ctx;
+ VALUE buf;
+ char *p;
+ size_t size, b_size;
+ nxt_unit_buf_t *b;
+ nxt_ruby_run_ctx_t *run_ctx;
+ nxt_unit_request_info_t *req;
Data_Get_Struct(obj, nxt_ruby_run_ctx_t, run_ctx);
- if (run_ctx->body_preread_size == 0) {
- return Qnil;
- }
-
- buf = rb_str_buf_new(1);
+ req = run_ctx->req;
- if (buf == Qnil) {
+ if (req->content_length == 0) {
return Qnil;
}
- run_ctx->body_preread_size -= nxt_ruby_stream_io_read_line(run_ctx->rmsg,
- buf);
-
- return buf;
-}
-
-
-static size_t
-nxt_ruby_stream_io_read_line(nxt_app_rmsg_t *rmsg, VALUE str)
-{
- size_t len, size;
- u_char *p;
- nxt_buf_t *buf;
-
- len = 0;
+ size = 0;
- for (buf = rmsg->buf; buf != NULL; buf = buf->next) {
-
- size = nxt_buf_mem_used_size(&buf->mem);
- p = memchr(buf->mem.pos, '\n', size);
+ for (b = req->content_buf; b; b = nxt_unit_buf_next(b)) {
+ b_size = b->end - b->free;
+ p = memchr(b->free, '\n', b_size);
if (p != NULL) {
p++;
- size = p - buf->mem.pos;
-
- rb_str_cat(str, (const char *) buf->mem.pos, size);
-
- len += size;
- buf->mem.pos = p;
-
+ size += p - b->free;
break;
}
- rb_str_cat(str, (const char *) buf->mem.pos, size);
+ size += b_size;
+ }
+
+ buf = rb_str_buf_new(size);
- len += size;
- buf->mem.pos = buf->mem.free;
+ if (buf == Qnil) {
+ return Qnil;
}
- rmsg->buf = buf;
+ size = nxt_unit_request_read(req, RSTRING_PTR(buf), size);
+
+ rb_str_set_len(buf, size);
- return len;
+ return buf;
}
@@ -173,12 +157,11 @@ nxt_ruby_stream_io_read(VALUE obj, VALUE args)
{
VALUE buf;
long copy_size, u_size;
- size_t len;
nxt_ruby_run_ctx_t *run_ctx;
Data_Get_Struct(obj, nxt_ruby_run_ctx_t, run_ctx);
- copy_size = run_ctx->body_preread_size;
+ copy_size = run_ctx->req->content_length;
if (RARRAY_LEN(args) > 0 && TYPE(RARRAY_PTR(args)[0]) == T_FIXNUM) {
u_size = NUM2LONG(RARRAY_PTR(args)[0]);
@@ -202,8 +185,8 @@ nxt_ruby_stream_io_read(VALUE obj, VALUE args)
return Qnil;
}
- len = nxt_app_msg_read_raw(run_ctx->task, run_ctx->rmsg,
- RSTRING_PTR(buf), (size_t) copy_size);
+ copy_size = nxt_unit_request_read(run_ctx->req, RSTRING_PTR(buf),
+ copy_size);
if (RARRAY_LEN(args) > 1 && TYPE(RARRAY_PTR(args)[1]) == T_STRING) {
@@ -211,9 +194,7 @@ nxt_ruby_stream_io_read(VALUE obj, VALUE args)
rb_str_cat(RARRAY_PTR(args)[1], RSTRING_PTR(buf), copy_size);
}
- rb_str_set_len(buf, (long) len);
-
- run_ctx->body_preread_size -= len;
+ rb_str_set_len(buf, copy_size);
return buf;
}
@@ -276,8 +257,7 @@ nxt_ruby_stream_io_s_write(nxt_ruby_run_ctx_t *run_ctx, VALUE val)
}
}
- nxt_log_error(NXT_LOG_ERR, run_ctx->task->log, "Ruby: %s",
- RSTRING_PTR(val));
+ nxt_unit_req_error(run_ctx->req, "Ruby: %s", RSTRING_PTR(val));
return RSTRING_LEN(val);
}