diff options
Diffstat (limited to '')
-rw-r--r-- | src/ruby/nxt_ruby.c | 693 | ||||
-rw-r--r-- | src/ruby/nxt_ruby.h | 8 | ||||
-rw-r--r-- | src/ruby/nxt_ruby_stream_io.c | 78 |
3 files changed, 360 insertions, 419 deletions
diff --git a/src/ruby/nxt_ruby.c b/src/ruby/nxt_ruby.c index ea05b133..a08b8189 100644 --- a/src/ruby/nxt_ruby.c +++ b/src/ruby/nxt_ruby.c @@ -5,6 +5,9 @@ #include <ruby/nxt_ruby.h> +#include <nxt_unit.h> +#include <nxt_unit_request.h> + #define NXT_RUBY_RACK_API_VERSION_MAJOR 1 #define NXT_RUBY_RACK_API_VERSION_MINOR 3 @@ -35,29 +38,26 @@ static VALUE nxt_ruby_bundler_setup(VALUE arg); static VALUE nxt_ruby_require_rack(VALUE arg); static VALUE nxt_ruby_rack_parse_script(VALUE ctx); static VALUE nxt_ruby_rack_env_create(VALUE arg); -static nxt_int_t nxt_ruby_run(nxt_task_t *task, nxt_app_rmsg_t *rmsg, - nxt_app_wmsg_t *wmsg); +static void nxt_ruby_request_handler(nxt_unit_request_info_t *req); static VALUE nxt_ruby_rack_app_run(VALUE arg); -static nxt_int_t nxt_ruby_read_request(nxt_ruby_run_ctx_t *run_ctx, - VALUE hash_env); -nxt_inline nxt_int_t nxt_ruby_read_add_env(nxt_task_t *task, - nxt_app_rmsg_t *rmsg, VALUE hash_env, const char *name, nxt_str_t *str); +static int nxt_ruby_read_request(VALUE hash_env); +nxt_inline void nxt_ruby_add_sptr(VALUE hash_env, + const char *name, uint32_t name_len, nxt_unit_sptr_t *sptr, uint32_t len); +nxt_inline void nxt_ruby_add_str(VALUE hash_env, + const char *name, uint32_t name_len, char *str, uint32_t len); static nxt_int_t nxt_ruby_rack_result_status(VALUE result); -nxt_inline nxt_int_t nxt_ruby_write(nxt_task_t *task, nxt_app_wmsg_t *wmsg, - const u_char *data, size_t len, nxt_bool_t flush, nxt_bool_t last); -static nxt_int_t nxt_ruby_rack_result_headers(VALUE result); -static int nxt_ruby_hash_foreach(VALUE r_key, VALUE r_value, VALUE arg); -static nxt_int_t nxt_ruby_head_send_part(const char *key, size_t key_size, - const char *value, size_t value_size); -static nxt_int_t nxt_ruby_rack_result_body(VALUE result); -static nxt_int_t nxt_ruby_rack_result_body_file_write(VALUE filepath); +static int nxt_ruby_rack_result_headers(VALUE result, nxt_int_t status); +static int nxt_ruby_hash_info(VALUE r_key, VALUE r_value, VALUE arg); +static int nxt_ruby_hash_add(VALUE r_key, VALUE r_value, VALUE arg); +static int nxt_ruby_rack_result_body(VALUE result); +static int nxt_ruby_rack_result_body_file_write(VALUE filepath); static VALUE nxt_ruby_rack_result_body_each(VALUE body); static void nxt_ruby_exception_log(nxt_task_t *task, uint32_t level, const char *desc); -static void nxt_ruby_atexit(nxt_task_t *task); +static void nxt_ruby_atexit(void); static uint32_t compat[] = { @@ -71,22 +71,22 @@ static VALUE nxt_ruby_io_input; static VALUE nxt_ruby_io_error; static nxt_ruby_run_ctx_t nxt_ruby_run_ctx; -NXT_EXPORT nxt_application_module_t nxt_app_module = { +NXT_EXPORT nxt_app_module_t nxt_app_module = { sizeof(compat), compat, nxt_string("ruby"), ruby_version, nxt_ruby_init, - nxt_ruby_run, - nxt_ruby_atexit, }; static nxt_int_t nxt_ruby_init(nxt_task_t *task, nxt_common_app_conf_t *conf) { - int state; + int state, rc; VALUE dummy, res; + nxt_unit_ctx_t *unit_ctx; + nxt_unit_init_t ruby_unit_init; nxt_ruby_rack_init_t rack_init; ruby_init(); @@ -128,6 +128,27 @@ nxt_ruby_init(nxt_task_t *task, nxt_common_app_conf_t *conf) rb_gc_register_address(&nxt_ruby_call); rb_gc_register_address(&nxt_ruby_env); + nxt_unit_default_init(task, &ruby_unit_init); + + ruby_unit_init.callbacks.request_handler = nxt_ruby_request_handler; + + unit_ctx = nxt_unit_init(&ruby_unit_init); + if (nxt_slow_path(unit_ctx == NULL)) { + return NXT_ERROR; + } + + nxt_ruby_run_ctx.unit_ctx = unit_ctx; + + rc = nxt_unit_run(unit_ctx); + + nxt_ruby_atexit(); + + nxt_ruby_run_ctx.unit_ctx = NULL; + + nxt_unit_done(unit_ctx); + + exit(rc); + return NXT_OK; } @@ -319,82 +340,75 @@ nxt_ruby_rack_env_create(VALUE arg) } -static nxt_int_t -nxt_ruby_run(nxt_task_t *task, nxt_app_rmsg_t *rmsg, nxt_app_wmsg_t *wmsg) +static void +nxt_ruby_request_handler(nxt_unit_request_info_t *req) { int state; VALUE res; - nxt_ruby_run_ctx.task = task; - nxt_ruby_run_ctx.rmsg = rmsg; - nxt_ruby_run_ctx.wmsg = wmsg; + nxt_ruby_run_ctx.req = req; res = rb_protect(nxt_ruby_rack_app_run, Qnil, &state); - if (nxt_slow_path(state != 0)) { - nxt_ruby_exception_log(nxt_ruby_run_ctx.task, NXT_LOG_ERR, + if (nxt_slow_path(res == Qnil || state != 0)) { + nxt_ruby_exception_log(NULL, NXT_LOG_ERR, "Failed to run ruby script"); - return NXT_ERROR; - } - - if (nxt_slow_path(res == Qnil)) { - return NXT_ERROR; } - - return NXT_OK; } static VALUE nxt_ruby_rack_app_run(VALUE arg) { + int rc; VALUE env, result; - nxt_int_t rc; + nxt_int_t status; env = rb_hash_dup(nxt_ruby_env); - rc = nxt_ruby_read_request(&nxt_ruby_run_ctx, env); - if (nxt_slow_path(rc != NXT_OK)) { - nxt_alert(nxt_ruby_run_ctx.task, - "Ruby: Failed to process incoming request"); + rc = nxt_ruby_read_request(env); + if (nxt_slow_path(rc != NXT_UNIT_OK)) { + nxt_unit_req_alert(nxt_ruby_run_ctx.req, + "Ruby: Failed to process incoming request"); goto fail; } result = rb_funcall(nxt_ruby_rackup, nxt_ruby_call, 1, env); if (nxt_slow_path(TYPE(result) != T_ARRAY)) { - nxt_log(nxt_ruby_run_ctx.task, NXT_LOG_ERR, - "Ruby: Invalid response format from application"); + nxt_unit_req_error(nxt_ruby_run_ctx.req, + "Ruby: Invalid response format from application"); goto fail; } if (nxt_slow_path(RARRAY_LEN(result) != 3)) { - nxt_log(nxt_ruby_run_ctx.task, NXT_LOG_ERR, - "Ruby: Invalid response format from application. " - "Need 3 entries [Status, Headers, Body]"); + nxt_unit_req_error(nxt_ruby_run_ctx.req, + "Ruby: Invalid response format from application. " + "Need 3 entries [Status, Headers, Body]"); goto fail; } - rc = nxt_ruby_rack_result_status(result); - if (nxt_slow_path(rc != NXT_OK)) { + status = nxt_ruby_rack_result_status(result); + if (nxt_slow_path(status < 0)) { + nxt_unit_req_error(nxt_ruby_run_ctx.req, + "Ruby: Invalid response status from application."); + goto fail; } - rc = nxt_ruby_rack_result_headers(result); - if (nxt_slow_path(rc != NXT_OK)) { + rc = nxt_ruby_rack_result_headers(result, status); + if (nxt_slow_path(rc != NXT_UNIT_OK)) { goto fail; } rc = nxt_ruby_rack_result_body(result); - if (nxt_slow_path(rc != NXT_OK)) { + if (nxt_slow_path(rc != NXT_UNIT_OK)) { goto fail; } - rc = nxt_app_msg_flush(nxt_ruby_run_ctx.task, nxt_ruby_run_ctx.wmsg, 1); - if (nxt_slow_path(rc != NXT_OK)) { - goto fail; - } + nxt_unit_request_done(nxt_ruby_run_ctx.req, rc); + nxt_ruby_run_ctx.req = NULL; rb_hash_delete(env, rb_obj_id(env)); @@ -402,296 +416,194 @@ nxt_ruby_rack_app_run(VALUE arg) fail: + nxt_unit_request_done(nxt_ruby_run_ctx.req, NXT_UNIT_ERROR); + nxt_ruby_run_ctx.req = NULL; + rb_hash_delete(env, rb_obj_id(env)); return Qnil; } -static nxt_int_t -nxt_ruby_read_request(nxt_ruby_run_ctx_t *run_ctx, VALUE hash_env) +static int +nxt_ruby_read_request(VALUE hash_env) { - u_char *colon; - size_t query_size; - nxt_int_t rc; - nxt_str_t str, value, path, target; - nxt_str_t host, server_name, server_port; - nxt_task_t *task; - nxt_app_rmsg_t *rmsg; + char *host_start, *port_start; + uint32_t i, host_length, port_length; + nxt_unit_field_t *f; + nxt_unit_request_t *r; - static nxt_str_t def_host = nxt_string("localhost"); - static nxt_str_t def_port = nxt_string("80"); + r = nxt_ruby_run_ctx.req->request; - task = run_ctx->task; - rmsg = run_ctx->rmsg; +#define NL(S) (S), sizeof(S)-1 - rc = nxt_ruby_read_add_env(task, rmsg, hash_env, "REQUEST_METHOD", &str); - if (nxt_slow_path(rc != NXT_OK)) { - return NXT_ERROR; + nxt_ruby_add_sptr(hash_env, NL("REQUEST_METHOD"), &r->method, + r->method_length); + nxt_ruby_add_sptr(hash_env, NL("REQUEST_URI"), &r->target, + r->target_length); + nxt_ruby_add_sptr(hash_env, NL("PATH_INFO"), &r->path, r->path_length); + if (r->query.offset) { + nxt_ruby_add_sptr(hash_env, NL("QUERY_STRING"), &r->query, + r->query_length); } + nxt_ruby_add_sptr(hash_env, NL("SERVER_PROTOCOL"), &r->version, + r->version_length); + nxt_ruby_add_sptr(hash_env, NL("REMOTE_ADDR"), &r->remote, + r->remote_length); + nxt_ruby_add_sptr(hash_env, NL("SERVER_ADDR"), &r->local, r->local_length); - rc = nxt_ruby_read_add_env(task, rmsg, hash_env, "REQUEST_URI", &target); - if (nxt_slow_path(rc != NXT_OK)) { - return NXT_ERROR; - } + for (i = 0; i < r->fields_count; i++) { + f = r->fields + i; - rc = nxt_app_msg_read_str(task, rmsg, &path); - if (nxt_slow_path(rc != NXT_OK)) { - return NXT_ERROR; + nxt_ruby_add_sptr(hash_env, nxt_unit_sptr_get(&f->name), f->name_length, + &f->value, f->value_length); } - rc = nxt_app_msg_read_size(task, rmsg, &query_size); - if (nxt_slow_path(rc != NXT_OK)) { - return NXT_ERROR; - } + if (r->content_length_field != NXT_UNIT_NONE_FIELD) { + f = r->fields + r->content_length_field; - if (path.start == NULL || path.length == 0) { - path = target; + nxt_ruby_add_sptr(hash_env, NL("CONTENT_LENGTH"), + &f->value, f->value_length); } - rb_hash_aset(hash_env, rb_str_new2("PATH_INFO"), - rb_str_new((const char *) path.start, (long) path.length)); + if (r->content_type_field != NXT_UNIT_NONE_FIELD) { + f = r->fields + r->content_type_field; - if (query_size > 0) { - query_size--; - - if (nxt_slow_path(target.length < query_size)) { - return NXT_ERROR; - } - - str.start = &target.start[query_size]; - str.length = target.length - query_size; - - rb_hash_aset(hash_env, rb_str_new2("QUERY_STRING"), - rb_str_new((const char *) str.start, (long) str.length)); + nxt_ruby_add_sptr(hash_env, NL("CONTENT_TYPE"), + &f->value, f->value_length); } - rc = nxt_ruby_read_add_env(task, rmsg, hash_env, "SERVER_PROTOCOL", &str); - if (nxt_slow_path(rc != NXT_OK)) { - return NXT_ERROR; - } + if (r->host_field != NXT_UNIT_NONE_FIELD) { + f = r->fields + r->host_field; - rc = nxt_ruby_read_add_env(task, rmsg, hash_env, "REMOTE_ADDR", &str); - if (nxt_slow_path(rc != NXT_OK)) { - return NXT_ERROR; - } - - rc = nxt_ruby_read_add_env(task, rmsg, hash_env, "SERVER_ADDR", &str); - if (nxt_slow_path(rc != NXT_OK)) { - return NXT_ERROR; - } - - rc = nxt_app_msg_read_str(task, rmsg, &host); - if (nxt_slow_path(rc != NXT_OK)) { - return NXT_ERROR; - } - - if (host.length == 0) { - host = def_host; - } - - colon = nxt_memchr(host.start, ':', host.length); - server_name = host; - - if (colon != NULL) { - server_name.length = colon - host.start; - - server_port.start = colon + 1; - server_port.length = host.length - server_name.length - 1; + host_start = nxt_unit_sptr_get(&f->value); + host_length = f->value_length; } else { - server_port = def_port; - } - - rb_hash_aset(hash_env, rb_str_new2("SERVER_NAME"), - rb_str_new((const char *) server_name.start, - (long) server_name.length)); - - rb_hash_aset(hash_env, rb_str_new2("SERVER_PORT"), - rb_str_new((const char *) server_port.start, - (long) server_port.length)); - - rc = nxt_ruby_read_add_env(task, rmsg, hash_env, "CONTENT_TYPE", &str); - if (nxt_slow_path(rc != NXT_OK)) { - return NXT_ERROR; - } - - rc = nxt_ruby_read_add_env(task, rmsg, hash_env, "CONTENT_LENGTH", &str); - if (nxt_slow_path(rc != NXT_OK)) { - return NXT_ERROR; + host_start = NULL; + host_length = 0; } - for ( ;; ) { - rc = nxt_app_msg_read_str(task, rmsg, &str); - if (nxt_slow_path(rc != NXT_OK)) { - return NXT_ERROR; - } + nxt_unit_split_host(host_start, host_length, &host_start, &host_length, + &port_start, &port_length); - if (nxt_slow_path(str.length == 0)) { - break; - } + nxt_ruby_add_str(hash_env, NL("SERVER_NAME"), host_start, host_length); + nxt_ruby_add_str(hash_env, NL("SERVER_PORT"), port_start, port_length); - rc = nxt_app_msg_read_str(task, rmsg, &value); - if (nxt_slow_path(rc != NXT_OK)) { - return NXT_ERROR; - } - - rb_hash_aset(hash_env, - rb_str_new((char *) str.start, (long) str.length), - rb_str_new((const char *) value.start, - (long) value.length)); - } +#undef NL - rc = nxt_app_msg_read_size(task, rmsg, &run_ctx->body_preread_size); - if (nxt_slow_path(rc != NXT_OK)) { - return NXT_ERROR; - } - - return NXT_OK; + return NXT_UNIT_OK; } -nxt_inline nxt_int_t -nxt_ruby_read_add_env(nxt_task_t *task, nxt_app_rmsg_t *rmsg, VALUE hash_env, - const char *name, nxt_str_t *str) +nxt_inline void +nxt_ruby_add_sptr(VALUE hash_env, + const char *name, uint32_t name_len, nxt_unit_sptr_t *sptr, uint32_t len) { - nxt_int_t rc; + char *str; - rc = nxt_app_msg_read_str(task, rmsg, str); - if (nxt_slow_path(rc != NXT_OK)) { - return rc; - } + str = nxt_unit_sptr_get(sptr); - if (str->start == NULL) { - rb_hash_aset(hash_env, rb_str_new2(name), Qnil); - return NXT_OK; - } + rb_hash_aset(hash_env, rb_str_new(name, name_len), rb_str_new(str, len)); +} - rb_hash_aset(hash_env, rb_str_new2(name), - rb_str_new((const char *) str->start, (long) str->length)); - return NXT_OK; +nxt_inline void +nxt_ruby_add_str(VALUE hash_env, + const char *name, uint32_t name_len, char *str, uint32_t len) +{ + rb_hash_aset(hash_env, rb_str_new(name, name_len), rb_str_new(str, len)); } static nxt_int_t nxt_ruby_rack_result_status(VALUE result) { - VALUE status; - u_char *p; - size_t len; - nxt_int_t rc; - u_char buf[3]; + VALUE status; status = rb_ary_entry(result, 0); if (TYPE(status) == T_FIXNUM) { - nxt_sprintf(buf, buf + 3, "%03d", FIX2INT(status)); - - p = buf; - len = 3; - - } else if (TYPE(status) == T_STRING) { - p = (u_char *) RSTRING_PTR(status); - len = RSTRING_LEN(status); - - } else { - nxt_log(nxt_ruby_run_ctx.task, NXT_LOG_ERR, - "Ruby: Invalid response 'status' format from application"); - - return NXT_ERROR; + return FIX2INT(status); } - rc = nxt_ruby_write(nxt_ruby_run_ctx.task, nxt_ruby_run_ctx.wmsg, - (u_char *) "Status: ", nxt_length("Status: "), 0, 0); - if (nxt_slow_path(rc != NXT_OK)) { - return NXT_ERROR; + if (TYPE(status) == T_STRING) { + return nxt_int_parse((u_char *) RSTRING_PTR(status), + RSTRING_LEN(status)); } - rc = nxt_ruby_write(nxt_ruby_run_ctx.task, nxt_ruby_run_ctx.wmsg, - p, len, 0, 0); - if (nxt_slow_path(rc != NXT_OK)) { - return NXT_ERROR; - } - - rc = nxt_ruby_write(nxt_ruby_run_ctx.task, nxt_ruby_run_ctx.wmsg, - (u_char *) "\r\n", nxt_length("\r\n"), 0, 0); - if (nxt_slow_path(rc != NXT_OK)) { - return NXT_ERROR; - } + nxt_unit_req_error(nxt_ruby_run_ctx.req, "Ruby: Invalid response 'status' " + "format from application"); - return NXT_OK; + return -2; } -nxt_inline nxt_int_t -nxt_ruby_write(nxt_task_t *task, nxt_app_wmsg_t *wmsg, - const u_char *data, size_t len, nxt_bool_t flush, nxt_bool_t last) -{ - nxt_int_t rc; - - rc = nxt_app_msg_write_raw(task, wmsg, data, len); - if (nxt_slow_path(rc != NXT_OK)) { - return rc; - } - - if (flush || last) { - rc = nxt_app_msg_flush(task, wmsg, last); - } - - return rc; -} +typedef struct { + int rc; + uint32_t fields; + uint32_t size; +} nxt_ruby_headers_info_t; -static nxt_int_t -nxt_ruby_rack_result_headers(VALUE result) +static int +nxt_ruby_rack_result_headers(VALUE result, nxt_int_t status) { - VALUE headers; - nxt_int_t rc; + int rc; + VALUE headers; + nxt_ruby_headers_info_t headers_info; headers = rb_ary_entry(result, 1); if (nxt_slow_path(TYPE(headers) != T_HASH)) { - nxt_log(nxt_ruby_run_ctx.task, NXT_LOG_ERR, - "Ruby: Invalid response 'headers' format from application"); + nxt_unit_req_error(nxt_ruby_run_ctx.req, + "Ruby: Invalid response 'headers' format from " + "application"); - return NXT_ERROR; + return NXT_UNIT_ERROR; } - rc = NXT_OK; + rc = NXT_UNIT_OK; - rb_hash_foreach(headers, nxt_ruby_hash_foreach, (VALUE) (uintptr_t) &rc); - if (nxt_slow_path(rc != NXT_OK)) { - return NXT_ERROR; + headers_info.rc = NXT_UNIT_OK; + headers_info.fields = 0; + headers_info.size = 0; + + rb_hash_foreach(headers, nxt_ruby_hash_info, + (VALUE) (uintptr_t) &headers_info); + if (nxt_slow_path(headers_info.rc != NXT_UNIT_OK)) { + return headers_info.rc; } - rc = nxt_ruby_write(nxt_ruby_run_ctx.task, nxt_ruby_run_ctx.wmsg, - (u_char *) "\r\n", nxt_length("\r\n"), 0, 0); - if (nxt_slow_path(rc != NXT_OK)) { - return NXT_ERROR; + rc = nxt_unit_response_init(nxt_ruby_run_ctx.req, status, + headers_info.fields, headers_info.size); + if (nxt_slow_path(rc != NXT_UNIT_OK)) { + return rc; } - return NXT_OK; + rb_hash_foreach(headers, nxt_ruby_hash_add, (VALUE) (uintptr_t) &rc); + + return rc; } static int -nxt_ruby_hash_foreach(VALUE r_key, VALUE r_value, VALUE arg) +nxt_ruby_hash_info(VALUE r_key, VALUE r_value, VALUE arg) { - nxt_int_t rc, *rc_p; - const char *value, *value_end, *pos; + const char *value, *value_end, *pos; + nxt_ruby_headers_info_t *headers_info; - rc_p = (nxt_int_t *) (uintptr_t) arg; + headers_info = (void *) (uintptr_t) arg; if (nxt_slow_path(TYPE(r_key) != T_STRING)) { - nxt_log(nxt_ruby_run_ctx.task, NXT_LOG_ERR, - "Ruby: Wrong header entry 'key' from application"); + nxt_unit_req_error(nxt_ruby_run_ctx.req, + "Ruby: Wrong header entry 'key' from application"); goto fail; } if (nxt_slow_path(TYPE(r_value) != T_STRING)) { - nxt_log(nxt_ruby_run_ctx.task, NXT_LOG_ERR, - "Ruby: Wrong header entry 'value' from application"); + nxt_unit_req_error(nxt_ruby_run_ctx.req, + "Ruby: Wrong header entry 'value' from application"); goto fail; } @@ -708,70 +620,86 @@ nxt_ruby_hash_foreach(VALUE r_key, VALUE r_value, VALUE arg) break; } - rc = nxt_ruby_head_send_part(RSTRING_PTR(r_key), RSTRING_LEN(r_key), - value, pos - value); - if (nxt_slow_path(rc != NXT_OK)) { - goto fail; - } + headers_info->fields++; + headers_info->size += RSTRING_LEN(r_key) + (pos - value); pos++; value = pos; } if (value <= value_end) { - rc = nxt_ruby_head_send_part(RSTRING_PTR(r_key), RSTRING_LEN(r_key), - value, value_end - value); - if (nxt_slow_path(rc != NXT_OK)) { - goto fail; - } + headers_info->fields++; + headers_info->size += RSTRING_LEN(r_key) + (value_end - value); } - *rc_p = NXT_OK; - return ST_CONTINUE; fail: - *rc_p = NXT_ERROR; + headers_info->rc = NXT_UNIT_ERROR; return ST_STOP; } -static nxt_int_t -nxt_ruby_head_send_part(const char *key, size_t key_size, - const char *value, size_t value_size) +static int +nxt_ruby_hash_add(VALUE r_key, VALUE r_value, VALUE arg) { - nxt_int_t rc; + int *rc; + uint32_t key_len; + const char *value, *value_end, *pos; - rc = nxt_app_msg_write_raw(nxt_ruby_run_ctx.task, nxt_ruby_run_ctx.wmsg, - (u_char *) key, key_size); - if (nxt_slow_path(rc != NXT_OK)) { - return rc; - } + rc = (int *) (uintptr_t) arg; - rc = nxt_app_msg_write_raw(nxt_ruby_run_ctx.task, nxt_ruby_run_ctx.wmsg, - (u_char *) ": ", nxt_length(": ")); - if (nxt_slow_path(rc != NXT_OK)) { - return rc; + value = RSTRING_PTR(r_value); + value_end = value + RSTRING_LEN(r_value); + + key_len = RSTRING_LEN(r_key); + + pos = value; + + for ( ;; ) { + pos = strchr(pos, '\n'); + + if (pos == NULL) { + break; + } + + *rc = nxt_unit_response_add_field(nxt_ruby_run_ctx.req, + RSTRING_PTR(r_key), key_len, + value, pos - value); + if (nxt_slow_path(*rc != NXT_UNIT_OK)) { + goto fail; + } + + pos++; + value = pos; } - rc = nxt_app_msg_write_raw(nxt_ruby_run_ctx.task, nxt_ruby_run_ctx.wmsg, - (u_char *) value, value_size); - if (nxt_slow_path(rc != NXT_OK)) { - return rc; + if (value <= value_end) { + *rc = nxt_unit_response_add_field(nxt_ruby_run_ctx.req, + RSTRING_PTR(r_key), key_len, + value, value_end - value); + if (nxt_slow_path(*rc != NXT_UNIT_OK)) { + goto fail; + } } - return nxt_app_msg_write_raw(nxt_ruby_run_ctx.task, nxt_ruby_run_ctx.wmsg, - (u_char *) "\r\n", nxt_length("\r\n")); + return ST_CONTINUE; + +fail: + + *rc = NXT_UNIT_ERROR; + + return ST_STOP; } -static nxt_int_t +static int nxt_ruby_rack_result_body(VALUE result) { - VALUE fn, body; - nxt_int_t rc; + int rc; + VALUE fn, body; body = rb_ary_entry(result, 2); @@ -779,120 +707,134 @@ nxt_ruby_rack_result_body(VALUE result) fn = rb_funcall(body, rb_intern("to_path"), 0); if (nxt_slow_path(TYPE(fn) != T_STRING)) { - nxt_log(nxt_ruby_run_ctx.task, NXT_LOG_ERR, - "Ruby: Failed to get 'body' file path from application"); + nxt_unit_req_error(nxt_ruby_run_ctx.req, + "Ruby: Failed to get 'body' file path from " + "application"); - return NXT_ERROR; + return NXT_UNIT_ERROR; } rc = nxt_ruby_rack_result_body_file_write(fn); - if (nxt_slow_path(rc != NXT_OK)) { - return NXT_ERROR; + if (nxt_slow_path(rc != NXT_UNIT_OK)) { + return rc; } } else if (rb_respond_to(body, rb_intern("each"))) { rb_iterate(rb_each, body, nxt_ruby_rack_result_body_each, 0); } else { - nxt_log(nxt_ruby_run_ctx.task, NXT_LOG_ERR, - "Ruby: Invalid response 'body' format from application"); + nxt_unit_req_error(nxt_ruby_run_ctx.req, + "Ruby: Invalid response 'body' format " + "from application"); - return NXT_ERROR; + return NXT_UNIT_ERROR; } if (rb_respond_to(body, rb_intern("close"))) { rb_funcall(body, rb_intern("close"), 0); } - return NXT_OK; + return NXT_UNIT_OK; } -static nxt_int_t -nxt_ruby_rack_result_body_file_write(VALUE filepath) +typedef struct { + int fd; + off_t pos; + off_t rest; +} nxt_ruby_rack_file_t; + + +static ssize_t +nxt_ruby_rack_file_read(nxt_unit_read_info_t *read_info, void *dst, size_t size) { - size_t len; - ssize_t n; - nxt_off_t rest; - nxt_int_t rc; - nxt_file_t file; - nxt_file_info_t finfo; - u_char buf[8192]; + ssize_t res; + nxt_ruby_rack_file_t *file; - nxt_memzero(&file, sizeof(nxt_file_t)); + file = read_info->data; - file.name = (nxt_file_name_t *) RSTRING_PTR(filepath); + size = nxt_min(size, (size_t) file->rest); - rc = nxt_file_open(nxt_ruby_run_ctx.task, &file, NXT_FILE_RDONLY, - NXT_FILE_OPEN, 0); - if (nxt_slow_path(rc != NXT_OK)) { - nxt_log(nxt_ruby_run_ctx.task, NXT_LOG_ERR, - "Ruby: Failed to open 'body' file: %s", - (const char *) file.name); + res = pread(file->fd, dst, size, file->pos); - return NXT_ERROR; + if (res >= 0) { + file->pos += res; + file->rest -= res; + + if (size > (size_t) res) { + file->rest = 0; + } } - rc = nxt_file_info(&file, &finfo); - if (nxt_slow_path(rc != NXT_OK)) { - nxt_log(nxt_ruby_run_ctx.task, NXT_LOG_ERR, - "Ruby: Failed to get 'body' file information: %s", - (const char *) file.name); + read_info->eof = file->rest == 0; - goto fail; - } + return res; +} - rest = nxt_file_size(&finfo); - while (rest != 0) { - len = nxt_min(rest, (nxt_off_t) sizeof(buf)); +static int +nxt_ruby_rack_result_body_file_write(VALUE filepath) +{ + int fd, rc; + struct stat finfo; + nxt_ruby_rack_file_t ruby_file; + nxt_unit_read_info_t read_info; - n = nxt_file_read(&file, buf, len, nxt_file_size(&finfo) - rest); - if (nxt_slow_path(n != (ssize_t) len)) { - nxt_log(nxt_ruby_run_ctx.task, NXT_LOG_ERR, - "Ruby: Failed to read 'body' file"); + fd = open(RSTRING_PTR(filepath), O_RDONLY, 0); + if (nxt_slow_path(fd == -1)) { + nxt_unit_req_error(nxt_ruby_run_ctx.req, + "Ruby: Failed to open content file \"%s\": %s (%d)", + RSTRING_PTR(filepath), strerror(errno), errno); - goto fail; - } + return NXT_UNIT_ERROR; + } - rest -= len; + rc = fstat(fd, &finfo); + if (nxt_slow_path(rc == -1)) { + nxt_unit_req_error(nxt_ruby_run_ctx.req, + "Ruby: Content file fstat(\"%s\") failed: %s (%d)", + RSTRING_PTR(filepath), strerror(errno), errno); - rc = nxt_app_msg_write_raw(nxt_ruby_run_ctx.task, nxt_ruby_run_ctx.wmsg, - buf, len); - if (nxt_slow_path(rc != NXT_OK)) { - nxt_log(nxt_ruby_run_ctx.task, NXT_LOG_ERR, - "Ruby: Failed to write 'body' from application"); + close(fd); - goto fail; - } + return NXT_UNIT_ERROR; } - nxt_file_close(nxt_ruby_run_ctx.task, &file); + ruby_file.fd = fd; + ruby_file.pos = 0; + ruby_file.rest = finfo.st_size; - return NXT_OK; + read_info.read = nxt_ruby_rack_file_read; + read_info.eof = ruby_file.rest == 0; + read_info.buf_size = ruby_file.rest; + read_info.data = &ruby_file; -fail: + rc = nxt_unit_response_write_cb(nxt_ruby_run_ctx.req, &read_info); + if (nxt_slow_path(rc != NXT_UNIT_OK)) { + nxt_unit_req_error(nxt_ruby_run_ctx.req, + "Ruby: Failed to write content file."); + } - nxt_file_close(nxt_ruby_run_ctx.task, &file); + close(fd); - return NXT_ERROR; + return rc; } static VALUE nxt_ruby_rack_result_body_each(VALUE body) { - nxt_int_t rc; + int rc; if (TYPE(body) != T_STRING) { return Qnil; } - rc = nxt_app_msg_write_raw(nxt_ruby_run_ctx.task, nxt_ruby_run_ctx.wmsg, - (u_char *) RSTRING_PTR(body), RSTRING_LEN(body)); - if (nxt_slow_path(rc != NXT_OK)) { - nxt_log(nxt_ruby_run_ctx.task, NXT_LOG_ERR, - "Ruby: Failed to write 'body' from application"); + rc = nxt_unit_response_write(nxt_ruby_run_ctx.req, RSTRING_PTR(body), + RSTRING_LEN(body)); + if (nxt_slow_path(rc != NXT_UNIT_OK)) { + nxt_unit_req_error(nxt_ruby_run_ctx.req, + "Ruby: Failed to write 'body' from application"); } return Qnil; @@ -905,30 +847,51 @@ nxt_ruby_exception_log(nxt_task_t *task, uint32_t level, const char *desc) int i; VALUE err, ary, eclass, msg; - nxt_log(task, level, "Ruby: %s", desc); + if (task != NULL) { + nxt_log(task, level, "Ruby: %s", desc); + + } else { + nxt_unit_log(nxt_ruby_run_ctx.unit_ctx, level, "Ruby: %s", desc); + } err = rb_errinfo(); - ary = rb_funcall(err, rb_intern("backtrace"), 0); + if (nxt_slow_path(err == Qnil)) { + return; + } - if (RARRAY_LEN(ary) == 0) { + ary = rb_funcall(err, rb_intern("backtrace"), 0); + if (nxt_slow_path(RARRAY_LEN(ary) == 0)) { return; } eclass = rb_class_name(rb_class_of(err)); msg = rb_funcall(err, rb_intern("message"), 0); - nxt_log(task, level, "Ruby: %s: %s (%s)", - RSTRING_PTR(RARRAY_PTR(ary)[0]), - RSTRING_PTR(msg), RSTRING_PTR(eclass)); + if (task != NULL) { + nxt_log(task, level, "Ruby: %s: %s (%s)", + RSTRING_PTR(RARRAY_PTR(ary)[0]), + RSTRING_PTR(msg), RSTRING_PTR(eclass)); + + } else { + nxt_unit_log(nxt_ruby_run_ctx.unit_ctx, level, "Ruby: %s: %s (%s)", + RSTRING_PTR(RARRAY_PTR(ary)[0]), + RSTRING_PTR(msg), RSTRING_PTR(eclass)); + } for (i = 1; i < RARRAY_LEN(ary); i++) { - nxt_log(task, level, "from %s", RSTRING_PTR(RARRAY_PTR(ary)[i])); + if (task != NULL) { + nxt_log(task, level, "from %s", RSTRING_PTR(RARRAY_PTR(ary)[i])); + + } else { + nxt_unit_log(nxt_ruby_run_ctx.unit_ctx, level, "from %s", + RSTRING_PTR(RARRAY_PTR(ary)[i])); + } } } static void -nxt_ruby_atexit(nxt_task_t *task) +nxt_ruby_atexit(void) { rb_gc_unregister_address(&nxt_ruby_io_input); rb_gc_unregister_address(&nxt_ruby_io_error); diff --git a/src/ruby/nxt_ruby.h b/src/ruby/nxt_ruby.h index 9a6be0d4..77a65894 100644 --- a/src/ruby/nxt_ruby.h +++ b/src/ruby/nxt_ruby.h @@ -17,14 +17,12 @@ #include <nxt_router.h> #include <nxt_runtime.h> #include <nxt_application.h> +#include <nxt_unit_typedefs.h> typedef struct { - nxt_task_t *task; - nxt_app_rmsg_t *rmsg; - nxt_app_wmsg_t *wmsg; - - size_t body_preread_size; + nxt_unit_ctx_t *unit_ctx; + nxt_unit_request_info_t *req; } nxt_ruby_run_ctx_t; diff --git a/src/ruby/nxt_ruby_stream_io.c b/src/ruby/nxt_ruby_stream_io.c index eee11c3c..3f6cac89 100644 --- a/src/ruby/nxt_ruby_stream_io.c +++ b/src/ruby/nxt_ruby_stream_io.c @@ -5,12 +5,12 @@ */ #include <ruby/nxt_ruby.h> +#include <nxt_unit.h> static VALUE nxt_ruby_stream_io_new(VALUE class, VALUE wrap); static VALUE nxt_ruby_stream_io_initialize(int argc, VALUE *argv, VALUE self); static VALUE nxt_ruby_stream_io_gets(VALUE obj, VALUE args); -static size_t nxt_ruby_stream_io_read_line(nxt_app_rmsg_t *rmsg, VALUE str); static VALUE nxt_ruby_stream_io_each(VALUE obj, VALUE args); static VALUE nxt_ruby_stream_io_read(VALUE obj, VALUE args); static VALUE nxt_ruby_stream_io_rewind(VALUE obj, VALUE args); @@ -85,63 +85,47 @@ nxt_ruby_stream_io_initialize(int argc, VALUE *argv, VALUE self) static VALUE nxt_ruby_stream_io_gets(VALUE obj, VALUE args) { - VALUE buf; - nxt_ruby_run_ctx_t *run_ctx; + VALUE buf; + char *p; + size_t size, b_size; + nxt_unit_buf_t *b; + nxt_ruby_run_ctx_t *run_ctx; + nxt_unit_request_info_t *req; Data_Get_Struct(obj, nxt_ruby_run_ctx_t, run_ctx); - if (run_ctx->body_preread_size == 0) { - return Qnil; - } - - buf = rb_str_buf_new(1); + req = run_ctx->req; - if (buf == Qnil) { + if (req->content_length == 0) { return Qnil; } - run_ctx->body_preread_size -= nxt_ruby_stream_io_read_line(run_ctx->rmsg, - buf); - - return buf; -} - - -static size_t -nxt_ruby_stream_io_read_line(nxt_app_rmsg_t *rmsg, VALUE str) -{ - size_t len, size; - u_char *p; - nxt_buf_t *buf; - - len = 0; + size = 0; - for (buf = rmsg->buf; buf != NULL; buf = buf->next) { - - size = nxt_buf_mem_used_size(&buf->mem); - p = memchr(buf->mem.pos, '\n', size); + for (b = req->content_buf; b; b = nxt_unit_buf_next(b)) { + b_size = b->end - b->free; + p = memchr(b->free, '\n', b_size); if (p != NULL) { p++; - size = p - buf->mem.pos; - - rb_str_cat(str, (const char *) buf->mem.pos, size); - - len += size; - buf->mem.pos = p; - + size += p - b->free; break; } - rb_str_cat(str, (const char *) buf->mem.pos, size); + size += b_size; + } + + buf = rb_str_buf_new(size); - len += size; - buf->mem.pos = buf->mem.free; + if (buf == Qnil) { + return Qnil; } - rmsg->buf = buf; + size = nxt_unit_request_read(req, RSTRING_PTR(buf), size); + + rb_str_set_len(buf, size); - return len; + return buf; } @@ -173,12 +157,11 @@ nxt_ruby_stream_io_read(VALUE obj, VALUE args) { VALUE buf; long copy_size, u_size; - size_t len; nxt_ruby_run_ctx_t *run_ctx; Data_Get_Struct(obj, nxt_ruby_run_ctx_t, run_ctx); - copy_size = run_ctx->body_preread_size; + copy_size = run_ctx->req->content_length; if (RARRAY_LEN(args) > 0 && TYPE(RARRAY_PTR(args)[0]) == T_FIXNUM) { u_size = NUM2LONG(RARRAY_PTR(args)[0]); @@ -202,8 +185,8 @@ nxt_ruby_stream_io_read(VALUE obj, VALUE args) return Qnil; } - len = nxt_app_msg_read_raw(run_ctx->task, run_ctx->rmsg, - RSTRING_PTR(buf), (size_t) copy_size); + copy_size = nxt_unit_request_read(run_ctx->req, RSTRING_PTR(buf), + copy_size); if (RARRAY_LEN(args) > 1 && TYPE(RARRAY_PTR(args)[1]) == T_STRING) { @@ -211,9 +194,7 @@ nxt_ruby_stream_io_read(VALUE obj, VALUE args) rb_str_cat(RARRAY_PTR(args)[1], RSTRING_PTR(buf), copy_size); } - rb_str_set_len(buf, (long) len); - - run_ctx->body_preread_size -= len; + rb_str_set_len(buf, copy_size); return buf; } @@ -276,8 +257,7 @@ nxt_ruby_stream_io_s_write(nxt_ruby_run_ctx_t *run_ctx, VALUE val) } } - nxt_log_error(NXT_LOG_ERR, run_ctx->task->log, "Ruby: %s", - RSTRING_PTR(val)); + nxt_unit_req_error(run_ctx->req, "Ruby: %s", RSTRING_PTR(val)); return RSTRING_LEN(val); } |