diff options
Diffstat (limited to 'src')
48 files changed, 2144 insertions, 522 deletions
diff --git a/src/go/unit/response.go b/src/go/unit/response.go index bb326ea5..767d66b7 100644 --- a/src/go/unit/response.go +++ b/src/go/unit/response.go @@ -63,7 +63,7 @@ func (r *response) WriteHeader(code int) { for k, vv := range r.header { for _, v := range vv { fields++ - fields_size += len(k) + len(v) + 2 + fields_size += len(k) + len(v) } } diff --git a/src/nodejs/unit-http/unit.cpp b/src/nodejs/unit-http/unit.cpp index ac10024c..10875703 100644 --- a/src/nodejs/unit-http/unit.cpp +++ b/src/nodejs/unit-http/unit.cpp @@ -629,9 +629,6 @@ Unit::response_send_headers(napi_env env, napi_callback_info info) keys_count = napi.get_value_uint32(argv[2]); header_len = napi.get_value_uint32(argv[3]); - /* Need to reserve extra byte for C-string 0-termination. */ - header_len++; - headers = argv[1]; ret = nxt_unit_response_init(req, status_code, keys_count, header_len); @@ -640,6 +637,12 @@ Unit::response_send_headers(napi_env env, napi_callback_info info) return nullptr; } + /* + * Each name and value are 0-terminated by libunit. + * Need to add extra 2 bytes for each header. + */ + header_len += keys_count * 2; + keys = napi.get_property_names(headers); keys_len = napi.get_array_length(keys); @@ -656,8 +659,8 @@ Unit::response_send_headers(napi_env env, napi_callback_info info) name_len = napi.get_value_string_latin1(name, ptr, header_len); name_ptr = ptr; - ptr += name_len; - header_len -= name_len; + ptr += name_len + 1; + header_len -= name_len + 1; hash = nxt_unit_field_hash(name_ptr, name_len); @@ -689,8 +692,8 @@ Unit::response_send_headers(napi_env env, napi_callback_info info) nxt_unit_sptr_set(&f->value, ptr); f->value_length = (uint32_t) value_len; - ptr += value_len; - header_len -= value_len; + ptr += value_len + 1; + header_len -= value_len + 1; req->response->fields_count++; } @@ -715,8 +718,8 @@ Unit::response_send_headers(napi_env env, napi_callback_info info) nxt_unit_sptr_set(&f->value, ptr); f->value_length = (uint32_t) value_len; - ptr += value_len; - header_len -= value_len; + ptr += value_len + 1; + header_len -= value_len + 1; req->response->fields_count++; } diff --git a/src/nxt_application.c b/src/nxt_application.c index 468bc627..bebe3907 100644 --- a/src/nxt_application.c +++ b/src/nxt_application.c @@ -327,6 +327,9 @@ nxt_app_start(nxt_task_t *task, void *data) lang->version, lang->file); nxt_app = nxt_app_module_load(task, lang->file); + if (nxt_slow_path(nxt_app == NULL)) { + return NXT_ERROR; + } } if (nxt_app->pre_init != NULL) { diff --git a/src/nxt_buf.c b/src/nxt_buf.c index 91846e4d..83be0fac 100644 --- a/src/nxt_buf.c +++ b/src/nxt_buf.c @@ -183,7 +183,10 @@ nxt_buf_chain_length(nxt_buf_t *b) length = 0; while (b != NULL) { - length += b->mem.free - b->mem.pos; + if (!nxt_buf_is_sync(b)) { + length += b->mem.free - b->mem.pos; + } + b = b->next; } @@ -195,7 +198,7 @@ static void nxt_buf_completion(nxt_task_t *task, void *obj, void *data) { nxt_mp_t *mp; - nxt_buf_t *b, *parent; + nxt_buf_t *b, *next, *parent; b = obj; parent = data; @@ -204,9 +207,23 @@ nxt_buf_completion(nxt_task_t *task, void *obj, void *data) nxt_assert(data == b->parent); - mp = b->data; - nxt_mp_free(mp, b); + do { + next = b->next; + parent = b->parent; + mp = b->data; + + nxt_mp_free(mp, b); + nxt_buf_parent_completion(task, parent); + + b = next; + } while (b != NULL); +} + + +void +nxt_buf_parent_completion(nxt_task_t *task, nxt_buf_t *parent) +{ if (parent != NULL) { nxt_debug(task, "parent retain:%uD", parent->retain); @@ -255,7 +272,7 @@ static void nxt_buf_ts_completion(nxt_task_t *task, void *obj, void *data) { nxt_mp_t *mp; - nxt_buf_t *b, *parent; + nxt_buf_t *b, *next, *parent; b = obj; parent = data; @@ -268,21 +285,18 @@ nxt_buf_ts_completion(nxt_task_t *task, void *obj, void *data) nxt_assert(data == b->parent); - mp = b->data; - nxt_mp_free(mp, b); - nxt_mp_release(mp); + do { + next = b->next; + parent = b->parent; + mp = b->data; - if (parent != NULL) { - nxt_debug(task, "parent retain:%uD", parent->retain); + nxt_mp_free(mp, b); + nxt_mp_release(mp); - parent->retain--; + nxt_buf_parent_completion(task, parent); - if (parent->retain == 0) { - parent->mem.pos = parent->mem.free; - - parent->completion_handler(task, parent, parent->parent); - } - } + b = next; + } while (b != NULL); } diff --git a/src/nxt_buf.h b/src/nxt_buf.h index 9c22d650..25e8499a 100644 --- a/src/nxt_buf.h +++ b/src/nxt_buf.h @@ -77,17 +77,17 @@ struct nxt_buf_s { uint32_t retain; - uint8_t is_file; /* 1 bit */ - - uint16_t is_mmap:1; - uint16_t is_port_mmap:1; - - uint16_t is_sync:1; - uint16_t is_nobuf:1; - uint16_t is_flush:1; - uint16_t is_last:1; - uint16_t is_port_mmap_sent:1; - uint16_t is_ts:1; + uint8_t cache_hint; + + uint8_t is_file:1; + uint8_t is_mmap:1; + uint8_t is_port_mmap:1; + uint8_t is_sync:1; + uint8_t is_nobuf:1; + uint8_t is_flush:1; + uint8_t is_last:1; + uint8_t is_port_mmap_sent:1; + uint8_t is_ts:1; nxt_buf_mem_t mem; @@ -250,6 +250,7 @@ NXT_EXPORT nxt_buf_t *nxt_buf_sync_alloc(nxt_mp_t *mp, nxt_uint_t flags); NXT_EXPORT nxt_int_t nxt_buf_ts_handle(nxt_task_t *task, void *obj, void *data); +NXT_EXPORT void nxt_buf_parent_completion(nxt_task_t *task, nxt_buf_t *parent); NXT_EXPORT nxt_buf_t *nxt_buf_make_plain(nxt_mp_t *mp, nxt_buf_t *src, size_t size); diff --git a/src/nxt_conf.c b/src/nxt_conf.c index 59eddd77..43820d2a 100644 --- a/src/nxt_conf.c +++ b/src/nxt_conf.c @@ -228,6 +228,13 @@ nxt_conf_get_integer(nxt_conf_value_t *value) } +uint8_t +nxt_conf_get_boolean(nxt_conf_value_t *value) +{ + return value->u.boolean; +} + + nxt_uint_t nxt_conf_object_members_count(nxt_conf_value_t *value) { diff --git a/src/nxt_conf.h b/src/nxt_conf.h index 725a6c95..66201fee 100644 --- a/src/nxt_conf.h +++ b/src/nxt_conf.h @@ -115,6 +115,7 @@ NXT_EXPORT void nxt_conf_set_string(nxt_conf_value_t *value, nxt_str_t *str); NXT_EXPORT nxt_int_t nxt_conf_set_string_dup(nxt_conf_value_t *value, nxt_mp_t *mp, nxt_str_t *str); NXT_EXPORT int64_t nxt_conf_get_integer(nxt_conf_value_t *value); +NXT_EXPORT uint8_t nxt_conf_get_boolean(nxt_conf_value_t *value); // FIXME reimplement and reorder functions below nxt_uint_t nxt_conf_object_members_count(nxt_conf_value_t *value); diff --git a/src/nxt_conf_validation.c b/src/nxt_conf_validation.c index c934b10b..105af675 100644 --- a/src/nxt_conf_validation.c +++ b/src/nxt_conf_validation.c @@ -58,8 +58,12 @@ static nxt_int_t nxt_conf_vldt_listener(nxt_conf_validation_t *vldt, static nxt_int_t nxt_conf_vldt_certificate(nxt_conf_validation_t *vldt, nxt_conf_value_t *value, void *data); #endif +static nxt_int_t nxt_conf_vldt_action(nxt_conf_validation_t *vldt, + nxt_conf_value_t *value, void *data); static nxt_int_t nxt_conf_vldt_pass(nxt_conf_validation_t *vldt, nxt_conf_value_t *value, void *data); +static nxt_int_t nxt_conf_vldt_proxy(nxt_conf_validation_t *vldt, + nxt_conf_value_t *value, void *data); static nxt_int_t nxt_conf_vldt_routes(nxt_conf_validation_t *vldt, nxt_conf_value_t *value, void *data); static nxt_int_t nxt_conf_vldt_routes_member(nxt_conf_validation_t *vldt, @@ -101,11 +105,9 @@ static nxt_int_t nxt_conf_vldt_java_classpath(nxt_conf_validation_t *vldt, static nxt_int_t nxt_conf_vldt_java_option(nxt_conf_validation_t *vldt, nxt_conf_value_t *value); -static nxt_int_t -nxt_conf_vldt_isolation(nxt_conf_validation_t *vldt, nxt_conf_value_t *value, - void *data); -static nxt_int_t -nxt_conf_vldt_clone_namespaces(nxt_conf_validation_t *vldt, +static nxt_int_t nxt_conf_vldt_isolation(nxt_conf_validation_t *vldt, + nxt_conf_value_t *value, void *data); +static nxt_int_t nxt_conf_vldt_clone_namespaces(nxt_conf_validation_t *vldt, nxt_conf_value_t *value, void *data); #if (NXT_HAVE_CLONE_NEWUSER) @@ -316,6 +318,11 @@ static nxt_conf_vldt_object_t nxt_conf_vldt_action_members[] = { NULL, NULL }, + { nxt_string("proxy"), + NXT_CONF_VLDT_STRING, + &nxt_conf_vldt_proxy, + NULL }, + NXT_CONF_VLDT_END }; @@ -328,8 +335,8 @@ static nxt_conf_vldt_object_t nxt_conf_vldt_route_members[] = { { nxt_string("action"), NXT_CONF_VLDT_OBJECT, - &nxt_conf_vldt_object, - (void *) &nxt_conf_vldt_action_members }, + &nxt_conf_vldt_action, + NULL }, NXT_CONF_VLDT_END }; @@ -618,7 +625,7 @@ static nxt_conf_vldt_object_t nxt_conf_vldt_java_members[] = { { nxt_string("classpath"), NXT_CONF_VLDT_ARRAY, &nxt_conf_vldt_array_iterator, - (void *) &nxt_conf_vldt_java_classpath}, + (void *) &nxt_conf_vldt_java_classpath }, { nxt_string("webapp"), NXT_CONF_VLDT_STRING, @@ -628,7 +635,7 @@ static nxt_conf_vldt_object_t nxt_conf_vldt_java_members[] = { { nxt_string("options"), NXT_CONF_VLDT_ARRAY, &nxt_conf_vldt_array_iterator, - (void *) &nxt_conf_vldt_java_option}, + (void *) &nxt_conf_vldt_java_option }, { nxt_string("unit_jars"), NXT_CONF_VLDT_STRING, @@ -881,6 +888,37 @@ nxt_conf_vldt_listener(nxt_conf_validation_t *vldt, nxt_str_t *name, static nxt_int_t +nxt_conf_vldt_action(nxt_conf_validation_t *vldt, nxt_conf_value_t *value, + void *data) +{ + nxt_int_t ret; + nxt_conf_value_t *pass_value, *share_value, *proxy_value; + + static nxt_str_t pass_str = nxt_string("pass"); + static nxt_str_t share_str = nxt_string("share"); + static nxt_str_t proxy_str = nxt_string("proxy"); + + ret = nxt_conf_vldt_object(vldt, value, nxt_conf_vldt_action_members); + + if (ret != NXT_OK) { + return ret; + } + + pass_value = nxt_conf_get_object_member(value, &pass_str, NULL); + share_value = nxt_conf_get_object_member(value, &share_str, NULL); + proxy_value = nxt_conf_get_object_member(value, &proxy_str, NULL); + + if (pass_value == NULL && share_value == NULL && proxy_value == NULL) { + return nxt_conf_vldt_error(vldt, "The \"action\" object must have " + "either \"pass\" or \"share\" or " + "\"proxy\" option set."); + } + + return NXT_OK; +} + + +static nxt_int_t nxt_conf_vldt_pass(nxt_conf_validation_t *vldt, nxt_conf_value_t *value, void *data) { @@ -964,6 +1002,30 @@ error: static nxt_int_t +nxt_conf_vldt_proxy(nxt_conf_validation_t *vldt, nxt_conf_value_t *value, + void *data) +{ + nxt_str_t name; + nxt_sockaddr_t *sa; + + nxt_conf_get_string(value, &name); + + if (nxt_str_start(&name, "http://", 7)) { + name.length -= 7; + name.start += 7; + + sa = nxt_sockaddr_parse(vldt->pool, &name); + if (sa != NULL) { + return NXT_OK; + } + } + + return nxt_conf_vldt_error(vldt, "The \"proxy\" address is invalid \"%V\"", + &name); +} + + +static nxt_int_t nxt_conf_vldt_routes(nxt_conf_validation_t *vldt, nxt_conf_value_t *value, void *data) { @@ -1525,8 +1587,8 @@ nxt_conf_vldt_environment(nxt_conf_validation_t *vldt, nxt_str_t *name, static nxt_int_t -nxt_conf_vldt_clone_namespaces(nxt_conf_validation_t *vldt, nxt_conf_value_t *value, - void *data) +nxt_conf_vldt_clone_namespaces(nxt_conf_validation_t *vldt, + nxt_conf_value_t *value, void *data) { return nxt_conf_vldt_object(vldt, value, data); } @@ -1691,7 +1753,8 @@ nxt_conf_vldt_php_option(nxt_conf_validation_t *vldt, nxt_str_t *name, static nxt_int_t -nxt_conf_vldt_java_classpath(nxt_conf_validation_t *vldt, nxt_conf_value_t *value) +nxt_conf_vldt_java_classpath(nxt_conf_validation_t *vldt, + nxt_conf_value_t *value) { nxt_str_t str; diff --git a/src/nxt_conn.h b/src/nxt_conn.h index 7284808b..2c1d49a0 100644 --- a/src/nxt_conn.h +++ b/src/nxt_conn.h @@ -8,7 +8,7 @@ #define _NXT_CONN_H_INCLUDED_ -typedef ssize_t (*nxt_conn_io_read_t)(nxt_conn_t *c); +typedef ssize_t (*nxt_conn_io_read_t)(nxt_task_t *task, nxt_conn_t *c); typedef nxt_msec_t (*nxt_conn_timer_value_t)(nxt_conn_t *c, uintptr_t data); diff --git a/src/nxt_conn_connect.c b/src/nxt_conn_connect.c index 12b6c80c..d045853f 100644 --- a/src/nxt_conn_connect.c +++ b/src/nxt_conn_connect.c @@ -7,6 +7,9 @@ #include <nxt_main.h> +static nxt_err_t nxt_conn_connect_test_error(nxt_task_t *task, nxt_conn_t *c); + + void nxt_conn_sys_socket(nxt_task_t *task, void *obj, void *data) { @@ -49,7 +52,7 @@ nxt_conn_io_connect(nxt_task_t *task, void *obj, void *data) case NXT_AGAIN: c->socket.write_handler = nxt_conn_connect_test; - c->socket.error_handler = state->error_handler; + c->socket.error_handler = nxt_conn_connect_error; engine = task->thread->engine; @@ -118,8 +121,7 @@ nxt_conn_socket(nxt_task_t *task, nxt_conn_t *c) void nxt_conn_connect_test(nxt_task_t *task, void *obj, void *data) { - int ret, err; - socklen_t len; + nxt_err_t err; nxt_conn_t *c; c = obj; @@ -132,48 +134,35 @@ nxt_conn_connect_test(nxt_task_t *task, void *obj, void *data) nxt_timer_disable(task->thread->engine, &c->write_timer); } - err = 0; - len = sizeof(int); - - /* - * Linux and BSDs return 0 and store a pending error in the err argument; - * Solaris returns -1 and sets the errno. - */ - - ret = getsockopt(c->socket.fd, SOL_SOCKET, SO_ERROR, (void *) &err, &len); - - if (nxt_slow_path(ret == -1)) { - err = nxt_errno; - } + err = nxt_conn_connect_test_error(task, c); if (err == 0) { nxt_work_queue_add(c->write_work_queue, c->write_state->ready_handler, task, c, data); - return; + } else { + nxt_conn_connect_error(task, c, data); } - - c->socket.error = err; - - nxt_log(task, nxt_socket_error_level(err), "connect(%d, %*s) failed %E", - c->socket.fd, (size_t) c->remote->length, - nxt_sockaddr_start(c->remote), err); - - nxt_conn_connect_error(task, c, data); } void nxt_conn_connect_error(nxt_task_t *task, void *obj, void *data) { + nxt_err_t err; nxt_conn_t *c; nxt_work_handler_t handler; const nxt_conn_state_t *state; c = obj; + err = c->socket.error; + + if (err == 0) { + err = nxt_conn_connect_test_error(task, c); + } state = c->write_state; - switch (c->socket.error) { + switch (err) { case NXT_ECONNREFUSED: #if (NXT_LINUX) @@ -193,3 +182,22 @@ nxt_conn_connect_error(nxt_task_t *task, void *obj, void *data) nxt_work_queue_add(c->write_work_queue, handler, task, c, data); } + + +static nxt_err_t +nxt_conn_connect_test_error(nxt_task_t *task, nxt_conn_t *c) +{ + nxt_err_t err; + + err = nxt_socket_error(c->socket.fd); + + if (err != 0) { + c->socket.error = err; + + nxt_log(task, nxt_socket_error_level(err), "connect(%d, %*s) failed %E", + c->socket.fd, (size_t) c->remote->length, + nxt_sockaddr_start(c->remote), err); + } + + return err; +} diff --git a/src/nxt_conn_read.c b/src/nxt_conn_read.c index 83969b31..3285abcd 100644 --- a/src/nxt_conn_read.c +++ b/src/nxt_conn_read.c @@ -69,7 +69,7 @@ nxt_conn_io_read(nxt_task_t *task, void *obj, void *data) n = c->io->recvbuf(c, c->read); } else { - n = state->io_read_handler(c); + n = state->io_read_handler(task, c); /* The state can be changed by io_read_handler. */ state = c->read_state; } diff --git a/src/nxt_epoll_engine.c b/src/nxt_epoll_engine.c index 9cdaab9b..a944834e 100644 --- a/src/nxt_epoll_engine.c +++ b/src/nxt_epoll_engine.c @@ -944,12 +944,12 @@ nxt_epoll_poll(nxt_event_engine_t *engine, nxt_msec_t timeout) nxt_work_queue_add(ev->read_work_queue, ev->read_handler, ev->task, ev, ev->data); + error = 0; + } else if (engine->u.epoll.mode == 0) { /* Level-triggered mode. */ nxt_epoll_disable_read(engine, ev); } - - error = 0; } if ((events & EPOLLOUT) != 0) { @@ -964,12 +964,12 @@ nxt_epoll_poll(nxt_event_engine_t *engine, nxt_msec_t timeout) nxt_work_queue_add(ev->write_work_queue, ev->write_handler, ev->task, ev, ev->data); + error = 0; + } else if (engine->u.epoll.mode == 0) { /* Level-triggered mode. */ nxt_epoll_disable_write(engine, ev); } - - error = 0; } if (!error) { diff --git a/src/nxt_event_engine.c b/src/nxt_event_engine.c index 31a35f6d..6f051067 100644 --- a/src/nxt_event_engine.c +++ b/src/nxt_event_engine.c @@ -556,19 +556,19 @@ nxt_event_engine_start(nxt_event_engine_t *engine) void * -nxt_event_engine_mem_alloc(nxt_event_engine_t *engine, uint8_t *slot, +nxt_event_engine_mem_alloc(nxt_event_engine_t *engine, uint8_t *hint, size_t size) { - uint8_t n; + uint32_t n; nxt_uint_t items; nxt_array_t *mem_cache; nxt_mem_cache_t *cache; nxt_mem_cache_block_t *block; mem_cache = engine->mem_cache; - n = *slot; + n = *hint; - if (n == (uint8_t) -1) { + if (n == NXT_EVENT_ENGINE_NO_MEM_HINT) { if (mem_cache == NULL) { /* IPv4 nxt_sockaddr_t and HTTP/1 and HTTP/2 buffers. */ @@ -607,7 +607,9 @@ nxt_event_engine_mem_alloc(nxt_event_engine_t *engine, uint8_t *slot, found: - *slot = n; + if (n < NXT_EVENT_ENGINE_NO_MEM_HINT) { + *hint = (uint8_t) n; + } } cache = mem_cache->elts; @@ -626,15 +628,39 @@ nxt_event_engine_mem_alloc(nxt_event_engine_t *engine, uint8_t *slot, void -nxt_event_engine_mem_free(nxt_event_engine_t *engine, uint8_t *slot, void *p) +nxt_event_engine_mem_free(nxt_event_engine_t *engine, uint8_t hint, void *p, + size_t size) { + uint32_t n; + nxt_array_t *mem_cache; nxt_mem_cache_t *cache; nxt_mem_cache_block_t *block; block = p; + mem_cache = engine->mem_cache; + cache = mem_cache->elts; + + n = hint; + + if (nxt_slow_path(n == NXT_EVENT_ENGINE_NO_MEM_HINT)) { - cache = engine->mem_cache->elts; - cache = cache + *slot; + if (size != 0) { + for (n = 0; n < mem_cache->nelts; n++) { + if (cache[n].size == size) { + goto found; + } + } + + nxt_alert(&engine->task, + "event engine mem free(%p, %z) not found", p, size); + } + + goto done; + } + +found: + + cache = cache + n; if (cache->count < 16) { cache->count++; @@ -644,10 +670,79 @@ nxt_event_engine_mem_free(nxt_event_engine_t *engine, uint8_t *slot, void *p) return; } +done: + nxt_mp_free(engine->mem_pool, p); } +void * +nxt_event_engine_buf_mem_alloc(nxt_event_engine_t *engine, size_t size) +{ + nxt_buf_t *b; + uint8_t hint; + + hint = NXT_EVENT_ENGINE_NO_MEM_HINT; + + b = nxt_event_engine_mem_alloc(engine, &hint, NXT_BUF_MEM_SIZE + size); + if (nxt_slow_path(b == NULL)) { + return NULL; + } + + nxt_memzero(b, NXT_BUF_MEM_SIZE); + + b->cache_hint = hint; + b->data = engine; + b->completion_handler = nxt_event_engine_buf_mem_completion; + + if (size != 0) { + b->mem.start = nxt_pointer_to(b, NXT_BUF_MEM_SIZE); + b->mem.pos = b->mem.start; + b->mem.free = b->mem.start; + b->mem.end = b->mem.start + size; + } + + return b; +} + + +void +nxt_event_engine_buf_mem_free(nxt_event_engine_t *engine, nxt_buf_t *b) +{ + size_t size; + + size = NXT_BUF_MEM_SIZE + nxt_buf_mem_size(&b->mem); + + nxt_event_engine_mem_free(engine, b->cache_hint, b, size); +} + + +void +nxt_event_engine_buf_mem_completion(nxt_task_t *task, void *obj, void *data) +{ + nxt_event_engine_t *engine; + nxt_buf_t *b, *next, *parent; + + b = obj; + parent = data; + + nxt_debug(task, "buf completion: %p %p", b, b->mem.start); + + engine = b->data; + + do { + next = b->next; + parent = b->parent; + + nxt_event_engine_buf_mem_free(engine, b); + + nxt_buf_parent_completion(task, parent); + + b = next; + } while (b != NULL); +} + + #if (NXT_DEBUG) void nxt_event_engine_thread_adopt(nxt_event_engine_t *engine) diff --git a/src/nxt_event_engine.h b/src/nxt_event_engine.h index 365d9e89..6b05d510 100644 --- a/src/nxt_event_engine.h +++ b/src/nxt_event_engine.h @@ -514,10 +514,16 @@ NXT_EXPORT void nxt_event_engine_post(nxt_event_engine_t *engine, NXT_EXPORT void nxt_event_engine_signal(nxt_event_engine_t *engine, nxt_uint_t signo); -void *nxt_event_engine_mem_alloc(nxt_event_engine_t *engine, uint8_t *slot, +#define NXT_EVENT_ENGINE_NO_MEM_HINT 255 + +void *nxt_event_engine_mem_alloc(nxt_event_engine_t *engine, uint8_t *hint, size_t size); -void nxt_event_engine_mem_free(nxt_event_engine_t *engine, uint8_t *slot, - void *p); +void nxt_event_engine_mem_free(nxt_event_engine_t *engine, uint8_t hint, + void *p, size_t size); +void *nxt_event_engine_buf_mem_alloc(nxt_event_engine_t *engine, size_t size); +void nxt_event_engine_buf_mem_free(nxt_event_engine_t *engine, nxt_buf_t *b); +void nxt_event_engine_buf_mem_completion(nxt_task_t *task, void *obj, + void *data); nxt_inline nxt_event_engine_t * diff --git a/src/nxt_h1proto.c b/src/nxt_h1proto.c index 541fcb44..b07eaf84 100644 --- a/src/nxt_h1proto.c +++ b/src/nxt_h1proto.c @@ -18,10 +18,10 @@ */ #if (NXT_TLS) -static ssize_t nxt_http_idle_io_read_handler(nxt_conn_t *c); +static ssize_t nxt_http_idle_io_read_handler(nxt_task_t *task, nxt_conn_t *c); static void nxt_http_conn_test(nxt_task_t *task, void *obj, void *data); #endif -static ssize_t nxt_h1p_idle_io_read_handler(nxt_conn_t *c); +static ssize_t nxt_h1p_idle_io_read_handler(nxt_task_t *task, nxt_conn_t *c); static void nxt_h1p_conn_proto_init(nxt_task_t *task, void *obj, void *data); static void nxt_h1p_conn_request_init(nxt_task_t *task, void *obj, void *data); static void nxt_h1p_conn_request_header_parse(nxt_task_t *task, void *obj, @@ -45,7 +45,7 @@ static void nxt_h1p_conn_request_body_read(nxt_task_t *task, void *obj, void *data); static void nxt_h1p_request_local_addr(nxt_task_t *task, nxt_http_request_t *r); static void nxt_h1p_request_header_send(nxt_task_t *task, - nxt_http_request_t *r, nxt_work_handler_t body_handler); + nxt_http_request_t *r, nxt_work_handler_t body_handler, void *data); static void nxt_h1p_request_send(nxt_task_t *task, nxt_http_request_t *r, nxt_buf_t *out); static nxt_buf_t *nxt_h1p_chunk_create(nxt_task_t *task, nxt_http_request_t *r, @@ -78,11 +78,32 @@ static void nxt_h1p_idle_response_timeout(nxt_task_t *task, void *obj, static nxt_msec_t nxt_h1p_idle_response_timer_value(nxt_conn_t *c, uintptr_t data); static void nxt_h1p_shutdown(nxt_task_t *task, nxt_conn_t *c); -static void nxt_h1p_shutdown_(nxt_task_t *task, nxt_conn_t *c); +static void nxt_h1p_closing(nxt_task_t *task, nxt_conn_t *c); static void nxt_h1p_conn_ws_shutdown(nxt_task_t *task, void *obj, void *data); static void nxt_h1p_conn_closing(nxt_task_t *task, void *obj, void *data); static void nxt_h1p_conn_free(nxt_task_t *task, void *obj, void *data); +static void nxt_h1p_peer_connect(nxt_task_t *task, nxt_http_peer_t *peer); +static void nxt_h1p_peer_connected(nxt_task_t *task, void *obj, void *data); +static void nxt_h1p_peer_refused(nxt_task_t *task, void *obj, void *data); +static void nxt_h1p_peer_header_send(nxt_task_t *task, nxt_http_peer_t *peer); +static void nxt_h1p_peer_header_sent(nxt_task_t *task, void *obj, void *data); +static void nxt_h1p_peer_header_read(nxt_task_t *task, nxt_http_peer_t *peer); +static ssize_t nxt_h1p_peer_io_read_handler(nxt_task_t *task, nxt_conn_t *c); +static void nxt_h1p_peer_header_read_done(nxt_task_t *task, void *obj, + void *data); +static nxt_int_t nxt_h1p_peer_header_parse(nxt_http_peer_t *peer, + nxt_buf_mem_t *bm); +static void nxt_h1p_peer_read(nxt_task_t *task, nxt_http_peer_t *peer); +static void nxt_h1p_peer_read_done(nxt_task_t *task, void *obj, void *data); +static void nxt_h1p_peer_closed(nxt_task_t *task, void *obj, void *data); +static void nxt_h1p_peer_error(nxt_task_t *task, void *obj, void *data); +static void nxt_h1p_peer_send_timeout(nxt_task_t *task, void *obj, void *data); +static void nxt_h1p_peer_read_timeout(nxt_task_t *task, void *obj, void *data); +static nxt_msec_t nxt_h1p_peer_timer_value(nxt_conn_t *c, uintptr_t data); +static void nxt_h1p_peer_close(nxt_task_t *task, nxt_http_peer_t *peer); +static void nxt_h1p_peer_free(nxt_task_t *task, void *obj, void *data); + #if (NXT_TLS) static const nxt_conn_state_t nxt_http_idle_state; static const nxt_conn_state_t nxt_h1p_shutdown_state; @@ -94,6 +115,13 @@ static const nxt_conn_state_t nxt_h1p_request_send_state; static const nxt_conn_state_t nxt_h1p_timeout_response_state; static const nxt_conn_state_t nxt_h1p_keepalive_state; static const nxt_conn_state_t nxt_h1p_close_state; +static const nxt_conn_state_t nxt_h1p_peer_connect_state; +static const nxt_conn_state_t nxt_h1p_peer_header_send_state; +static const nxt_conn_state_t nxt_h1p_peer_header_body_send_state; +static const nxt_conn_state_t nxt_h1p_peer_header_read_state; +static const nxt_conn_state_t nxt_h1p_peer_header_read_timer_state; +static const nxt_conn_state_t nxt_h1p_peer_read_state; +static const nxt_conn_state_t nxt_h1p_peer_close_state; const nxt_http_proto_table_t nxt_http_proto[3] = { @@ -106,6 +134,13 @@ const nxt_http_proto_table_t nxt_http_proto[3] = { .body_bytes_sent = nxt_h1p_request_body_bytes_sent, .discard = nxt_h1p_request_discard, .close = nxt_h1p_request_close, + + .peer_connect = nxt_h1p_peer_connect, + .peer_header_send = nxt_h1p_peer_header_send, + .peer_header_read = nxt_h1p_peer_header_read, + .peer_read = nxt_h1p_peer_read, + .peer_close = nxt_h1p_peer_close, + .ws_frame_start = nxt_h1p_websocket_frame_start, }, /* NXT_HTTP_PROTO_H2 */ @@ -113,9 +148,9 @@ const nxt_http_proto_table_t nxt_http_proto[3] = { }; -static nxt_lvlhsh_t nxt_h1p_fields_hash; +static nxt_lvlhsh_t nxt_h1p_fields_hash; -static nxt_http_field_proc_t nxt_h1p_fields[] = { +static nxt_http_field_proc_t nxt_h1p_fields[] = { { nxt_string("Connection"), &nxt_h1p_connection, 0 }, { nxt_string("Upgrade"), &nxt_h1p_upgrade, 0 }, { nxt_string("Sec-WebSocket-Key"), &nxt_h1p_websocket_key, 0 }, @@ -136,11 +171,32 @@ static nxt_http_field_proc_t nxt_h1p_fields[] = { }; +static nxt_lvlhsh_t nxt_h1p_peer_fields_hash; + +static nxt_http_field_proc_t nxt_h1p_peer_fields[] = { + { nxt_string("Connection"), &nxt_http_proxy_skip, 0 }, + { nxt_string("Transfer-Encoding"), &nxt_http_proxy_skip, 0 }, + { nxt_string("Server"), &nxt_http_proxy_skip, 0 }, + { nxt_string("Date"), &nxt_http_proxy_date, 0 }, + { nxt_string("Content-Length"), &nxt_http_proxy_content_length, 0 }, +}; + + nxt_int_t nxt_h1p_init(nxt_task_t *task, nxt_runtime_t *rt) { - return nxt_http_fields_hash(&nxt_h1p_fields_hash, rt->mem_pool, - nxt_h1p_fields, nxt_nitems(nxt_h1p_fields)); + nxt_int_t ret; + + ret = nxt_http_fields_hash(&nxt_h1p_fields_hash, rt->mem_pool, + nxt_h1p_fields, nxt_nitems(nxt_h1p_fields)); + + if (nxt_fast_path(ret == NXT_OK)) { + ret = nxt_http_fields_hash(&nxt_h1p_peer_fields_hash, + rt->mem_pool, nxt_h1p_peer_fields, + nxt_nitems(nxt_h1p_peer_fields)); + } + + return ret; } @@ -196,7 +252,7 @@ static const nxt_conn_state_t nxt_http_idle_state static ssize_t -nxt_http_idle_io_read_handler(nxt_conn_t *c) +nxt_http_idle_io_read_handler(nxt_task_t *task, nxt_conn_t *c) { size_t size; ssize_t n; @@ -216,7 +272,7 @@ nxt_http_idle_io_read_handler(nxt_conn_t *c) size = joint->socket_conf->header_buffer_size; - b = nxt_buf_mem_alloc(c->mem_pool, size, 0); + b = nxt_event_engine_buf_mem_alloc(task->thread->engine, size); if (nxt_slow_path(b == NULL)) { c->socket.error = NXT_ENOMEM; return NXT_ERROR; @@ -234,7 +290,7 @@ nxt_http_idle_io_read_handler(nxt_conn_t *c) } else { c->read = NULL; - nxt_mp_free(c->mem_pool, b); + nxt_event_engine_buf_mem_free(task->thread->engine, b); } return n; @@ -248,12 +304,14 @@ nxt_http_conn_test(nxt_task_t *task, void *obj, void *data) nxt_buf_t *b; nxt_conn_t *c; nxt_tls_conf_t *tls; + nxt_event_engine_t *engine; nxt_socket_conf_joint_t *joint; c = obj; nxt_debug(task, "h1p conn https test"); + engine = task->thread->engine; b = c->read; p = b->mem.pos; @@ -262,7 +320,7 @@ nxt_http_conn_test(nxt_task_t *task, void *obj, void *data) if (p[0] != 0x16) { b->mem.free = b->mem.pos; - nxt_conn_read(task->thread->engine, c); + nxt_conn_read(engine, c); return; } @@ -292,7 +350,7 @@ nxt_http_conn_test(nxt_task_t *task, void *obj, void *data) #endif c->read = NULL; - nxt_mp_free(c->mem_pool, b); + nxt_event_engine_buf_mem_free(engine, b); joint = c->listen->socket.data; @@ -301,7 +359,7 @@ nxt_http_conn_test(nxt_task_t *task, void *obj, void *data) * Listening socket had been closed while * connection was in keep-alive state. */ - nxt_h1p_shutdown(task, c); + nxt_h1p_closing(task, c); return; } @@ -330,7 +388,7 @@ static const nxt_conn_state_t nxt_h1p_idle_state static ssize_t -nxt_h1p_idle_io_read_handler(nxt_conn_t *c) +nxt_h1p_idle_io_read_handler(nxt_task_t *task, nxt_conn_t *c) { size_t size; ssize_t n; @@ -353,7 +411,7 @@ nxt_h1p_idle_io_read_handler(nxt_conn_t *c) if (b == NULL) { size = joint->socket_conf->header_buffer_size; - b = nxt_buf_mem_alloc(c->mem_pool, size, 0); + b = nxt_event_engine_buf_mem_alloc(task->thread->engine, size); if (nxt_slow_path(b == NULL)) { c->socket.error = NXT_ENOMEM; return NXT_ERROR; @@ -367,7 +425,7 @@ nxt_h1p_idle_io_read_handler(nxt_conn_t *c) } else { c->read = NULL; - nxt_mp_free(c->mem_pool, b); + nxt_event_engine_buf_mem_free(task->thread->engine, b); } return n; @@ -386,7 +444,7 @@ nxt_h1p_conn_proto_init(nxt_task_t *task, void *obj, void *data) h1p = nxt_mp_zget(c->mem_pool, sizeof(nxt_h1proto_t)); if (nxt_slow_path(h1p == NULL)) { - nxt_h1p_shutdown(task, c); + nxt_h1p_closing(task, c); return; } @@ -424,6 +482,12 @@ nxt_h1p_conn_request_init(nxt_task_t *task, void *obj, void *data) r->tls = c->u.tls; #endif + r->task = c->task; + task = &r->task; + c->socket.task = task; + c->read_timer.task = task; + c->write_timer.task = task; + ret = nxt_http_parse_request_init(&h1p->parser, r->mem_pool); if (nxt_fast_path(ret == NXT_OK)) { @@ -444,7 +508,7 @@ nxt_h1p_conn_request_init(nxt_task_t *task, void *obj, void *data) nxt_mp_release(r->mem_pool); } - nxt_h1p_shutdown(task, c); + nxt_h1p_closing(task, c); } @@ -599,13 +663,15 @@ nxt_h1p_header_process(nxt_task_t *task, nxt_h1proto_t *h1p, } if (nxt_slow_path(h1p->websocket_key == NULL)) { - nxt_log(task, NXT_LOG_INFO, "h1p upgrade: bad or absent websocket key"); + nxt_log(task, NXT_LOG_INFO, + "h1p upgrade: bad or absent websocket key"); return NXT_HTTP_BAD_REQUEST; } if (nxt_slow_path(h1p->websocket_version_ok == 0)) { - nxt_log(task, NXT_LOG_INFO, "h1p upgrade: bad or absent websocket version"); + nxt_log(task, NXT_LOG_INFO, + "h1p upgrade: bad or absent websocket version"); return NXT_HTTP_UPGRADE_REQUIRED; } @@ -655,16 +721,16 @@ nxt_h1p_header_buffer_test(nxt_task_t *task, nxt_h1proto_t *h1p, nxt_conn_t *c, static nxt_int_t nxt_h1p_connection(void *ctx, nxt_http_field_t *field, uintptr_t data) { - nxt_http_request_t *r; - static const u_char *upgrade = (const u_char *) "upgrade"; + nxt_http_request_t *r; r = ctx; + field->hopbyhop = 1; if (field->value_length == 5 && nxt_memcmp(field->value, "close", 5) == 0) { r->proto.h1->keepalive = 0; } else if (field->value_length == 7 - && nxt_memcasecmp(field->value, upgrade, 7) == 0) + && nxt_memcasecmp(field->value, "upgrade", 7) == 0) { r->proto.h1->connection_upgrade = 1; } @@ -676,13 +742,12 @@ nxt_h1p_connection(void *ctx, nxt_http_field_t *field, uintptr_t data) static nxt_int_t nxt_h1p_upgrade(void *ctx, nxt_http_field_t *field, uintptr_t data) { - nxt_http_request_t *r; - static const u_char *websocket = (const u_char *) "websocket"; + nxt_http_request_t *r; r = ctx; if (field->value_length == 9 - && nxt_memcasecmp(field->value, websocket, 9) == 0) + && nxt_memcasecmp(field->value, "websocket", 9) == 0) { r->proto.h1->upgrade_websocket = 1; } @@ -730,6 +795,8 @@ nxt_h1p_transfer_encoding(void *ctx, nxt_http_field_t *field, uintptr_t data) nxt_http_request_t *r; r = ctx; + field->skip = 1; + field->hopbyhop = 1; if (field->value_length == 7 && nxt_memcmp(field->value, "chunked", 7) == 0) @@ -995,7 +1062,7 @@ static const nxt_str_t nxt_http_server_error[] = { static void nxt_h1p_request_header_send(nxt_task_t *task, nxt_http_request_t *r, - nxt_work_handler_t body_handler) + nxt_work_handler_t body_handler, void *data) { u_char *p; size_t size; @@ -1172,7 +1239,7 @@ nxt_h1p_request_header_send(nxt_task_t *task, nxt_http_request_t *r, * in engine->write_work_queue. */ nxt_work_queue_add(&task->thread->engine->fast_work_queue, - body_handler, task, r, NULL); + body_handler, task, r, data); } else { header->next = nxt_http_buf_last(r); @@ -1211,6 +1278,7 @@ nxt_h1p_complete_buffers(nxt_task_t *task, nxt_h1proto_t *h1p) while (b != NULL) { next = b->next; + b->next = NULL; nxt_work_queue_add(&task->thread->engine->fast_work_queue, b->completion_handler, task, b, b->parent); @@ -1226,7 +1294,8 @@ nxt_h1p_complete_buffers(nxt_task_t *task, nxt_h1proto_t *h1p) size = nxt_buf_mem_used_size(&in->mem); if (size == 0) { - nxt_mp_free(c->mem_pool, in); + nxt_work_queue_add(&task->thread->engine->fast_work_queue, + in->completion_handler, task, in, in->parent); c->read = NULL; } @@ -1480,11 +1549,16 @@ nxt_h1p_request_close(nxt_task_t *task, nxt_http_proto_t proto, nxt_debug(task, "h1p request close"); h1p = proto.h1; + h1p->keepalive &= !h1p->request->inconsistent; h1p->request = NULL; nxt_router_conf_release(task, joint); c = h1p->conn; + task = &c->task; + c->socket.task = task; + c->read_timer.task = task; + c->write_timer.task = task; if (h1p->keepalive) { nxt_h1p_keepalive(task, h1p, c); @@ -1770,14 +1844,31 @@ nxt_h1p_shutdown(nxt_task_t *task, nxt_conn_t *c) } } else { - nxt_h1p_shutdown_(task, c); + nxt_h1p_closing(task, c); } } static void -nxt_h1p_shutdown_(nxt_task_t *task, nxt_conn_t *c) +nxt_h1p_conn_ws_shutdown(nxt_task_t *task, void *obj, void *data) { + nxt_timer_t *timer; + nxt_h1p_websocket_timer_t *ws_timer; + + nxt_debug(task, "h1p conn ws shutdown"); + + timer = obj; + ws_timer = nxt_timer_data(timer, nxt_h1p_websocket_timer_t, timer); + + nxt_h1p_closing(task, ws_timer->h1p->conn); +} + + +static void +nxt_h1p_closing(nxt_task_t *task, nxt_conn_t *c) +{ + nxt_debug(task, "h1p closing"); + c->socket.data = NULL; #if (NXT_TLS) @@ -1809,21 +1900,6 @@ static const nxt_conn_state_t nxt_h1p_shutdown_state static void -nxt_h1p_conn_ws_shutdown(nxt_task_t *task, void *obj, void *data) -{ - nxt_timer_t *timer; - nxt_h1p_websocket_timer_t *ws_timer; - - nxt_debug(task, "h1p conn ws shutdown"); - - timer = obj; - ws_timer = nxt_timer_data(timer, nxt_h1p_websocket_timer_t, timer); - - nxt_h1p_shutdown_(task, ws_timer->h1p->conn); -} - - -static void nxt_h1p_conn_closing(nxt_task_t *task, void *obj, void *data) { nxt_conn_t *c; @@ -1868,3 +1944,673 @@ nxt_h1p_conn_free(nxt_task_t *task, void *obj, void *data) nxt_router_listen_event_release(&engine->task, lev, NULL); } + + +static void +nxt_h1p_peer_connect(nxt_task_t *task, nxt_http_peer_t *peer) +{ + nxt_mp_t *mp; + nxt_int_t ret; + nxt_conn_t *c, *client; + nxt_h1proto_t *h1p; + nxt_fd_event_t *socket; + nxt_work_queue_t *wq; + nxt_http_request_t *r; + + nxt_debug(task, "h1p peer connect"); + + peer->status = NXT_HTTP_UNSET; + r = peer->request; + + mp = nxt_mp_create(1024, 128, 256, 32); + + if (nxt_slow_path(mp == NULL)) { + goto fail; + } + + h1p = nxt_mp_zalloc(mp, sizeof(nxt_h1proto_t)); + if (nxt_slow_path(h1p == NULL)) { + goto fail; + } + + ret = nxt_http_parse_request_init(&h1p->parser, r->mem_pool); + if (nxt_slow_path(ret != NXT_OK)) { + goto fail; + } + + c = nxt_conn_create(mp, task); + if (nxt_slow_path(c == NULL)) { + goto fail; + } + + c->mem_pool = mp; + h1p->conn = c; + + peer->proto.h1 = h1p; + h1p->request = r; + + c->socket.task = task; + c->read_timer.task = task; + c->write_timer.task = task; + c->socket.data = peer; + c->remote = peer->sockaddr; + + c->socket.write_ready = 1; + c->write_state = &nxt_h1p_peer_connect_state; + + /* + * TODO: queues should be implemented via client proto interface. + */ + client = r->proto.h1->conn; + + socket = &client->socket; + wq = socket->read_work_queue; + c->read_work_queue = wq; + c->socket.read_work_queue = wq; + c->read_timer.work_queue = wq; + + wq = socket->write_work_queue; + c->write_work_queue = wq; + c->socket.write_work_queue = wq; + c->write_timer.work_queue = wq; + /* TODO END */ + + nxt_conn_connect(task->thread->engine, c); + + return; + +fail: + + peer->status = NXT_HTTP_INTERNAL_SERVER_ERROR; + + r->state->error_handler(task, r, peer); +} + + +static const nxt_conn_state_t nxt_h1p_peer_connect_state + nxt_aligned(64) = +{ + .ready_handler = nxt_h1p_peer_connected, + .close_handler = nxt_h1p_peer_refused, + .error_handler = nxt_h1p_peer_error, + + .timer_handler = nxt_h1p_peer_send_timeout, + .timer_value = nxt_h1p_peer_timer_value, + .timer_data = offsetof(nxt_socket_conf_t, proxy_timeout), +}; + + +static void +nxt_h1p_peer_connected(nxt_task_t *task, void *obj, void *data) +{ + nxt_http_peer_t *peer; + nxt_http_request_t *r; + + peer = data; + + nxt_debug(task, "h1p peer connected"); + + r = peer->request; + r->state->ready_handler(task, r, peer); +} + + +static void +nxt_h1p_peer_refused(nxt_task_t *task, void *obj, void *data) +{ + nxt_http_peer_t *peer; + nxt_http_request_t *r; + + peer = data; + + nxt_debug(task, "h1p peer refused"); + + //peer->status = NXT_HTTP_SERVICE_UNAVAILABLE; + peer->status = NXT_HTTP_BAD_GATEWAY; + + r = peer->request; + r->state->error_handler(task, r, peer); +} + + +static void +nxt_h1p_peer_header_send(nxt_task_t *task, nxt_http_peer_t *peer) +{ + u_char *p; + size_t size; + nxt_buf_t *header, *body; + nxt_conn_t *c; + nxt_http_field_t *field; + nxt_http_request_t *r; + + nxt_debug(task, "h1p peer header send"); + + r = peer->request; + + size = r->method->length + sizeof(" ") + r->target.length + + sizeof(" HTTP/1.0\r\n") + + sizeof("\r\n"); + + nxt_list_each(field, r->fields) { + + if (!field->hopbyhop) { + size += field->name_length + field->value_length; + size += nxt_length(": \r\n"); + } + + } nxt_list_loop; + + header = nxt_http_buf_mem(task, r, size); + if (nxt_slow_path(header == NULL)) { + r->state->error_handler(task, r, peer); + return; + } + + p = header->mem.free; + + p = nxt_cpymem(p, r->method->start, r->method->length); + *p++ = ' '; + p = nxt_cpymem(p, r->target.start, r->target.length); + p = nxt_cpymem(p, " HTTP/1.0\r\n", 11); + + nxt_list_each(field, r->fields) { + + if (!field->hopbyhop) { + p = nxt_cpymem(p, field->name, field->name_length); + *p++ = ':'; *p++ = ' '; + p = nxt_cpymem(p, field->value, field->value_length); + *p++ = '\r'; *p++ = '\n'; + } + + } nxt_list_loop; + + *p++ = '\r'; *p++ = '\n'; + header->mem.free = p; + size = p - header->mem.pos; + + c = peer->proto.h1->conn; + c->write = header; + c->write_state = &nxt_h1p_peer_header_send_state; + + if (r->body != NULL) { + body = nxt_buf_mem_alloc(r->mem_pool, 0, 0); + if (nxt_slow_path(body == NULL)) { + r->state->error_handler(task, r, peer); + return; + } + + header->next = body; + + body->mem = r->body->mem; + size += nxt_buf_mem_used_size(&body->mem); + +// nxt_mp_retain(r->mem_pool); + } + + if (size > 16384) { + /* Use proxy_send_timeout instead of proxy_timeout. */ + c->write_state = &nxt_h1p_peer_header_body_send_state; + } + + nxt_conn_write(task->thread->engine, c); +} + + +static const nxt_conn_state_t nxt_h1p_peer_header_send_state + nxt_aligned(64) = +{ + .ready_handler = nxt_h1p_peer_header_sent, + .error_handler = nxt_h1p_peer_error, + + .timer_handler = nxt_h1p_peer_send_timeout, + .timer_value = nxt_h1p_peer_timer_value, + .timer_data = offsetof(nxt_socket_conf_t, proxy_timeout), +}; + + +static const nxt_conn_state_t nxt_h1p_peer_header_body_send_state + nxt_aligned(64) = +{ + .ready_handler = nxt_h1p_peer_header_sent, + .error_handler = nxt_h1p_peer_error, + + .timer_handler = nxt_h1p_peer_send_timeout, + .timer_value = nxt_h1p_peer_timer_value, + .timer_data = offsetof(nxt_socket_conf_t, proxy_send_timeout), + .timer_autoreset = 1, +}; + + +static void +nxt_h1p_peer_header_sent(nxt_task_t *task, void *obj, void *data) +{ + nxt_conn_t *c; + nxt_http_peer_t *peer; + nxt_http_request_t *r; + nxt_event_engine_t *engine; + + c = obj; + peer = data; + + nxt_debug(task, "h1p peer header sent"); + + engine = task->thread->engine; + + c->write = nxt_sendbuf_completion(task, &engine->fast_work_queue, c->write); + + if (c->write == NULL) { + r = peer->request; + r->state->ready_handler(task, r, peer); + return; + } + + nxt_conn_write(engine, c); +} + + +static void +nxt_h1p_peer_header_read(nxt_task_t *task, nxt_http_peer_t *peer) +{ + nxt_conn_t *c; + + nxt_debug(task, "h1p peer header read"); + + c = peer->proto.h1->conn; + + if (c->write_timer.enabled) { + c->read_state = &nxt_h1p_peer_header_read_state; + + } else { + c->read_state = &nxt_h1p_peer_header_read_timer_state; + } + + nxt_conn_read(task->thread->engine, c); +} + + +static const nxt_conn_state_t nxt_h1p_peer_header_read_state + nxt_aligned(64) = +{ + .ready_handler = nxt_h1p_peer_header_read_done, + .close_handler = nxt_h1p_peer_closed, + .error_handler = nxt_h1p_peer_error, + + .io_read_handler = nxt_h1p_peer_io_read_handler, +}; + + +static const nxt_conn_state_t nxt_h1p_peer_header_read_timer_state + nxt_aligned(64) = +{ + .ready_handler = nxt_h1p_peer_header_read_done, + .close_handler = nxt_h1p_peer_closed, + .error_handler = nxt_h1p_peer_error, + + .io_read_handler = nxt_h1p_peer_io_read_handler, + + .timer_handler = nxt_h1p_peer_read_timeout, + .timer_value = nxt_h1p_peer_timer_value, + .timer_data = offsetof(nxt_socket_conf_t, proxy_timeout), +}; + + +static ssize_t +nxt_h1p_peer_io_read_handler(nxt_task_t *task, nxt_conn_t *c) +{ + size_t size; + ssize_t n; + nxt_buf_t *b; + nxt_http_peer_t *peer; + nxt_socket_conf_t *skcf; + nxt_http_request_t *r; + + peer = c->socket.data; + r = peer->request; + b = c->read; + + if (b == NULL) { + skcf = r->conf->socket_conf; + + size = (peer->header_received) ? skcf->proxy_buffer_size + : skcf->proxy_header_buffer_size; + + nxt_debug(task, "h1p peer io read: %z", size); + + b = nxt_http_proxy_buf_mem_alloc(task, r, size); + if (nxt_slow_path(b == NULL)) { + c->socket.error = NXT_ENOMEM; + return NXT_ERROR; + } + } + + n = c->io->recvbuf(c, b); + + if (n > 0) { + c->read = b; + + } else { + c->read = NULL; + nxt_http_proxy_buf_mem_free(task, r, b); + } + + return n; +} + + +static void +nxt_h1p_peer_header_read_done(nxt_task_t *task, void *obj, void *data) +{ + nxt_int_t ret; + nxt_buf_t *b; + nxt_conn_t *c; + nxt_http_peer_t *peer; + nxt_http_request_t *r; + nxt_event_engine_t *engine; + + c = obj; + peer = data; + + nxt_debug(task, "h1p peer header read done"); + + b = c->read; + + ret = nxt_h1p_peer_header_parse(peer, &b->mem); + + r = peer->request; + + ret = nxt_expect(NXT_DONE, ret); + + if (ret != NXT_AGAIN) { + engine = task->thread->engine; + nxt_timer_disable(engine, &c->write_timer); + nxt_timer_disable(engine, &c->read_timer); + } + + switch (ret) { + + case NXT_DONE: + peer->fields = peer->proto.h1->parser.fields; + + ret = nxt_http_fields_process(peer->fields, + &nxt_h1p_peer_fields_hash, r); + if (nxt_slow_path(ret != NXT_OK)) { + peer->status = NXT_HTTP_INTERNAL_SERVER_ERROR; + break; + } + + c->read = NULL; + + if (nxt_buf_mem_used_size(&b->mem) != 0) { + peer->body = b; + } + + peer->header_received = 1; + + r->state->ready_handler(task, r, peer); + return; + + case NXT_AGAIN: + if (nxt_buf_mem_free_size(&b->mem) != 0) { + nxt_conn_read(task->thread->engine, c); + return; + } + + /* Fall through. */ + + default: + case NXT_ERROR: + case NXT_HTTP_PARSE_INVALID: + case NXT_HTTP_PARSE_UNSUPPORTED_VERSION: + case NXT_HTTP_PARSE_TOO_LARGE_FIELD: + peer->status = NXT_HTTP_BAD_GATEWAY; + break; + } + + nxt_http_proxy_buf_mem_free(task, r, b); + + r->state->error_handler(task, r, peer); +} + + +static nxt_int_t +nxt_h1p_peer_header_parse(nxt_http_peer_t *peer, nxt_buf_mem_t *bm) +{ + u_char *p; + size_t length; + nxt_int_t status; + + if (peer->status < 0) { + length = nxt_buf_mem_used_size(bm); + + if (nxt_slow_path(length < 12)) { + return NXT_AGAIN; + } + + p = bm->pos; + + if (nxt_slow_path(nxt_memcmp(p, "HTTP/1.", 7) != 0 + || (p[7] != '0' && p[7] != '1'))) + { + return NXT_ERROR; + } + + status = nxt_int_parse(&p[9], 3); + + if (nxt_slow_path(status < 0)) { + return NXT_ERROR; + } + + p += 12; + length -= 12; + + p = nxt_memchr(p, '\n', length); + + if (nxt_slow_path(p == NULL)) { + return NXT_AGAIN; + } + + bm->pos = p + 1; + peer->status = status; + } + + return nxt_http_parse_fields(&peer->proto.h1->parser, bm); +} + + +static void +nxt_h1p_peer_read(nxt_task_t *task, nxt_http_peer_t *peer) +{ + nxt_conn_t *c; + + nxt_debug(task, "h1p peer read"); + + c = peer->proto.h1->conn; + c->read_state = &nxt_h1p_peer_read_state; + + nxt_conn_read(task->thread->engine, c); +} + + +static const nxt_conn_state_t nxt_h1p_peer_read_state + nxt_aligned(64) = +{ + .ready_handler = nxt_h1p_peer_read_done, + .close_handler = nxt_h1p_peer_closed, + .error_handler = nxt_h1p_peer_error, + + .io_read_handler = nxt_h1p_peer_io_read_handler, + + .timer_handler = nxt_h1p_peer_read_timeout, + .timer_value = nxt_h1p_peer_timer_value, + .timer_data = offsetof(nxt_socket_conf_t, proxy_read_timeout), + .timer_autoreset = 1, +}; + + +static void +nxt_h1p_peer_read_done(nxt_task_t *task, void *obj, void *data) +{ + nxt_conn_t *c; + nxt_http_peer_t *peer; + nxt_http_request_t *r; + + c = obj; + peer = data; + + nxt_debug(task, "h1p peer read done"); + + peer->body = c->read; + c->read = NULL; + + r = peer->request; + r->state->ready_handler(task, r, peer); +} + + +static void +nxt_h1p_peer_closed(nxt_task_t *task, void *obj, void *data) +{ + nxt_http_peer_t *peer; + nxt_http_request_t *r; + + peer = data; + + nxt_debug(task, "h1p peer closed"); + + r = peer->request; + + if (peer->header_received) { + peer->body = nxt_http_buf_last(r); + + peer->closed = 1; + + r->state->ready_handler(task, r, peer); + + } else { + peer->status = NXT_HTTP_BAD_GATEWAY; + + r->state->error_handler(task, r, peer); + } +} + + +static void +nxt_h1p_peer_error(nxt_task_t *task, void *obj, void *data) +{ + nxt_http_peer_t *peer; + nxt_http_request_t *r; + + peer = data; + + nxt_debug(task, "h1p peer error"); + + peer->status = NXT_HTTP_BAD_GATEWAY; + + r = peer->request; + r->state->error_handler(task, r, peer); +} + + +static void +nxt_h1p_peer_send_timeout(nxt_task_t *task, void *obj, void *data) +{ + nxt_conn_t *c; + nxt_timer_t *timer; + nxt_http_peer_t *peer; + nxt_http_request_t *r; + + timer = obj; + + nxt_debug(task, "h1p peer send timeout"); + + c = nxt_write_timer_conn(timer); + c->block_write = 1; + c->block_read = 1; + + peer = c->socket.data; + peer->status = NXT_HTTP_GATEWAY_TIMEOUT; + + r = peer->request; + r->state->error_handler(task, r, peer); +} + + +static void +nxt_h1p_peer_read_timeout(nxt_task_t *task, void *obj, void *data) +{ + nxt_conn_t *c; + nxt_timer_t *timer; + nxt_http_peer_t *peer; + nxt_http_request_t *r; + + timer = obj; + + nxt_debug(task, "h1p peer read timeout"); + + c = nxt_read_timer_conn(timer); + c->block_write = 1; + c->block_read = 1; + + peer = c->socket.data; + peer->status = NXT_HTTP_GATEWAY_TIMEOUT; + + r = peer->request; + r->state->error_handler(task, r, peer); +} + + +static nxt_msec_t +nxt_h1p_peer_timer_value(nxt_conn_t *c, uintptr_t data) +{ + nxt_http_peer_t *peer; + + peer = c->socket.data; + + return nxt_value_at(nxt_msec_t, peer->request->conf->socket_conf, data); +} + + +static void +nxt_h1p_peer_close(nxt_task_t *task, nxt_http_peer_t *peer) +{ + nxt_conn_t *c; + + nxt_debug(task, "h1p peer close"); + + peer->closed = 1; + + c = peer->proto.h1->conn; + task = &c->task; + c->socket.task = task; + c->read_timer.task = task; + c->write_timer.task = task; + + if (c->socket.fd != -1) { + c->write_state = &nxt_h1p_peer_close_state; + + nxt_conn_close(task->thread->engine, c); + + } else { + nxt_h1p_peer_free(task, c, NULL); + } +} + + +static const nxt_conn_state_t nxt_h1p_peer_close_state + nxt_aligned(64) = +{ + .ready_handler = nxt_h1p_peer_free, +}; + + +static void +nxt_h1p_peer_free(nxt_task_t *task, void *obj, void *data) +{ + nxt_conn_t *c; + + c = obj; + + nxt_debug(task, "h1p peer free"); + + nxt_conn_free(task, c); +} diff --git a/src/nxt_h1proto.h b/src/nxt_h1proto.h index c6d3bd53..61da6770 100644 --- a/src/nxt_h1proto.h +++ b/src/nxt_h1proto.h @@ -20,6 +20,8 @@ struct nxt_h1proto_s { nxt_http_request_parse_t parser; uint8_t nbuffers; + uint8_t header_buffer_slot; + uint8_t large_buffer_slot; uint8_t keepalive; /* 1 bit */ uint8_t chunked; /* 1 bit */ uint8_t websocket; /* 1 bit */ diff --git a/src/nxt_h1proto_websocket.c b/src/nxt_h1proto_websocket.c index 13754be0..c9ff899c 100644 --- a/src/nxt_h1proto_websocket.c +++ b/src/nxt_h1proto_websocket.c @@ -26,7 +26,7 @@ static void nxt_h1p_conn_ws_keepalive_enable(nxt_task_t *task, static void nxt_h1p_conn_ws_frame_process(nxt_task_t *task, nxt_conn_t *c, nxt_h1proto_t *h1p, nxt_websocket_header_t *wsh); static void nxt_h1p_conn_ws_error(nxt_task_t *task, void *obj, void *data); -static ssize_t nxt_h1p_ws_io_read_handler(nxt_conn_t *c); +static ssize_t nxt_h1p_ws_io_read_handler(nxt_task_t *task, nxt_conn_t *c); static void nxt_h1p_conn_ws_timeout(nxt_task_t *task, void *obj, void *data); static void nxt_h1p_conn_ws_frame_payload_read(nxt_task_t *task, void *obj, void *data); @@ -473,7 +473,7 @@ nxt_h1p_conn_ws_error(nxt_task_t *task, void *obj, void *data) static ssize_t -nxt_h1p_ws_io_read_handler(nxt_conn_t *c) +nxt_h1p_ws_io_read_handler(nxt_task_t *task, nxt_conn_t *c) { size_t size; ssize_t n; @@ -697,6 +697,7 @@ nxt_h1p_conn_ws_pong(nxt_task_t *task, void *obj, void *data) for (i = 0; i < payload_len; i++) { while (nxt_buf_mem_used_size(&b->mem) == 0) { next = b->next; + b->next = NULL; nxt_work_queue_add(&task->thread->engine->fast_work_queue, b->completion_handler, task, b, b->parent); diff --git a/src/nxt_http.h b/src/nxt_http.h index 560b7310..030d77a7 100644 --- a/src/nxt_http.h +++ b/src/nxt_http.h @@ -9,6 +9,7 @@ typedef enum { + NXT_HTTP_UNSET = -1, NXT_HTTP_INVALID = 0, NXT_HTTP_CONTINUE = 100, @@ -105,6 +106,21 @@ typedef struct { } nxt_http_response_t; +typedef struct { + nxt_http_proto_t proto; + nxt_http_request_t *request; + nxt_sockaddr_t *sockaddr; + nxt_list_t *fields; + nxt_buf_t *body; + nxt_off_t remainder; + + nxt_http_status_t status:16; + nxt_http_protocol_t protocol:8; /* 2 bits */ + uint8_t header_received; /* 1 bit */ + uint8_t closed; /* 1 bit */ +} nxt_http_peer_t; + + struct nxt_http_request_s { nxt_http_proto_t proto; nxt_socket_conf_joint_t *conf; @@ -137,12 +153,14 @@ struct nxt_http_request_s { nxt_sockaddr_t *remote; nxt_sockaddr_t *local; void *tls; + nxt_task_t task; nxt_timer_t timer; void *timer_data; void *req_rpc_data; + nxt_http_peer_t *peer; nxt_buf_t *last; nxt_http_response_t resp; @@ -153,20 +171,23 @@ struct nxt_http_request_s { nxt_http_protocol_t protocol:8; /* 2 bits */ uint8_t logged; /* 1 bit */ uint8_t header_sent; /* 1 bit */ + uint8_t inconsistent; /* 1 bit */ uint8_t error; /* 1 bit */ uint8_t websocket_handshake; /* 1 bit */ }; typedef struct nxt_http_route_s nxt_http_route_t; +typedef struct nxt_http_upstream_s nxt_http_upstream_t; -struct nxt_http_pass_s { - nxt_http_pass_t *(*handler)(nxt_task_t *task, +struct nxt_http_action_s { + nxt_http_action_t *(*handler)(nxt_task_t *task, nxt_http_request_t *r, - nxt_http_pass_t *pass); + nxt_http_action_t *action); union { nxt_http_route_t *route; + nxt_http_upstream_t *upstream; nxt_app_t *application; } u; @@ -178,12 +199,19 @@ typedef struct { void (*body_read)(nxt_task_t *task, nxt_http_request_t *r); void (*local_addr)(nxt_task_t *task, nxt_http_request_t *r); void (*header_send)(nxt_task_t *task, nxt_http_request_t *r, - nxt_work_handler_t body_handler); + nxt_work_handler_t body_handler, void *data); void (*send)(nxt_task_t *task, nxt_http_request_t *r, nxt_buf_t *out); nxt_off_t (*body_bytes_sent)(nxt_task_t *task, nxt_http_proto_t proto); void (*discard)(nxt_task_t *task, nxt_http_request_t *r, nxt_buf_t *last); void (*close)(nxt_task_t *task, nxt_http_proto_t proto, nxt_socket_conf_joint_t *joint); + + void (*peer_connect)(nxt_task_t *task, nxt_http_peer_t *peer); + void (*peer_header_send)(nxt_task_t *task, nxt_http_peer_t *peer); + void (*peer_header_read)(nxt_task_t *task, nxt_http_peer_t *peer); + void (*peer_read)(nxt_task_t *task, nxt_http_peer_t *peer); + void (*peer_close)(nxt_task_t *task, nxt_http_peer_t *peer); + void (*ws_frame_start)(nxt_task_t *task, nxt_http_request_t *r, nxt_buf_t *ws_frame); } nxt_http_proto_table_t; @@ -218,7 +246,7 @@ void nxt_http_request_error(nxt_task_t *task, nxt_http_request_t *r, nxt_http_status_t status); void nxt_http_request_read_body(nxt_task_t *task, nxt_http_request_t *r); void nxt_http_request_header_send(nxt_task_t *task, nxt_http_request_t *r, - nxt_work_handler_t body_handler); + nxt_work_handler_t body_handler, void *data); void nxt_http_request_ws_frame_start(nxt_task_t *task, nxt_http_request_t *r, nxt_buf_t *ws_frame); void nxt_http_request_send(nxt_task_t *task, nxt_http_request_t *r, @@ -238,24 +266,36 @@ nxt_int_t nxt_http_request_content_length(void *ctx, nxt_http_field_t *field, nxt_http_routes_t *nxt_http_routes_create(nxt_task_t *task, nxt_router_temp_conf_t *tmcf, nxt_conf_value_t *routes_conf); -nxt_http_pass_t *nxt_http_pass_create(nxt_task_t *task, +nxt_http_action_t *nxt_http_action_create(nxt_task_t *task, nxt_router_temp_conf_t *tmcf, nxt_str_t *name); void nxt_http_routes_resolve(nxt_task_t *task, nxt_router_temp_conf_t *tmcf); -nxt_http_pass_t *nxt_http_pass_application(nxt_task_t *task, +nxt_http_action_t *nxt_http_pass_application(nxt_task_t *task, nxt_router_temp_conf_t *tmcf, nxt_str_t *name); void nxt_http_routes_cleanup(nxt_task_t *task, nxt_http_routes_t *routes); -void nxt_http_pass_cleanup(nxt_task_t *task, nxt_http_pass_t *pass); +void nxt_http_action_cleanup(nxt_task_t *task, nxt_http_action_t *action); -nxt_http_pass_t *nxt_http_static_handler(nxt_task_t *task, - nxt_http_request_t *r, nxt_http_pass_t *pass); +nxt_http_action_t *nxt_http_static_handler(nxt_task_t *task, + nxt_http_request_t *r, nxt_http_action_t *action); nxt_int_t nxt_http_static_mtypes_init(nxt_mp_t *mp, nxt_lvlhsh_t *hash); nxt_int_t nxt_http_static_mtypes_hash_add(nxt_mp_t *mp, nxt_lvlhsh_t *hash, nxt_str_t *extension, nxt_str_t *type); nxt_str_t *nxt_http_static_mtypes_hash_find(nxt_lvlhsh_t *hash, nxt_str_t *extension); -nxt_http_pass_t *nxt_http_request_application(nxt_task_t *task, - nxt_http_request_t *r, nxt_http_pass_t *pass); +nxt_http_action_t *nxt_http_application_handler(nxt_task_t *task, + nxt_http_request_t *r, nxt_http_action_t *action); + +nxt_int_t nxt_http_proxy_create(nxt_mp_t *mp, nxt_http_action_t *action); +nxt_int_t nxt_http_proxy_date(void *ctx, nxt_http_field_t *field, + uintptr_t data); +nxt_int_t nxt_http_proxy_content_length(void *ctx, nxt_http_field_t *field, + uintptr_t data); +nxt_int_t nxt_http_proxy_skip(void *ctx, nxt_http_field_t *field, + uintptr_t data); +nxt_buf_t *nxt_http_proxy_buf_mem_alloc(nxt_task_t *task, nxt_http_request_t *r, + size_t size); +void nxt_http_proxy_buf_mem_free(nxt_task_t *task, nxt_http_request_t *r, + nxt_buf_t *b); extern nxt_time_string_t nxt_http_date_cache; diff --git a/src/nxt_http_error.c b/src/nxt_http_error.c index 8e8b80f1..370b12db 100644 --- a/src/nxt_http_error.c +++ b/src/nxt_http_error.c @@ -57,8 +57,8 @@ nxt_http_request_error(nxt_task_t *task, nxt_http_request_t *r, r->state = &nxt_http_request_send_error_body_state; - nxt_http_request_header_send(task, r, nxt_http_request_send_error_body); - + nxt_http_request_header_send(task, r, + nxt_http_request_send_error_body, NULL); return; fail: diff --git a/src/nxt_http_parse.c b/src/nxt_http_parse.c index e6e91454..4c5d4936 100644 --- a/src/nxt_http_parse.c +++ b/src/nxt_http_parse.c @@ -787,6 +787,7 @@ nxt_http_parse_field_end(nxt_http_request_parse_t *rp, u_char **pos, field->hash = nxt_http_field_hash_end(rp->field_hash); field->skip = 0; + field->hopbyhop = 0; field->name_length = rp->field_name.length; field->value_length = rp->field_value.length; diff --git a/src/nxt_http_parse.h b/src/nxt_http_parse.h index d7ce5e4f..d319c71d 100644 --- a/src/nxt_http_parse.h +++ b/src/nxt_http_parse.h @@ -81,7 +81,8 @@ typedef struct { struct nxt_http_field_s { uint16_t hash; - uint8_t skip; /* 1 bit */ + uint8_t skip:1; + uint8_t hopbyhop:1; uint8_t name_length; uint32_t value_length; u_char *name; diff --git a/src/nxt_http_proxy.c b/src/nxt_http_proxy.c new file mode 100644 index 00000000..7f4eeff2 --- /dev/null +++ b/src/nxt_http_proxy.c @@ -0,0 +1,403 @@ + +/* + * Copyright (C) Igor Sysoev + * Copyright (C) NGINX, Inc. + */ + +#include <nxt_router.h> +#include <nxt_http.h> + + +typedef void (*nxt_http_upstream_connect_t)(nxt_task_t *task, + nxt_http_upstream_t *upstream, nxt_http_peer_t *peer); + + +struct nxt_http_upstream_s { + uint32_t current; + uint32_t n; + uint8_t protocol; + nxt_http_upstream_connect_t connect; + nxt_sockaddr_t *sockaddr[1]; +}; + + +static void nxt_http_upstream_connect(nxt_task_t *task, + nxt_http_upstream_t *upstream, nxt_http_peer_t *peer); +static nxt_http_action_t *nxt_http_proxy_handler(nxt_task_t *task, + nxt_http_request_t *r, nxt_http_action_t *action); +static void nxt_http_proxy_header_send(nxt_task_t *task, void *obj, void *data); +static void nxt_http_proxy_header_sent(nxt_task_t *task, void *obj, void *data); +static void nxt_http_proxy_header_read(nxt_task_t *task, void *obj, void *data); +static void nxt_http_proxy_send_body(nxt_task_t *task, void *obj, void *data); +static void nxt_http_proxy_request_send(nxt_task_t *task, + nxt_http_request_t *r, nxt_buf_t *out); +static void nxt_http_proxy_read(nxt_task_t *task, void *obj, void *data); +static void nxt_http_proxy_buf_mem_completion(nxt_task_t *task, void *obj, + void *data); +static void nxt_http_proxy_error(nxt_task_t *task, void *obj, void *data); + + +static const nxt_http_request_state_t nxt_http_proxy_header_send_state; +static const nxt_http_request_state_t nxt_http_proxy_header_sent_state; +static const nxt_http_request_state_t nxt_http_proxy_header_read_state; +static const nxt_http_request_state_t nxt_http_proxy_read_state; + + +nxt_int_t +nxt_http_proxy_create(nxt_mp_t *mp, nxt_http_action_t *action) +{ + nxt_str_t name; + nxt_sockaddr_t *sa; + nxt_http_upstream_t *upstream; + + sa = NULL; + name = action->name; + + if (nxt_str_start(&name, "http://", 7)) { + name.length -= 7; + name.start += 7; + + sa = nxt_sockaddr_parse(mp, &name); + if (nxt_slow_path(sa == NULL)) { + return NXT_ERROR; + } + + sa->type = SOCK_STREAM; + } + + if (sa != NULL) { + upstream = nxt_mp_alloc(mp, sizeof(nxt_http_upstream_t)); + if (nxt_slow_path(upstream == NULL)) { + return NXT_ERROR; + } + + upstream->current = 0; + upstream->n = 1; + upstream->protocol = NXT_HTTP_PROTO_H1; + upstream->connect = nxt_http_upstream_connect; + upstream->sockaddr[0] = sa; + + action->u.upstream = upstream; + action->handler = nxt_http_proxy_handler; + } + + return NXT_OK; +} + + +static nxt_http_action_t * +nxt_http_proxy_handler(nxt_task_t *task, nxt_http_request_t *r, + nxt_http_action_t *action) +{ + nxt_http_peer_t *peer; + + peer = nxt_mp_zalloc(r->mem_pool, sizeof(nxt_http_peer_t)); + if (nxt_slow_path(peer == NULL)) { + nxt_http_request_error(task, r, NXT_HTTP_INTERNAL_SERVER_ERROR); + return NULL; + } + + peer->request = r; + r->peer = peer; + + nxt_mp_retain(r->mem_pool); + + action->u.upstream->connect(task, action->u.upstream, peer); + + return NULL; +} + + +static void +nxt_http_upstream_connect(nxt_task_t *task, nxt_http_upstream_t *upstream, + nxt_http_peer_t *peer) +{ + peer->protocol = upstream->protocol; + peer->sockaddr = upstream->sockaddr[0]; + + peer->request->state = &nxt_http_proxy_header_send_state; + + nxt_http_proto[peer->protocol].peer_connect(task, peer); +} + + +static const nxt_http_request_state_t nxt_http_proxy_header_send_state + nxt_aligned(64) = +{ + .ready_handler = nxt_http_proxy_header_send, + .error_handler = nxt_http_proxy_error, +}; + + +static void +nxt_http_proxy_header_send(nxt_task_t *task, void *obj, void *data) +{ + nxt_http_peer_t *peer; + nxt_http_request_t *r; + + r = obj; + peer = data; + r->state = &nxt_http_proxy_header_sent_state; + + nxt_http_proto[peer->protocol].peer_header_send(task, peer); +} + + +static const nxt_http_request_state_t nxt_http_proxy_header_sent_state + nxt_aligned(64) = +{ + .ready_handler = nxt_http_proxy_header_sent, + .error_handler = nxt_http_proxy_error, +}; + + +static void +nxt_http_proxy_header_sent(nxt_task_t *task, void *obj, void *data) +{ + nxt_http_peer_t *peer; + nxt_http_request_t *r; + + r = obj; + peer = data; + r->state = &nxt_http_proxy_header_read_state; + + nxt_http_proto[peer->protocol].peer_header_read(task, peer); +} + + +static const nxt_http_request_state_t nxt_http_proxy_header_read_state + nxt_aligned(64) = +{ + .ready_handler = nxt_http_proxy_header_read, + .error_handler = nxt_http_proxy_error, +}; + + +static void +nxt_http_proxy_header_read(nxt_task_t *task, void *obj, void *data) +{ + nxt_http_peer_t *peer; + nxt_http_field_t *f, *field; + nxt_http_request_t *r; + + r = obj; + peer = data; + + r->status = peer->status; + + nxt_debug(task, "http proxy status: %d", peer->status); + + if (r->resp.content_length_n > 0) { + peer->remainder = r->resp.content_length_n; + } + + nxt_list_each(field, peer->fields) { + + nxt_debug(task, "http proxy header: \"%*s: %*s\"", + (size_t) field->name_length, field->name, + (size_t) field->value_length, field->value); + + if (!field->skip) { + f = nxt_list_add(r->resp.fields); + if (nxt_slow_path(f == NULL)) { + nxt_http_proxy_error(task, r, peer); + return; + } + + *f = *field; + } + + } nxt_list_loop; + + nxt_http_request_header_send(task, r, nxt_http_proxy_send_body, peer); +} + + +static void +nxt_http_proxy_send_body(nxt_task_t *task, void *obj, void *data) +{ + nxt_buf_t *out; + nxt_http_peer_t *peer; + nxt_http_request_t *r; + + r = obj; + peer = data; + out = peer->body; + + if (out != NULL) { + peer->body = NULL; + nxt_http_proxy_request_send(task, r, out); + } + + r->state = &nxt_http_proxy_read_state; + + nxt_http_proto[peer->protocol].peer_read(task, peer); +} + + +static void +nxt_http_proxy_request_send(nxt_task_t *task, nxt_http_request_t *r, + nxt_buf_t *out) +{ + size_t length; + + if (r->peer->remainder > 0) { + length = nxt_buf_chain_length(out); + r->peer->remainder -= length; + } + + nxt_http_request_send(task, r, out); +} + + +static const nxt_http_request_state_t nxt_http_proxy_read_state + nxt_aligned(64) = +{ + .ready_handler = nxt_http_proxy_read, + .error_handler = nxt_http_proxy_error, +}; + + +static void +nxt_http_proxy_read(nxt_task_t *task, void *obj, void *data) +{ + nxt_buf_t *out; + nxt_bool_t last; + nxt_http_peer_t *peer; + nxt_http_request_t *r; + + r = obj; + peer = data; + out = peer->body; + peer->body = NULL; + last = nxt_buf_is_last(out); + + nxt_http_proxy_request_send(task, r, out); + + if (!last) { + nxt_http_proto[peer->protocol].peer_read(task, peer); + + } else { + r->inconsistent = (peer->remainder != 0); + + nxt_http_proto[peer->protocol].peer_close(task, peer); + + nxt_mp_release(r->mem_pool); + } +} + + +nxt_buf_t * +nxt_http_proxy_buf_mem_alloc(nxt_task_t *task, nxt_http_request_t *r, + size_t size) +{ + nxt_buf_t *b; + + b = nxt_event_engine_buf_mem_alloc(task->thread->engine, size); + if (nxt_fast_path(b != NULL)) { + b->completion_handler = nxt_http_proxy_buf_mem_completion; + b->parent = r; + nxt_mp_retain(r->mem_pool); + + } else { + nxt_http_request_error(task, r, NXT_HTTP_INTERNAL_SERVER_ERROR); + } + + return b; +} + + +static void +nxt_http_proxy_buf_mem_completion(nxt_task_t *task, void *obj, void *data) +{ + nxt_buf_t *b, *next; + nxt_http_peer_t *peer; + nxt_http_request_t *r; + + b = obj; + r = data; + + peer = r->peer; + + do { + next = b->next; + + nxt_http_proxy_buf_mem_free(task, r, b); + + b = next; + } while (b != NULL); + + if (!peer->closed) { + nxt_http_proto[peer->protocol].peer_read(task, peer); + } +} + + +void +nxt_http_proxy_buf_mem_free(nxt_task_t *task, nxt_http_request_t *r, + nxt_buf_t *b) +{ + nxt_event_engine_buf_mem_free(task->thread->engine, b); + + nxt_mp_release(r->mem_pool); +} + + +static void +nxt_http_proxy_error(nxt_task_t *task, void *obj, void *data) +{ + nxt_http_peer_t *peer; + nxt_http_request_t *r; + + r = obj; + peer = r->peer; + + nxt_http_proto[peer->protocol].peer_close(task, peer); + + nxt_mp_release(r->mem_pool); + + nxt_http_request_error(task, r, peer->status); +} + + +nxt_int_t +nxt_http_proxy_date(void *ctx, nxt_http_field_t *field, uintptr_t data) +{ + nxt_http_request_t *r; + + r = ctx; + + r->resp.date = field; + + return NXT_OK; +} + + +nxt_int_t +nxt_http_proxy_content_length(void *ctx, nxt_http_field_t *field, + uintptr_t data) +{ + nxt_off_t n; + nxt_http_request_t *r; + + r = ctx; + + r->resp.content_length = field; + + n = nxt_off_t_parse(field->value, field->value_length); + + if (nxt_fast_path(n >= 0)) { + r->resp.content_length_n = n; + } + + return NXT_OK; +} + + +nxt_int_t +nxt_http_proxy_skip(void *ctx, nxt_http_field_t *field, uintptr_t data) +{ + field->skip = 1; + + return NXT_OK; +} diff --git a/src/nxt_http_request.c b/src/nxt_http_request.c index a18a02e7..14c75dab 100644 --- a/src/nxt_http_request.c +++ b/src/nxt_http_request.c @@ -10,7 +10,7 @@ static nxt_int_t nxt_http_validate_host(nxt_str_t *host, nxt_mp_t *mp); static void nxt_http_request_start(nxt_task_t *task, void *obj, void *data); -static void nxt_http_request_pass(nxt_task_t *task, void *obj, void *data); +static void nxt_http_request_action(nxt_task_t *task, void *obj, void *data); static void nxt_http_request_proto_info(nxt_task_t *task, nxt_http_request_t *r); static void nxt_http_request_mem_buf_completion(nxt_task_t *task, void *obj, @@ -278,33 +278,33 @@ nxt_http_request_start(nxt_task_t *task, void *obj, void *data) static const nxt_http_request_state_t nxt_http_request_body_state nxt_aligned(64) = { - .ready_handler = nxt_http_request_pass, + .ready_handler = nxt_http_request_action, .error_handler = nxt_http_request_close_handler, }; static void -nxt_http_request_pass(nxt_task_t *task, void *obj, void *data) +nxt_http_request_action(nxt_task_t *task, void *obj, void *data) { - nxt_http_pass_t *pass; + nxt_http_action_t *action; nxt_http_request_t *r; r = obj; - pass = r->conf->socket_conf->pass; + action = r->conf->socket_conf->action; - if (nxt_fast_path(pass != NULL)) { + if (nxt_fast_path(action != NULL)) { do { - nxt_debug(task, "http request route: %V", &pass->name); + nxt_debug(task, "http request route: %V", &action->name); - pass = pass->handler(task, r, pass); + action = action->handler(task, r, action); - if (pass == NULL) { + if (action == NULL) { return; } - if (pass == NXT_HTTP_PASS_ERROR) { + if (action == NXT_HTTP_ACTION_ERROR) { break; } @@ -315,13 +315,13 @@ nxt_http_request_pass(nxt_task_t *task, void *obj, void *data) } -nxt_http_pass_t * -nxt_http_request_application(nxt_task_t *task, nxt_http_request_t *r, - nxt_http_pass_t *pass) +nxt_http_action_t * +nxt_http_application_handler(nxt_task_t *task, nxt_http_request_t *r, + nxt_http_action_t *action) { nxt_event_engine_t *engine; - nxt_debug(task, "http request application"); + nxt_debug(task, "http application handler"); nxt_mp_retain(r->mem_pool); @@ -344,7 +344,7 @@ nxt_http_request_application(nxt_task_t *task, nxt_http_request_t *r, nxt_str_set(&r->server_name, "localhost"); } - nxt_router_process_http_request(task, r, pass->u.application); + nxt_router_process_http_request(task, r, action->u.application); return NULL; } @@ -370,7 +370,7 @@ nxt_http_request_read_body(nxt_task_t *task, nxt_http_request_t *r) void nxt_http_request_header_send(nxt_task_t *task, nxt_http_request_t *r, - nxt_work_handler_t body_handler) + nxt_work_handler_t body_handler, void *data) { u_char *p, *end; nxt_http_field_t *server, *date, *content_length; @@ -431,7 +431,7 @@ nxt_http_request_header_send(nxt_task_t *task, nxt_http_request_t *r, } if (nxt_fast_path(r->proto.any != NULL)) { - nxt_http_proto[r->protocol].header_send(task, r, body_handler); + nxt_http_proto[r->protocol].header_send(task, r, body_handler, data); } return; @@ -483,15 +483,20 @@ nxt_http_buf_mem(nxt_task_t *task, nxt_http_request_t *r, size_t size) static void nxt_http_request_mem_buf_completion(nxt_task_t *task, void *obj, void *data) { - nxt_buf_t *b; + nxt_buf_t *b, *next; nxt_http_request_t *r; b = obj; r = data; - nxt_mp_free(r->mem_pool, b); + do { + next = b->next; - nxt_mp_release(r->mem_pool); + nxt_mp_free(r->mem_pool, b); + nxt_mp_release(r->mem_pool); + + b = next; + } while (b != NULL); } @@ -570,9 +575,9 @@ nxt_http_request_close_handler(nxt_task_t *task, void *obj, void *data) if (nxt_fast_path(proto.any != NULL)) { protocol = r->protocol; - nxt_mp_release(r->mem_pool); - nxt_http_proto[protocol].close(task, proto, conf); + + nxt_mp_release(r->mem_pool); } } diff --git a/src/nxt_http_route.c b/src/nxt_http_route.c index c3c11faa..18b352ea 100644 --- a/src/nxt_http_route.c +++ b/src/nxt_http_route.c @@ -36,6 +36,13 @@ typedef enum { typedef struct { + nxt_conf_value_t *pass; + nxt_conf_value_t *share; + nxt_conf_value_t *proxy; +} nxt_http_route_action_conf_t; + + +typedef struct { nxt_conf_value_t *host; nxt_conf_value_t *uri; nxt_conf_value_t *method; @@ -119,7 +126,7 @@ typedef union { typedef struct { uint32_t items; - nxt_http_pass_t pass; + nxt_http_action_t action; nxt_http_route_test_t test[0]; } nxt_http_route_match_t; @@ -152,6 +159,8 @@ static nxt_http_route_t *nxt_http_route_create(nxt_task_t *task, nxt_router_temp_conf_t *tmcf, nxt_conf_value_t *cv); static nxt_http_route_match_t *nxt_http_route_match_create(nxt_task_t *task, nxt_router_temp_conf_t *tmcf, nxt_conf_value_t *cv); +static nxt_int_t nxt_http_route_action_create(nxt_router_temp_conf_t *tmcf, + nxt_conf_value_t *cv, nxt_http_route_match_t *match); static nxt_http_route_table_t *nxt_http_route_table_create(nxt_task_t *task, nxt_mp_t *mp, nxt_conf_value_t *table_cv, nxt_http_route_object_t object, nxt_bool_t case_sensitive); @@ -173,15 +182,15 @@ static u_char *nxt_http_route_pattern_copy(nxt_mp_t *mp, nxt_str_t *test, static void nxt_http_route_resolve(nxt_task_t *task, nxt_router_temp_conf_t *tmcf, nxt_http_route_t *route); -static void nxt_http_pass_resolve(nxt_task_t *task, - nxt_router_temp_conf_t *tmcf, nxt_http_pass_t *pass); +static void nxt_http_action_resolve(nxt_task_t *task, + nxt_router_temp_conf_t *tmcf, nxt_http_action_t *action); static nxt_http_route_t *nxt_http_route_find(nxt_http_routes_t *routes, nxt_str_t *name); static void nxt_http_route_cleanup(nxt_task_t *task, nxt_http_route_t *routes); -static nxt_http_pass_t *nxt_http_route_pass(nxt_task_t *task, - nxt_http_request_t *r, nxt_http_pass_t *start); -static nxt_http_pass_t *nxt_http_route_match(nxt_http_request_t *r, +static nxt_http_action_t *nxt_http_route_handler(nxt_task_t *task, + nxt_http_request_t *r, nxt_http_action_t *start); +static nxt_http_action_t *nxt_http_route_match(nxt_http_request_t *r, nxt_http_route_match_t *match); static nxt_int_t nxt_http_route_table(nxt_http_request_t *r, nxt_http_route_table_t *table); @@ -367,16 +376,13 @@ nxt_http_route_match_create(nxt_task_t *task, nxt_router_temp_conf_t *tmcf, uint32_t n; nxt_mp_t *mp; nxt_int_t ret; - nxt_str_t pass, *string; - nxt_conf_value_t *match_conf, *pass_conf; + nxt_conf_value_t *match_conf; nxt_http_route_test_t *test; nxt_http_route_rule_t *rule; nxt_http_route_table_t *table; nxt_http_route_match_t *match; nxt_http_route_match_conf_t mtcf; - static nxt_str_t pass_path = nxt_string("/action/pass"); - static nxt_str_t share_path = nxt_string("/action/share"); static nxt_str_t match_path = nxt_string("/match"); match_conf = nxt_conf_get_path(cv, &match_path); @@ -391,25 +397,12 @@ nxt_http_route_match_create(nxt_task_t *task, nxt_router_temp_conf_t *tmcf, return NULL; } - match->pass.u.route = NULL; - match->pass.handler = NULL; + match->action.u.route = NULL; + match->action.handler = NULL; match->items = n; - pass_conf = nxt_conf_get_path(cv, &pass_path); - - if (pass_conf == NULL) { - pass_conf = nxt_conf_get_path(cv, &share_path); - if (nxt_slow_path(pass_conf == NULL)) { - return NULL; - } - - match->pass.handler = nxt_http_static_handler; - } - - nxt_conf_get_string(pass_conf, &pass); - - string = nxt_str_dup(mp, &match->pass.name, &pass); - if (nxt_slow_path(string == NULL)) { + ret = nxt_http_route_action_create(tmcf, cv, match); + if (nxt_slow_path(ret != NXT_OK)) { return NULL; } @@ -516,6 +509,78 @@ nxt_http_route_match_create(nxt_task_t *task, nxt_router_temp_conf_t *tmcf, } +static nxt_conf_map_t nxt_http_route_action_conf[] = { + { + nxt_string("pass"), + NXT_CONF_MAP_PTR, + offsetof(nxt_http_route_action_conf_t, pass) + }, + { + nxt_string("share"), + NXT_CONF_MAP_PTR, + offsetof(nxt_http_route_action_conf_t, share) + }, + { + nxt_string("proxy"), + NXT_CONF_MAP_PTR, + offsetof(nxt_http_route_action_conf_t, proxy) + }, +}; + + +static nxt_int_t +nxt_http_route_action_create(nxt_router_temp_conf_t *tmcf, nxt_conf_value_t *cv, + nxt_http_route_match_t *match) +{ + nxt_mp_t *mp; + nxt_int_t ret; + nxt_str_t name, *string; + nxt_conf_value_t *conf, *action_conf; + nxt_http_route_action_conf_t accf; + + static nxt_str_t action_path = nxt_string("/action"); + + action_conf = nxt_conf_get_path(cv, &action_path); + if (action_conf == NULL) { + return NXT_ERROR; + } + + nxt_memzero(&accf, sizeof(accf)); + + ret = nxt_conf_map_object(tmcf->mem_pool, + action_conf, nxt_http_route_action_conf, + nxt_nitems(nxt_http_route_action_conf), &accf); + if (ret != NXT_OK) { + return ret; + } + + conf = accf.pass; + + if (accf.share != NULL) { + conf = accf.share; + match->action.handler = nxt_http_static_handler; + + } else if (accf.proxy != NULL) { + conf = accf.proxy; + } + + nxt_conf_get_string(conf, &name); + + mp = tmcf->router_conf->mem_pool; + + string = nxt_str_dup(mp, &match->action.name, &name); + if (nxt_slow_path(string == NULL)) { + return NXT_ERROR; + } + + if (accf.proxy != NULL) { + return nxt_http_proxy_create(mp, &match->action); + } + + return NXT_OK; +} + + static nxt_http_route_table_t * nxt_http_route_table_create(nxt_task_t *task, nxt_mp_t *mp, nxt_conf_value_t *table_cv, nxt_http_route_object_t object, @@ -877,17 +942,17 @@ static void nxt_http_route_resolve(nxt_task_t *task, nxt_router_temp_conf_t *tmcf, nxt_http_route_t *route) { - nxt_http_pass_t *pass; + nxt_http_action_t *action; nxt_http_route_match_t **match, **end; match = &route->match[0]; end = match + route->items; while (match < end) { - pass = &(*match)->pass; + action = &(*match)->action; - if (pass->handler == NULL) { - nxt_http_pass_resolve(task, tmcf, &(*match)->pass); + if (action->handler == NULL) { + nxt_http_action_resolve(task, tmcf, &(*match)->action); } match++; @@ -896,21 +961,21 @@ nxt_http_route_resolve(nxt_task_t *task, nxt_router_temp_conf_t *tmcf, static void -nxt_http_pass_resolve(nxt_task_t *task, nxt_router_temp_conf_t *tmcf, - nxt_http_pass_t *pass) +nxt_http_action_resolve(nxt_task_t *task, nxt_router_temp_conf_t *tmcf, + nxt_http_action_t *action) { nxt_str_t name; - name = pass->name; + name = action->name; if (nxt_str_start(&name, "applications/", 13)) { name.length -= 13; name.start += 13; - pass->u.application = nxt_router_listener_application(tmcf, &name); - nxt_router_app_use(task, pass->u.application, 1); + action->u.application = nxt_router_listener_application(tmcf, &name); + nxt_router_app_use(task, action->u.application, 1); - pass->handler = nxt_http_request_application; + action->handler = nxt_http_application_handler; } else if (nxt_str_start(&name, "routes", 6)) { @@ -923,9 +988,9 @@ nxt_http_pass_resolve(nxt_task_t *task, nxt_router_temp_conf_t *tmcf, name.start += 7; } - pass->u.route = nxt_http_route_find(tmcf->router_conf->routes, &name); + action->u.route = nxt_http_route_find(tmcf->router_conf->routes, &name); - pass->handler = nxt_http_route_pass; + action->handler = nxt_http_route_handler; } } @@ -950,46 +1015,49 @@ nxt_http_route_find(nxt_http_routes_t *routes, nxt_str_t *name) } -nxt_http_pass_t * -nxt_http_pass_create(nxt_task_t *task, nxt_router_temp_conf_t *tmcf, +nxt_http_action_t * +nxt_http_action_create(nxt_task_t *task, nxt_router_temp_conf_t *tmcf, nxt_str_t *name) { - nxt_http_pass_t *pass; + nxt_http_action_t *action; - pass = nxt_mp_alloc(tmcf->router_conf->mem_pool, sizeof(nxt_http_pass_t)); - if (nxt_slow_path(pass == NULL)) { + action = nxt_mp_alloc(tmcf->router_conf->mem_pool, + sizeof(nxt_http_action_t)); + if (nxt_slow_path(action == NULL)) { return NULL; } - pass->name = *name; + action->name = *name; + action->handler = NULL; - nxt_http_pass_resolve(task, tmcf, pass); + nxt_http_action_resolve(task, tmcf, action); - return pass; + return action; } /* COMPATIBILITY: listener application. */ -nxt_http_pass_t * +nxt_http_action_t * nxt_http_pass_application(nxt_task_t *task, nxt_router_temp_conf_t *tmcf, nxt_str_t *name) { - nxt_http_pass_t *pass; + nxt_http_action_t *action; - pass = nxt_mp_alloc(tmcf->router_conf->mem_pool, sizeof(nxt_http_pass_t)); - if (nxt_slow_path(pass == NULL)) { + action = nxt_mp_alloc(tmcf->router_conf->mem_pool, + sizeof(nxt_http_action_t)); + if (nxt_slow_path(action == NULL)) { return NULL; } - pass->name = *name; + action->name = *name; - pass->u.application = nxt_router_listener_application(tmcf, name); - nxt_router_app_use(task, pass->u.application, 1); + action->u.application = nxt_router_listener_application(tmcf, name); + nxt_router_app_use(task, action->u.application, 1); - pass->handler = nxt_http_request_application; + action->handler = nxt_http_application_handler; - return pass; + return action; } @@ -1020,7 +1088,7 @@ nxt_http_route_cleanup(nxt_task_t *task, nxt_http_route_t *route) end = match + route->items; while (match < end) { - nxt_http_pass_cleanup(task, &(*match)->pass); + nxt_http_action_cleanup(task, &(*match)->action); match++; } @@ -1028,20 +1096,20 @@ nxt_http_route_cleanup(nxt_task_t *task, nxt_http_route_t *route) void -nxt_http_pass_cleanup(nxt_task_t *task, nxt_http_pass_t *pass) +nxt_http_action_cleanup(nxt_task_t *task, nxt_http_action_t *action) { - if (pass->handler == nxt_http_request_application) { - nxt_router_app_use(task, pass->u.application, -1); + if (action->handler == nxt_http_application_handler) { + nxt_router_app_use(task, action->u.application, -1); } } -static nxt_http_pass_t * -nxt_http_route_pass(nxt_task_t *task, nxt_http_request_t *r, - nxt_http_pass_t *start) +static nxt_http_action_t * +nxt_http_route_handler(nxt_task_t *task, nxt_http_request_t *r, + nxt_http_action_t *start) { - nxt_http_pass_t *pass; nxt_http_route_t *route; + nxt_http_action_t *action; nxt_http_route_match_t **match, **end; route = start->u.route; @@ -1049,9 +1117,9 @@ nxt_http_route_pass(nxt_task_t *task, nxt_http_request_t *r, end = match + route->items; while (match < end) { - pass = nxt_http_route_match(r, *match); - if (pass != NULL) { - return pass; + action = nxt_http_route_match(r, *match); + if (action != NULL) { + return action; } match++; @@ -1063,7 +1131,7 @@ nxt_http_route_pass(nxt_task_t *task, nxt_http_request_t *r, } -static nxt_http_pass_t * +static nxt_http_action_t * nxt_http_route_match(nxt_http_request_t *r, nxt_http_route_match_t *match) { nxt_int_t ret; @@ -1081,14 +1149,14 @@ nxt_http_route_match(nxt_http_request_t *r, nxt_http_route_match_t *match) } if (ret <= 0) { - /* 0 => NULL, -1 => NXT_HTTP_PASS_ERROR. */ - return (nxt_http_pass_t *) (intptr_t) ret; + /* 0 => NULL, -1 => NXT_HTTP_ACTION_ERROR. */ + return (nxt_http_action_t *) (intptr_t) ret; } test++; } - return &match->pass; + return &match->action; } diff --git a/src/nxt_http_static.c b/src/nxt_http_static.c index 48a989cf..44132859 100644 --- a/src/nxt_http_static.c +++ b/src/nxt_http_static.c @@ -27,9 +27,9 @@ static void nxt_http_static_mtypes_hash_free(void *data, void *p); static const nxt_http_request_state_t nxt_http_static_send_state; -nxt_http_pass_t * +nxt_http_action_t * nxt_http_static_handler(nxt_task_t *task, nxt_http_request_t *r, - nxt_http_pass_t *pass) + nxt_http_action_t *action) { size_t alloc, encode; u_char *p; @@ -76,7 +76,7 @@ nxt_http_static_handler(nxt_task_t *task, nxt_http_request_t *r, nxt_str_null(&extension); } - alloc = pass->name.length + r->path->length + index.length + 1; + alloc = action->name.length + r->path->length + index.length + 1; f->name = nxt_mp_nget(r->mem_pool, alloc); if (nxt_slow_path(f->name == NULL)) { @@ -84,7 +84,7 @@ nxt_http_static_handler(nxt_task_t *task, nxt_http_request_t *r, } p = f->name; - p = nxt_cpymem(p, pass->name.start, pass->name.length); + p = nxt_cpymem(p, action->name.start, action->name.length); p = nxt_cpymem(p, r->path->start, r->path->length); p = nxt_cpymem(p, index.start, index.length); *p = '\0'; @@ -272,7 +272,7 @@ nxt_http_static_handler(nxt_task_t *task, nxt_http_request_t *r, body_handler = NULL; } - nxt_http_request_header_send(task, r, body_handler); + nxt_http_request_header_send(task, r, body_handler, NULL); r->state = &nxt_http_static_send_state; return NULL; diff --git a/src/nxt_http_websocket.c b/src/nxt_http_websocket.c index d58d615c..fb888f5d 100644 --- a/src/nxt_http_websocket.c +++ b/src/nxt_http_websocket.c @@ -88,6 +88,7 @@ nxt_http_websocket_client(nxt_task_t *task, void *obj, void *data) frame_size -= copy_size; next = b->next; + b->next = NULL; if (nxt_buf_mem_used_size(&b->mem) == 0) { nxt_work_queue_add(&task->thread->engine->fast_work_queue, diff --git a/src/nxt_main_process.c b/src/nxt_main_process.c index 85685fbc..cfe0341f 100644 --- a/src/nxt_main_process.c +++ b/src/nxt_main_process.c @@ -70,8 +70,8 @@ static void nxt_main_port_access_log_handler(nxt_task_t *task, static nxt_int_t nxt_init_set_isolation(nxt_task_t *task, nxt_process_init_t *init, nxt_conf_value_t *isolation); -static nxt_int_t nxt_init_set_ns(nxt_task_t *task, - nxt_process_init_t *init, nxt_conf_value_t *ns); +static nxt_int_t nxt_init_set_ns(nxt_task_t *task, nxt_process_init_t *init, + nxt_conf_value_t *ns); const nxt_sig_event_t nxt_main_process_signals[] = { nxt_event_signal(SIGHUP, nxt_main_process_signal_handler), @@ -397,31 +397,19 @@ nxt_main_process_port_create(nxt_task_t *task, nxt_runtime_t *rt) nxt_port_t *port; nxt_process_t *process; - process = nxt_runtime_process_get(rt, nxt_pid); - if (nxt_slow_path(process == NULL)) { - return NXT_ERROR; - } - - port = nxt_port_new(task, 0, nxt_pid, NXT_PROCESS_MAIN); + port = nxt_runtime_process_port_create(task, rt, nxt_pid, 0, + NXT_PROCESS_MAIN); if (nxt_slow_path(port == NULL)) { - nxt_process_use(task, process, -1); return NXT_ERROR; } - nxt_process_port_add(task, process, port); - - nxt_process_use(task, process, -1); + process = port->process; ret = nxt_port_socket_init(task, port, 0); if (nxt_slow_path(ret != NXT_OK)) { - nxt_port_use(task, port, -1); return ret; } - nxt_runtime_port_add(task, port); - - nxt_port_use(task, port, -1); - /* * A main process port. A write port is not closed * since it should be inherited by worker processes. @@ -465,13 +453,11 @@ nxt_main_start_controller_process(nxt_task_t *task, nxt_runtime_t *rt) { nxt_process_init_t *init; - init = nxt_malloc(sizeof(nxt_process_init_t)); + init = nxt_mp_zalloc(rt->mem_pool, sizeof(nxt_process_init_t)); if (nxt_slow_path(init == NULL)) { return NXT_ERROR; } - nxt_memzero(init, sizeof(nxt_process_init_t)); - init->start = nxt_controller_start; init->name = "controller"; init->user_cred = &rt->user_cred; @@ -561,13 +547,11 @@ nxt_main_start_discovery_process(nxt_task_t *task, nxt_runtime_t *rt) { nxt_process_init_t *init; - init = nxt_malloc(sizeof(nxt_process_init_t)); + init = nxt_mp_zalloc(rt->mem_pool, sizeof(nxt_process_init_t)); if (nxt_slow_path(init == NULL)) { return NXT_ERROR; } - nxt_memzero(init, sizeof(nxt_process_init_t)); - init->start = nxt_discovery_start; init->name = "discovery"; init->user_cred = &rt->user_cred; @@ -587,13 +571,11 @@ nxt_main_start_router_process(nxt_task_t *task, nxt_runtime_t *rt) { nxt_process_init_t *init; - init = nxt_malloc(sizeof(nxt_process_init_t)); + init = nxt_mp_zalloc(rt->mem_pool, sizeof(nxt_process_init_t)); if (nxt_slow_path(init == NULL)) { return NXT_ERROR; } - nxt_memzero(init, sizeof(nxt_process_init_t)); - init->start = nxt_router_start; init->name = "router"; init->user_cred = &rt->user_cred; @@ -627,13 +609,11 @@ nxt_main_start_worker_process(nxt_task_t *task, nxt_runtime_t *rt, + app_conf->group.length + 1; } - init = nxt_malloc(size); + init = nxt_mp_zalloc(rt->mem_pool, size); if (nxt_slow_path(init == NULL)) { return NXT_ERROR; } - nxt_memzero(init, sizeof(nxt_process_init_t)); - if (rt->capabilities.setid) { init->user_cred = nxt_pointer_to(init, sizeof(nxt_process_init_t)); user = nxt_pointer_to(init->user_cred, sizeof(nxt_user_cred_t)); @@ -705,7 +685,7 @@ nxt_main_start_worker_process(nxt_task_t *task, nxt_runtime_t *rt, fail: - nxt_free(init); + nxt_mp_free(rt->mem_pool, init); return NXT_ERROR; } @@ -726,6 +706,8 @@ nxt_main_create_worker_process(nxt_task_t *task, nxt_runtime_t *rt, process = nxt_runtime_process_new(rt); if (nxt_slow_path(process == NULL)) { + nxt_mp_free(rt->mem_pool, init); + return NXT_ERROR; } @@ -785,8 +767,6 @@ nxt_main_stop_all_processes(nxt_task_t *task, nxt_runtime_t *rt) nxt_runtime_process_each(rt, process) { if (nxt_pid != process->pid) { - process->init = NULL; - nxt_process_port_each(process, port) { (void) nxt_port_socket_write(task, port, NXT_PORT_MSG_QUIT, @@ -1005,10 +985,11 @@ nxt_main_cleanup_worker_process(nxt_task_t *task, nxt_pid_t pid) if (process) { init = process->init; + process->init = NULL; ptype = nxt_process_type(process); - if (process->ready && init != NULL) { + if (process->ready) { init->stream = 0; } @@ -1057,7 +1038,7 @@ nxt_main_cleanup_worker_process(nxt_task_t *task, nxt_pid_t pid) init->restart(task, rt, init); } else { - nxt_free(init); + nxt_mp_free(rt->mem_pool, init); } } } @@ -1540,7 +1521,8 @@ nxt_init_set_isolation(nxt_task_t *task, nxt_process_init_t *init, static nxt_int_t -nxt_init_set_ns(nxt_task_t *task, nxt_process_init_t *init, nxt_conf_value_t *namespaces) +nxt_init_set_ns(nxt_task_t *task, nxt_process_init_t *init, + nxt_conf_value_t *namespaces) { uint32_t index; nxt_str_t name; @@ -1549,7 +1531,13 @@ nxt_init_set_ns(nxt_task_t *task, nxt_process_init_t *init, nxt_conf_value_t *na index = 0; - while ((value = nxt_conf_next_object_member(namespaces, &name, &index)) != NULL) { + for ( ;; ) { + value = nxt_conf_next_object_member(namespaces, &name, &index); + + if (value == NULL) { + break; + } + flag = 0; #if (NXT_HAVE_CLONE_NEWUSER) @@ -1593,11 +1581,9 @@ nxt_init_set_ns(nxt_task_t *task, nxt_process_init_t *init, nxt_conf_value_t *na return NXT_ERROR; } - if (nxt_conf_get_integer(value) == 0) { - continue; /* process shares everything by default */ + if (nxt_conf_get_boolean(value)) { + init->isolation.clone.flags |= flag; } - - init->isolation.clone.flags |= flag; } return NXT_OK; diff --git a/src/nxt_port.c b/src/nxt_port.c index 9029353a..8d14a5e7 100644 --- a/src/nxt_port.c +++ b/src/nxt_port.c @@ -238,7 +238,6 @@ void nxt_port_new_port_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg) { nxt_port_t *port; - nxt_process_t *process; nxt_runtime_t *rt; nxt_port_msg_new_port_t *new_port_msg; @@ -261,22 +260,13 @@ nxt_port_new_port_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg) return; } - process = nxt_runtime_process_get(rt, new_port_msg->pid); - if (nxt_slow_path(process == NULL)) { - return; - } - - port = nxt_port_new(task, new_port_msg->id, new_port_msg->pid, - new_port_msg->type); + port = nxt_runtime_process_port_create(task, rt, new_port_msg->pid, + new_port_msg->id, + new_port_msg->type); if (nxt_slow_path(port == NULL)) { - nxt_process_use(task, process, -1); return; } - nxt_process_port_add(task, process, port); - - nxt_process_use(task, process, -1); - nxt_fd_nonblocking(task, msg->fd); port->pair[0] = -1; @@ -286,10 +276,6 @@ nxt_port_new_port_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg) port->socket.task = task; - nxt_runtime_port_add(task, port); - - nxt_port_use(task, port, -1); - nxt_port_write_enable(task, port); msg->u.new_port = port; diff --git a/src/nxt_port_socket.c b/src/nxt_port_socket.c index 4edc423a..9c7da970 100644 --- a/src/nxt_port_socket.c +++ b/src/nxt_port_socket.c @@ -478,7 +478,8 @@ static nxt_buf_t * nxt_port_buf_completion(nxt_task_t *task, nxt_work_queue_t *wq, nxt_buf_t *b, size_t sent, nxt_bool_t mmap_mode) { - size_t size; + size_t size; + nxt_buf_t *next; while (b != NULL) { @@ -528,7 +529,9 @@ nxt_port_buf_completion(nxt_task_t *task, nxt_work_queue_t *wq, nxt_buf_t *b, nxt_work_queue_add(wq, b->completion_handler, task, b, b->parent); - b = b->next; + next = b->next; + b->next = NULL; + b = next; } return b; @@ -796,7 +799,7 @@ static void nxt_port_read_msg_process(nxt_task_t *task, nxt_port_t *port, nxt_port_recv_msg_t *msg) { - nxt_buf_t *b, *orig_b; + nxt_buf_t *b, *orig_b, *next; nxt_port_recv_msg_t *fmsg; if (nxt_slow_path(msg->size < sizeof(nxt_port_msg_t))) { @@ -915,11 +918,15 @@ fmsg_failed: */ if (msg->buf == b) { /* complete mmap buffers */ - for (; b != NULL; b = b->next) { + while (b != NULL) { nxt_debug(task, "complete buffer %p", b); nxt_work_queue_add(port->socket.read_work_queue, b->completion_handler, task, b, b->parent); + + next = b->next; + b->next = NULL; + b = next; } } @@ -964,7 +971,7 @@ static void nxt_port_error_handler(nxt_task_t *task, void *obj, void *data) { int use_delta; - nxt_buf_t *b; + nxt_buf_t *b, *next; nxt_port_t *port; nxt_work_queue_t *wq; nxt_port_send_msg_t *msg; @@ -986,7 +993,10 @@ nxt_port_error_handler(nxt_task_t *task, void *obj, void *data) nxt_queue_each(msg, &port->messages, nxt_port_send_msg_t, link) { - for (b = msg->buf; b != NULL; b = b->next) { + for (b = msg->buf; b != NULL; b = next) { + next = b->next; + b->next = NULL; + if (nxt_buf_is_sync(b)) { continue; } diff --git a/src/nxt_process.c b/src/nxt_process.c index 0cc9ccc4..b246a58c 100644 --- a/src/nxt_process.c +++ b/src/nxt_process.c @@ -207,7 +207,7 @@ nxt_process_create(nxt_task_t *task, nxt_process_t *process) goto fail; } -#if (NXT_HAVE_CLONE_NEWUSER) +#if (NXT_HAVE_CLONE && NXT_HAVE_CLONE_NEWUSER) if ((init->isolation.clone.flags & CLONE_NEWUSER) == CLONE_NEWUSER) { ret = nxt_clone_proc_map(task, pid, &init->isolation.clone); if (nxt_slow_path(ret != NXT_OK)) { @@ -723,16 +723,35 @@ free: nxt_int_t nxt_user_cred_set(nxt_task_t *task, nxt_user_cred_t *uc) { - nxt_debug(task, "user cred set: \"%s\" uid:%uL base gid:%uL", - uc->user, (uint64_t) uc->uid, (uint64_t) uc->base_gid); + nxt_debug(task, "user cred set: \"%s\" uid:%d base gid:%d", + uc->user, uc->uid, uc->base_gid); if (setgid(uc->base_gid) != 0) { + +#if (NXT_HAVE_CLONE) + if (nxt_errno == EINVAL) { + nxt_log(task, NXT_LOG_ERR, "The gid %d isn't valid in the " + "application namespace.", uc->base_gid); + return NXT_ERROR; + } +#endif + nxt_alert(task, "setgid(%d) failed %E", uc->base_gid, nxt_errno); return NXT_ERROR; } if (uc->gids != NULL) { if (setgroups(uc->ngroups, uc->gids) != 0) { + +#if (NXT_HAVE_CLONE) + if (nxt_errno == EINVAL) { + nxt_log(task, NXT_LOG_ERR, "The user \"%s\" (uid: %d) has " + "supplementary group ids not valid in the application " + "namespace.", uc->user, uc->uid); + return NXT_ERROR; + } +#endif + nxt_alert(task, "setgroups(%i) failed %E", uc->ngroups, nxt_errno); return NXT_ERROR; } @@ -747,6 +766,15 @@ nxt_user_cred_set(nxt_task_t *task, nxt_user_cred_t *uc) } if (setuid(uc->uid) != 0) { + +#if (NXT_HAVE_CLONE) + if (nxt_errno == EINVAL) { + nxt_log(task, NXT_LOG_ERR, "The uid %d (user \"%s\") isn't " + "valid in the application namespace.", uc->uid, uc->user); + return NXT_ERROR; + } +#endif + nxt_alert(task, "setuid(%d) failed %E", uc->uid, nxt_errno); return NXT_ERROR; } @@ -756,6 +784,17 @@ nxt_user_cred_set(nxt_task_t *task, nxt_user_cred_t *uc) void +nxt_process_use(nxt_task_t *task, nxt_process_t *process, int i) +{ + process->use_count += i; + + if (process->use_count == 0) { + nxt_runtime_process_release(task->thread->runtime, process); + } +} + + +void nxt_process_port_add(nxt_task_t *task, nxt_process_t *process, nxt_port_t *port) { nxt_assert(port->process == NULL); diff --git a/src/nxt_process.h b/src/nxt_process.h index df9ca038..d67573f1 100644 --- a/src/nxt_process.h +++ b/src/nxt_process.h @@ -114,6 +114,8 @@ NXT_EXPORT void nxt_process_port_add(nxt_task_t *task, nxt_process_t *process, nxt_process_type_t nxt_process_type(nxt_process_t *process); +void nxt_process_use(nxt_task_t *task, nxt_process_t *process, int i); + void nxt_process_close_ports(nxt_task_t *task, nxt_process_t *process); void nxt_process_connected_port_add(nxt_process_t *process, nxt_port_t *port); diff --git a/src/nxt_python_wsgi.c b/src/nxt_python_wsgi.c index a6d5f217..5bb2fb2c 100644 --- a/src/nxt_python_wsgi.c +++ b/src/nxt_python_wsgi.c @@ -86,6 +86,7 @@ static PyObject *nxt_py_input_read(nxt_py_input_t *self, PyObject *args); static PyObject *nxt_py_input_readline(nxt_py_input_t *self, PyObject *args); static PyObject *nxt_py_input_readlines(nxt_py_input_t *self, PyObject *args); +static void nxt_python_print_exception(void); static int nxt_python_write(nxt_python_run_ctx_t *ctx, PyObject *bytes); struct nxt_python_run_ctx_s { @@ -130,58 +131,17 @@ static PyMethodDef nxt_py_input_methods[] = { static PyTypeObject nxt_py_input_type = { PyVarObject_HEAD_INIT(NULL, 0) - "unit._input", /* tp_name */ - (int) sizeof(nxt_py_input_t), /* tp_basicsize */ - 0, /* tp_itemsize */ - (destructor) nxt_py_input_dealloc, /* tp_dealloc */ - 0, /* tp_print */ - 0, /* tp_getattr */ - 0, /* tp_setattr */ - 0, /* tp_compare */ - 0, /* tp_repr */ - 0, /* tp_as_number */ - 0, /* tp_as_sequence */ - 0, /* tp_as_mapping */ - 0, /* tp_hash */ - 0, /* tp_call */ - 0, /* tp_str */ - 0, /* tp_getattro */ - 0, /* tp_setattro */ - 0, /* tp_as_buffer */ - Py_TPFLAGS_DEFAULT, /* tp_flags */ - "unit input object.", /* tp_doc */ - 0, /* tp_traverse */ - 0, /* tp_clear */ - 0, /* tp_richcompare */ - 0, /* tp_weaklistoffset */ - 0, /* tp_iter */ - 0, /* tp_iternext */ - nxt_py_input_methods, /* tp_methods */ - 0, /* tp_members */ - 0, /* tp_getset */ - 0, /* tp_base */ - 0, /* tp_dict */ - 0, /* tp_descr_get */ - 0, /* tp_descr_set */ - 0, /* tp_dictoffset */ - 0, /* tp_init */ - 0, /* tp_alloc */ - 0, /* tp_new */ - 0, /* tp_free */ - 0, /* tp_is_gc */ - 0, /* tp_bases */ - 0, /* tp_mro - method resolution order */ - 0, /* tp_cache */ - 0, /* tp_subclasses */ - 0, /* tp_weaklist */ - 0, /* tp_del */ - 0, /* tp_version_tag */ -#if PY_MAJOR_VERSION == 3 && PY_MINOR_VERSION > 3 - 0, /* tp_finalize */ -#endif + + .tp_name = "unit._input", + .tp_basicsize = sizeof(nxt_py_input_t), + .tp_dealloc = (destructor) nxt_py_input_dealloc, + .tp_flags = Py_TPFLAGS_DEFAULT, + .tp_doc = "unit input object.", + .tp_methods = nxt_py_input_methods, }; +static PyObject *nxt_py_stderr_flush; static PyObject *nxt_py_application; static PyObject *nxt_py_start_resp_obj; static PyObject *nxt_py_write_obj; @@ -193,6 +153,7 @@ static wchar_t *nxt_py_home; static char *nxt_py_home; #endif +static PyThreadState *nxt_python_thread_state; static nxt_python_run_ctx_t *nxt_python_run_ctx; @@ -279,6 +240,21 @@ nxt_python_init(nxt_task_t *task, nxt_common_app_conf_t *conf) module = NULL; + obj = PySys_GetObject((char *) "stderr"); + if (nxt_slow_path(obj == NULL)) { + nxt_alert(task, "Python failed to get \"sys.stderr\" object"); + goto fail; + } + + nxt_py_stderr_flush = PyObject_GetAttrString(obj, "flush"); + if (nxt_slow_path(nxt_py_stderr_flush == NULL)) { + nxt_alert(task, "Python failed to get \"flush\" attribute of " + "\"sys.stderr\" object"); + goto fail; + } + + Py_DECREF(obj); + if (c->path.length > 0) { obj = PyString_FromStringAndSize((char *) c->path.start, c->path.length); @@ -349,7 +325,7 @@ nxt_python_init(nxt_task_t *task, nxt_common_app_conf_t *conf) module = PyImport_ImportModule(nxt_py_module); if (nxt_slow_path(module == NULL)) { nxt_alert(task, "Python failed to import module \"%s\"", nxt_py_module); - PyErr_Print(); + nxt_python_print_exception(); goto fail; } @@ -363,7 +339,6 @@ nxt_python_init(nxt_task_t *task, nxt_common_app_conf_t *conf) if (nxt_slow_path(PyCallable_Check(obj) == 0)) { nxt_alert(task, "\"application\" in module \"%s\" " "is not a callable object", nxt_py_module); - PyErr_Print(); goto fail; } @@ -382,10 +357,14 @@ nxt_python_init(nxt_task_t *task, nxt_common_app_conf_t *conf) goto fail; } + nxt_python_thread_state = PyEval_SaveThread(); + rc = nxt_unit_run(unit_ctx); nxt_unit_done(unit_ctx); + PyEval_RestoreThread(nxt_python_thread_state); + nxt_python_atexit(); exit(rc); @@ -407,22 +386,26 @@ static void nxt_python_request_handler(nxt_unit_request_info_t *req) { int rc; - PyObject *result, *iterator, *item, *args, *environ; + PyObject *environ, *args, *response, *iterator, *item; + PyObject *close, *result; nxt_python_run_ctx_t run_ctx = {-1, 0, NULL, req}; + PyEval_RestoreThread(nxt_python_thread_state); + environ = nxt_python_get_environ(&run_ctx); if (nxt_slow_path(environ == NULL)) { - nxt_unit_request_done(req, NXT_UNIT_ERROR); - - return; + rc = NXT_UNIT_ERROR; + goto done; } args = PyTuple_New(2); if (nxt_slow_path(args == NULL)) { + Py_DECREF(environ); + nxt_unit_req_error(req, "Python failed to create arguments tuple"); - nxt_unit_request_done(req, NXT_UNIT_ERROR); - return; + rc = NXT_UNIT_ERROR; + goto done; } PyTuple_SET_ITEM(args, 0, environ); @@ -432,103 +415,104 @@ nxt_python_request_handler(nxt_unit_request_info_t *req) nxt_python_run_ctx = &run_ctx; - result = PyObject_CallObject(nxt_py_application, args); + response = PyObject_CallObject(nxt_py_application, args); Py_DECREF(args); - if (nxt_slow_path(result == NULL)) { + if (nxt_slow_path(response == NULL)) { nxt_unit_req_error(req, "Python failed to call the application"); - PyErr_Print(); - - nxt_unit_request_done(req, NXT_UNIT_ERROR); - nxt_python_run_ctx = NULL; + nxt_python_print_exception(); - return; + rc = NXT_UNIT_ERROR; + goto done; } - item = NULL; - iterator = NULL; + /* Shortcut: avoid iterate over response string symbols. */ + if (PyBytes_Check(response)) { + rc = nxt_python_write(&run_ctx, response); - /* Shortcut: avoid iterate over result string symbols. */ - if (PyBytes_Check(result)) { + } else { + iterator = PyObject_GetIter(response); - rc = nxt_python_write(&run_ctx, result); - if (nxt_slow_path(rc != NXT_UNIT_OK)) { - goto fail; - } + if (nxt_fast_path(iterator != NULL)) { + rc = NXT_UNIT_OK; - } else { - iterator = PyObject_GetIter(result); + while (run_ctx.bytes_sent < run_ctx.content_length) { + item = PyIter_Next(iterator); - if (nxt_slow_path(iterator == NULL)) { - nxt_unit_req_error(req, "the application returned " - "not an iterable object"); + if (item == NULL) { + if (nxt_slow_path(PyErr_Occurred() != NULL)) { + nxt_unit_req_error(req, "Python failed to iterate over " + "the application response object"); + nxt_python_print_exception(); - goto fail; - } + rc = NXT_UNIT_ERROR; + } - while (run_ctx.bytes_sent < run_ctx.content_length - && (item = PyIter_Next(iterator))) - { - if (nxt_slow_path(!PyBytes_Check(item))) { - nxt_unit_req_error(req, "the application returned " - "not a bytestring object"); + break; + } - goto fail; - } + if (nxt_fast_path(PyBytes_Check(item))) { + rc = nxt_python_write(&run_ctx, item); - rc = nxt_python_write(&run_ctx, item); - if (nxt_slow_path(rc != NXT_UNIT_OK)) { - goto fail; - } + } else { + nxt_unit_req_error(req, "the application returned " + "not a bytestring object"); + rc = NXT_UNIT_ERROR; + } - Py_DECREF(item); - } + Py_DECREF(item); - Py_DECREF(iterator); + if (nxt_slow_path(rc != NXT_UNIT_OK)) { + break; + } + } - if (PyObject_HasAttrString(result, "close")) { - PyObject_CallMethod(result, (char *) "close", NULL); - } - } + Py_DECREF(iterator); - if (nxt_slow_path(PyErr_Occurred() != NULL)) { - nxt_unit_req_error(req, "an application error occurred"); - PyErr_Print(); - } + } else { + nxt_unit_req_error(req, + "the application returned not an iterable object"); + nxt_python_print_exception(); - nxt_unit_request_done(req, NXT_UNIT_OK); + rc = NXT_UNIT_ERROR; + } - Py_DECREF(result); + close = PyObject_GetAttrString(response, "close"); - nxt_python_run_ctx = NULL; + if (close != NULL) { + result = PyObject_CallFunction(close, NULL); + if (nxt_slow_path(result == NULL)) { + nxt_unit_req_error(req, "Python failed to call the close() " + "method of the application response"); + nxt_python_print_exception(); - return; + } else { + Py_DECREF(result); + } -fail: + Py_DECREF(close); - if (item != NULL) { - Py_DECREF(item); + } else { + PyErr_Clear(); + } } - if (iterator != NULL) { - Py_DECREF(iterator); - } + Py_DECREF(response); - if (PyObject_HasAttrString(result, "close")) { - PyObject_CallMethod(result, (char *) "close", NULL); - } +done: - Py_DECREF(result); - nxt_python_run_ctx = NULL; + nxt_python_thread_state = PyEval_SaveThread(); - nxt_unit_request_done(req, NXT_UNIT_ERROR); + nxt_python_run_ctx = NULL; + nxt_unit_request_done(req, rc); } static void nxt_python_atexit(void) { + Py_XDECREF(nxt_py_stderr_flush); Py_XDECREF(nxt_py_application); Py_XDECREF(nxt_py_start_resp_obj); Py_XDECREF(nxt_py_write_obj); @@ -767,7 +751,7 @@ nxt_python_add_sptr(nxt_python_run_ctx_t *ctx, const char *name, nxt_unit_req_error(ctx->req, "Python failed to create value string \"%.*s\"", (int) size, src); - PyErr_Print(); + nxt_python_print_exception(); return NXT_UNIT_ERROR; } @@ -802,7 +786,7 @@ nxt_python_add_str(nxt_python_run_ctx_t *ctx, const char *name, nxt_unit_req_error(ctx->req, "Python failed to create value string \"%.*s\"", (int) size, str); - PyErr_Print(); + nxt_python_print_exception(); return NXT_UNIT_ERROR; } @@ -1137,6 +1121,28 @@ nxt_py_input_readlines(nxt_py_input_t *self, PyObject *args) } +static void +nxt_python_print_exception(void) +{ + PyErr_Print(); + +#if PY_MAJOR_VERSION == 3 + /* The backtrace may be buffered in sys.stderr file object. */ + { + PyObject *result; + + result = PyObject_CallFunction(nxt_py_stderr_flush, NULL); + if (nxt_slow_path(result == NULL)) { + PyErr_Clear(); + return; + } + + Py_DECREF(result); + } +#endif +} + + static int nxt_python_write(nxt_python_run_ctx_t *ctx, PyObject *bytes) { diff --git a/src/nxt_router.c b/src/nxt_router.c index 28781600..b9f5d921 100644 --- a/src/nxt_router.c +++ b/src/nxt_router.c @@ -537,6 +537,7 @@ nxt_router_msg_cancel(nxt_task_t *task, nxt_msg_info_t *msg_info, for (b = msg_info->buf; b != NULL; b = next) { next = b->next; + b->next = NULL; b->completion_handler = msg_info->completion_handler; @@ -1172,8 +1173,8 @@ nxt_router_conf_error(nxt_task_t *task, nxt_router_temp_conf_t *tmcf) nxt_queue_each(skcf, &new_socket_confs, nxt_socket_conf_t, link) { - if (skcf->pass != NULL) { - nxt_http_pass_cleanup(task, skcf->pass); + if (skcf->action != NULL) { + nxt_http_action_cleanup(task, skcf->action); } } nxt_queue_loop; @@ -1458,7 +1459,8 @@ nxt_router_conf_create(nxt_task_t *task, nxt_router_temp_conf_t *tmcf, next = 0; for ( ;; ) { - application = nxt_conf_next_object_member(applications, &name, &next); + application = nxt_conf_next_object_member(applications, + &name, &next); if (application == NULL) { break; } @@ -1676,10 +1678,16 @@ nxt_router_conf_create(nxt_task_t *task, nxt_router_temp_conf_t *tmcf, skcf->large_header_buffers = 4; skcf->body_buffer_size = 16 * 1024; skcf->max_body_size = 8 * 1024 * 1024; + skcf->proxy_header_buffer_size = 64 * 1024; + skcf->proxy_buffer_size = 4096; + skcf->proxy_buffers = 256; skcf->idle_timeout = 180 * 1000; skcf->header_read_timeout = 30 * 1000; skcf->body_read_timeout = 30 * 1000; skcf->send_timeout = 30 * 1000; + skcf->proxy_timeout = 60 * 1000; + skcf->proxy_send_timeout = 30 * 1000; + skcf->proxy_read_timeout = 30 * 1000; skcf->websocket_conf.max_frame_size = 1024 * 1024; skcf->websocket_conf.read_timeout = 60 * 1000; @@ -1729,12 +1737,12 @@ nxt_router_conf_create(nxt_task_t *task, nxt_router_temp_conf_t *tmcf, skcf->router_conf->count++; if (lscf.pass.length != 0) { - skcf->pass = nxt_http_pass_create(task, tmcf, &lscf.pass); + skcf->action = nxt_http_action_create(task, tmcf, &lscf.pass); /* COMPATIBILITY: listener application. */ } else if (lscf.application.length > 0) { - skcf->pass = nxt_http_pass_application(task, tmcf, - &lscf.application); + skcf->action = nxt_http_pass_application(task, tmcf, + &lscf.application); } } } @@ -3070,8 +3078,8 @@ nxt_router_conf_release(nxt_task_t *task, nxt_socket_conf_joint_t *joint) nxt_thread_spin_unlock(lock); if (skcf != NULL) { - if (skcf->pass != NULL) { - nxt_http_pass_cleanup(task, skcf->pass); + if (skcf->action != NULL) { + nxt_http_action_cleanup(task, skcf->action); } #if (NXT_TLS) @@ -3497,7 +3505,7 @@ nxt_router_response_ready_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg, void *data) { nxt_int_t ret; - nxt_buf_t *b; + nxt_buf_t *b, *next; nxt_port_t *app_port; nxt_unit_field_t *f; nxt_http_field_t *field; @@ -3580,6 +3588,7 @@ nxt_router_response_ready_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg, field->hash = f->hash; field->skip = 0; + field->hopbyhop = 0; field->name_length = f->name_length; field->value_length = f->value_length; @@ -3612,17 +3621,20 @@ nxt_router_response_ready_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg, } if (nxt_buf_mem_used_size(&b->mem) == 0) { + next = b->next; + b->next = NULL; + nxt_work_queue_add(&task->thread->engine->fast_work_queue, b->completion_handler, task, b, b->parent); - b = b->next; + b = next; } if (b != NULL) { nxt_buf_chain_add(&r->out, b); } - nxt_http_request_header_send(task, r, nxt_http_request_send_body); + nxt_http_request_header_send(task, r, nxt_http_request_send_body, NULL); if (r->websocket_handshake && r->status == NXT_HTTP_SWITCHING_PROTOCOLS) @@ -5056,6 +5068,7 @@ nxt_router_prepare_msg(nxt_task_t *task, nxt_http_request_t *r, if (nxt_slow_path(buf == NULL)) { while (out != NULL) { buf = out->next; + out->next = NULL; out->completion_handler(task, out, out->parent); out = buf; } diff --git a/src/nxt_router.h b/src/nxt_router.h index ec18ff48..1517c14b 100644 --- a/src/nxt_router.h +++ b/src/nxt_router.h @@ -16,12 +16,12 @@ typedef struct nxt_http_request_s nxt_http_request_t; #include <nxt_application.h> -typedef struct nxt_http_pass_s nxt_http_pass_t; +typedef struct nxt_http_action_s nxt_http_action_t; typedef struct nxt_http_routes_s nxt_http_routes_t; typedef struct nxt_router_access_log_s nxt_router_access_log_t; -#define NXT_HTTP_PASS_ERROR ((nxt_http_pass_t *) -1) +#define NXT_HTTP_ACTION_ERROR ((nxt_http_action_t *) -1) typedef struct { @@ -154,7 +154,7 @@ typedef struct { nxt_queue_link_t link; nxt_router_conf_t *router_conf; - nxt_http_pass_t *pass; + nxt_http_action_t *action; /* * A listen socket time can be shorter than socket configuration life @@ -170,10 +170,17 @@ typedef struct { size_t large_header_buffers; size_t body_buffer_size; size_t max_body_size; + size_t proxy_header_buffer_size; + size_t proxy_buffer_size; + size_t proxy_buffers; + nxt_msec_t idle_timeout; nxt_msec_t header_read_timeout; nxt_msec_t body_read_timeout; nxt_msec_t send_timeout; + nxt_msec_t proxy_timeout; + nxt_msec_t proxy_send_timeout; + nxt_msec_t proxy_read_timeout; nxt_websocket_conf_t websocket_conf; diff --git a/src/nxt_runtime.c b/src/nxt_runtime.c index de41ba4d..096aabc4 100644 --- a/src/nxt_runtime.c +++ b/src/nxt_runtime.c @@ -37,10 +37,10 @@ static void nxt_runtime_thread_pool_destroy(nxt_task_t *task, nxt_runtime_t *rt, static void nxt_runtime_thread_pool_init(void); static void nxt_runtime_thread_pool_exit(nxt_task_t *task, void *obj, void *data); -static void nxt_runtime_process_destroy(nxt_runtime_t *rt, +static nxt_process_t *nxt_runtime_process_get(nxt_runtime_t *rt, nxt_pid_t pid); +static void nxt_runtime_process_remove(nxt_runtime_t *rt, nxt_process_t *process); -static nxt_process_t *nxt_runtime_process_remove_pid(nxt_runtime_t *rt, - nxt_pid_t pid); +static void nxt_runtime_port_add(nxt_task_t *task, nxt_port_t *port); nxt_int_t @@ -459,7 +459,7 @@ nxt_runtime_close_idle_connections(nxt_event_engine_t *engine) idle = &engine->idle_connections; - for (link = nxt_queue_head(idle); + for (link = nxt_queue_first(idle); link != nxt_queue_tail(idle); link = next) { @@ -658,6 +658,8 @@ nxt_runtime_thread_pool_exit(nxt_task_t *task, void *obj, void *data) if (tp == thread_pools[i]) { nxt_array_remove(rt->thread_pools, &thread_pools[i]); + nxt_free(tp); + if (n == 1) { /* The last thread pool. */ rt->continuation(task, 0); @@ -1296,11 +1298,15 @@ nxt_runtime_process_new(nxt_runtime_t *rt) } -static void -nxt_runtime_process_destroy(nxt_runtime_t *rt, nxt_process_t *process) +void +nxt_runtime_process_release(nxt_runtime_t *rt, nxt_process_t *process) { nxt_port_t *port; + if (process->registered == 1) { + nxt_runtime_process_remove(rt, process); + } + nxt_assert(process->use_count == 0); nxt_assert(process->registered == 0); @@ -1316,6 +1322,10 @@ nxt_runtime_process_destroy(nxt_runtime_t *rt, nxt_process_t *process) nxt_thread_mutex_destroy(&process->outgoing.mutex); nxt_thread_mutex_destroy(&process->cp_mutex); + if (process->init != NULL) { + nxt_mp_free(rt->mem_pool, process->init); + } + nxt_mp_free(rt->mem_pool, process); } @@ -1379,7 +1389,7 @@ nxt_runtime_process_find(nxt_runtime_t *rt, nxt_pid_t pid) } -nxt_process_t * +static nxt_process_t * nxt_runtime_process_get(nxt_runtime_t *rt, nxt_pid_t pid) { nxt_process_t *process; @@ -1489,13 +1499,13 @@ nxt_runtime_process_add(nxt_task_t *task, nxt_process_t *process) } -static nxt_process_t * -nxt_runtime_process_remove_pid(nxt_runtime_t *rt, nxt_pid_t pid) +static void +nxt_runtime_process_remove(nxt_runtime_t *rt, nxt_process_t *process) { - nxt_process_t *process; + nxt_pid_t pid; nxt_lvlhsh_query_t lhq; - process = NULL; + pid = process->pid; nxt_runtime_process_lhq_pid(&lhq, &pid); @@ -1521,40 +1531,49 @@ nxt_runtime_process_remove_pid(nxt_runtime_t *rt, nxt_pid_t pid) } nxt_thread_mutex_unlock(&rt->processes_mutex); - - return process; } -void -nxt_process_use(nxt_task_t *task, nxt_process_t *process, int i) +nxt_process_t * +nxt_runtime_process_first(nxt_runtime_t *rt, nxt_lvlhsh_each_t *lhe) { - nxt_runtime_t *rt; + nxt_lvlhsh_each_init(lhe, &lvlhsh_processes_proto); + + return nxt_runtime_process_next(rt, lhe); +} - process->use_count += i; - if (process->use_count == 0) { - rt = task->thread->runtime; +nxt_port_t * +nxt_runtime_process_port_create(nxt_task_t *task, nxt_runtime_t *rt, + nxt_pid_t pid, nxt_port_id_t id, nxt_process_type_t type) +{ + nxt_port_t *port; + nxt_process_t *process; - if (process->registered == 1) { - nxt_runtime_process_remove_pid(rt, process->pid); - } + process = nxt_runtime_process_get(rt, pid); + if (nxt_slow_path(process == NULL)) { + return NULL; + } - nxt_runtime_process_destroy(rt, process); + port = nxt_port_new(task, id, pid, type); + if (nxt_slow_path(port == NULL)) { + nxt_process_use(task, process, -1); + return NULL; } -} + nxt_process_port_add(task, process, port); -nxt_process_t * -nxt_runtime_process_first(nxt_runtime_t *rt, nxt_lvlhsh_each_t *lhe) -{ - nxt_lvlhsh_each_init(lhe, &lvlhsh_processes_proto); + nxt_process_use(task, process, -1); - return nxt_runtime_process_next(rt, lhe); + nxt_runtime_port_add(task, port); + + nxt_port_use(task, port, -1); + + return port; } -void +static void nxt_runtime_port_add(nxt_task_t *task, nxt_port_t *port) { nxt_int_t res; diff --git a/src/nxt_runtime.h b/src/nxt_runtime.h index 0791f8e7..d5b340b6 100644 --- a/src/nxt_runtime.h +++ b/src/nxt_runtime.h @@ -93,22 +93,20 @@ nxt_int_t nxt_runtime_thread_pool_create(nxt_thread_t *thr, nxt_runtime_t *rt, nxt_process_t *nxt_runtime_process_new(nxt_runtime_t *rt); -nxt_process_t *nxt_runtime_process_get(nxt_runtime_t *rt, nxt_pid_t pid); - void nxt_runtime_process_add(nxt_task_t *task, nxt_process_t *process); nxt_process_t *nxt_runtime_process_find(nxt_runtime_t *rt, nxt_pid_t pid); -void nxt_process_use(nxt_task_t *task, nxt_process_t *process, int i); - nxt_process_t *nxt_runtime_process_first(nxt_runtime_t *rt, nxt_lvlhsh_each_t *lhe); +void nxt_runtime_process_release(nxt_runtime_t *rt, nxt_process_t *process); + #define nxt_runtime_process_next(rt, lhe) \ nxt_lvlhsh_each(&rt->processes, lhe) - -void nxt_runtime_port_add(nxt_task_t *task, nxt_port_t *port); +nxt_port_t *nxt_runtime_process_port_create(nxt_task_t *task, nxt_runtime_t *rt, + nxt_pid_t pid, nxt_port_id_t id, nxt_process_type_t type); void nxt_runtime_port_remove(nxt_task_t *task, nxt_port_t *port); diff --git a/src/nxt_sendbuf.c b/src/nxt_sendbuf.c index 16fe4724..94f8e9eb 100644 --- a/src/nxt_sendbuf.c +++ b/src/nxt_sendbuf.c @@ -9,6 +9,8 @@ static nxt_bool_t nxt_sendbuf_copy(nxt_buf_mem_t *bm, nxt_buf_t *b, size_t *copied); +static nxt_buf_t *nxt_sendbuf_coalesce_completion(nxt_task_t *task, + nxt_work_queue_t *wq, nxt_buf_t *start); nxt_uint_t @@ -380,15 +382,11 @@ nxt_sendbuf_completion(nxt_task_t *task, nxt_work_queue_t *wq, nxt_buf_t *b) { while (b != NULL) { - nxt_prefetch(b->next); - if (!nxt_buf_is_sync(b) && nxt_buf_used_size(b) != 0) { break; } - nxt_work_queue_add(wq, b->completion_handler, task, b, b->parent); - - b = b->next; + b = nxt_sendbuf_coalesce_completion(task, wq, b); } return b; @@ -399,10 +397,49 @@ void nxt_sendbuf_drain(nxt_task_t *task, nxt_work_queue_t *wq, nxt_buf_t *b) { while (b != NULL) { - nxt_prefetch(b->next); + b = nxt_sendbuf_coalesce_completion(task, wq, b); + } +} - nxt_work_queue_add(wq, b->completion_handler, task, b, b->parent); - b = b->next; +static nxt_buf_t * +nxt_sendbuf_coalesce_completion(nxt_task_t *task, nxt_work_queue_t *wq, + nxt_buf_t *start) +{ + nxt_buf_t *b, *next, **last, *rest, **last_rest; + nxt_work_handler_t handler; + + rest = NULL; + last_rest = &rest; + last = &start->next; + b = start; + handler = b->completion_handler; + + for ( ;; ) { + next = b->next; + if (next == NULL) { + break; + } + + b->next = NULL; + b = next; + + if (!nxt_buf_is_sync(b) && nxt_buf_used_size(b) != 0) { + *last_rest = b; + break; + } + + if (handler == b->completion_handler) { + *last = b; + last = &b->next; + + } else { + *last_rest = b; + last_rest = &b->next; + } } + + nxt_work_queue_add(wq, handler, task, start, start->parent); + + return rest; } diff --git a/src/nxt_sockaddr.c b/src/nxt_sockaddr.c index 99cf54b4..57dfbfa6 100644 --- a/src/nxt_sockaddr.c +++ b/src/nxt_sockaddr.c @@ -23,11 +23,11 @@ static nxt_int_t nxt_job_sockaddr_inet_parse(nxt_job_sockaddr_parse_t *jbs); nxt_sockaddr_t * nxt_sockaddr_cache_alloc(nxt_event_engine_t *engine, nxt_listen_socket_t *ls) { - uint8_t hint; size_t size; + uint8_t hint; nxt_sockaddr_t *sa; - hint = (uint8_t) -1; + hint = NXT_EVENT_ENGINE_NO_MEM_HINT; size = offsetof(nxt_sockaddr_t, u) + ls->socklen + ls->address_length; sa = nxt_event_engine_mem_alloc(engine, &hint, size); @@ -56,7 +56,11 @@ nxt_sockaddr_cache_alloc(nxt_event_engine_t *engine, nxt_listen_socket_t *ls) void nxt_sockaddr_cache_free(nxt_event_engine_t *engine, nxt_conn_t *c) { - nxt_event_engine_mem_free(engine, &c->remote->cache_hint, c->remote); + nxt_sockaddr_t *sa; + + sa = c->remote; + + nxt_event_engine_mem_free(engine, sa->cache_hint, sa, 0); } diff --git a/src/nxt_socket.c b/src/nxt_socket.c index 95a298d8..2a809184 100644 --- a/src/nxt_socket.c +++ b/src/nxt_socket.c @@ -300,6 +300,28 @@ nxt_socket_shutdown(nxt_task_t *task, nxt_socket_t s, nxt_uint_t how) } +nxt_err_t +nxt_socket_error(nxt_socket_t s) +{ + int ret, err; + socklen_t len; + + err = 0; + len = sizeof(int); + /* + * Linux and BSDs return 0 and store a pending error in the err argument; + * Solaris returns -1 and sets the errno. + */ + ret = getsockopt(s, SOL_SOCKET, SO_ERROR, (void *) &err, &len); + + if (nxt_slow_path(ret == -1)) { + err = nxt_errno; + } + + return err; +} + + nxt_uint_t nxt_socket_error_level(nxt_err_t err) { @@ -315,6 +337,9 @@ nxt_socket_error_level(nxt_err_t err) case NXT_EHOSTUNREACH: return NXT_LOG_INFO; + case NXT_ECONNREFUSED: + return NXT_LOG_ERR; + default: return NXT_LOG_ALERT; } diff --git a/src/nxt_socket.h b/src/nxt_socket.h index 3f00648d..6a450f83 100644 --- a/src/nxt_socket.h +++ b/src/nxt_socket.h @@ -106,6 +106,7 @@ NXT_EXPORT nxt_int_t nxt_socket_connect(nxt_task_t *task, nxt_socket_t s, nxt_sockaddr_t *sa); NXT_EXPORT void nxt_socket_shutdown(nxt_task_t *task, nxt_socket_t s, nxt_uint_t how); +nxt_err_t nxt_socket_error(nxt_socket_t s); nxt_uint_t nxt_socket_error_level(nxt_err_t err); NXT_EXPORT nxt_int_t nxt_socketpair_create(nxt_task_t *task, diff --git a/src/nxt_string.c b/src/nxt_string.c index b89e9555..d567883f 100644 --- a/src/nxt_string.c +++ b/src/nxt_string.c @@ -188,10 +188,14 @@ nxt_strncasecmp(const u_char *s1, const u_char *s2, size_t length) nxt_int_t -nxt_memcasecmp(const u_char *s1, const u_char *s2, size_t length) +nxt_memcasecmp(const void *p1, const void *p2, size_t length) { - u_char c1, c2; - nxt_int_t n; + u_char c1, c2; + nxt_int_t n; + const u_char *s1, *s2; + + s1 = p1; + s2 = p2; while (length-- != 0) { c1 = *s1++; diff --git a/src/nxt_string.h b/src/nxt_string.h index 8d7b3b73..de498048 100644 --- a/src/nxt_string.h +++ b/src/nxt_string.h @@ -87,7 +87,7 @@ NXT_EXPORT u_char *nxt_cpystrn(u_char *dst, const u_char *src, size_t length); NXT_EXPORT nxt_int_t nxt_strcasecmp(const u_char *s1, const u_char *s2); NXT_EXPORT nxt_int_t nxt_strncasecmp(const u_char *s1, const u_char *s2, size_t length); -NXT_EXPORT nxt_int_t nxt_memcasecmp(const u_char *s1, const u_char *s2, +NXT_EXPORT nxt_int_t nxt_memcasecmp(const void *p1, const void *p2, size_t length); NXT_EXPORT u_char *nxt_memstrn(const u_char *s, const u_char *end, diff --git a/src/nxt_unit.c b/src/nxt_unit.c index 9ccd1fd9..0cf32916 100644 --- a/src/nxt_unit.c +++ b/src/nxt_unit.c @@ -1316,8 +1316,12 @@ nxt_unit_response_init(nxt_unit_request_info_t *req, nxt_unit_req_debug(req, "duplicate response init"); } + /* + * Each field name and value 0-terminated by libunit, + * this is the reason of '+ 2' below. + */ buf_size = sizeof(nxt_unit_response_t) - + max_fields_count * sizeof(nxt_unit_field_t) + + max_fields_count * (sizeof(nxt_unit_field_t) + 2) + max_fields_size; if (nxt_slow_path(req->response_buf != NULL)) { @@ -1391,8 +1395,12 @@ nxt_unit_response_realloc(nxt_unit_request_info_t *req, return NXT_UNIT_ERROR; } + /* + * Each field name and value 0-terminated by libunit, + * this is the reason of '+ 2' below. + */ buf_size = sizeof(nxt_unit_response_t) - + max_fields_count * sizeof(nxt_unit_field_t) + + max_fields_count * (sizeof(nxt_unit_field_t) + 2) + max_fields_size; nxt_unit_req_debug(req, "realloc %"PRIu32"", buf_size); @@ -1458,7 +1466,8 @@ nxt_unit_response_realloc(nxt_unit_request_info_t *req, goto fail; } - resp->piggyback_content_length = req->response->piggyback_content_length; + resp->piggyback_content_length = + req->response->piggyback_content_length; nxt_unit_sptr_set(&resp->piggyback_content, p); p = nxt_cpymem(p, nxt_unit_sptr_get(&req->response->piggyback_content), @@ -1953,7 +1962,8 @@ nxt_unit_mmap_buf_send(nxt_unit_ctx_t *ctx, uint32_t stream, if (hdr != NULL) { m.mmap_msg.mmap_id = hdr->id; - m.mmap_msg.chunk_id = nxt_port_mmap_chunk_id(hdr, (u_char *) buf->start); + m.mmap_msg.chunk_id = nxt_port_mmap_chunk_id(hdr, + (u_char *) buf->start); } nxt_unit_debug(ctx, "#%"PRIu32": send mmap: (%d,%d,%d)", diff --git a/src/nxt_unit_field.h b/src/nxt_unit_field.h index d19db0f0..b07d3046 100644 --- a/src/nxt_unit_field.h +++ b/src/nxt_unit_field.h @@ -21,7 +21,8 @@ enum { /* Name and Value field aka HTTP header. */ struct nxt_unit_field_s { uint16_t hash; - uint8_t skip; + uint8_t skip:1; + uint8_t hopbyhop:1; uint8_t name_length; uint32_t value_length; diff --git a/src/nxt_websocket.c b/src/nxt_websocket.c index 9a099760..91002237 100644 --- a/src/nxt_websocket.c +++ b/src/nxt_websocket.c @@ -19,7 +19,7 @@ nxt_inline void nxt_hton16(uint8_t *b, uint16_t v) { b[0] = (v >> 8); - b[1] = (v & 0xFFu); + b[1] = (v & 0xFFu); } diff --git a/src/ruby/nxt_ruby.c b/src/ruby/nxt_ruby.c index ab9f7020..45d7a7aa 100644 --- a/src/ruby/nxt_ruby.c +++ b/src/ruby/nxt_ruby.c @@ -85,14 +85,16 @@ static nxt_int_t nxt_ruby_init(nxt_task_t *task, nxt_common_app_conf_t *conf) { int state, rc; - VALUE dummy, res; + VALUE res; nxt_unit_ctx_t *unit_ctx; nxt_unit_init_t ruby_unit_init; nxt_ruby_rack_init_t rack_init; + static char *argv[2] = { (char *) "NGINX_Unit", (char *) "-e0" }; + + RUBY_INIT_STACK ruby_init(); - Init_stack(&dummy); - ruby_init_loadpath(); + ruby_options(2, argv); ruby_script("NGINX_Unit"); rack_init.task = task; @@ -707,7 +709,8 @@ nxt_ruby_rack_result_body(VALUE result) } } else if (rb_respond_to(body, rb_intern("each"))) { - rb_iterate(rb_each, body, nxt_ruby_rack_result_body_each, 0); + rb_block_call(body, rb_intern("each"), 0, 0, + nxt_ruby_rack_result_body_each, 0); } else { nxt_unit_req_error(nxt_ruby_run_ctx.req, diff --git a/src/ruby/nxt_ruby_stream_io.c b/src/ruby/nxt_ruby_stream_io.c index 3f6cac89..fcfcf5dd 100644 --- a/src/ruby/nxt_ruby_stream_io.c +++ b/src/ruby/nxt_ruby_stream_io.c @@ -31,7 +31,8 @@ nxt_ruby_stream_io_input_init(void) rb_gc_register_address(&stream_io); rb_define_singleton_method(stream_io, "new", nxt_ruby_stream_io_new, 1); - rb_define_method(stream_io, "initialize", nxt_ruby_stream_io_initialize, -1); + rb_define_method(stream_io, "initialize", + nxt_ruby_stream_io_initialize, -1); rb_define_method(stream_io, "gets", nxt_ruby_stream_io_gets, 0); rb_define_method(stream_io, "each", nxt_ruby_stream_io_each, 0); rb_define_method(stream_io, "read", nxt_ruby_stream_io_read, -2); @@ -51,7 +52,8 @@ nxt_ruby_stream_io_error_init(void) rb_gc_register_address(&stream_io); rb_define_singleton_method(stream_io, "new", nxt_ruby_stream_io_new, 1); - rb_define_method(stream_io, "initialize", nxt_ruby_stream_io_initialize, -1); + rb_define_method(stream_io, "initialize", + nxt_ruby_stream_io_initialize, -1); rb_define_method(stream_io, "puts", nxt_ruby_stream_io_puts, -2); rb_define_method(stream_io, "write", nxt_ruby_stream_io_write, -2); rb_define_method(stream_io, "flush", nxt_ruby_stream_io_flush, 0); diff --git a/src/test/nxt_unit_websocket_chat.c b/src/test/nxt_unit_websocket_chat.c index ecc9a243..6e274722 100644 --- a/src/test/nxt_unit_websocket_chat.c +++ b/src/test/nxt_unit_websocket_chat.c @@ -104,10 +104,10 @@ ws_chat_root(nxt_unit_request_info_t *req) rc = nxt_unit_response_init(req, 200 /* Status code. */, 2 /* Number of response headers. */, - nxt_length(CONTENT_TYPE) + 1 - + nxt_length(TEXT_HTML) + 1 - + nxt_length(CONTENT_LENGTH) + 1 - + ws_chat_index_content_length_size + 1 + nxt_length(CONTENT_TYPE) + + nxt_length(TEXT_HTML) + + nxt_length(CONTENT_LENGTH) + + ws_chat_index_content_length_size + ws_chat_index_html_size); if (nxt_slow_path(rc != NXT_UNIT_OK)) { return rc; |