diff options
author | Alexander Borisov <alexander.borisov@nginx.com> | 2018-03-21 16:50:07 +0300 |
---|---|---|
committer | Alexander Borisov <alexander.borisov@nginx.com> | 2018-03-21 16:50:07 +0300 |
commit | 37051b6c15cce7d6ab01c50e1086f8ef0b34e93d (patch) | |
tree | d6f7fee987e795613c4088c7fa12709888d0a00e | |
parent | 778a90c319e47f688b749e43df33f3eb9dce3e72 (diff) | |
download | unit-37051b6c15cce7d6ab01c50e1086f8ef0b34e93d.tar.gz unit-37051b6c15cce7d6ab01c50e1086f8ef0b34e93d.tar.bz2 |
Added Ruby support.
Diffstat (limited to '')
-rw-r--r-- | auto/help | 3 | ||||
-rw-r--r-- | auto/modules/conf | 4 | ||||
-rw-r--r-- | auto/modules/ruby | 171 | ||||
-rw-r--r-- | src/nxt_application.c | 3 | ||||
-rw-r--r-- | src/nxt_application.h | 10 | ||||
-rw-r--r-- | src/nxt_conf_validation.c | 11 | ||||
-rw-r--r-- | src/nxt_main_process.c | 10 | ||||
-rw-r--r-- | src/nxt_router.c | 88 | ||||
-rw-r--r-- | src/ruby/nxt_ruby.c | 911 | ||||
-rw-r--r-- | src/ruby/nxt_ruby.h | 34 | ||||
-rw-r--r-- | src/ruby/nxt_ruby_stream_io.c | 290 |
11 files changed, 1534 insertions, 1 deletions
@@ -48,4 +48,7 @@ cat << END perl OPTIONS configure Perl module run "./configure perl --help" to see available options + ruby OPTIONS configure Ruby module + run "./configure ruby --help" to see available options + END diff --git a/auto/modules/conf b/auto/modules/conf index df77309e..93d28601 100644 --- a/auto/modules/conf +++ b/auto/modules/conf @@ -21,6 +21,10 @@ case "$nxt_module" in . auto/modules/perl ;; + ruby) + . auto/modules/ruby + ;; + *) echo echo $0: error: invalid module \"$nxt_module\". diff --git a/auto/modules/ruby b/auto/modules/ruby new file mode 100644 index 00000000..eb5474ac --- /dev/null +++ b/auto/modules/ruby @@ -0,0 +1,171 @@ + +# Copyright (C) Alexander Borisov +# Copyright (C) NGINX, Inc. + + +shift + +for nxt_option; do + + case "$nxt_option" in + -*=*) value=`echo "$nxt_option" | sed -e 's/[-_a-zA-Z0-9]*=//'` ;; + *) value="" ;; + esac + + case "$nxt_option" in + --ruby=*) NXT_RUBY="$value" ;; + --module=*) NXT_RUBY_MODULE="$value" ;; + + --help) + cat << END + + --ruby=FILE set ruby executable, default: ruby + --module=NAME set unit ruby module name + +END + exit 0 + ;; + + *) + echo + echo $0: error: invalid Ruby option \"$nxt_option\" + echo + exit 1 + ;; + esac + +done + + +if [ ! -f $NXT_AUTOCONF_DATA ]; then + echo + echo Please run common $0 before configuring module \"$nxt_module\". + echo + exit 1 +fi + +. $NXT_AUTOCONF_DATA + +$echo "configuring Ruby module" +$echo "configuring Ruby module ..." >> $NXT_AUTOCONF_ERR + +NXT_RUBY=${NXT_RUBY=ruby} +NXT_RUBY_MODULE=${NXT_RUBY_MODULE=${NXT_RUBY}} + +nxt_found=no + +if /bin/sh -c "$NXT_RUBY -v" >> $NXT_AUTOCONF_ERR 2>&1; then + + NXT_RUBY_RUBYHDRDIR=`$NXT_RUBY -r rbconfig -e 'printf("%s",RbConfig::CONFIG["rubyhdrdir"])'` + NXT_RUBY_ARCHHDRDIR=`$NXT_RUBY -r rbconfig -e 'printf("%s",RbConfig::CONFIG["rubyarchhdrdir"])'` + NXT_RUBY_INCPATH="-I$NXT_RUBY_ARCHHDRDIR -I$NXT_RUBY_RUBYHDRDIR" + + NXT_RUBY_LIBNAME=`$NXT_RUBY -r rbconfig -e 'printf("%s",RbConfig::CONFIG["RUBY_SO_NAME"])'` + NXT_RUBY_LIBSCONF=`$NXT_RUBY -r rbconfig -e 'printf("%s",RbConfig::CONFIG["LIBS"])'` + NXT_RUBY_LIBPATH=`$NXT_RUBY -r rbconfig -e 'printf("%s",RbConfig::CONFIG["libdir"])'` + NXT_RUBY_LIBS="-L$NXT_RUBY_LIBPATH -Wl,-rpath,${NXT_RUBY_LIBPATH} -l$NXT_RUBY_LIBNAME $NXT_RUBY_LIBSCONF" + + nxt_feature="Ruby" + nxt_feature_name="" + nxt_feature_run=no + nxt_feature_incs="${NXT_RUBY_INCPATH}" + nxt_feature_libs="${NXT_RUBY_LIBS}" + nxt_feature_test=" + #include <ruby.h> + + int main() { + ruby_init(); + return ruby_cleanup(0); + }" + + . auto/feature + +else + $echo "checking for Ruby ... not found" +fi + +if [ $nxt_found = no ]; then + $echo + $echo $0: error: no Ruby found. + $echo + exit 1; +fi + +NXT_RUBY_VERSION=`$NXT_RUBY -r rbconfig -e 'printf("%s",RbConfig::CONFIG["ruby_version"])'` +$echo " + Ruby version: ${NXT_RUBY_VERSION}" + +if grep ^$NXT_RUBY_MODULE: $NXT_MAKEFILE 2>&1 > /dev/null; then + $echo + $echo $0: error: duplicate \"$NXT_RUBY_MODULE\" module configured. + $echo + exit 1; +fi + +$echo " + Ruby module: ${NXT_RUBY_MODULE}.unit.so" + +. auto/cc/deps + +$echo >> $NXT_MAKEFILE + +NXT_RUBY_MODULE_SRCS=" \ + src/ruby/nxt_ruby.c \ + src/ruby/nxt_ruby_stream_io.c +" + +# The Ruby module object files. + +nxt_objs= + +for nxt_src in $NXT_RUBY_MODULE_SRCS; do + + nxt_obj=${nxt_src%.c}-$NXT_RUBY_MODULE.o + nxt_dep=${nxt_src%.c}-$NXT_RUBY_MODULE.dep + nxt_dep_flags=`nxt_gen_dep_flags` + nxt_dep_post=`nxt_gen_dep_post` + nxt_objs="$nxt_objs $NXT_BUILD_DIR/$nxt_obj" + + cat << END >> $NXT_MAKEFILE + +$NXT_BUILD_DIR/$nxt_obj: $nxt_src + mkdir -p $NXT_BUILD_DIR/src/ruby + \$(CC) -c \$(CFLAGS) \$(NXT_INCS) $NXT_RUBY_INCPATH \\ + $nxt_dep_flags \\ + -o $NXT_BUILD_DIR/$nxt_obj $nxt_src + $nxt_dep_post + +-include $NXT_BUILD_DIR/$nxt_dep + +END + +done + +cat << END >> $NXT_MAKEFILE + +.PHONY: ${NXT_RUBY_MODULE} +.PHONY: ${NXT_RUBY_MODULE}-install +.PHONY: ${NXT_RUBY_MODULE}-uninstall + +all: ${NXT_RUBY_MODULE} + +${NXT_RUBY_MODULE}: $NXT_BUILD_DIR/${NXT_RUBY_MODULE}.unit.so + +$NXT_BUILD_DIR/${NXT_RUBY_MODULE}.unit.so: $nxt_objs + \$(NXT_MODULE_LINK) -o $NXT_BUILD_DIR/${NXT_RUBY_MODULE}.unit.so \\ + $nxt_objs $NXT_RUBY_LIBS $NXT_LD_OPT + + +install: ${NXT_RUBY_MODULE}-install + +${NXT_RUBY_MODULE}-install: ${NXT_RUBY_MODULE} + install -d \$(DESTDIR)$NXT_MODULES + install -p $NXT_BUILD_DIR/${NXT_RUBY_MODULE}.unit.so \\ + \$(DESTDIR)$NXT_MODULES/ + + +uninstall: ${NXT_RUBY_MODULE}-uninstall + +${NXT_RUBY_MODULE}-uninstall: + rm -f \$(DESTDIR)$NXT_MODULES/${NXT_RUBY_MODULE}.unit.so + @rmdir -p \$(DESTDIR)$NXT_MODULES 2>/dev/null || true + +END diff --git a/src/nxt_application.c b/src/nxt_application.c index cad4ac8d..8ded36da 100644 --- a/src/nxt_application.c +++ b/src/nxt_application.c @@ -875,6 +875,9 @@ nxt_app_parse_type(u_char *p, size_t length) } else if (nxt_str_eq(&str, "perl", 4)) { return NXT_APP_PERL; + + } else if (nxt_str_eq(&str, "ruby", 4)) { + return NXT_APP_RUBY; } return NXT_APP_UNKNOWN; diff --git a/src/nxt_application.h b/src/nxt_application.h index b6391149..6c5f0d6e 100644 --- a/src/nxt_application.h +++ b/src/nxt_application.h @@ -14,6 +14,7 @@ typedef enum { NXT_APP_PHP, NXT_APP_GO, NXT_APP_PERL, + NXT_APP_RUBY, NXT_APP_UNKNOWN, } nxt_app_type_t; @@ -58,6 +59,11 @@ typedef struct { } nxt_perl_app_conf_t; +typedef struct { + nxt_str_t script; +} nxt_ruby_app_conf_t; + + struct nxt_common_app_conf_s { nxt_str_t name; nxt_str_t type; @@ -71,6 +77,7 @@ struct nxt_common_app_conf_s { nxt_php_app_conf_t php; nxt_go_app_conf_t go; nxt_perl_app_conf_t perl; + nxt_ruby_app_conf_t ruby; } u; }; @@ -145,8 +152,9 @@ struct nxt_app_wmsg_s { uint32_t stream; }; + struct nxt_app_rmsg_s { - nxt_buf_t *buf; /* current buffer to read */ + nxt_buf_t *buf; /* current buffer to read */ }; diff --git a/src/nxt_conf_validation.c b/src/nxt_conf_validation.c index ccca877b..828b2ee2 100644 --- a/src/nxt_conf_validation.c +++ b/src/nxt_conf_validation.c @@ -224,6 +224,16 @@ static nxt_conf_vldt_object_t nxt_conf_vldt_perl_members[] = { }; +static nxt_conf_vldt_object_t nxt_conf_vldt_ruby_members[] = { + { nxt_string("script"), + NXT_CONF_VLDT_STRING, + NULL, + NULL }, + + NXT_CONF_VLDT_NEXT(&nxt_conf_vldt_common_members) +}; + + nxt_int_t nxt_conf_validate(nxt_conf_validation_t *vldt) { @@ -413,6 +423,7 @@ nxt_conf_vldt_app(nxt_conf_validation_t *vldt, nxt_str_t *name, nxt_conf_vldt_php_members, nxt_conf_vldt_go_members, nxt_conf_vldt_perl_members, + nxt_conf_vldt_ruby_members, }; ret = nxt_conf_vldt_type(vldt, name, value, NXT_CONF_VLDT_OBJECT); diff --git a/src/nxt_main_process.c b/src/nxt_main_process.c index f1f5d6d4..51303e5a 100644 --- a/src/nxt_main_process.c +++ b/src/nxt_main_process.c @@ -182,11 +182,21 @@ static nxt_conf_map_t nxt_perl_app_conf[] = { }; +static nxt_conf_map_t nxt_ruby_app_conf[] = { + { + nxt_string("script"), + NXT_CONF_MAP_STR, + offsetof(nxt_common_app_conf_t, u.ruby.script), + }, +}; + + static nxt_conf_app_map_t nxt_app_maps[] = { { nxt_nitems(nxt_python_app_conf), nxt_python_app_conf }, { nxt_nitems(nxt_php_app_conf), nxt_php_app_conf }, { nxt_nitems(nxt_go_app_conf), nxt_go_app_conf }, { nxt_nitems(nxt_perl_app_conf), nxt_perl_app_conf }, + { nxt_nitems(nxt_ruby_app_conf), nxt_ruby_app_conf }, }; diff --git a/src/nxt_router.c b/src/nxt_router.c index dbc8b283..d2be8e91 100644 --- a/src/nxt_router.c +++ b/src/nxt_router.c @@ -229,6 +229,8 @@ static nxt_int_t nxt_go_prepare_msg(nxt_task_t *task, nxt_app_request_t *r, nxt_app_wmsg_t *wmsg); static nxt_int_t nxt_perl_prepare_msg(nxt_task_t *task, nxt_app_request_t *r, nxt_app_wmsg_t *wmsg); +static nxt_int_t nxt_ruby_prepare_msg(nxt_task_t *task, nxt_app_request_t *r, + nxt_app_wmsg_t *wmsg); static void nxt_router_conn_free(nxt_task_t *task, void *obj, void *data); static void nxt_router_app_timeout(nxt_task_t *task, void *obj, void *data); @@ -250,6 +252,7 @@ static nxt_app_prepare_msg_t nxt_app_prepare_msg[] = { nxt_php_prepare_msg, nxt_go_prepare_msg, nxt_perl_prepare_msg, + nxt_ruby_prepare_msg, }; @@ -4127,6 +4130,91 @@ fail: } +static nxt_int_t +nxt_ruby_prepare_msg(nxt_task_t *task, nxt_app_request_t *r, + nxt_app_wmsg_t *wmsg) +{ + nxt_int_t rc; + nxt_str_t str; + nxt_buf_t *b; + nxt_http_field_t *field; + nxt_app_request_header_t *h; + + static const nxt_str_t prefix = nxt_string("HTTP_"); + static const nxt_str_t eof = nxt_null_string; + + h = &r->header; + +#define RC(S) \ + do { \ + rc = (S); \ + if (nxt_slow_path(rc != NXT_OK)) { \ + goto fail; \ + } \ + } while(0) + +#define NXT_WRITE(N) \ + RC(nxt_app_msg_write_str(task, wmsg, N)) + + /* TODO error handle, async mmap buffer assignment */ + + NXT_WRITE(&h->method); + NXT_WRITE(&h->target); + + if (h->query.length) { + str.start = h->target.start; + str.length = (h->target.length - h->query.length) - 1; + + RC(nxt_app_msg_write_str(task, wmsg, &str)); + + } else { + NXT_WRITE(&eof); + } + + if (h->query.start != NULL) { + RC(nxt_app_msg_write_size(task, wmsg, + h->query.start - h->target.start + 1)); + } else { + RC(nxt_app_msg_write_size(task, wmsg, 0)); + } + + NXT_WRITE(&h->version); + + NXT_WRITE(&r->remote); + NXT_WRITE(&r->local); + + NXT_WRITE(&h->host); + NXT_WRITE(&h->content_type); + NXT_WRITE(&h->content_length); + + nxt_list_each(field, h->fields) { + RC(nxt_app_msg_write_prefixed_upcase(task, wmsg, &prefix, + field->name, field->name_length)); + RC(nxt_app_msg_write(task, wmsg, field->value, field->value_length)); + } nxt_list_loop; + + /* end-of-headers mark */ + NXT_WRITE(&eof); + + RC(nxt_app_msg_write_size(task, wmsg, r->body.preread_size)); + + for (b = r->body.buf; b != NULL; b = b->next) { + + RC(nxt_app_msg_write_raw(task, wmsg, b->mem.pos, + nxt_buf_mem_used_size(&b->mem))); + } + +#undef NXT_WRITE +#undef RC + + return NXT_OK; + +fail: + + return NXT_ERROR; +} + + const nxt_conn_state_t nxt_router_conn_close_state nxt_aligned(64) = { diff --git a/src/ruby/nxt_ruby.c b/src/ruby/nxt_ruby.c new file mode 100644 index 00000000..abb9640e --- /dev/null +++ b/src/ruby/nxt_ruby.c @@ -0,0 +1,911 @@ +/* + * Copyright (C) Alexander Borisov + * Copyright (C) NGINX, Inc. + */ + +#include <ruby/nxt_ruby.h> + + +#define NXT_RUBY_RACK_API_VERSION_MAJOR 1 +#define NXT_RUBY_RACK_API_VERSION_MINOR 3 + +#define NXT_RUBY_STRINGIZE_HELPER(x) #x +#define NXT_RUBY_STRINGIZE(x) NXT_RUBY_STRINGIZE_HELPER(x) + +#define NXT_RUBY_LIB_VERSION \ + NXT_RUBY_STRINGIZE(RUBY_API_VERSION_MAJOR) \ + "." NXT_RUBY_STRINGIZE(RUBY_API_VERSION_MINOR) \ + "." NXT_RUBY_STRINGIZE(RUBY_API_VERSION_TEENY) + + +typedef struct { + nxt_task_t *task; + nxt_str_t *script; + VALUE builder; +} nxt_ruby_rack_init_t; + + +static nxt_int_t nxt_ruby_init(nxt_task_t *task, nxt_common_app_conf_t *conf); +static VALUE nxt_ruby_init_basic(VALUE arg); +static nxt_int_t nxt_ruby_init_io(nxt_task_t *task); +static VALUE nxt_ruby_rack_init(nxt_ruby_rack_init_t *rack_init); + +static VALUE nxt_ruby_require_rubygems(VALUE arg); +static VALUE nxt_ruby_require_rack(VALUE arg); +static VALUE nxt_ruby_rack_parse_script(VALUE ctx); +static VALUE nxt_ruby_rack_env_create(VALUE arg); +static nxt_int_t nxt_ruby_run(nxt_task_t *task, nxt_app_rmsg_t *rmsg, + nxt_app_wmsg_t *wmsg); + +static VALUE nxt_ruby_rack_app_run(VALUE arg); +static nxt_int_t nxt_ruby_read_request(nxt_ruby_run_ctx_t *run_ctx, + VALUE hash_env); +nxt_inline nxt_int_t nxt_ruby_read_add_env(nxt_task_t *task, + nxt_app_rmsg_t *rmsg, VALUE hash_env, const char *name, nxt_str_t *str); +static nxt_int_t nxt_ruby_rack_result_status(VALUE result); +nxt_inline nxt_int_t nxt_ruby_write(nxt_task_t *task, nxt_app_wmsg_t *wmsg, + const u_char *data, size_t len, nxt_bool_t flush, nxt_bool_t last); +static nxt_int_t nxt_ruby_rack_result_headers(VALUE result); +static int nxt_ruby_hash_foreach(VALUE r_key, VALUE r_value, VALUE arg); +static nxt_int_t nxt_ruby_head_send_part(const char *key, size_t key_size, + const char *value, size_t value_size); +static nxt_int_t nxt_ruby_rack_result_body(VALUE result); +static nxt_int_t nxt_ruby_rack_result_body_file_write(VALUE filepath); +static VALUE nxt_ruby_rack_result_body_each(VALUE body); + +static void nxt_ruby_exception_log(nxt_task_t *task, uint32_t level, + const char *desc); + +static void nxt_ruby_atexit(nxt_task_t *task); + + +static uint32_t compat[] = { + NXT_VERNUM, NXT_DEBUG, +}; + +static VALUE nxt_ruby_rackup; +static VALUE nxt_ruby_call; +static VALUE nxt_ruby_env; +static VALUE nxt_ruby_io_input; +static VALUE nxt_ruby_io_error; +static nxt_ruby_run_ctx_t nxt_ruby_run_ctx; + +NXT_EXPORT nxt_application_module_t nxt_app_module = { + sizeof(compat), + compat, + nxt_string("ruby"), + nxt_string(NXT_RUBY_LIB_VERSION), + nxt_ruby_init, + nxt_ruby_run, + nxt_ruby_atexit, +}; + + +static nxt_int_t +nxt_ruby_init(nxt_task_t *task, nxt_common_app_conf_t *conf) +{ + int state; + VALUE dummy, res; + nxt_ruby_rack_init_t rack_init; + + ruby_init(); + Init_stack(&dummy); + ruby_init_loadpath(); + ruby_script("NGINX_Unit"); + + rack_init.task = task; + rack_init.script = &conf->u.ruby.script; + + res = rb_protect(nxt_ruby_init_basic, + (VALUE) (uintptr_t) &rack_init, &state); + if (nxt_slow_path(res == Qnil || state != 0)) { + nxt_ruby_exception_log(task, NXT_LOG_ALERT, + "Failed to init basic variables"); + return NXT_ERROR; + } + + nxt_ruby_rackup = nxt_ruby_rack_init(&rack_init); + if (nxt_slow_path(nxt_ruby_rackup == Qnil)) { + return NXT_ERROR; + } + + nxt_ruby_call = rb_intern("call"); + if (nxt_slow_path(nxt_ruby_call == Qnil)) { + nxt_alert(task, "Ruby: Unable to find rack entry point"); + + return NXT_ERROR; + } + + nxt_ruby_env = rb_protect(nxt_ruby_rack_env_create, Qnil, &state); + if (nxt_slow_path(state != 0)) { + nxt_ruby_exception_log(task, NXT_LOG_ALERT, + "Failed to create 'environ' variable"); + return NXT_ERROR; + } + + rb_gc_register_address(&nxt_ruby_rackup); + rb_gc_register_address(&nxt_ruby_call); + rb_gc_register_address(&nxt_ruby_env); + + return NXT_OK; +} + + +static VALUE +nxt_ruby_init_basic(VALUE arg) +{ + int state; + nxt_int_t rc; + nxt_ruby_rack_init_t *rack_init; + + rack_init = (nxt_ruby_rack_init_t *) (uintptr_t) arg; + + state = rb_enc_find_index("encdb"); + if (nxt_slow_path(state == 0)) { + nxt_alert(rack_init->task, + "Ruby: Failed to find encoding index 'encdb'"); + + return Qnil; + } + + rc = nxt_ruby_init_io(rack_init->task); + if (nxt_slow_path(rc != NXT_OK)) { + return Qnil; + } + + return arg; +} + + +static nxt_int_t +nxt_ruby_init_io(nxt_task_t *task) +{ + VALUE rb, io_input, io_error; + + io_input = nxt_ruby_stream_io_input_init(); + rb = Data_Wrap_Struct(io_input, 0, 0, &nxt_ruby_run_ctx); + + nxt_ruby_io_input = rb_funcall(io_input, rb_intern("new"), 1, rb); + if (nxt_slow_path(nxt_ruby_io_input == Qnil)) { + nxt_alert(task, "Ruby: Failed to create environment 'rack.input' var"); + + return NXT_ERROR; + } + + io_error = nxt_ruby_stream_io_error_init(); + rb = Data_Wrap_Struct(io_error, 0, 0, &nxt_ruby_run_ctx); + + nxt_ruby_io_error = rb_funcall(io_error, rb_intern("new"), 1, rb); + if (nxt_slow_path(nxt_ruby_io_error == Qnil)) { + nxt_alert(task, "Ruby: Failed to create environment 'rack.error' var"); + + return NXT_ERROR; + } + + rb_gc_register_address(&nxt_ruby_io_input); + rb_gc_register_address(&nxt_ruby_io_error); + + return NXT_OK; +} + + +static VALUE +nxt_ruby_rack_init(nxt_ruby_rack_init_t *rack_init) +{ + int state; + VALUE rack, rackup; + + rb_protect(nxt_ruby_require_rubygems, Qnil, &state); + if (nxt_slow_path(state != 0)) { + nxt_ruby_exception_log(rack_init->task, NXT_LOG_ALERT, + "Failed to require 'rubygems' package"); + return Qnil; + } + + rb_protect(nxt_ruby_require_rack, Qnil, &state); + if (nxt_slow_path(state != 0)) { + nxt_ruby_exception_log(rack_init->task, NXT_LOG_ALERT, + "Failed to require 'rack' package"); + return Qnil; + } + + rack = rb_const_get(rb_cObject, rb_intern("Rack")); + rack_init->builder = rb_const_get(rack, rb_intern("Builder")); + + rackup = rb_protect(nxt_ruby_rack_parse_script, + (VALUE) (uintptr_t) rack_init, &state); + if (nxt_slow_path(TYPE(rackup) != T_ARRAY || state != 0)) { + nxt_ruby_exception_log(rack_init->task, NXT_LOG_ALERT, + "Failed to parse rack script"); + return Qnil; + } + + if (nxt_slow_path(RARRAY_LEN(rackup) < 1)) { + nxt_alert(rack_init->task, "Ruby: Invalid rack config file"); + return Qnil; + } + + return RARRAY_PTR(rackup)[0]; +} + + +static VALUE +nxt_ruby_require_rubygems(VALUE arg) +{ + return rb_funcall(rb_cObject, rb_intern("require"), 1, + rb_str_new2("rubygems")); +} + + +static VALUE +nxt_ruby_require_rack(VALUE arg) +{ + return rb_funcall(rb_cObject, rb_intern("require"), 1, rb_str_new2("rack")); +} + + +static VALUE +nxt_ruby_rack_parse_script(VALUE ctx) +{ + VALUE script, res; + nxt_ruby_rack_init_t *rack_init; + + rack_init = (nxt_ruby_rack_init_t *) (uintptr_t) ctx; + + script = rb_str_new((const char *) rack_init->script->start, + (long) rack_init->script->length); + + res = rb_funcall(rack_init->builder, rb_intern("parse_file"), 1, script); + + rb_str_free(script); + + return res; +} + + +static VALUE +nxt_ruby_rack_env_create(VALUE arg) +{ + VALUE hash_env, version; + + hash_env = rb_hash_new(); + version = rb_ary_new(); + + rb_ary_push(version, UINT2NUM(NXT_RUBY_RACK_API_VERSION_MAJOR)); + rb_ary_push(version, UINT2NUM(NXT_RUBY_RACK_API_VERSION_MINOR)); + + rb_hash_aset(hash_env, rb_str_new2("rack.version"), version); + rb_hash_aset(hash_env, rb_str_new2("rack.url_scheme"), rb_str_new2("http")); + rb_hash_aset(hash_env, rb_str_new2("rack.input"), nxt_ruby_io_input); + rb_hash_aset(hash_env, rb_str_new2("rack.errors"), nxt_ruby_io_error); + rb_hash_aset(hash_env, rb_str_new2("rack.multithread"), Qfalse); + rb_hash_aset(hash_env, rb_str_new2("rack.multiprocess"), Qtrue); + rb_hash_aset(hash_env, rb_str_new2("rack.run_once"), Qfalse); + rb_hash_aset(hash_env, rb_str_new2("rack.hijack?"), Qfalse); + rb_hash_aset(hash_env, rb_str_new2("rack.hijack"), Qnil); + rb_hash_aset(hash_env, rb_str_new2("rack.hijack_io"), Qnil); + + return hash_env; +} + + +static nxt_int_t +nxt_ruby_run(nxt_task_t *task, nxt_app_rmsg_t *rmsg, nxt_app_wmsg_t *wmsg) +{ + int state; + VALUE res; + + nxt_ruby_run_ctx.task = task; + nxt_ruby_run_ctx.rmsg = rmsg; + nxt_ruby_run_ctx.wmsg = wmsg; + + res = rb_protect(nxt_ruby_rack_app_run, Qnil, &state); + if (nxt_slow_path(state != 0)) { + nxt_ruby_exception_log(nxt_ruby_run_ctx.task, NXT_LOG_ERR, + "Failed to run ruby script"); + return NXT_ERROR; + } + + if (nxt_slow_path(res == Qnil)) { + return NXT_ERROR; + } + + return NXT_OK; +} + + +static VALUE +nxt_ruby_rack_app_run(VALUE arg) +{ + VALUE env, result; + nxt_int_t rc; + + env = rb_hash_dup(nxt_ruby_env); + + rc = nxt_ruby_read_request(&nxt_ruby_run_ctx, env); + if (nxt_slow_path(rc != NXT_OK)) { + nxt_alert(nxt_ruby_run_ctx.task, + "Ruby: Failed to process incoming request"); + + goto fail; + } + + result = rb_funcall(nxt_ruby_rackup, nxt_ruby_call, 1, env); + if (nxt_slow_path(TYPE(result) != T_ARRAY)) { + nxt_log(nxt_ruby_run_ctx.task, NXT_LOG_ERR, + "Ruby: Invalid response format from application"); + + goto fail; + } + + if (nxt_slow_path(RARRAY_LEN(result) != 3)) { + nxt_log(nxt_ruby_run_ctx.task, NXT_LOG_ERR, + "Ruby: Invalid response format from application. " + "Need 3 entries [Status, Headers, Body]"); + + goto fail; + } + + rc = nxt_ruby_rack_result_status(result); + if (nxt_slow_path(rc != NXT_OK)) { + goto fail; + } + + rc = nxt_ruby_rack_result_headers(result); + if (nxt_slow_path(rc != NXT_OK)) { + goto fail; + } + + rc = nxt_ruby_rack_result_body(result); + if (nxt_slow_path(rc != NXT_OK)) { + goto fail; + } + + rc = nxt_app_msg_flush(nxt_ruby_run_ctx.task, nxt_ruby_run_ctx.wmsg, 1); + if (nxt_slow_path(rc != NXT_OK)) { + goto fail; + } + + rb_hash_delete(env, rb_obj_id(env)); + + return result; + +fail: + + rb_hash_delete(env, rb_obj_id(env)); + + return Qnil; +} + + +static nxt_int_t +nxt_ruby_read_request(nxt_ruby_run_ctx_t *run_ctx, VALUE hash_env) +{ + u_char *colon; + size_t query_size; + nxt_int_t rc; + nxt_str_t str, value, path, target; + nxt_str_t host, server_name, server_port; + nxt_task_t *task; + nxt_app_rmsg_t *rmsg; + + static nxt_str_t def_host = nxt_string("localhost"); + static nxt_str_t def_port = nxt_string("80"); + + task = run_ctx->task; + rmsg = run_ctx->rmsg; + + rc = nxt_ruby_read_add_env(task, rmsg, hash_env, "REQUEST_METHOD", &str); + if (nxt_slow_path(rc != NXT_OK)) { + return NXT_ERROR; + } + + rc = nxt_ruby_read_add_env(task, rmsg, hash_env, "REQUEST_URI", &target); + if (nxt_slow_path(rc != NXT_OK)) { + return NXT_ERROR; + } + + rc = nxt_app_msg_read_str(task, rmsg, &path); + if (nxt_slow_path(rc != NXT_OK)) { + return NXT_ERROR; + } + + rc = nxt_app_msg_read_size(task, rmsg, &query_size); + if (nxt_slow_path(rc != NXT_OK)) { + return NXT_ERROR; + } + + if (path.start == NULL || path.length == 0) { + path = target; + } + + rb_hash_aset(hash_env, rb_str_new2("PATH_INFO"), + rb_str_new((const char *) path.start, (long) path.length)); + + if (query_size > 0) { + query_size--; + + if (nxt_slow_path(target.length < query_size)) { + return NXT_ERROR; + } + + str.start = &target.start[query_size]; + str.length = target.length - query_size; + + rb_hash_aset(hash_env, rb_str_new2("QUERY_STRING"), + rb_str_new((const char *) str.start, (long) str.length)); + } + + rc = nxt_ruby_read_add_env(task, rmsg, hash_env, "SERVER_PROTOCOL", &str); + if (nxt_slow_path(rc != NXT_OK)) { + return NXT_ERROR; + } + + rc = nxt_ruby_read_add_env(task, rmsg, hash_env, "REMOTE_ADDR", &str); + if (nxt_slow_path(rc != NXT_OK)) { + return NXT_ERROR; + } + + rc = nxt_ruby_read_add_env(task, rmsg, hash_env, "SERVER_ADDR", &str); + if (nxt_slow_path(rc != NXT_OK)) { + return NXT_ERROR; + } + + rc = nxt_app_msg_read_str(task, rmsg, &host); + if (nxt_slow_path(rc != NXT_OK)) { + return NXT_ERROR; + } + + if (host.length == 0) { + host = def_host; + } + + colon = nxt_memchr(host.start, ':', host.length); + server_name = host; + + if (colon != NULL) { + server_name.length = colon - host.start; + + server_port.start = colon + 1; + server_port.length = host.length - server_name.length - 1; + + } else { + server_port = def_port; + } + + rb_hash_aset(hash_env, rb_str_new2("SERVER_NAME"), + rb_str_new((const char *) server_name.start, + (long) server_name.length)); + + rb_hash_aset(hash_env, rb_str_new2("SERVER_PORT"), + rb_str_new((const char *) server_port.start, + (long) server_port.length)); + + rc = nxt_ruby_read_add_env(task, rmsg, hash_env, "CONTENT_TYPE", &str); + if (nxt_slow_path(rc != NXT_OK)) { + return NXT_ERROR; + } + + rc = nxt_ruby_read_add_env(task, rmsg, hash_env, "CONTENT_LENGTH", &str); + if (nxt_slow_path(rc != NXT_OK)) { + return NXT_ERROR; + } + + for ( ;; ) { + rc = nxt_app_msg_read_str(task, rmsg, &str); + if (nxt_slow_path(rc != NXT_OK)) { + return NXT_ERROR; + } + + if (nxt_slow_path(str.length == 0)) { + break; + } + + rc = nxt_app_msg_read_str(task, rmsg, &value); + if (nxt_slow_path(rc != NXT_OK)) { + return NXT_ERROR; + } + + rb_hash_aset(hash_env, + rb_str_new((char *) str.start, (long) str.length), + rb_str_new((const char *) value.start, + (long) value.length)); + } + + rc = nxt_app_msg_read_size(task, rmsg, &run_ctx->body_preread_size); + if (nxt_slow_path(rc != NXT_OK)) { + return NXT_ERROR; + } + + return NXT_OK; +} + + +nxt_inline nxt_int_t +nxt_ruby_read_add_env(nxt_task_t *task, nxt_app_rmsg_t *rmsg, VALUE hash_env, + const char *name, nxt_str_t *str) +{ + nxt_int_t rc; + + rc = nxt_app_msg_read_str(task, rmsg, str); + if (nxt_slow_path(rc != NXT_OK)) { + return rc; + } + + if (str->start == NULL) { + rb_hash_aset(hash_env, rb_str_new2(name), Qnil); + return NXT_OK; + } + + rb_hash_aset(hash_env, rb_str_new2(name), + rb_str_new((const char *) str->start, (long) str->length)); + + return NXT_OK; +} + + +static nxt_int_t +nxt_ruby_rack_result_status(VALUE result) +{ + VALUE status; + u_char *p; + size_t len; + nxt_int_t rc; + u_char buf[3]; + + status = rb_ary_entry(result, 0); + + if (TYPE(status) == T_FIXNUM) { + nxt_sprintf(buf, buf + 3, "%03d", FIX2INT(status)); + + p = buf; + len = 3; + + } else if (TYPE(status) == T_STRING) { + p = (u_char *) RSTRING_PTR(status); + len = RSTRING_LEN(status); + + } else { + nxt_log(nxt_ruby_run_ctx.task, NXT_LOG_ERR, + "Ruby: Invalid response 'status' format from application"); + + return NXT_ERROR; + } + + rc = nxt_ruby_write(nxt_ruby_run_ctx.task, nxt_ruby_run_ctx.wmsg, + (u_char *) "Status: ", (sizeof("Status: ") - 1), 0, 0); + if (nxt_slow_path(rc != NXT_OK)) { + return NXT_ERROR; + } + + rc = nxt_ruby_write(nxt_ruby_run_ctx.task, nxt_ruby_run_ctx.wmsg, + p, len, 0, 0); + if (nxt_slow_path(rc != NXT_OK)) { + return NXT_ERROR; + } + + rc = nxt_ruby_write(nxt_ruby_run_ctx.task, nxt_ruby_run_ctx.wmsg, + (u_char *) "\r\n", (sizeof("\r\n") - 1), 0, 0); + if (nxt_slow_path(rc != NXT_OK)) { + return NXT_ERROR; + } + + return NXT_OK; +} + + +nxt_inline nxt_int_t +nxt_ruby_write(nxt_task_t *task, nxt_app_wmsg_t *wmsg, + const u_char *data, size_t len, nxt_bool_t flush, nxt_bool_t last) +{ + nxt_int_t rc; + + rc = nxt_app_msg_write_raw(task, wmsg, data, len); + if (nxt_slow_path(rc != NXT_OK)) { + return rc; + } + + if (flush || last) { + rc = nxt_app_msg_flush(task, wmsg, last); + } + + return rc; +} + + +static nxt_int_t +nxt_ruby_rack_result_headers(VALUE result) +{ + VALUE headers; + nxt_int_t rc; + + headers = rb_ary_entry(result, 1); + if (nxt_slow_path(TYPE(headers) != T_HASH)) { + nxt_log(nxt_ruby_run_ctx.task, NXT_LOG_ERR, + "Ruby: Invalid response 'headers' format from application"); + + return NXT_ERROR; + } + + rc = NXT_OK; + + rb_hash_foreach(headers, nxt_ruby_hash_foreach, (VALUE) (uintptr_t) &rc); + if (nxt_slow_path(rc != NXT_OK)) { + return NXT_ERROR; + } + + rc = nxt_ruby_write(nxt_ruby_run_ctx.task, nxt_ruby_run_ctx.wmsg, + (u_char *) "\r\n", (sizeof("\r\n") - 1), 0, 0); + if (nxt_slow_path(rc != NXT_OK)) { + return NXT_ERROR; + } + + return NXT_OK; +} + + +static int +nxt_ruby_hash_foreach(VALUE r_key, VALUE r_value, VALUE arg) +{ + nxt_int_t rc, *rc_p; + const char *value, *value_end, *pos; + + rc_p = (nxt_int_t *) (uintptr_t) arg; + + if (nxt_slow_path(TYPE(r_key) != T_STRING)) { + nxt_log(nxt_ruby_run_ctx.task, NXT_LOG_ERR, + "Ruby: Wrong header entry 'key' from application"); + + goto fail; + } + + if (nxt_slow_path(TYPE(r_value) != T_STRING)) { + nxt_log(nxt_ruby_run_ctx.task, NXT_LOG_ERR, + "Ruby: Wrong header entry 'value' from application"); + + goto fail; + } + + value = RSTRING_PTR(r_value); + value_end = value + RSTRING_LEN(r_value); + + pos = value; + + for ( ;; ) { + pos = strchr(pos, '\n'); + + if (pos == NULL) { + break; + } + + rc = nxt_ruby_head_send_part(RSTRING_PTR(r_key), RSTRING_LEN(r_key), + value, pos - value); + if (nxt_slow_path(rc != NXT_OK)) { + goto fail; + } + + pos++; + value = pos; + } + + if (value <= value_end) { + rc = nxt_ruby_head_send_part(RSTRING_PTR(r_key), RSTRING_LEN(r_key), + value, value_end - value); + if (nxt_slow_path(rc != NXT_OK)) { + goto fail; + } + } + + *rc_p = NXT_OK; + + return ST_CONTINUE; + +fail: + + *rc_p = NXT_ERROR; + + return ST_STOP; +} + + +static nxt_int_t +nxt_ruby_head_send_part(const char *key, size_t key_size, + const char *value, size_t value_size) +{ + nxt_int_t rc; + + rc = nxt_app_msg_write_raw(nxt_ruby_run_ctx.task, nxt_ruby_run_ctx.wmsg, + (u_char *) key, key_size); + if (nxt_slow_path(rc != NXT_OK)) { + return rc; + } + + rc = nxt_app_msg_write_raw(nxt_ruby_run_ctx.task, nxt_ruby_run_ctx.wmsg, + (u_char *) ": ", (sizeof(": ") - 1)); + if (nxt_slow_path(rc != NXT_OK)) { + return rc; + } + + rc = nxt_app_msg_write_raw(nxt_ruby_run_ctx.task, nxt_ruby_run_ctx.wmsg, + (u_char *) value, value_size); + if (nxt_slow_path(rc != NXT_OK)) { + return rc; + } + + return nxt_app_msg_write_raw(nxt_ruby_run_ctx.task, nxt_ruby_run_ctx.wmsg, + (u_char *) "\r\n", (sizeof("\r\n") - 1)); +} + + +static nxt_int_t +nxt_ruby_rack_result_body(VALUE result) +{ + VALUE fn, body; + nxt_int_t rc; + + body = rb_ary_entry(result, 2); + + if (rb_respond_to(body, rb_intern("to_path"))) { + + fn = rb_funcall(body, rb_intern("to_path"), 0); + if (nxt_slow_path(TYPE(fn) != T_STRING)) { + nxt_log(nxt_ruby_run_ctx.task, NXT_LOG_ERR, + "Ruby: Failed to get 'body' file path from application"); + + return NXT_ERROR; + } + + rc = nxt_ruby_rack_result_body_file_write(fn); + if (nxt_slow_path(rc != NXT_OK)) { + return NXT_ERROR; + } + + } else if (rb_respond_to(body, rb_intern("each"))) { + rb_iterate(rb_each, body, nxt_ruby_rack_result_body_each, 0); + + } else { + nxt_log(nxt_ruby_run_ctx.task, NXT_LOG_ERR, + "Ruby: Invalid response 'body' format from application"); + + return NXT_ERROR; + } + + if (rb_respond_to(body, rb_intern("close"))) { + rb_funcall(body, rb_intern("close"), 0); + } + + return NXT_OK; +} + + +static nxt_int_t +nxt_ruby_rack_result_body_file_write(VALUE filepath) +{ + size_t len; + ssize_t n; + nxt_off_t rest; + nxt_int_t rc; + nxt_file_t file; + nxt_file_info_t finfo; + u_char buf[8192]; + + nxt_memzero(&file, sizeof(nxt_file_t)); + + file.name = (nxt_file_name_t *) RSTRING_PTR(filepath); + + rc = nxt_file_open(nxt_ruby_run_ctx.task, &file, NXT_FILE_RDONLY, + NXT_FILE_OPEN, 0); + if (nxt_slow_path(rc != NXT_OK)) { + nxt_log(nxt_ruby_run_ctx.task, NXT_LOG_ERR, + "Ruby: Failed to open 'body' file: %s", + (const char *) file.name); + + return NXT_ERROR; + } + + rc = nxt_file_info(&file, &finfo); + if (nxt_slow_path(rc != NXT_OK)) { + nxt_log(nxt_ruby_run_ctx.task, NXT_LOG_ERR, + "Ruby: Failed to get 'body' file information: %s", + (const char *) file.name); + + goto fail; + } + + rest = nxt_file_size(&finfo); + + while (rest != 0) { + len = nxt_min(rest, (nxt_off_t) sizeof(buf)); + + n = nxt_file_read(&file, buf, len, nxt_file_size(&finfo) - rest); + if (nxt_slow_path(n != (ssize_t) len)) { + nxt_log(nxt_ruby_run_ctx.task, NXT_LOG_ERR, + "Ruby: Failed to read 'body' file"); + + goto fail; + } + + rest -= len; + + rc = nxt_app_msg_write_raw(nxt_ruby_run_ctx.task, nxt_ruby_run_ctx.wmsg, + buf, len); + if (nxt_slow_path(rc != NXT_OK)) { + nxt_log(nxt_ruby_run_ctx.task, NXT_LOG_ERR, + "Ruby: Failed to write 'body' from application"); + + goto fail; + } + } + + nxt_file_close(nxt_ruby_run_ctx.task, &file); + + return NXT_OK; + +fail: + + nxt_file_close(nxt_ruby_run_ctx.task, &file); + + return NXT_ERROR; +} + + +static VALUE +nxt_ruby_rack_result_body_each(VALUE body) +{ + nxt_int_t rc; + + if (TYPE(body) != T_STRING) { + return Qnil; + } + + rc = nxt_app_msg_write_raw(nxt_ruby_run_ctx.task, nxt_ruby_run_ctx.wmsg, + (u_char *) RSTRING_PTR(body), RSTRING_LEN(body)); + if (nxt_slow_path(rc != NXT_OK)) { + nxt_log(nxt_ruby_run_ctx.task, NXT_LOG_ERR, + "Ruby: Failed to write 'body' from application"); + } + + return Qnil; +} + + +static void +nxt_ruby_exception_log(nxt_task_t *task, uint32_t level, const char *desc) +{ + int i; + VALUE err, ary, eclass, msg; + + nxt_log(task, level, "Ruby: %s", desc); + + err = rb_errinfo(); + ary = rb_funcall(err, rb_intern("backtrace"), 0); + + if (RARRAY_LEN(ary) == 0) { + return; + } + + eclass = rb_class_name(rb_class_of(err)); + msg = rb_funcall(err, rb_intern("message"), 0); + + nxt_log(task, level, "Ruby: %s: %s (%s)", + RSTRING_PTR(RARRAY_PTR(ary)[0]), + RSTRING_PTR(msg), RSTRING_PTR(eclass)); + + for (i = 1; i < RARRAY_LEN(ary); i++) { + nxt_log(task, level, "from %s", RSTRING_PTR(RARRAY_PTR(ary)[i])); + } +} + + +static void +nxt_ruby_atexit(nxt_task_t *task) +{ + rb_gc_unregister_address(&nxt_ruby_io_input); + rb_gc_unregister_address(&nxt_ruby_io_error); + + rb_gc_unregister_address(&nxt_ruby_rackup); + rb_gc_unregister_address(&nxt_ruby_call); + rb_gc_unregister_address(&nxt_ruby_env); + + ruby_cleanup(0); +} diff --git a/src/ruby/nxt_ruby.h b/src/ruby/nxt_ruby.h new file mode 100644 index 00000000..9a6be0d4 --- /dev/null +++ b/src/ruby/nxt_ruby.h @@ -0,0 +1,34 @@ + +/* + * Copyright (C) Alexander Borisov + * Copyright (C) NGINX, Inc. + */ + +#ifndef _NXT_RUBY_H_INCLUDED_ +#define _NXT_RUBY_H_INCLUDED_ + + +#include <ruby.h> +#include <ruby/io.h> +#include <ruby/encoding.h> +#include <ruby/version.h> + +#include <nxt_main.h> +#include <nxt_router.h> +#include <nxt_runtime.h> +#include <nxt_application.h> + + +typedef struct { + nxt_task_t *task; + nxt_app_rmsg_t *rmsg; + nxt_app_wmsg_t *wmsg; + + size_t body_preread_size; +} nxt_ruby_run_ctx_t; + + +VALUE nxt_ruby_stream_io_input_init(void); +VALUE nxt_ruby_stream_io_error_init(void); + +#endif /* _NXT_RUBY_H_INCLUDED_ */ diff --git a/src/ruby/nxt_ruby_stream_io.c b/src/ruby/nxt_ruby_stream_io.c new file mode 100644 index 00000000..eee11c3c --- /dev/null +++ b/src/ruby/nxt_ruby_stream_io.c @@ -0,0 +1,290 @@ + +/* + * Copyright (C) Alexander Borisov + * Copyright (C) NGINX, Inc. + */ + +#include <ruby/nxt_ruby.h> + + +static VALUE nxt_ruby_stream_io_new(VALUE class, VALUE wrap); +static VALUE nxt_ruby_stream_io_initialize(int argc, VALUE *argv, VALUE self); +static VALUE nxt_ruby_stream_io_gets(VALUE obj, VALUE args); +static size_t nxt_ruby_stream_io_read_line(nxt_app_rmsg_t *rmsg, VALUE str); +static VALUE nxt_ruby_stream_io_each(VALUE obj, VALUE args); +static VALUE nxt_ruby_stream_io_read(VALUE obj, VALUE args); +static VALUE nxt_ruby_stream_io_rewind(VALUE obj, VALUE args); +static VALUE nxt_ruby_stream_io_puts(VALUE obj, VALUE args); +static VALUE nxt_ruby_stream_io_write(VALUE obj, VALUE args); +nxt_inline long nxt_ruby_stream_io_s_write(nxt_ruby_run_ctx_t *run_ctx, + VALUE val); +static VALUE nxt_ruby_stream_io_flush(VALUE obj, VALUE args); + + +VALUE +nxt_ruby_stream_io_input_init(void) +{ + VALUE stream_io; + + stream_io = rb_define_class("NGINX_Unit_Stream_IO_Read", rb_cData); + + rb_gc_register_address(&stream_io); + + rb_define_singleton_method(stream_io, "new", nxt_ruby_stream_io_new, 1); + rb_define_method(stream_io, "initialize", nxt_ruby_stream_io_initialize, -1); + rb_define_method(stream_io, "gets", nxt_ruby_stream_io_gets, 0); + rb_define_method(stream_io, "each", nxt_ruby_stream_io_each, 0); + rb_define_method(stream_io, "read", nxt_ruby_stream_io_read, -2); + rb_define_method(stream_io, "rewind", nxt_ruby_stream_io_rewind, 0); + + return stream_io; +} + + +VALUE +nxt_ruby_stream_io_error_init(void) +{ + VALUE stream_io; + + stream_io = rb_define_class("NGINX_Unit_Stream_IO_Error", rb_cData); + + rb_gc_register_address(&stream_io); + + rb_define_singleton_method(stream_io, "new", nxt_ruby_stream_io_new, 1); + rb_define_method(stream_io, "initialize", nxt_ruby_stream_io_initialize, -1); + rb_define_method(stream_io, "puts", nxt_ruby_stream_io_puts, -2); + rb_define_method(stream_io, "write", nxt_ruby_stream_io_write, -2); + rb_define_method(stream_io, "flush", nxt_ruby_stream_io_flush, 0); + + return stream_io; +} + + +static VALUE +nxt_ruby_stream_io_new(VALUE class, VALUE wrap) +{ + VALUE self; + nxt_ruby_run_ctx_t *run_ctx; + + Data_Get_Struct(wrap, nxt_ruby_run_ctx_t, run_ctx); + self = Data_Wrap_Struct(class, 0, 0, run_ctx); + + rb_obj_call_init(self, 0, NULL); + + return self; +} + + +static VALUE +nxt_ruby_stream_io_initialize(int argc, VALUE *argv, VALUE self) +{ + return self; +} + + +static VALUE +nxt_ruby_stream_io_gets(VALUE obj, VALUE args) +{ + VALUE buf; + nxt_ruby_run_ctx_t *run_ctx; + + Data_Get_Struct(obj, nxt_ruby_run_ctx_t, run_ctx); + + if (run_ctx->body_preread_size == 0) { + return Qnil; + } + + buf = rb_str_buf_new(1); + + if (buf == Qnil) { + return Qnil; + } + + run_ctx->body_preread_size -= nxt_ruby_stream_io_read_line(run_ctx->rmsg, + buf); + + return buf; +} + + +static size_t +nxt_ruby_stream_io_read_line(nxt_app_rmsg_t *rmsg, VALUE str) +{ + size_t len, size; + u_char *p; + nxt_buf_t *buf; + + len = 0; + + for (buf = rmsg->buf; buf != NULL; buf = buf->next) { + + size = nxt_buf_mem_used_size(&buf->mem); + p = memchr(buf->mem.pos, '\n', size); + + if (p != NULL) { + p++; + size = p - buf->mem.pos; + + rb_str_cat(str, (const char *) buf->mem.pos, size); + + len += size; + buf->mem.pos = p; + + break; + } + + rb_str_cat(str, (const char *) buf->mem.pos, size); + + len += size; + buf->mem.pos = buf->mem.free; + } + + rmsg->buf = buf; + + return len; +} + + +static VALUE +nxt_ruby_stream_io_each(VALUE obj, VALUE args) +{ + VALUE chunk; + + if (rb_block_given_p() == 0) { + rb_raise(rb_eArgError, "Expected block on rack.input 'each' method"); + } + + for ( ;; ) { + chunk = nxt_ruby_stream_io_gets(obj, Qnil); + + if (chunk == Qnil) { + return Qnil; + } + + rb_yield(chunk); + } + + return Qnil; +} + + +static VALUE +nxt_ruby_stream_io_read(VALUE obj, VALUE args) +{ + VALUE buf; + long copy_size, u_size; + size_t len; + nxt_ruby_run_ctx_t *run_ctx; + + Data_Get_Struct(obj, nxt_ruby_run_ctx_t, run_ctx); + + copy_size = run_ctx->body_preread_size; + + if (RARRAY_LEN(args) > 0 && TYPE(RARRAY_PTR(args)[0]) == T_FIXNUM) { + u_size = NUM2LONG(RARRAY_PTR(args)[0]); + + if (u_size < 0 || copy_size == 0) { + return Qnil; + } + + if (copy_size > u_size) { + copy_size = u_size; + } + } + + if (copy_size == 0) { + return rb_str_new_cstr(""); + } + + buf = rb_str_buf_new(copy_size); + + if (nxt_slow_path(buf == Qnil)) { + return Qnil; + } + + len = nxt_app_msg_read_raw(run_ctx->task, run_ctx->rmsg, + RSTRING_PTR(buf), (size_t) copy_size); + + if (RARRAY_LEN(args) > 1 && TYPE(RARRAY_PTR(args)[1]) == T_STRING) { + + rb_str_set_len(RARRAY_PTR(args)[1], 0); + rb_str_cat(RARRAY_PTR(args)[1], RSTRING_PTR(buf), copy_size); + } + + rb_str_set_len(buf, (long) len); + + run_ctx->body_preread_size -= len; + + return buf; +} + + +static VALUE +nxt_ruby_stream_io_rewind(VALUE obj, VALUE args) +{ + return Qnil; +} + + +static VALUE +nxt_ruby_stream_io_puts(VALUE obj, VALUE args) +{ + nxt_ruby_run_ctx_t *run_ctx; + + if (RARRAY_LEN(args) != 1) { + return Qnil; + } + + Data_Get_Struct(obj, nxt_ruby_run_ctx_t, run_ctx); + + nxt_ruby_stream_io_s_write(run_ctx, RARRAY_PTR(args)[0]); + + return Qnil; +} + + +static VALUE +nxt_ruby_stream_io_write(VALUE obj, VALUE args) +{ + long len; + nxt_ruby_run_ctx_t *run_ctx; + + if (RARRAY_LEN(args) != 1) { + return Qnil; + } + + Data_Get_Struct(obj, nxt_ruby_run_ctx_t, run_ctx); + + len = nxt_ruby_stream_io_s_write(run_ctx, RARRAY_PTR(args)[0]); + + return LONG2FIX(len); +} + + +nxt_inline long +nxt_ruby_stream_io_s_write(nxt_ruby_run_ctx_t *run_ctx, VALUE val) +{ + if (nxt_slow_path(val == Qnil)) { + return 0; + } + + if (TYPE(val) != T_STRING) { + val = rb_funcall(val, rb_intern("to_s"), 0); + + if (TYPE(val) != T_STRING) { + return 0; + } + } + + nxt_log_error(NXT_LOG_ERR, run_ctx->task->log, "Ruby: %s", + RSTRING_PTR(val)); + + return RSTRING_LEN(val); +} + + +static VALUE +nxt_ruby_stream_io_flush(VALUE obj, VALUE args) +{ + return Qnil; +} |