summaryrefslogtreecommitdiffhomepage
path: root/src
diff options
context:
space:
mode:
Diffstat (limited to 'src')
-rw-r--r--src/go/unit/response.go2
-rw-r--r--src/nodejs/unit-http/unit.cpp21
-rw-r--r--src/nxt_application.c3
-rw-r--r--src/nxt_buf.c48
-rw-r--r--src/nxt_buf.h23
-rw-r--r--src/nxt_conf.c7
-rw-r--r--src/nxt_conf.h1
-rw-r--r--src/nxt_conf_validation.c87
-rw-r--r--src/nxt_conn.h2
-rw-r--r--src/nxt_conn_connect.c60
-rw-r--r--src/nxt_conn_read.c2
-rw-r--r--src/nxt_epoll_engine.c8
-rw-r--r--src/nxt_event_engine.c111
-rw-r--r--src/nxt_event_engine.h12
-rw-r--r--src/nxt_h1proto.c840
-rw-r--r--src/nxt_h1proto.h2
-rw-r--r--src/nxt_h1proto_websocket.c5
-rw-r--r--src/nxt_http.h64
-rw-r--r--src/nxt_http_error.c4
-rw-r--r--src/nxt_http_parse.c1
-rw-r--r--src/nxt_http_parse.h3
-rw-r--r--src/nxt_http_proxy.c403
-rw-r--r--src/nxt_http_request.c49
-rw-r--r--src/nxt_http_route.c210
-rw-r--r--src/nxt_http_static.c10
-rw-r--r--src/nxt_http_websocket.c1
-rw-r--r--src/nxt_main_process.c66
-rw-r--r--src/nxt_port.c20
-rw-r--r--src/nxt_port_socket.c22
-rw-r--r--src/nxt_process.c45
-rw-r--r--src/nxt_process.h2
-rw-r--r--src/nxt_python_wsgi.c246
-rw-r--r--src/nxt_router.c35
-rw-r--r--src/nxt_router.h13
-rw-r--r--src/nxt_runtime.c79
-rw-r--r--src/nxt_runtime.h10
-rw-r--r--src/nxt_sendbuf.c53
-rw-r--r--src/nxt_sockaddr.c10
-rw-r--r--src/nxt_socket.c25
-rw-r--r--src/nxt_socket.h1
-rw-r--r--src/nxt_string.c10
-rw-r--r--src/nxt_string.h2
-rw-r--r--src/nxt_unit.c18
-rw-r--r--src/nxt_unit_field.h3
-rw-r--r--src/nxt_websocket.c2
-rw-r--r--src/ruby/nxt_ruby.c11
-rw-r--r--src/ruby/nxt_ruby_stream_io.c6
-rw-r--r--src/test/nxt_unit_websocket_chat.c8
48 files changed, 2144 insertions, 522 deletions
diff --git a/src/go/unit/response.go b/src/go/unit/response.go
index bb326ea5..767d66b7 100644
--- a/src/go/unit/response.go
+++ b/src/go/unit/response.go
@@ -63,7 +63,7 @@ func (r *response) WriteHeader(code int) {
for k, vv := range r.header {
for _, v := range vv {
fields++
- fields_size += len(k) + len(v) + 2
+ fields_size += len(k) + len(v)
}
}
diff --git a/src/nodejs/unit-http/unit.cpp b/src/nodejs/unit-http/unit.cpp
index ac10024c..10875703 100644
--- a/src/nodejs/unit-http/unit.cpp
+++ b/src/nodejs/unit-http/unit.cpp
@@ -629,9 +629,6 @@ Unit::response_send_headers(napi_env env, napi_callback_info info)
keys_count = napi.get_value_uint32(argv[2]);
header_len = napi.get_value_uint32(argv[3]);
- /* Need to reserve extra byte for C-string 0-termination. */
- header_len++;
-
headers = argv[1];
ret = nxt_unit_response_init(req, status_code, keys_count, header_len);
@@ -640,6 +637,12 @@ Unit::response_send_headers(napi_env env, napi_callback_info info)
return nullptr;
}
+ /*
+ * Each name and value are 0-terminated by libunit.
+ * Need to add extra 2 bytes for each header.
+ */
+ header_len += keys_count * 2;
+
keys = napi.get_property_names(headers);
keys_len = napi.get_array_length(keys);
@@ -656,8 +659,8 @@ Unit::response_send_headers(napi_env env, napi_callback_info info)
name_len = napi.get_value_string_latin1(name, ptr, header_len);
name_ptr = ptr;
- ptr += name_len;
- header_len -= name_len;
+ ptr += name_len + 1;
+ header_len -= name_len + 1;
hash = nxt_unit_field_hash(name_ptr, name_len);
@@ -689,8 +692,8 @@ Unit::response_send_headers(napi_env env, napi_callback_info info)
nxt_unit_sptr_set(&f->value, ptr);
f->value_length = (uint32_t) value_len;
- ptr += value_len;
- header_len -= value_len;
+ ptr += value_len + 1;
+ header_len -= value_len + 1;
req->response->fields_count++;
}
@@ -715,8 +718,8 @@ Unit::response_send_headers(napi_env env, napi_callback_info info)
nxt_unit_sptr_set(&f->value, ptr);
f->value_length = (uint32_t) value_len;
- ptr += value_len;
- header_len -= value_len;
+ ptr += value_len + 1;
+ header_len -= value_len + 1;
req->response->fields_count++;
}
diff --git a/src/nxt_application.c b/src/nxt_application.c
index 468bc627..bebe3907 100644
--- a/src/nxt_application.c
+++ b/src/nxt_application.c
@@ -327,6 +327,9 @@ nxt_app_start(nxt_task_t *task, void *data)
lang->version, lang->file);
nxt_app = nxt_app_module_load(task, lang->file);
+ if (nxt_slow_path(nxt_app == NULL)) {
+ return NXT_ERROR;
+ }
}
if (nxt_app->pre_init != NULL) {
diff --git a/src/nxt_buf.c b/src/nxt_buf.c
index 91846e4d..83be0fac 100644
--- a/src/nxt_buf.c
+++ b/src/nxt_buf.c
@@ -183,7 +183,10 @@ nxt_buf_chain_length(nxt_buf_t *b)
length = 0;
while (b != NULL) {
- length += b->mem.free - b->mem.pos;
+ if (!nxt_buf_is_sync(b)) {
+ length += b->mem.free - b->mem.pos;
+ }
+
b = b->next;
}
@@ -195,7 +198,7 @@ static void
nxt_buf_completion(nxt_task_t *task, void *obj, void *data)
{
nxt_mp_t *mp;
- nxt_buf_t *b, *parent;
+ nxt_buf_t *b, *next, *parent;
b = obj;
parent = data;
@@ -204,9 +207,23 @@ nxt_buf_completion(nxt_task_t *task, void *obj, void *data)
nxt_assert(data == b->parent);
- mp = b->data;
- nxt_mp_free(mp, b);
+ do {
+ next = b->next;
+ parent = b->parent;
+ mp = b->data;
+
+ nxt_mp_free(mp, b);
+ nxt_buf_parent_completion(task, parent);
+
+ b = next;
+ } while (b != NULL);
+}
+
+
+void
+nxt_buf_parent_completion(nxt_task_t *task, nxt_buf_t *parent)
+{
if (parent != NULL) {
nxt_debug(task, "parent retain:%uD", parent->retain);
@@ -255,7 +272,7 @@ static void
nxt_buf_ts_completion(nxt_task_t *task, void *obj, void *data)
{
nxt_mp_t *mp;
- nxt_buf_t *b, *parent;
+ nxt_buf_t *b, *next, *parent;
b = obj;
parent = data;
@@ -268,21 +285,18 @@ nxt_buf_ts_completion(nxt_task_t *task, void *obj, void *data)
nxt_assert(data == b->parent);
- mp = b->data;
- nxt_mp_free(mp, b);
- nxt_mp_release(mp);
+ do {
+ next = b->next;
+ parent = b->parent;
+ mp = b->data;
- if (parent != NULL) {
- nxt_debug(task, "parent retain:%uD", parent->retain);
+ nxt_mp_free(mp, b);
+ nxt_mp_release(mp);
- parent->retain--;
+ nxt_buf_parent_completion(task, parent);
- if (parent->retain == 0) {
- parent->mem.pos = parent->mem.free;
-
- parent->completion_handler(task, parent, parent->parent);
- }
- }
+ b = next;
+ } while (b != NULL);
}
diff --git a/src/nxt_buf.h b/src/nxt_buf.h
index 9c22d650..25e8499a 100644
--- a/src/nxt_buf.h
+++ b/src/nxt_buf.h
@@ -77,17 +77,17 @@ struct nxt_buf_s {
uint32_t retain;
- uint8_t is_file; /* 1 bit */
-
- uint16_t is_mmap:1;
- uint16_t is_port_mmap:1;
-
- uint16_t is_sync:1;
- uint16_t is_nobuf:1;
- uint16_t is_flush:1;
- uint16_t is_last:1;
- uint16_t is_port_mmap_sent:1;
- uint16_t is_ts:1;
+ uint8_t cache_hint;
+
+ uint8_t is_file:1;
+ uint8_t is_mmap:1;
+ uint8_t is_port_mmap:1;
+ uint8_t is_sync:1;
+ uint8_t is_nobuf:1;
+ uint8_t is_flush:1;
+ uint8_t is_last:1;
+ uint8_t is_port_mmap_sent:1;
+ uint8_t is_ts:1;
nxt_buf_mem_t mem;
@@ -250,6 +250,7 @@ NXT_EXPORT nxt_buf_t *nxt_buf_sync_alloc(nxt_mp_t *mp, nxt_uint_t flags);
NXT_EXPORT nxt_int_t nxt_buf_ts_handle(nxt_task_t *task, void *obj, void *data);
+NXT_EXPORT void nxt_buf_parent_completion(nxt_task_t *task, nxt_buf_t *parent);
NXT_EXPORT nxt_buf_t *nxt_buf_make_plain(nxt_mp_t *mp, nxt_buf_t *src,
size_t size);
diff --git a/src/nxt_conf.c b/src/nxt_conf.c
index 59eddd77..43820d2a 100644
--- a/src/nxt_conf.c
+++ b/src/nxt_conf.c
@@ -228,6 +228,13 @@ nxt_conf_get_integer(nxt_conf_value_t *value)
}
+uint8_t
+nxt_conf_get_boolean(nxt_conf_value_t *value)
+{
+ return value->u.boolean;
+}
+
+
nxt_uint_t
nxt_conf_object_members_count(nxt_conf_value_t *value)
{
diff --git a/src/nxt_conf.h b/src/nxt_conf.h
index 725a6c95..66201fee 100644
--- a/src/nxt_conf.h
+++ b/src/nxt_conf.h
@@ -115,6 +115,7 @@ NXT_EXPORT void nxt_conf_set_string(nxt_conf_value_t *value, nxt_str_t *str);
NXT_EXPORT nxt_int_t nxt_conf_set_string_dup(nxt_conf_value_t *value,
nxt_mp_t *mp, nxt_str_t *str);
NXT_EXPORT int64_t nxt_conf_get_integer(nxt_conf_value_t *value);
+NXT_EXPORT uint8_t nxt_conf_get_boolean(nxt_conf_value_t *value);
// FIXME reimplement and reorder functions below
nxt_uint_t nxt_conf_object_members_count(nxt_conf_value_t *value);
diff --git a/src/nxt_conf_validation.c b/src/nxt_conf_validation.c
index c934b10b..105af675 100644
--- a/src/nxt_conf_validation.c
+++ b/src/nxt_conf_validation.c
@@ -58,8 +58,12 @@ static nxt_int_t nxt_conf_vldt_listener(nxt_conf_validation_t *vldt,
static nxt_int_t nxt_conf_vldt_certificate(nxt_conf_validation_t *vldt,
nxt_conf_value_t *value, void *data);
#endif
+static nxt_int_t nxt_conf_vldt_action(nxt_conf_validation_t *vldt,
+ nxt_conf_value_t *value, void *data);
static nxt_int_t nxt_conf_vldt_pass(nxt_conf_validation_t *vldt,
nxt_conf_value_t *value, void *data);
+static nxt_int_t nxt_conf_vldt_proxy(nxt_conf_validation_t *vldt,
+ nxt_conf_value_t *value, void *data);
static nxt_int_t nxt_conf_vldt_routes(nxt_conf_validation_t *vldt,
nxt_conf_value_t *value, void *data);
static nxt_int_t nxt_conf_vldt_routes_member(nxt_conf_validation_t *vldt,
@@ -101,11 +105,9 @@ static nxt_int_t nxt_conf_vldt_java_classpath(nxt_conf_validation_t *vldt,
static nxt_int_t nxt_conf_vldt_java_option(nxt_conf_validation_t *vldt,
nxt_conf_value_t *value);
-static nxt_int_t
-nxt_conf_vldt_isolation(nxt_conf_validation_t *vldt, nxt_conf_value_t *value,
- void *data);
-static nxt_int_t
-nxt_conf_vldt_clone_namespaces(nxt_conf_validation_t *vldt,
+static nxt_int_t nxt_conf_vldt_isolation(nxt_conf_validation_t *vldt,
+ nxt_conf_value_t *value, void *data);
+static nxt_int_t nxt_conf_vldt_clone_namespaces(nxt_conf_validation_t *vldt,
nxt_conf_value_t *value, void *data);
#if (NXT_HAVE_CLONE_NEWUSER)
@@ -316,6 +318,11 @@ static nxt_conf_vldt_object_t nxt_conf_vldt_action_members[] = {
NULL,
NULL },
+ { nxt_string("proxy"),
+ NXT_CONF_VLDT_STRING,
+ &nxt_conf_vldt_proxy,
+ NULL },
+
NXT_CONF_VLDT_END
};
@@ -328,8 +335,8 @@ static nxt_conf_vldt_object_t nxt_conf_vldt_route_members[] = {
{ nxt_string("action"),
NXT_CONF_VLDT_OBJECT,
- &nxt_conf_vldt_object,
- (void *) &nxt_conf_vldt_action_members },
+ &nxt_conf_vldt_action,
+ NULL },
NXT_CONF_VLDT_END
};
@@ -618,7 +625,7 @@ static nxt_conf_vldt_object_t nxt_conf_vldt_java_members[] = {
{ nxt_string("classpath"),
NXT_CONF_VLDT_ARRAY,
&nxt_conf_vldt_array_iterator,
- (void *) &nxt_conf_vldt_java_classpath},
+ (void *) &nxt_conf_vldt_java_classpath },
{ nxt_string("webapp"),
NXT_CONF_VLDT_STRING,
@@ -628,7 +635,7 @@ static nxt_conf_vldt_object_t nxt_conf_vldt_java_members[] = {
{ nxt_string("options"),
NXT_CONF_VLDT_ARRAY,
&nxt_conf_vldt_array_iterator,
- (void *) &nxt_conf_vldt_java_option},
+ (void *) &nxt_conf_vldt_java_option },
{ nxt_string("unit_jars"),
NXT_CONF_VLDT_STRING,
@@ -881,6 +888,37 @@ nxt_conf_vldt_listener(nxt_conf_validation_t *vldt, nxt_str_t *name,
static nxt_int_t
+nxt_conf_vldt_action(nxt_conf_validation_t *vldt, nxt_conf_value_t *value,
+ void *data)
+{
+ nxt_int_t ret;
+ nxt_conf_value_t *pass_value, *share_value, *proxy_value;
+
+ static nxt_str_t pass_str = nxt_string("pass");
+ static nxt_str_t share_str = nxt_string("share");
+ static nxt_str_t proxy_str = nxt_string("proxy");
+
+ ret = nxt_conf_vldt_object(vldt, value, nxt_conf_vldt_action_members);
+
+ if (ret != NXT_OK) {
+ return ret;
+ }
+
+ pass_value = nxt_conf_get_object_member(value, &pass_str, NULL);
+ share_value = nxt_conf_get_object_member(value, &share_str, NULL);
+ proxy_value = nxt_conf_get_object_member(value, &proxy_str, NULL);
+
+ if (pass_value == NULL && share_value == NULL && proxy_value == NULL) {
+ return nxt_conf_vldt_error(vldt, "The \"action\" object must have "
+ "either \"pass\" or \"share\" or "
+ "\"proxy\" option set.");
+ }
+
+ return NXT_OK;
+}
+
+
+static nxt_int_t
nxt_conf_vldt_pass(nxt_conf_validation_t *vldt, nxt_conf_value_t *value,
void *data)
{
@@ -964,6 +1002,30 @@ error:
static nxt_int_t
+nxt_conf_vldt_proxy(nxt_conf_validation_t *vldt, nxt_conf_value_t *value,
+ void *data)
+{
+ nxt_str_t name;
+ nxt_sockaddr_t *sa;
+
+ nxt_conf_get_string(value, &name);
+
+ if (nxt_str_start(&name, "http://", 7)) {
+ name.length -= 7;
+ name.start += 7;
+
+ sa = nxt_sockaddr_parse(vldt->pool, &name);
+ if (sa != NULL) {
+ return NXT_OK;
+ }
+ }
+
+ return nxt_conf_vldt_error(vldt, "The \"proxy\" address is invalid \"%V\"",
+ &name);
+}
+
+
+static nxt_int_t
nxt_conf_vldt_routes(nxt_conf_validation_t *vldt, nxt_conf_value_t *value,
void *data)
{
@@ -1525,8 +1587,8 @@ nxt_conf_vldt_environment(nxt_conf_validation_t *vldt, nxt_str_t *name,
static nxt_int_t
-nxt_conf_vldt_clone_namespaces(nxt_conf_validation_t *vldt, nxt_conf_value_t *value,
- void *data)
+nxt_conf_vldt_clone_namespaces(nxt_conf_validation_t *vldt,
+ nxt_conf_value_t *value, void *data)
{
return nxt_conf_vldt_object(vldt, value, data);
}
@@ -1691,7 +1753,8 @@ nxt_conf_vldt_php_option(nxt_conf_validation_t *vldt, nxt_str_t *name,
static nxt_int_t
-nxt_conf_vldt_java_classpath(nxt_conf_validation_t *vldt, nxt_conf_value_t *value)
+nxt_conf_vldt_java_classpath(nxt_conf_validation_t *vldt,
+ nxt_conf_value_t *value)
{
nxt_str_t str;
diff --git a/src/nxt_conn.h b/src/nxt_conn.h
index 7284808b..2c1d49a0 100644
--- a/src/nxt_conn.h
+++ b/src/nxt_conn.h
@@ -8,7 +8,7 @@
#define _NXT_CONN_H_INCLUDED_
-typedef ssize_t (*nxt_conn_io_read_t)(nxt_conn_t *c);
+typedef ssize_t (*nxt_conn_io_read_t)(nxt_task_t *task, nxt_conn_t *c);
typedef nxt_msec_t (*nxt_conn_timer_value_t)(nxt_conn_t *c, uintptr_t data);
diff --git a/src/nxt_conn_connect.c b/src/nxt_conn_connect.c
index 12b6c80c..d045853f 100644
--- a/src/nxt_conn_connect.c
+++ b/src/nxt_conn_connect.c
@@ -7,6 +7,9 @@
#include <nxt_main.h>
+static nxt_err_t nxt_conn_connect_test_error(nxt_task_t *task, nxt_conn_t *c);
+
+
void
nxt_conn_sys_socket(nxt_task_t *task, void *obj, void *data)
{
@@ -49,7 +52,7 @@ nxt_conn_io_connect(nxt_task_t *task, void *obj, void *data)
case NXT_AGAIN:
c->socket.write_handler = nxt_conn_connect_test;
- c->socket.error_handler = state->error_handler;
+ c->socket.error_handler = nxt_conn_connect_error;
engine = task->thread->engine;
@@ -118,8 +121,7 @@ nxt_conn_socket(nxt_task_t *task, nxt_conn_t *c)
void
nxt_conn_connect_test(nxt_task_t *task, void *obj, void *data)
{
- int ret, err;
- socklen_t len;
+ nxt_err_t err;
nxt_conn_t *c;
c = obj;
@@ -132,48 +134,35 @@ nxt_conn_connect_test(nxt_task_t *task, void *obj, void *data)
nxt_timer_disable(task->thread->engine, &c->write_timer);
}
- err = 0;
- len = sizeof(int);
-
- /*
- * Linux and BSDs return 0 and store a pending error in the err argument;
- * Solaris returns -1 and sets the errno.
- */
-
- ret = getsockopt(c->socket.fd, SOL_SOCKET, SO_ERROR, (void *) &err, &len);
-
- if (nxt_slow_path(ret == -1)) {
- err = nxt_errno;
- }
+ err = nxt_conn_connect_test_error(task, c);
if (err == 0) {
nxt_work_queue_add(c->write_work_queue, c->write_state->ready_handler,
task, c, data);
- return;
+ } else {
+ nxt_conn_connect_error(task, c, data);
}
-
- c->socket.error = err;
-
- nxt_log(task, nxt_socket_error_level(err), "connect(%d, %*s) failed %E",
- c->socket.fd, (size_t) c->remote->length,
- nxt_sockaddr_start(c->remote), err);
-
- nxt_conn_connect_error(task, c, data);
}
void
nxt_conn_connect_error(nxt_task_t *task, void *obj, void *data)
{
+ nxt_err_t err;
nxt_conn_t *c;
nxt_work_handler_t handler;
const nxt_conn_state_t *state;
c = obj;
+ err = c->socket.error;
+
+ if (err == 0) {
+ err = nxt_conn_connect_test_error(task, c);
+ }
state = c->write_state;
- switch (c->socket.error) {
+ switch (err) {
case NXT_ECONNREFUSED:
#if (NXT_LINUX)
@@ -193,3 +182,22 @@ nxt_conn_connect_error(nxt_task_t *task, void *obj, void *data)
nxt_work_queue_add(c->write_work_queue, handler, task, c, data);
}
+
+
+static nxt_err_t
+nxt_conn_connect_test_error(nxt_task_t *task, nxt_conn_t *c)
+{
+ nxt_err_t err;
+
+ err = nxt_socket_error(c->socket.fd);
+
+ if (err != 0) {
+ c->socket.error = err;
+
+ nxt_log(task, nxt_socket_error_level(err), "connect(%d, %*s) failed %E",
+ c->socket.fd, (size_t) c->remote->length,
+ nxt_sockaddr_start(c->remote), err);
+ }
+
+ return err;
+}
diff --git a/src/nxt_conn_read.c b/src/nxt_conn_read.c
index 83969b31..3285abcd 100644
--- a/src/nxt_conn_read.c
+++ b/src/nxt_conn_read.c
@@ -69,7 +69,7 @@ nxt_conn_io_read(nxt_task_t *task, void *obj, void *data)
n = c->io->recvbuf(c, c->read);
} else {
- n = state->io_read_handler(c);
+ n = state->io_read_handler(task, c);
/* The state can be changed by io_read_handler. */
state = c->read_state;
}
diff --git a/src/nxt_epoll_engine.c b/src/nxt_epoll_engine.c
index 9cdaab9b..a944834e 100644
--- a/src/nxt_epoll_engine.c
+++ b/src/nxt_epoll_engine.c
@@ -944,12 +944,12 @@ nxt_epoll_poll(nxt_event_engine_t *engine, nxt_msec_t timeout)
nxt_work_queue_add(ev->read_work_queue, ev->read_handler,
ev->task, ev, ev->data);
+ error = 0;
+
} else if (engine->u.epoll.mode == 0) {
/* Level-triggered mode. */
nxt_epoll_disable_read(engine, ev);
}
-
- error = 0;
}
if ((events & EPOLLOUT) != 0) {
@@ -964,12 +964,12 @@ nxt_epoll_poll(nxt_event_engine_t *engine, nxt_msec_t timeout)
nxt_work_queue_add(ev->write_work_queue, ev->write_handler,
ev->task, ev, ev->data);
+ error = 0;
+
} else if (engine->u.epoll.mode == 0) {
/* Level-triggered mode. */
nxt_epoll_disable_write(engine, ev);
}
-
- error = 0;
}
if (!error) {
diff --git a/src/nxt_event_engine.c b/src/nxt_event_engine.c
index 31a35f6d..6f051067 100644
--- a/src/nxt_event_engine.c
+++ b/src/nxt_event_engine.c
@@ -556,19 +556,19 @@ nxt_event_engine_start(nxt_event_engine_t *engine)
void *
-nxt_event_engine_mem_alloc(nxt_event_engine_t *engine, uint8_t *slot,
+nxt_event_engine_mem_alloc(nxt_event_engine_t *engine, uint8_t *hint,
size_t size)
{
- uint8_t n;
+ uint32_t n;
nxt_uint_t items;
nxt_array_t *mem_cache;
nxt_mem_cache_t *cache;
nxt_mem_cache_block_t *block;
mem_cache = engine->mem_cache;
- n = *slot;
+ n = *hint;
- if (n == (uint8_t) -1) {
+ if (n == NXT_EVENT_ENGINE_NO_MEM_HINT) {
if (mem_cache == NULL) {
/* IPv4 nxt_sockaddr_t and HTTP/1 and HTTP/2 buffers. */
@@ -607,7 +607,9 @@ nxt_event_engine_mem_alloc(nxt_event_engine_t *engine, uint8_t *slot,
found:
- *slot = n;
+ if (n < NXT_EVENT_ENGINE_NO_MEM_HINT) {
+ *hint = (uint8_t) n;
+ }
}
cache = mem_cache->elts;
@@ -626,15 +628,39 @@ nxt_event_engine_mem_alloc(nxt_event_engine_t *engine, uint8_t *slot,
void
-nxt_event_engine_mem_free(nxt_event_engine_t *engine, uint8_t *slot, void *p)
+nxt_event_engine_mem_free(nxt_event_engine_t *engine, uint8_t hint, void *p,
+ size_t size)
{
+ uint32_t n;
+ nxt_array_t *mem_cache;
nxt_mem_cache_t *cache;
nxt_mem_cache_block_t *block;
block = p;
+ mem_cache = engine->mem_cache;
+ cache = mem_cache->elts;
+
+ n = hint;
+
+ if (nxt_slow_path(n == NXT_EVENT_ENGINE_NO_MEM_HINT)) {
- cache = engine->mem_cache->elts;
- cache = cache + *slot;
+ if (size != 0) {
+ for (n = 0; n < mem_cache->nelts; n++) {
+ if (cache[n].size == size) {
+ goto found;
+ }
+ }
+
+ nxt_alert(&engine->task,
+ "event engine mem free(%p, %z) not found", p, size);
+ }
+
+ goto done;
+ }
+
+found:
+
+ cache = cache + n;
if (cache->count < 16) {
cache->count++;
@@ -644,10 +670,79 @@ nxt_event_engine_mem_free(nxt_event_engine_t *engine, uint8_t *slot, void *p)
return;
}
+done:
+
nxt_mp_free(engine->mem_pool, p);
}
+void *
+nxt_event_engine_buf_mem_alloc(nxt_event_engine_t *engine, size_t size)
+{
+ nxt_buf_t *b;
+ uint8_t hint;
+
+ hint = NXT_EVENT_ENGINE_NO_MEM_HINT;
+
+ b = nxt_event_engine_mem_alloc(engine, &hint, NXT_BUF_MEM_SIZE + size);
+ if (nxt_slow_path(b == NULL)) {
+ return NULL;
+ }
+
+ nxt_memzero(b, NXT_BUF_MEM_SIZE);
+
+ b->cache_hint = hint;
+ b->data = engine;
+ b->completion_handler = nxt_event_engine_buf_mem_completion;
+
+ if (size != 0) {
+ b->mem.start = nxt_pointer_to(b, NXT_BUF_MEM_SIZE);
+ b->mem.pos = b->mem.start;
+ b->mem.free = b->mem.start;
+ b->mem.end = b->mem.start + size;
+ }
+
+ return b;
+}
+
+
+void
+nxt_event_engine_buf_mem_free(nxt_event_engine_t *engine, nxt_buf_t *b)
+{
+ size_t size;
+
+ size = NXT_BUF_MEM_SIZE + nxt_buf_mem_size(&b->mem);
+
+ nxt_event_engine_mem_free(engine, b->cache_hint, b, size);
+}
+
+
+void
+nxt_event_engine_buf_mem_completion(nxt_task_t *task, void *obj, void *data)
+{
+ nxt_event_engine_t *engine;
+ nxt_buf_t *b, *next, *parent;
+
+ b = obj;
+ parent = data;
+
+ nxt_debug(task, "buf completion: %p %p", b, b->mem.start);
+
+ engine = b->data;
+
+ do {
+ next = b->next;
+ parent = b->parent;
+
+ nxt_event_engine_buf_mem_free(engine, b);
+
+ nxt_buf_parent_completion(task, parent);
+
+ b = next;
+ } while (b != NULL);
+}
+
+
#if (NXT_DEBUG)
void nxt_event_engine_thread_adopt(nxt_event_engine_t *engine)
diff --git a/src/nxt_event_engine.h b/src/nxt_event_engine.h
index 365d9e89..6b05d510 100644
--- a/src/nxt_event_engine.h
+++ b/src/nxt_event_engine.h
@@ -514,10 +514,16 @@ NXT_EXPORT void nxt_event_engine_post(nxt_event_engine_t *engine,
NXT_EXPORT void nxt_event_engine_signal(nxt_event_engine_t *engine,
nxt_uint_t signo);
-void *nxt_event_engine_mem_alloc(nxt_event_engine_t *engine, uint8_t *slot,
+#define NXT_EVENT_ENGINE_NO_MEM_HINT 255
+
+void *nxt_event_engine_mem_alloc(nxt_event_engine_t *engine, uint8_t *hint,
size_t size);
-void nxt_event_engine_mem_free(nxt_event_engine_t *engine, uint8_t *slot,
- void *p);
+void nxt_event_engine_mem_free(nxt_event_engine_t *engine, uint8_t hint,
+ void *p, size_t size);
+void *nxt_event_engine_buf_mem_alloc(nxt_event_engine_t *engine, size_t size);
+void nxt_event_engine_buf_mem_free(nxt_event_engine_t *engine, nxt_buf_t *b);
+void nxt_event_engine_buf_mem_completion(nxt_task_t *task, void *obj,
+ void *data);
nxt_inline nxt_event_engine_t *
diff --git a/src/nxt_h1proto.c b/src/nxt_h1proto.c
index 541fcb44..b07eaf84 100644
--- a/src/nxt_h1proto.c
+++ b/src/nxt_h1proto.c
@@ -18,10 +18,10 @@
*/
#if (NXT_TLS)
-static ssize_t nxt_http_idle_io_read_handler(nxt_conn_t *c);
+static ssize_t nxt_http_idle_io_read_handler(nxt_task_t *task, nxt_conn_t *c);
static void nxt_http_conn_test(nxt_task_t *task, void *obj, void *data);
#endif
-static ssize_t nxt_h1p_idle_io_read_handler(nxt_conn_t *c);
+static ssize_t nxt_h1p_idle_io_read_handler(nxt_task_t *task, nxt_conn_t *c);
static void nxt_h1p_conn_proto_init(nxt_task_t *task, void *obj, void *data);
static void nxt_h1p_conn_request_init(nxt_task_t *task, void *obj, void *data);
static void nxt_h1p_conn_request_header_parse(nxt_task_t *task, void *obj,
@@ -45,7 +45,7 @@ static void nxt_h1p_conn_request_body_read(nxt_task_t *task, void *obj,
void *data);
static void nxt_h1p_request_local_addr(nxt_task_t *task, nxt_http_request_t *r);
static void nxt_h1p_request_header_send(nxt_task_t *task,
- nxt_http_request_t *r, nxt_work_handler_t body_handler);
+ nxt_http_request_t *r, nxt_work_handler_t body_handler, void *data);
static void nxt_h1p_request_send(nxt_task_t *task, nxt_http_request_t *r,
nxt_buf_t *out);
static nxt_buf_t *nxt_h1p_chunk_create(nxt_task_t *task, nxt_http_request_t *r,
@@ -78,11 +78,32 @@ static void nxt_h1p_idle_response_timeout(nxt_task_t *task, void *obj,
static nxt_msec_t nxt_h1p_idle_response_timer_value(nxt_conn_t *c,
uintptr_t data);
static void nxt_h1p_shutdown(nxt_task_t *task, nxt_conn_t *c);
-static void nxt_h1p_shutdown_(nxt_task_t *task, nxt_conn_t *c);
+static void nxt_h1p_closing(nxt_task_t *task, nxt_conn_t *c);
static void nxt_h1p_conn_ws_shutdown(nxt_task_t *task, void *obj, void *data);
static void nxt_h1p_conn_closing(nxt_task_t *task, void *obj, void *data);
static void nxt_h1p_conn_free(nxt_task_t *task, void *obj, void *data);
+static void nxt_h1p_peer_connect(nxt_task_t *task, nxt_http_peer_t *peer);
+static void nxt_h1p_peer_connected(nxt_task_t *task, void *obj, void *data);
+static void nxt_h1p_peer_refused(nxt_task_t *task, void *obj, void *data);
+static void nxt_h1p_peer_header_send(nxt_task_t *task, nxt_http_peer_t *peer);
+static void nxt_h1p_peer_header_sent(nxt_task_t *task, void *obj, void *data);
+static void nxt_h1p_peer_header_read(nxt_task_t *task, nxt_http_peer_t *peer);
+static ssize_t nxt_h1p_peer_io_read_handler(nxt_task_t *task, nxt_conn_t *c);
+static void nxt_h1p_peer_header_read_done(nxt_task_t *task, void *obj,
+ void *data);
+static nxt_int_t nxt_h1p_peer_header_parse(nxt_http_peer_t *peer,
+ nxt_buf_mem_t *bm);
+static void nxt_h1p_peer_read(nxt_task_t *task, nxt_http_peer_t *peer);
+static void nxt_h1p_peer_read_done(nxt_task_t *task, void *obj, void *data);
+static void nxt_h1p_peer_closed(nxt_task_t *task, void *obj, void *data);
+static void nxt_h1p_peer_error(nxt_task_t *task, void *obj, void *data);
+static void nxt_h1p_peer_send_timeout(nxt_task_t *task, void *obj, void *data);
+static void nxt_h1p_peer_read_timeout(nxt_task_t *task, void *obj, void *data);
+static nxt_msec_t nxt_h1p_peer_timer_value(nxt_conn_t *c, uintptr_t data);
+static void nxt_h1p_peer_close(nxt_task_t *task, nxt_http_peer_t *peer);
+static void nxt_h1p_peer_free(nxt_task_t *task, void *obj, void *data);
+
#if (NXT_TLS)
static const nxt_conn_state_t nxt_http_idle_state;
static const nxt_conn_state_t nxt_h1p_shutdown_state;
@@ -94,6 +115,13 @@ static const nxt_conn_state_t nxt_h1p_request_send_state;
static const nxt_conn_state_t nxt_h1p_timeout_response_state;
static const nxt_conn_state_t nxt_h1p_keepalive_state;
static const nxt_conn_state_t nxt_h1p_close_state;
+static const nxt_conn_state_t nxt_h1p_peer_connect_state;
+static const nxt_conn_state_t nxt_h1p_peer_header_send_state;
+static const nxt_conn_state_t nxt_h1p_peer_header_body_send_state;
+static const nxt_conn_state_t nxt_h1p_peer_header_read_state;
+static const nxt_conn_state_t nxt_h1p_peer_header_read_timer_state;
+static const nxt_conn_state_t nxt_h1p_peer_read_state;
+static const nxt_conn_state_t nxt_h1p_peer_close_state;
const nxt_http_proto_table_t nxt_http_proto[3] = {
@@ -106,6 +134,13 @@ const nxt_http_proto_table_t nxt_http_proto[3] = {
.body_bytes_sent = nxt_h1p_request_body_bytes_sent,
.discard = nxt_h1p_request_discard,
.close = nxt_h1p_request_close,
+
+ .peer_connect = nxt_h1p_peer_connect,
+ .peer_header_send = nxt_h1p_peer_header_send,
+ .peer_header_read = nxt_h1p_peer_header_read,
+ .peer_read = nxt_h1p_peer_read,
+ .peer_close = nxt_h1p_peer_close,
+
.ws_frame_start = nxt_h1p_websocket_frame_start,
},
/* NXT_HTTP_PROTO_H2 */
@@ -113,9 +148,9 @@ const nxt_http_proto_table_t nxt_http_proto[3] = {
};
-static nxt_lvlhsh_t nxt_h1p_fields_hash;
+static nxt_lvlhsh_t nxt_h1p_fields_hash;
-static nxt_http_field_proc_t nxt_h1p_fields[] = {
+static nxt_http_field_proc_t nxt_h1p_fields[] = {
{ nxt_string("Connection"), &nxt_h1p_connection, 0 },
{ nxt_string("Upgrade"), &nxt_h1p_upgrade, 0 },
{ nxt_string("Sec-WebSocket-Key"), &nxt_h1p_websocket_key, 0 },
@@ -136,11 +171,32 @@ static nxt_http_field_proc_t nxt_h1p_fields[] = {
};
+static nxt_lvlhsh_t nxt_h1p_peer_fields_hash;
+
+static nxt_http_field_proc_t nxt_h1p_peer_fields[] = {
+ { nxt_string("Connection"), &nxt_http_proxy_skip, 0 },
+ { nxt_string("Transfer-Encoding"), &nxt_http_proxy_skip, 0 },
+ { nxt_string("Server"), &nxt_http_proxy_skip, 0 },
+ { nxt_string("Date"), &nxt_http_proxy_date, 0 },
+ { nxt_string("Content-Length"), &nxt_http_proxy_content_length, 0 },
+};
+
+
nxt_int_t
nxt_h1p_init(nxt_task_t *task, nxt_runtime_t *rt)
{
- return nxt_http_fields_hash(&nxt_h1p_fields_hash, rt->mem_pool,
- nxt_h1p_fields, nxt_nitems(nxt_h1p_fields));
+ nxt_int_t ret;
+
+ ret = nxt_http_fields_hash(&nxt_h1p_fields_hash, rt->mem_pool,
+ nxt_h1p_fields, nxt_nitems(nxt_h1p_fields));
+
+ if (nxt_fast_path(ret == NXT_OK)) {
+ ret = nxt_http_fields_hash(&nxt_h1p_peer_fields_hash,
+ rt->mem_pool, nxt_h1p_peer_fields,
+ nxt_nitems(nxt_h1p_peer_fields));
+ }
+
+ return ret;
}
@@ -196,7 +252,7 @@ static const nxt_conn_state_t nxt_http_idle_state
static ssize_t
-nxt_http_idle_io_read_handler(nxt_conn_t *c)
+nxt_http_idle_io_read_handler(nxt_task_t *task, nxt_conn_t *c)
{
size_t size;
ssize_t n;
@@ -216,7 +272,7 @@ nxt_http_idle_io_read_handler(nxt_conn_t *c)
size = joint->socket_conf->header_buffer_size;
- b = nxt_buf_mem_alloc(c->mem_pool, size, 0);
+ b = nxt_event_engine_buf_mem_alloc(task->thread->engine, size);
if (nxt_slow_path(b == NULL)) {
c->socket.error = NXT_ENOMEM;
return NXT_ERROR;
@@ -234,7 +290,7 @@ nxt_http_idle_io_read_handler(nxt_conn_t *c)
} else {
c->read = NULL;
- nxt_mp_free(c->mem_pool, b);
+ nxt_event_engine_buf_mem_free(task->thread->engine, b);
}
return n;
@@ -248,12 +304,14 @@ nxt_http_conn_test(nxt_task_t *task, void *obj, void *data)
nxt_buf_t *b;
nxt_conn_t *c;
nxt_tls_conf_t *tls;
+ nxt_event_engine_t *engine;
nxt_socket_conf_joint_t *joint;
c = obj;
nxt_debug(task, "h1p conn https test");
+ engine = task->thread->engine;
b = c->read;
p = b->mem.pos;
@@ -262,7 +320,7 @@ nxt_http_conn_test(nxt_task_t *task, void *obj, void *data)
if (p[0] != 0x16) {
b->mem.free = b->mem.pos;
- nxt_conn_read(task->thread->engine, c);
+ nxt_conn_read(engine, c);
return;
}
@@ -292,7 +350,7 @@ nxt_http_conn_test(nxt_task_t *task, void *obj, void *data)
#endif
c->read = NULL;
- nxt_mp_free(c->mem_pool, b);
+ nxt_event_engine_buf_mem_free(engine, b);
joint = c->listen->socket.data;
@@ -301,7 +359,7 @@ nxt_http_conn_test(nxt_task_t *task, void *obj, void *data)
* Listening socket had been closed while
* connection was in keep-alive state.
*/
- nxt_h1p_shutdown(task, c);
+ nxt_h1p_closing(task, c);
return;
}
@@ -330,7 +388,7 @@ static const nxt_conn_state_t nxt_h1p_idle_state
static ssize_t
-nxt_h1p_idle_io_read_handler(nxt_conn_t *c)
+nxt_h1p_idle_io_read_handler(nxt_task_t *task, nxt_conn_t *c)
{
size_t size;
ssize_t n;
@@ -353,7 +411,7 @@ nxt_h1p_idle_io_read_handler(nxt_conn_t *c)
if (b == NULL) {
size = joint->socket_conf->header_buffer_size;
- b = nxt_buf_mem_alloc(c->mem_pool, size, 0);
+ b = nxt_event_engine_buf_mem_alloc(task->thread->engine, size);
if (nxt_slow_path(b == NULL)) {
c->socket.error = NXT_ENOMEM;
return NXT_ERROR;
@@ -367,7 +425,7 @@ nxt_h1p_idle_io_read_handler(nxt_conn_t *c)
} else {
c->read = NULL;
- nxt_mp_free(c->mem_pool, b);
+ nxt_event_engine_buf_mem_free(task->thread->engine, b);
}
return n;
@@ -386,7 +444,7 @@ nxt_h1p_conn_proto_init(nxt_task_t *task, void *obj, void *data)
h1p = nxt_mp_zget(c->mem_pool, sizeof(nxt_h1proto_t));
if (nxt_slow_path(h1p == NULL)) {
- nxt_h1p_shutdown(task, c);
+ nxt_h1p_closing(task, c);
return;
}
@@ -424,6 +482,12 @@ nxt_h1p_conn_request_init(nxt_task_t *task, void *obj, void *data)
r->tls = c->u.tls;
#endif
+ r->task = c->task;
+ task = &r->task;
+ c->socket.task = task;
+ c->read_timer.task = task;
+ c->write_timer.task = task;
+
ret = nxt_http_parse_request_init(&h1p->parser, r->mem_pool);
if (nxt_fast_path(ret == NXT_OK)) {
@@ -444,7 +508,7 @@ nxt_h1p_conn_request_init(nxt_task_t *task, void *obj, void *data)
nxt_mp_release(r->mem_pool);
}
- nxt_h1p_shutdown(task, c);
+ nxt_h1p_closing(task, c);
}
@@ -599,13 +663,15 @@ nxt_h1p_header_process(nxt_task_t *task, nxt_h1proto_t *h1p,
}
if (nxt_slow_path(h1p->websocket_key == NULL)) {
- nxt_log(task, NXT_LOG_INFO, "h1p upgrade: bad or absent websocket key");
+ nxt_log(task, NXT_LOG_INFO,
+ "h1p upgrade: bad or absent websocket key");
return NXT_HTTP_BAD_REQUEST;
}
if (nxt_slow_path(h1p->websocket_version_ok == 0)) {
- nxt_log(task, NXT_LOG_INFO, "h1p upgrade: bad or absent websocket version");
+ nxt_log(task, NXT_LOG_INFO,
+ "h1p upgrade: bad or absent websocket version");
return NXT_HTTP_UPGRADE_REQUIRED;
}
@@ -655,16 +721,16 @@ nxt_h1p_header_buffer_test(nxt_task_t *task, nxt_h1proto_t *h1p, nxt_conn_t *c,
static nxt_int_t
nxt_h1p_connection(void *ctx, nxt_http_field_t *field, uintptr_t data)
{
- nxt_http_request_t *r;
- static const u_char *upgrade = (const u_char *) "upgrade";
+ nxt_http_request_t *r;
r = ctx;
+ field->hopbyhop = 1;
if (field->value_length == 5 && nxt_memcmp(field->value, "close", 5) == 0) {
r->proto.h1->keepalive = 0;
} else if (field->value_length == 7
- && nxt_memcasecmp(field->value, upgrade, 7) == 0)
+ && nxt_memcasecmp(field->value, "upgrade", 7) == 0)
{
r->proto.h1->connection_upgrade = 1;
}
@@ -676,13 +742,12 @@ nxt_h1p_connection(void *ctx, nxt_http_field_t *field, uintptr_t data)
static nxt_int_t
nxt_h1p_upgrade(void *ctx, nxt_http_field_t *field, uintptr_t data)
{
- nxt_http_request_t *r;
- static const u_char *websocket = (const u_char *) "websocket";
+ nxt_http_request_t *r;
r = ctx;
if (field->value_length == 9
- && nxt_memcasecmp(field->value, websocket, 9) == 0)
+ && nxt_memcasecmp(field->value, "websocket", 9) == 0)
{
r->proto.h1->upgrade_websocket = 1;
}
@@ -730,6 +795,8 @@ nxt_h1p_transfer_encoding(void *ctx, nxt_http_field_t *field, uintptr_t data)
nxt_http_request_t *r;
r = ctx;
+ field->skip = 1;
+ field->hopbyhop = 1;
if (field->value_length == 7
&& nxt_memcmp(field->value, "chunked", 7) == 0)
@@ -995,7 +1062,7 @@ static const nxt_str_t nxt_http_server_error[] = {
static void
nxt_h1p_request_header_send(nxt_task_t *task, nxt_http_request_t *r,
- nxt_work_handler_t body_handler)
+ nxt_work_handler_t body_handler, void *data)
{
u_char *p;
size_t size;
@@ -1172,7 +1239,7 @@ nxt_h1p_request_header_send(nxt_task_t *task, nxt_http_request_t *r,
* in engine->write_work_queue.
*/
nxt_work_queue_add(&task->thread->engine->fast_work_queue,
- body_handler, task, r, NULL);
+ body_handler, task, r, data);
} else {
header->next = nxt_http_buf_last(r);
@@ -1211,6 +1278,7 @@ nxt_h1p_complete_buffers(nxt_task_t *task, nxt_h1proto_t *h1p)
while (b != NULL) {
next = b->next;
+ b->next = NULL;
nxt_work_queue_add(&task->thread->engine->fast_work_queue,
b->completion_handler, task, b, b->parent);
@@ -1226,7 +1294,8 @@ nxt_h1p_complete_buffers(nxt_task_t *task, nxt_h1proto_t *h1p)
size = nxt_buf_mem_used_size(&in->mem);
if (size == 0) {
- nxt_mp_free(c->mem_pool, in);
+ nxt_work_queue_add(&task->thread->engine->fast_work_queue,
+ in->completion_handler, task, in, in->parent);
c->read = NULL;
}
@@ -1480,11 +1549,16 @@ nxt_h1p_request_close(nxt_task_t *task, nxt_http_proto_t proto,
nxt_debug(task, "h1p request close");
h1p = proto.h1;
+ h1p->keepalive &= !h1p->request->inconsistent;
h1p->request = NULL;
nxt_router_conf_release(task, joint);
c = h1p->conn;
+ task = &c->task;
+ c->socket.task = task;
+ c->read_timer.task = task;
+ c->write_timer.task = task;
if (h1p->keepalive) {
nxt_h1p_keepalive(task, h1p, c);
@@ -1770,14 +1844,31 @@ nxt_h1p_shutdown(nxt_task_t *task, nxt_conn_t *c)
}
} else {
- nxt_h1p_shutdown_(task, c);
+ nxt_h1p_closing(task, c);
}
}
static void
-nxt_h1p_shutdown_(nxt_task_t *task, nxt_conn_t *c)
+nxt_h1p_conn_ws_shutdown(nxt_task_t *task, void *obj, void *data)
{
+ nxt_timer_t *timer;
+ nxt_h1p_websocket_timer_t *ws_timer;
+
+ nxt_debug(task, "h1p conn ws shutdown");
+
+ timer = obj;
+ ws_timer = nxt_timer_data(timer, nxt_h1p_websocket_timer_t, timer);
+
+ nxt_h1p_closing(task, ws_timer->h1p->conn);
+}
+
+
+static void
+nxt_h1p_closing(nxt_task_t *task, nxt_conn_t *c)
+{
+ nxt_debug(task, "h1p closing");
+
c->socket.data = NULL;
#if (NXT_TLS)
@@ -1809,21 +1900,6 @@ static const nxt_conn_state_t nxt_h1p_shutdown_state
static void
-nxt_h1p_conn_ws_shutdown(nxt_task_t *task, void *obj, void *data)
-{
- nxt_timer_t *timer;
- nxt_h1p_websocket_timer_t *ws_timer;
-
- nxt_debug(task, "h1p conn ws shutdown");
-
- timer = obj;
- ws_timer = nxt_timer_data(timer, nxt_h1p_websocket_timer_t, timer);
-
- nxt_h1p_shutdown_(task, ws_timer->h1p->conn);
-}
-
-
-static void
nxt_h1p_conn_closing(nxt_task_t *task, void *obj, void *data)
{
nxt_conn_t *c;
@@ -1868,3 +1944,673 @@ nxt_h1p_conn_free(nxt_task_t *task, void *obj, void *data)
nxt_router_listen_event_release(&engine->task, lev, NULL);
}
+
+
+static void
+nxt_h1p_peer_connect(nxt_task_t *task, nxt_http_peer_t *peer)
+{
+ nxt_mp_t *mp;
+ nxt_int_t ret;
+ nxt_conn_t *c, *client;
+ nxt_h1proto_t *h1p;
+ nxt_fd_event_t *socket;
+ nxt_work_queue_t *wq;
+ nxt_http_request_t *r;
+
+ nxt_debug(task, "h1p peer connect");
+
+ peer->status = NXT_HTTP_UNSET;
+ r = peer->request;
+
+ mp = nxt_mp_create(1024, 128, 256, 32);
+
+ if (nxt_slow_path(mp == NULL)) {
+ goto fail;
+ }
+
+ h1p = nxt_mp_zalloc(mp, sizeof(nxt_h1proto_t));
+ if (nxt_slow_path(h1p == NULL)) {
+ goto fail;
+ }
+
+ ret = nxt_http_parse_request_init(&h1p->parser, r->mem_pool);
+ if (nxt_slow_path(ret != NXT_OK)) {
+ goto fail;
+ }
+
+ c = nxt_conn_create(mp, task);
+ if (nxt_slow_path(c == NULL)) {
+ goto fail;
+ }
+
+ c->mem_pool = mp;
+ h1p->conn = c;
+
+ peer->proto.h1 = h1p;
+ h1p->request = r;
+
+ c->socket.task = task;
+ c->read_timer.task = task;
+ c->write_timer.task = task;
+ c->socket.data = peer;
+ c->remote = peer->sockaddr;
+
+ c->socket.write_ready = 1;
+ c->write_state = &nxt_h1p_peer_connect_state;
+
+ /*
+ * TODO: queues should be implemented via client proto interface.
+ */
+ client = r->proto.h1->conn;
+
+ socket = &client->socket;
+ wq = socket->read_work_queue;
+ c->read_work_queue = wq;
+ c->socket.read_work_queue = wq;
+ c->read_timer.work_queue = wq;
+
+ wq = socket->write_work_queue;
+ c->write_work_queue = wq;
+ c->socket.write_work_queue = wq;
+ c->write_timer.work_queue = wq;
+ /* TODO END */
+
+ nxt_conn_connect(task->thread->engine, c);
+
+ return;
+
+fail:
+
+ peer->status = NXT_HTTP_INTERNAL_SERVER_ERROR;
+
+ r->state->error_handler(task, r, peer);
+}
+
+
+static const nxt_conn_state_t nxt_h1p_peer_connect_state
+ nxt_aligned(64) =
+{
+ .ready_handler = nxt_h1p_peer_connected,
+ .close_handler = nxt_h1p_peer_refused,
+ .error_handler = nxt_h1p_peer_error,
+
+ .timer_handler = nxt_h1p_peer_send_timeout,
+ .timer_value = nxt_h1p_peer_timer_value,
+ .timer_data = offsetof(nxt_socket_conf_t, proxy_timeout),
+};
+
+
+static void
+nxt_h1p_peer_connected(nxt_task_t *task, void *obj, void *data)
+{
+ nxt_http_peer_t *peer;
+ nxt_http_request_t *r;
+
+ peer = data;
+
+ nxt_debug(task, "h1p peer connected");
+
+ r = peer->request;
+ r->state->ready_handler(task, r, peer);
+}
+
+
+static void
+nxt_h1p_peer_refused(nxt_task_t *task, void *obj, void *data)
+{
+ nxt_http_peer_t *peer;
+ nxt_http_request_t *r;
+
+ peer = data;
+
+ nxt_debug(task, "h1p peer refused");
+
+ //peer->status = NXT_HTTP_SERVICE_UNAVAILABLE;
+ peer->status = NXT_HTTP_BAD_GATEWAY;
+
+ r = peer->request;
+ r->state->error_handler(task, r, peer);
+}
+
+
+static void
+nxt_h1p_peer_header_send(nxt_task_t *task, nxt_http_peer_t *peer)
+{
+ u_char *p;
+ size_t size;
+ nxt_buf_t *header, *body;
+ nxt_conn_t *c;
+ nxt_http_field_t *field;
+ nxt_http_request_t *r;
+
+ nxt_debug(task, "h1p peer header send");
+
+ r = peer->request;
+
+ size = r->method->length + sizeof(" ") + r->target.length
+ + sizeof(" HTTP/1.0\r\n")
+ + sizeof("\r\n");
+
+ nxt_list_each(field, r->fields) {
+
+ if (!field->hopbyhop) {
+ size += field->name_length + field->value_length;
+ size += nxt_length(": \r\n");
+ }
+
+ } nxt_list_loop;
+
+ header = nxt_http_buf_mem(task, r, size);
+ if (nxt_slow_path(header == NULL)) {
+ r->state->error_handler(task, r, peer);
+ return;
+ }
+
+ p = header->mem.free;
+
+ p = nxt_cpymem(p, r->method->start, r->method->length);
+ *p++ = ' ';
+ p = nxt_cpymem(p, r->target.start, r->target.length);
+ p = nxt_cpymem(p, " HTTP/1.0\r\n", 11);
+
+ nxt_list_each(field, r->fields) {
+
+ if (!field->hopbyhop) {
+ p = nxt_cpymem(p, field->name, field->name_length);
+ *p++ = ':'; *p++ = ' ';
+ p = nxt_cpymem(p, field->value, field->value_length);
+ *p++ = '\r'; *p++ = '\n';
+ }
+
+ } nxt_list_loop;
+
+ *p++ = '\r'; *p++ = '\n';
+ header->mem.free = p;
+ size = p - header->mem.pos;
+
+ c = peer->proto.h1->conn;
+ c->write = header;
+ c->write_state = &nxt_h1p_peer_header_send_state;
+
+ if (r->body != NULL) {
+ body = nxt_buf_mem_alloc(r->mem_pool, 0, 0);
+ if (nxt_slow_path(body == NULL)) {
+ r->state->error_handler(task, r, peer);
+ return;
+ }
+
+ header->next = body;
+
+ body->mem = r->body->mem;
+ size += nxt_buf_mem_used_size(&body->mem);
+
+// nxt_mp_retain(r->mem_pool);
+ }
+
+ if (size > 16384) {
+ /* Use proxy_send_timeout instead of proxy_timeout. */
+ c->write_state = &nxt_h1p_peer_header_body_send_state;
+ }
+
+ nxt_conn_write(task->thread->engine, c);
+}
+
+
+static const nxt_conn_state_t nxt_h1p_peer_header_send_state
+ nxt_aligned(64) =
+{
+ .ready_handler = nxt_h1p_peer_header_sent,
+ .error_handler = nxt_h1p_peer_error,
+
+ .timer_handler = nxt_h1p_peer_send_timeout,
+ .timer_value = nxt_h1p_peer_timer_value,
+ .timer_data = offsetof(nxt_socket_conf_t, proxy_timeout),
+};
+
+
+static const nxt_conn_state_t nxt_h1p_peer_header_body_send_state
+ nxt_aligned(64) =
+{
+ .ready_handler = nxt_h1p_peer_header_sent,
+ .error_handler = nxt_h1p_peer_error,
+
+ .timer_handler = nxt_h1p_peer_send_timeout,
+ .timer_value = nxt_h1p_peer_timer_value,
+ .timer_data = offsetof(nxt_socket_conf_t, proxy_send_timeout),
+ .timer_autoreset = 1,
+};
+
+
+static void
+nxt_h1p_peer_header_sent(nxt_task_t *task, void *obj, void *data)
+{
+ nxt_conn_t *c;
+ nxt_http_peer_t *peer;
+ nxt_http_request_t *r;
+ nxt_event_engine_t *engine;
+
+ c = obj;
+ peer = data;
+
+ nxt_debug(task, "h1p peer header sent");
+
+ engine = task->thread->engine;
+
+ c->write = nxt_sendbuf_completion(task, &engine->fast_work_queue, c->write);
+
+ if (c->write == NULL) {
+ r = peer->request;
+ r->state->ready_handler(task, r, peer);
+ return;
+ }
+
+ nxt_conn_write(engine, c);
+}
+
+
+static void
+nxt_h1p_peer_header_read(nxt_task_t *task, nxt_http_peer_t *peer)
+{
+ nxt_conn_t *c;
+
+ nxt_debug(task, "h1p peer header read");
+
+ c = peer->proto.h1->conn;
+
+ if (c->write_timer.enabled) {
+ c->read_state = &nxt_h1p_peer_header_read_state;
+
+ } else {
+ c->read_state = &nxt_h1p_peer_header_read_timer_state;
+ }
+
+ nxt_conn_read(task->thread->engine, c);
+}
+
+
+static const nxt_conn_state_t nxt_h1p_peer_header_read_state
+ nxt_aligned(64) =
+{
+ .ready_handler = nxt_h1p_peer_header_read_done,
+ .close_handler = nxt_h1p_peer_closed,
+ .error_handler = nxt_h1p_peer_error,
+
+ .io_read_handler = nxt_h1p_peer_io_read_handler,
+};
+
+
+static const nxt_conn_state_t nxt_h1p_peer_header_read_timer_state
+ nxt_aligned(64) =
+{
+ .ready_handler = nxt_h1p_peer_header_read_done,
+ .close_handler = nxt_h1p_peer_closed,
+ .error_handler = nxt_h1p_peer_error,
+
+ .io_read_handler = nxt_h1p_peer_io_read_handler,
+
+ .timer_handler = nxt_h1p_peer_read_timeout,
+ .timer_value = nxt_h1p_peer_timer_value,
+ .timer_data = offsetof(nxt_socket_conf_t, proxy_timeout),
+};
+
+
+static ssize_t
+nxt_h1p_peer_io_read_handler(nxt_task_t *task, nxt_conn_t *c)
+{
+ size_t size;
+ ssize_t n;
+ nxt_buf_t *b;
+ nxt_http_peer_t *peer;
+ nxt_socket_conf_t *skcf;
+ nxt_http_request_t *r;
+
+ peer = c->socket.data;
+ r = peer->request;
+ b = c->read;
+
+ if (b == NULL) {
+ skcf = r->conf->socket_conf;
+
+ size = (peer->header_received) ? skcf->proxy_buffer_size
+ : skcf->proxy_header_buffer_size;
+
+ nxt_debug(task, "h1p peer io read: %z", size);
+
+ b = nxt_http_proxy_buf_mem_alloc(task, r, size);
+ if (nxt_slow_path(b == NULL)) {
+ c->socket.error = NXT_ENOMEM;
+ return NXT_ERROR;
+ }
+ }
+
+ n = c->io->recvbuf(c, b);
+
+ if (n > 0) {
+ c->read = b;
+
+ } else {
+ c->read = NULL;
+ nxt_http_proxy_buf_mem_free(task, r, b);
+ }
+
+ return n;
+}
+
+
+static void
+nxt_h1p_peer_header_read_done(nxt_task_t *task, void *obj, void *data)
+{
+ nxt_int_t ret;
+ nxt_buf_t *b;
+ nxt_conn_t *c;
+ nxt_http_peer_t *peer;
+ nxt_http_request_t *r;
+ nxt_event_engine_t *engine;
+
+ c = obj;
+ peer = data;
+
+ nxt_debug(task, "h1p peer header read done");
+
+ b = c->read;
+
+ ret = nxt_h1p_peer_header_parse(peer, &b->mem);
+
+ r = peer->request;
+
+ ret = nxt_expect(NXT_DONE, ret);
+
+ if (ret != NXT_AGAIN) {
+ engine = task->thread->engine;
+ nxt_timer_disable(engine, &c->write_timer);
+ nxt_timer_disable(engine, &c->read_timer);
+ }
+
+ switch (ret) {
+
+ case NXT_DONE:
+ peer->fields = peer->proto.h1->parser.fields;
+
+ ret = nxt_http_fields_process(peer->fields,
+ &nxt_h1p_peer_fields_hash, r);
+ if (nxt_slow_path(ret != NXT_OK)) {
+ peer->status = NXT_HTTP_INTERNAL_SERVER_ERROR;
+ break;
+ }
+
+ c->read = NULL;
+
+ if (nxt_buf_mem_used_size(&b->mem) != 0) {
+ peer->body = b;
+ }
+
+ peer->header_received = 1;
+
+ r->state->ready_handler(task, r, peer);
+ return;
+
+ case NXT_AGAIN:
+ if (nxt_buf_mem_free_size(&b->mem) != 0) {
+ nxt_conn_read(task->thread->engine, c);
+ return;
+ }
+
+ /* Fall through. */
+
+ default:
+ case NXT_ERROR:
+ case NXT_HTTP_PARSE_INVALID:
+ case NXT_HTTP_PARSE_UNSUPPORTED_VERSION:
+ case NXT_HTTP_PARSE_TOO_LARGE_FIELD:
+ peer->status = NXT_HTTP_BAD_GATEWAY;
+ break;
+ }
+
+ nxt_http_proxy_buf_mem_free(task, r, b);
+
+ r->state->error_handler(task, r, peer);
+}
+
+
+static nxt_int_t
+nxt_h1p_peer_header_parse(nxt_http_peer_t *peer, nxt_buf_mem_t *bm)
+{
+ u_char *p;
+ size_t length;
+ nxt_int_t status;
+
+ if (peer->status < 0) {
+ length = nxt_buf_mem_used_size(bm);
+
+ if (nxt_slow_path(length < 12)) {
+ return NXT_AGAIN;
+ }
+
+ p = bm->pos;
+
+ if (nxt_slow_path(nxt_memcmp(p, "HTTP/1.", 7) != 0
+ || (p[7] != '0' && p[7] != '1')))
+ {
+ return NXT_ERROR;
+ }
+
+ status = nxt_int_parse(&p[9], 3);
+
+ if (nxt_slow_path(status < 0)) {
+ return NXT_ERROR;
+ }
+
+ p += 12;
+ length -= 12;
+
+ p = nxt_memchr(p, '\n', length);
+
+ if (nxt_slow_path(p == NULL)) {
+ return NXT_AGAIN;
+ }
+
+ bm->pos = p + 1;
+ peer->status = status;
+ }
+
+ return nxt_http_parse_fields(&peer->proto.h1->parser, bm);
+}
+
+
+static void
+nxt_h1p_peer_read(nxt_task_t *task, nxt_http_peer_t *peer)
+{
+ nxt_conn_t *c;
+
+ nxt_debug(task, "h1p peer read");
+
+ c = peer->proto.h1->conn;
+ c->read_state = &nxt_h1p_peer_read_state;
+
+ nxt_conn_read(task->thread->engine, c);
+}
+
+
+static const nxt_conn_state_t nxt_h1p_peer_read_state
+ nxt_aligned(64) =
+{
+ .ready_handler = nxt_h1p_peer_read_done,
+ .close_handler = nxt_h1p_peer_closed,
+ .error_handler = nxt_h1p_peer_error,
+
+ .io_read_handler = nxt_h1p_peer_io_read_handler,
+
+ .timer_handler = nxt_h1p_peer_read_timeout,
+ .timer_value = nxt_h1p_peer_timer_value,
+ .timer_data = offsetof(nxt_socket_conf_t, proxy_read_timeout),
+ .timer_autoreset = 1,
+};
+
+
+static void
+nxt_h1p_peer_read_done(nxt_task_t *task, void *obj, void *data)
+{
+ nxt_conn_t *c;
+ nxt_http_peer_t *peer;
+ nxt_http_request_t *r;
+
+ c = obj;
+ peer = data;
+
+ nxt_debug(task, "h1p peer read done");
+
+ peer->body = c->read;
+ c->read = NULL;
+
+ r = peer->request;
+ r->state->ready_handler(task, r, peer);
+}
+
+
+static void
+nxt_h1p_peer_closed(nxt_task_t *task, void *obj, void *data)
+{
+ nxt_http_peer_t *peer;
+ nxt_http_request_t *r;
+
+ peer = data;
+
+ nxt_debug(task, "h1p peer closed");
+
+ r = peer->request;
+
+ if (peer->header_received) {
+ peer->body = nxt_http_buf_last(r);
+
+ peer->closed = 1;
+
+ r->state->ready_handler(task, r, peer);
+
+ } else {
+ peer->status = NXT_HTTP_BAD_GATEWAY;
+
+ r->state->error_handler(task, r, peer);
+ }
+}
+
+
+static void
+nxt_h1p_peer_error(nxt_task_t *task, void *obj, void *data)
+{
+ nxt_http_peer_t *peer;
+ nxt_http_request_t *r;
+
+ peer = data;
+
+ nxt_debug(task, "h1p peer error");
+
+ peer->status = NXT_HTTP_BAD_GATEWAY;
+
+ r = peer->request;
+ r->state->error_handler(task, r, peer);
+}
+
+
+static void
+nxt_h1p_peer_send_timeout(nxt_task_t *task, void *obj, void *data)
+{
+ nxt_conn_t *c;
+ nxt_timer_t *timer;
+ nxt_http_peer_t *peer;
+ nxt_http_request_t *r;
+
+ timer = obj;
+
+ nxt_debug(task, "h1p peer send timeout");
+
+ c = nxt_write_timer_conn(timer);
+ c->block_write = 1;
+ c->block_read = 1;
+
+ peer = c->socket.data;
+ peer->status = NXT_HTTP_GATEWAY_TIMEOUT;
+
+ r = peer->request;
+ r->state->error_handler(task, r, peer);
+}
+
+
+static void
+nxt_h1p_peer_read_timeout(nxt_task_t *task, void *obj, void *data)
+{
+ nxt_conn_t *c;
+ nxt_timer_t *timer;
+ nxt_http_peer_t *peer;
+ nxt_http_request_t *r;
+
+ timer = obj;
+
+ nxt_debug(task, "h1p peer read timeout");
+
+ c = nxt_read_timer_conn(timer);
+ c->block_write = 1;
+ c->block_read = 1;
+
+ peer = c->socket.data;
+ peer->status = NXT_HTTP_GATEWAY_TIMEOUT;
+
+ r = peer->request;
+ r->state->error_handler(task, r, peer);
+}
+
+
+static nxt_msec_t
+nxt_h1p_peer_timer_value(nxt_conn_t *c, uintptr_t data)
+{
+ nxt_http_peer_t *peer;
+
+ peer = c->socket.data;
+
+ return nxt_value_at(nxt_msec_t, peer->request->conf->socket_conf, data);
+}
+
+
+static void
+nxt_h1p_peer_close(nxt_task_t *task, nxt_http_peer_t *peer)
+{
+ nxt_conn_t *c;
+
+ nxt_debug(task, "h1p peer close");
+
+ peer->closed = 1;
+
+ c = peer->proto.h1->conn;
+ task = &c->task;
+ c->socket.task = task;
+ c->read_timer.task = task;
+ c->write_timer.task = task;
+
+ if (c->socket.fd != -1) {
+ c->write_state = &nxt_h1p_peer_close_state;
+
+ nxt_conn_close(task->thread->engine, c);
+
+ } else {
+ nxt_h1p_peer_free(task, c, NULL);
+ }
+}
+
+
+static const nxt_conn_state_t nxt_h1p_peer_close_state
+ nxt_aligned(64) =
+{
+ .ready_handler = nxt_h1p_peer_free,
+};
+
+
+static void
+nxt_h1p_peer_free(nxt_task_t *task, void *obj, void *data)
+{
+ nxt_conn_t *c;
+
+ c = obj;
+
+ nxt_debug(task, "h1p peer free");
+
+ nxt_conn_free(task, c);
+}
diff --git a/src/nxt_h1proto.h b/src/nxt_h1proto.h
index c6d3bd53..61da6770 100644
--- a/src/nxt_h1proto.h
+++ b/src/nxt_h1proto.h
@@ -20,6 +20,8 @@ struct nxt_h1proto_s {
nxt_http_request_parse_t parser;
uint8_t nbuffers;
+ uint8_t header_buffer_slot;
+ uint8_t large_buffer_slot;
uint8_t keepalive; /* 1 bit */
uint8_t chunked; /* 1 bit */
uint8_t websocket; /* 1 bit */
diff --git a/src/nxt_h1proto_websocket.c b/src/nxt_h1proto_websocket.c
index 13754be0..c9ff899c 100644
--- a/src/nxt_h1proto_websocket.c
+++ b/src/nxt_h1proto_websocket.c
@@ -26,7 +26,7 @@ static void nxt_h1p_conn_ws_keepalive_enable(nxt_task_t *task,
static void nxt_h1p_conn_ws_frame_process(nxt_task_t *task, nxt_conn_t *c,
nxt_h1proto_t *h1p, nxt_websocket_header_t *wsh);
static void nxt_h1p_conn_ws_error(nxt_task_t *task, void *obj, void *data);
-static ssize_t nxt_h1p_ws_io_read_handler(nxt_conn_t *c);
+static ssize_t nxt_h1p_ws_io_read_handler(nxt_task_t *task, nxt_conn_t *c);
static void nxt_h1p_conn_ws_timeout(nxt_task_t *task, void *obj, void *data);
static void nxt_h1p_conn_ws_frame_payload_read(nxt_task_t *task, void *obj,
void *data);
@@ -473,7 +473,7 @@ nxt_h1p_conn_ws_error(nxt_task_t *task, void *obj, void *data)
static ssize_t
-nxt_h1p_ws_io_read_handler(nxt_conn_t *c)
+nxt_h1p_ws_io_read_handler(nxt_task_t *task, nxt_conn_t *c)
{
size_t size;
ssize_t n;
@@ -697,6 +697,7 @@ nxt_h1p_conn_ws_pong(nxt_task_t *task, void *obj, void *data)
for (i = 0; i < payload_len; i++) {
while (nxt_buf_mem_used_size(&b->mem) == 0) {
next = b->next;
+ b->next = NULL;
nxt_work_queue_add(&task->thread->engine->fast_work_queue,
b->completion_handler, task, b, b->parent);
diff --git a/src/nxt_http.h b/src/nxt_http.h
index 560b7310..030d77a7 100644
--- a/src/nxt_http.h
+++ b/src/nxt_http.h
@@ -9,6 +9,7 @@
typedef enum {
+ NXT_HTTP_UNSET = -1,
NXT_HTTP_INVALID = 0,
NXT_HTTP_CONTINUE = 100,
@@ -105,6 +106,21 @@ typedef struct {
} nxt_http_response_t;
+typedef struct {
+ nxt_http_proto_t proto;
+ nxt_http_request_t *request;
+ nxt_sockaddr_t *sockaddr;
+ nxt_list_t *fields;
+ nxt_buf_t *body;
+ nxt_off_t remainder;
+
+ nxt_http_status_t status:16;
+ nxt_http_protocol_t protocol:8; /* 2 bits */
+ uint8_t header_received; /* 1 bit */
+ uint8_t closed; /* 1 bit */
+} nxt_http_peer_t;
+
+
struct nxt_http_request_s {
nxt_http_proto_t proto;
nxt_socket_conf_joint_t *conf;
@@ -137,12 +153,14 @@ struct nxt_http_request_s {
nxt_sockaddr_t *remote;
nxt_sockaddr_t *local;
void *tls;
+ nxt_task_t task;
nxt_timer_t timer;
void *timer_data;
void *req_rpc_data;
+ nxt_http_peer_t *peer;
nxt_buf_t *last;
nxt_http_response_t resp;
@@ -153,20 +171,23 @@ struct nxt_http_request_s {
nxt_http_protocol_t protocol:8; /* 2 bits */
uint8_t logged; /* 1 bit */
uint8_t header_sent; /* 1 bit */
+ uint8_t inconsistent; /* 1 bit */
uint8_t error; /* 1 bit */
uint8_t websocket_handshake; /* 1 bit */
};
typedef struct nxt_http_route_s nxt_http_route_t;
+typedef struct nxt_http_upstream_s nxt_http_upstream_t;
-struct nxt_http_pass_s {
- nxt_http_pass_t *(*handler)(nxt_task_t *task,
+struct nxt_http_action_s {
+ nxt_http_action_t *(*handler)(nxt_task_t *task,
nxt_http_request_t *r,
- nxt_http_pass_t *pass);
+ nxt_http_action_t *action);
union {
nxt_http_route_t *route;
+ nxt_http_upstream_t *upstream;
nxt_app_t *application;
} u;
@@ -178,12 +199,19 @@ typedef struct {
void (*body_read)(nxt_task_t *task, nxt_http_request_t *r);
void (*local_addr)(nxt_task_t *task, nxt_http_request_t *r);
void (*header_send)(nxt_task_t *task, nxt_http_request_t *r,
- nxt_work_handler_t body_handler);
+ nxt_work_handler_t body_handler, void *data);
void (*send)(nxt_task_t *task, nxt_http_request_t *r, nxt_buf_t *out);
nxt_off_t (*body_bytes_sent)(nxt_task_t *task, nxt_http_proto_t proto);
void (*discard)(nxt_task_t *task, nxt_http_request_t *r, nxt_buf_t *last);
void (*close)(nxt_task_t *task, nxt_http_proto_t proto,
nxt_socket_conf_joint_t *joint);
+
+ void (*peer_connect)(nxt_task_t *task, nxt_http_peer_t *peer);
+ void (*peer_header_send)(nxt_task_t *task, nxt_http_peer_t *peer);
+ void (*peer_header_read)(nxt_task_t *task, nxt_http_peer_t *peer);
+ void (*peer_read)(nxt_task_t *task, nxt_http_peer_t *peer);
+ void (*peer_close)(nxt_task_t *task, nxt_http_peer_t *peer);
+
void (*ws_frame_start)(nxt_task_t *task, nxt_http_request_t *r,
nxt_buf_t *ws_frame);
} nxt_http_proto_table_t;
@@ -218,7 +246,7 @@ void nxt_http_request_error(nxt_task_t *task, nxt_http_request_t *r,
nxt_http_status_t status);
void nxt_http_request_read_body(nxt_task_t *task, nxt_http_request_t *r);
void nxt_http_request_header_send(nxt_task_t *task, nxt_http_request_t *r,
- nxt_work_handler_t body_handler);
+ nxt_work_handler_t body_handler, void *data);
void nxt_http_request_ws_frame_start(nxt_task_t *task, nxt_http_request_t *r,
nxt_buf_t *ws_frame);
void nxt_http_request_send(nxt_task_t *task, nxt_http_request_t *r,
@@ -238,24 +266,36 @@ nxt_int_t nxt_http_request_content_length(void *ctx, nxt_http_field_t *field,
nxt_http_routes_t *nxt_http_routes_create(nxt_task_t *task,
nxt_router_temp_conf_t *tmcf, nxt_conf_value_t *routes_conf);
-nxt_http_pass_t *nxt_http_pass_create(nxt_task_t *task,
+nxt_http_action_t *nxt_http_action_create(nxt_task_t *task,
nxt_router_temp_conf_t *tmcf, nxt_str_t *name);
void nxt_http_routes_resolve(nxt_task_t *task, nxt_router_temp_conf_t *tmcf);
-nxt_http_pass_t *nxt_http_pass_application(nxt_task_t *task,
+nxt_http_action_t *nxt_http_pass_application(nxt_task_t *task,
nxt_router_temp_conf_t *tmcf, nxt_str_t *name);
void nxt_http_routes_cleanup(nxt_task_t *task, nxt_http_routes_t *routes);
-void nxt_http_pass_cleanup(nxt_task_t *task, nxt_http_pass_t *pass);
+void nxt_http_action_cleanup(nxt_task_t *task, nxt_http_action_t *action);
-nxt_http_pass_t *nxt_http_static_handler(nxt_task_t *task,
- nxt_http_request_t *r, nxt_http_pass_t *pass);
+nxt_http_action_t *nxt_http_static_handler(nxt_task_t *task,
+ nxt_http_request_t *r, nxt_http_action_t *action);
nxt_int_t nxt_http_static_mtypes_init(nxt_mp_t *mp, nxt_lvlhsh_t *hash);
nxt_int_t nxt_http_static_mtypes_hash_add(nxt_mp_t *mp, nxt_lvlhsh_t *hash,
nxt_str_t *extension, nxt_str_t *type);
nxt_str_t *nxt_http_static_mtypes_hash_find(nxt_lvlhsh_t *hash,
nxt_str_t *extension);
-nxt_http_pass_t *nxt_http_request_application(nxt_task_t *task,
- nxt_http_request_t *r, nxt_http_pass_t *pass);
+nxt_http_action_t *nxt_http_application_handler(nxt_task_t *task,
+ nxt_http_request_t *r, nxt_http_action_t *action);
+
+nxt_int_t nxt_http_proxy_create(nxt_mp_t *mp, nxt_http_action_t *action);
+nxt_int_t nxt_http_proxy_date(void *ctx, nxt_http_field_t *field,
+ uintptr_t data);
+nxt_int_t nxt_http_proxy_content_length(void *ctx, nxt_http_field_t *field,
+ uintptr_t data);
+nxt_int_t nxt_http_proxy_skip(void *ctx, nxt_http_field_t *field,
+ uintptr_t data);
+nxt_buf_t *nxt_http_proxy_buf_mem_alloc(nxt_task_t *task, nxt_http_request_t *r,
+ size_t size);
+void nxt_http_proxy_buf_mem_free(nxt_task_t *task, nxt_http_request_t *r,
+ nxt_buf_t *b);
extern nxt_time_string_t nxt_http_date_cache;
diff --git a/src/nxt_http_error.c b/src/nxt_http_error.c
index 8e8b80f1..370b12db 100644
--- a/src/nxt_http_error.c
+++ b/src/nxt_http_error.c
@@ -57,8 +57,8 @@ nxt_http_request_error(nxt_task_t *task, nxt_http_request_t *r,
r->state = &nxt_http_request_send_error_body_state;
- nxt_http_request_header_send(task, r, nxt_http_request_send_error_body);
-
+ nxt_http_request_header_send(task, r,
+ nxt_http_request_send_error_body, NULL);
return;
fail:
diff --git a/src/nxt_http_parse.c b/src/nxt_http_parse.c
index e6e91454..4c5d4936 100644
--- a/src/nxt_http_parse.c
+++ b/src/nxt_http_parse.c
@@ -787,6 +787,7 @@ nxt_http_parse_field_end(nxt_http_request_parse_t *rp, u_char **pos,
field->hash = nxt_http_field_hash_end(rp->field_hash);
field->skip = 0;
+ field->hopbyhop = 0;
field->name_length = rp->field_name.length;
field->value_length = rp->field_value.length;
diff --git a/src/nxt_http_parse.h b/src/nxt_http_parse.h
index d7ce5e4f..d319c71d 100644
--- a/src/nxt_http_parse.h
+++ b/src/nxt_http_parse.h
@@ -81,7 +81,8 @@ typedef struct {
struct nxt_http_field_s {
uint16_t hash;
- uint8_t skip; /* 1 bit */
+ uint8_t skip:1;
+ uint8_t hopbyhop:1;
uint8_t name_length;
uint32_t value_length;
u_char *name;
diff --git a/src/nxt_http_proxy.c b/src/nxt_http_proxy.c
new file mode 100644
index 00000000..7f4eeff2
--- /dev/null
+++ b/src/nxt_http_proxy.c
@@ -0,0 +1,403 @@
+
+/*
+ * Copyright (C) Igor Sysoev
+ * Copyright (C) NGINX, Inc.
+ */
+
+#include <nxt_router.h>
+#include <nxt_http.h>
+
+
+typedef void (*nxt_http_upstream_connect_t)(nxt_task_t *task,
+ nxt_http_upstream_t *upstream, nxt_http_peer_t *peer);
+
+
+struct nxt_http_upstream_s {
+ uint32_t current;
+ uint32_t n;
+ uint8_t protocol;
+ nxt_http_upstream_connect_t connect;
+ nxt_sockaddr_t *sockaddr[1];
+};
+
+
+static void nxt_http_upstream_connect(nxt_task_t *task,
+ nxt_http_upstream_t *upstream, nxt_http_peer_t *peer);
+static nxt_http_action_t *nxt_http_proxy_handler(nxt_task_t *task,
+ nxt_http_request_t *r, nxt_http_action_t *action);
+static void nxt_http_proxy_header_send(nxt_task_t *task, void *obj, void *data);
+static void nxt_http_proxy_header_sent(nxt_task_t *task, void *obj, void *data);
+static void nxt_http_proxy_header_read(nxt_task_t *task, void *obj, void *data);
+static void nxt_http_proxy_send_body(nxt_task_t *task, void *obj, void *data);
+static void nxt_http_proxy_request_send(nxt_task_t *task,
+ nxt_http_request_t *r, nxt_buf_t *out);
+static void nxt_http_proxy_read(nxt_task_t *task, void *obj, void *data);
+static void nxt_http_proxy_buf_mem_completion(nxt_task_t *task, void *obj,
+ void *data);
+static void nxt_http_proxy_error(nxt_task_t *task, void *obj, void *data);
+
+
+static const nxt_http_request_state_t nxt_http_proxy_header_send_state;
+static const nxt_http_request_state_t nxt_http_proxy_header_sent_state;
+static const nxt_http_request_state_t nxt_http_proxy_header_read_state;
+static const nxt_http_request_state_t nxt_http_proxy_read_state;
+
+
+nxt_int_t
+nxt_http_proxy_create(nxt_mp_t *mp, nxt_http_action_t *action)
+{
+ nxt_str_t name;
+ nxt_sockaddr_t *sa;
+ nxt_http_upstream_t *upstream;
+
+ sa = NULL;
+ name = action->name;
+
+ if (nxt_str_start(&name, "http://", 7)) {
+ name.length -= 7;
+ name.start += 7;
+
+ sa = nxt_sockaddr_parse(mp, &name);
+ if (nxt_slow_path(sa == NULL)) {
+ return NXT_ERROR;
+ }
+
+ sa->type = SOCK_STREAM;
+ }
+
+ if (sa != NULL) {
+ upstream = nxt_mp_alloc(mp, sizeof(nxt_http_upstream_t));
+ if (nxt_slow_path(upstream == NULL)) {
+ return NXT_ERROR;
+ }
+
+ upstream->current = 0;
+ upstream->n = 1;
+ upstream->protocol = NXT_HTTP_PROTO_H1;
+ upstream->connect = nxt_http_upstream_connect;
+ upstream->sockaddr[0] = sa;
+
+ action->u.upstream = upstream;
+ action->handler = nxt_http_proxy_handler;
+ }
+
+ return NXT_OK;
+}
+
+
+static nxt_http_action_t *
+nxt_http_proxy_handler(nxt_task_t *task, nxt_http_request_t *r,
+ nxt_http_action_t *action)
+{
+ nxt_http_peer_t *peer;
+
+ peer = nxt_mp_zalloc(r->mem_pool, sizeof(nxt_http_peer_t));
+ if (nxt_slow_path(peer == NULL)) {
+ nxt_http_request_error(task, r, NXT_HTTP_INTERNAL_SERVER_ERROR);
+ return NULL;
+ }
+
+ peer->request = r;
+ r->peer = peer;
+
+ nxt_mp_retain(r->mem_pool);
+
+ action->u.upstream->connect(task, action->u.upstream, peer);
+
+ return NULL;
+}
+
+
+static void
+nxt_http_upstream_connect(nxt_task_t *task, nxt_http_upstream_t *upstream,
+ nxt_http_peer_t *peer)
+{
+ peer->protocol = upstream->protocol;
+ peer->sockaddr = upstream->sockaddr[0];
+
+ peer->request->state = &nxt_http_proxy_header_send_state;
+
+ nxt_http_proto[peer->protocol].peer_connect(task, peer);
+}
+
+
+static const nxt_http_request_state_t nxt_http_proxy_header_send_state
+ nxt_aligned(64) =
+{
+ .ready_handler = nxt_http_proxy_header_send,
+ .error_handler = nxt_http_proxy_error,
+};
+
+
+static void
+nxt_http_proxy_header_send(nxt_task_t *task, void *obj, void *data)
+{
+ nxt_http_peer_t *peer;
+ nxt_http_request_t *r;
+
+ r = obj;
+ peer = data;
+ r->state = &nxt_http_proxy_header_sent_state;
+
+ nxt_http_proto[peer->protocol].peer_header_send(task, peer);
+}
+
+
+static const nxt_http_request_state_t nxt_http_proxy_header_sent_state
+ nxt_aligned(64) =
+{
+ .ready_handler = nxt_http_proxy_header_sent,
+ .error_handler = nxt_http_proxy_error,
+};
+
+
+static void
+nxt_http_proxy_header_sent(nxt_task_t *task, void *obj, void *data)
+{
+ nxt_http_peer_t *peer;
+ nxt_http_request_t *r;
+
+ r = obj;
+ peer = data;
+ r->state = &nxt_http_proxy_header_read_state;
+
+ nxt_http_proto[peer->protocol].peer_header_read(task, peer);
+}
+
+
+static const nxt_http_request_state_t nxt_http_proxy_header_read_state
+ nxt_aligned(64) =
+{
+ .ready_handler = nxt_http_proxy_header_read,
+ .error_handler = nxt_http_proxy_error,
+};
+
+
+static void
+nxt_http_proxy_header_read(nxt_task_t *task, void *obj, void *data)
+{
+ nxt_http_peer_t *peer;
+ nxt_http_field_t *f, *field;
+ nxt_http_request_t *r;
+
+ r = obj;
+ peer = data;
+
+ r->status = peer->status;
+
+ nxt_debug(task, "http proxy status: %d", peer->status);
+
+ if (r->resp.content_length_n > 0) {
+ peer->remainder = r->resp.content_length_n;
+ }
+
+ nxt_list_each(field, peer->fields) {
+
+ nxt_debug(task, "http proxy header: \"%*s: %*s\"",
+ (size_t) field->name_length, field->name,
+ (size_t) field->value_length, field->value);
+
+ if (!field->skip) {
+ f = nxt_list_add(r->resp.fields);
+ if (nxt_slow_path(f == NULL)) {
+ nxt_http_proxy_error(task, r, peer);
+ return;
+ }
+
+ *f = *field;
+ }
+
+ } nxt_list_loop;
+
+ nxt_http_request_header_send(task, r, nxt_http_proxy_send_body, peer);
+}
+
+
+static void
+nxt_http_proxy_send_body(nxt_task_t *task, void *obj, void *data)
+{
+ nxt_buf_t *out;
+ nxt_http_peer_t *peer;
+ nxt_http_request_t *r;
+
+ r = obj;
+ peer = data;
+ out = peer->body;
+
+ if (out != NULL) {
+ peer->body = NULL;
+ nxt_http_proxy_request_send(task, r, out);
+ }
+
+ r->state = &nxt_http_proxy_read_state;
+
+ nxt_http_proto[peer->protocol].peer_read(task, peer);
+}
+
+
+static void
+nxt_http_proxy_request_send(nxt_task_t *task, nxt_http_request_t *r,
+ nxt_buf_t *out)
+{
+ size_t length;
+
+ if (r->peer->remainder > 0) {
+ length = nxt_buf_chain_length(out);
+ r->peer->remainder -= length;
+ }
+
+ nxt_http_request_send(task, r, out);
+}
+
+
+static const nxt_http_request_state_t nxt_http_proxy_read_state
+ nxt_aligned(64) =
+{
+ .ready_handler = nxt_http_proxy_read,
+ .error_handler = nxt_http_proxy_error,
+};
+
+
+static void
+nxt_http_proxy_read(nxt_task_t *task, void *obj, void *data)
+{
+ nxt_buf_t *out;
+ nxt_bool_t last;
+ nxt_http_peer_t *peer;
+ nxt_http_request_t *r;
+
+ r = obj;
+ peer = data;
+ out = peer->body;
+ peer->body = NULL;
+ last = nxt_buf_is_last(out);
+
+ nxt_http_proxy_request_send(task, r, out);
+
+ if (!last) {
+ nxt_http_proto[peer->protocol].peer_read(task, peer);
+
+ } else {
+ r->inconsistent = (peer->remainder != 0);
+
+ nxt_http_proto[peer->protocol].peer_close(task, peer);
+
+ nxt_mp_release(r->mem_pool);
+ }
+}
+
+
+nxt_buf_t *
+nxt_http_proxy_buf_mem_alloc(nxt_task_t *task, nxt_http_request_t *r,
+ size_t size)
+{
+ nxt_buf_t *b;
+
+ b = nxt_event_engine_buf_mem_alloc(task->thread->engine, size);
+ if (nxt_fast_path(b != NULL)) {
+ b->completion_handler = nxt_http_proxy_buf_mem_completion;
+ b->parent = r;
+ nxt_mp_retain(r->mem_pool);
+
+ } else {
+ nxt_http_request_error(task, r, NXT_HTTP_INTERNAL_SERVER_ERROR);
+ }
+
+ return b;
+}
+
+
+static void
+nxt_http_proxy_buf_mem_completion(nxt_task_t *task, void *obj, void *data)
+{
+ nxt_buf_t *b, *next;
+ nxt_http_peer_t *peer;
+ nxt_http_request_t *r;
+
+ b = obj;
+ r = data;
+
+ peer = r->peer;
+
+ do {
+ next = b->next;
+
+ nxt_http_proxy_buf_mem_free(task, r, b);
+
+ b = next;
+ } while (b != NULL);
+
+ if (!peer->closed) {
+ nxt_http_proto[peer->protocol].peer_read(task, peer);
+ }
+}
+
+
+void
+nxt_http_proxy_buf_mem_free(nxt_task_t *task, nxt_http_request_t *r,
+ nxt_buf_t *b)
+{
+ nxt_event_engine_buf_mem_free(task->thread->engine, b);
+
+ nxt_mp_release(r->mem_pool);
+}
+
+
+static void
+nxt_http_proxy_error(nxt_task_t *task, void *obj, void *data)
+{
+ nxt_http_peer_t *peer;
+ nxt_http_request_t *r;
+
+ r = obj;
+ peer = r->peer;
+
+ nxt_http_proto[peer->protocol].peer_close(task, peer);
+
+ nxt_mp_release(r->mem_pool);
+
+ nxt_http_request_error(task, r, peer->status);
+}
+
+
+nxt_int_t
+nxt_http_proxy_date(void *ctx, nxt_http_field_t *field, uintptr_t data)
+{
+ nxt_http_request_t *r;
+
+ r = ctx;
+
+ r->resp.date = field;
+
+ return NXT_OK;
+}
+
+
+nxt_int_t
+nxt_http_proxy_content_length(void *ctx, nxt_http_field_t *field,
+ uintptr_t data)
+{
+ nxt_off_t n;
+ nxt_http_request_t *r;
+
+ r = ctx;
+
+ r->resp.content_length = field;
+
+ n = nxt_off_t_parse(field->value, field->value_length);
+
+ if (nxt_fast_path(n >= 0)) {
+ r->resp.content_length_n = n;
+ }
+
+ return NXT_OK;
+}
+
+
+nxt_int_t
+nxt_http_proxy_skip(void *ctx, nxt_http_field_t *field, uintptr_t data)
+{
+ field->skip = 1;
+
+ return NXT_OK;
+}
diff --git a/src/nxt_http_request.c b/src/nxt_http_request.c
index a18a02e7..14c75dab 100644
--- a/src/nxt_http_request.c
+++ b/src/nxt_http_request.c
@@ -10,7 +10,7 @@
static nxt_int_t nxt_http_validate_host(nxt_str_t *host, nxt_mp_t *mp);
static void nxt_http_request_start(nxt_task_t *task, void *obj, void *data);
-static void nxt_http_request_pass(nxt_task_t *task, void *obj, void *data);
+static void nxt_http_request_action(nxt_task_t *task, void *obj, void *data);
static void nxt_http_request_proto_info(nxt_task_t *task,
nxt_http_request_t *r);
static void nxt_http_request_mem_buf_completion(nxt_task_t *task, void *obj,
@@ -278,33 +278,33 @@ nxt_http_request_start(nxt_task_t *task, void *obj, void *data)
static const nxt_http_request_state_t nxt_http_request_body_state
nxt_aligned(64) =
{
- .ready_handler = nxt_http_request_pass,
+ .ready_handler = nxt_http_request_action,
.error_handler = nxt_http_request_close_handler,
};
static void
-nxt_http_request_pass(nxt_task_t *task, void *obj, void *data)
+nxt_http_request_action(nxt_task_t *task, void *obj, void *data)
{
- nxt_http_pass_t *pass;
+ nxt_http_action_t *action;
nxt_http_request_t *r;
r = obj;
- pass = r->conf->socket_conf->pass;
+ action = r->conf->socket_conf->action;
- if (nxt_fast_path(pass != NULL)) {
+ if (nxt_fast_path(action != NULL)) {
do {
- nxt_debug(task, "http request route: %V", &pass->name);
+ nxt_debug(task, "http request route: %V", &action->name);
- pass = pass->handler(task, r, pass);
+ action = action->handler(task, r, action);
- if (pass == NULL) {
+ if (action == NULL) {
return;
}
- if (pass == NXT_HTTP_PASS_ERROR) {
+ if (action == NXT_HTTP_ACTION_ERROR) {
break;
}
@@ -315,13 +315,13 @@ nxt_http_request_pass(nxt_task_t *task, void *obj, void *data)
}
-nxt_http_pass_t *
-nxt_http_request_application(nxt_task_t *task, nxt_http_request_t *r,
- nxt_http_pass_t *pass)
+nxt_http_action_t *
+nxt_http_application_handler(nxt_task_t *task, nxt_http_request_t *r,
+ nxt_http_action_t *action)
{
nxt_event_engine_t *engine;
- nxt_debug(task, "http request application");
+ nxt_debug(task, "http application handler");
nxt_mp_retain(r->mem_pool);
@@ -344,7 +344,7 @@ nxt_http_request_application(nxt_task_t *task, nxt_http_request_t *r,
nxt_str_set(&r->server_name, "localhost");
}
- nxt_router_process_http_request(task, r, pass->u.application);
+ nxt_router_process_http_request(task, r, action->u.application);
return NULL;
}
@@ -370,7 +370,7 @@ nxt_http_request_read_body(nxt_task_t *task, nxt_http_request_t *r)
void
nxt_http_request_header_send(nxt_task_t *task, nxt_http_request_t *r,
- nxt_work_handler_t body_handler)
+ nxt_work_handler_t body_handler, void *data)
{
u_char *p, *end;
nxt_http_field_t *server, *date, *content_length;
@@ -431,7 +431,7 @@ nxt_http_request_header_send(nxt_task_t *task, nxt_http_request_t *r,
}
if (nxt_fast_path(r->proto.any != NULL)) {
- nxt_http_proto[r->protocol].header_send(task, r, body_handler);
+ nxt_http_proto[r->protocol].header_send(task, r, body_handler, data);
}
return;
@@ -483,15 +483,20 @@ nxt_http_buf_mem(nxt_task_t *task, nxt_http_request_t *r, size_t size)
static void
nxt_http_request_mem_buf_completion(nxt_task_t *task, void *obj, void *data)
{
- nxt_buf_t *b;
+ nxt_buf_t *b, *next;
nxt_http_request_t *r;
b = obj;
r = data;
- nxt_mp_free(r->mem_pool, b);
+ do {
+ next = b->next;
- nxt_mp_release(r->mem_pool);
+ nxt_mp_free(r->mem_pool, b);
+ nxt_mp_release(r->mem_pool);
+
+ b = next;
+ } while (b != NULL);
}
@@ -570,9 +575,9 @@ nxt_http_request_close_handler(nxt_task_t *task, void *obj, void *data)
if (nxt_fast_path(proto.any != NULL)) {
protocol = r->protocol;
- nxt_mp_release(r->mem_pool);
-
nxt_http_proto[protocol].close(task, proto, conf);
+
+ nxt_mp_release(r->mem_pool);
}
}
diff --git a/src/nxt_http_route.c b/src/nxt_http_route.c
index c3c11faa..18b352ea 100644
--- a/src/nxt_http_route.c
+++ b/src/nxt_http_route.c
@@ -36,6 +36,13 @@ typedef enum {
typedef struct {
+ nxt_conf_value_t *pass;
+ nxt_conf_value_t *share;
+ nxt_conf_value_t *proxy;
+} nxt_http_route_action_conf_t;
+
+
+typedef struct {
nxt_conf_value_t *host;
nxt_conf_value_t *uri;
nxt_conf_value_t *method;
@@ -119,7 +126,7 @@ typedef union {
typedef struct {
uint32_t items;
- nxt_http_pass_t pass;
+ nxt_http_action_t action;
nxt_http_route_test_t test[0];
} nxt_http_route_match_t;
@@ -152,6 +159,8 @@ static nxt_http_route_t *nxt_http_route_create(nxt_task_t *task,
nxt_router_temp_conf_t *tmcf, nxt_conf_value_t *cv);
static nxt_http_route_match_t *nxt_http_route_match_create(nxt_task_t *task,
nxt_router_temp_conf_t *tmcf, nxt_conf_value_t *cv);
+static nxt_int_t nxt_http_route_action_create(nxt_router_temp_conf_t *tmcf,
+ nxt_conf_value_t *cv, nxt_http_route_match_t *match);
static nxt_http_route_table_t *nxt_http_route_table_create(nxt_task_t *task,
nxt_mp_t *mp, nxt_conf_value_t *table_cv, nxt_http_route_object_t object,
nxt_bool_t case_sensitive);
@@ -173,15 +182,15 @@ static u_char *nxt_http_route_pattern_copy(nxt_mp_t *mp, nxt_str_t *test,
static void nxt_http_route_resolve(nxt_task_t *task,
nxt_router_temp_conf_t *tmcf, nxt_http_route_t *route);
-static void nxt_http_pass_resolve(nxt_task_t *task,
- nxt_router_temp_conf_t *tmcf, nxt_http_pass_t *pass);
+static void nxt_http_action_resolve(nxt_task_t *task,
+ nxt_router_temp_conf_t *tmcf, nxt_http_action_t *action);
static nxt_http_route_t *nxt_http_route_find(nxt_http_routes_t *routes,
nxt_str_t *name);
static void nxt_http_route_cleanup(nxt_task_t *task, nxt_http_route_t *routes);
-static nxt_http_pass_t *nxt_http_route_pass(nxt_task_t *task,
- nxt_http_request_t *r, nxt_http_pass_t *start);
-static nxt_http_pass_t *nxt_http_route_match(nxt_http_request_t *r,
+static nxt_http_action_t *nxt_http_route_handler(nxt_task_t *task,
+ nxt_http_request_t *r, nxt_http_action_t *start);
+static nxt_http_action_t *nxt_http_route_match(nxt_http_request_t *r,
nxt_http_route_match_t *match);
static nxt_int_t nxt_http_route_table(nxt_http_request_t *r,
nxt_http_route_table_t *table);
@@ -367,16 +376,13 @@ nxt_http_route_match_create(nxt_task_t *task, nxt_router_temp_conf_t *tmcf,
uint32_t n;
nxt_mp_t *mp;
nxt_int_t ret;
- nxt_str_t pass, *string;
- nxt_conf_value_t *match_conf, *pass_conf;
+ nxt_conf_value_t *match_conf;
nxt_http_route_test_t *test;
nxt_http_route_rule_t *rule;
nxt_http_route_table_t *table;
nxt_http_route_match_t *match;
nxt_http_route_match_conf_t mtcf;
- static nxt_str_t pass_path = nxt_string("/action/pass");
- static nxt_str_t share_path = nxt_string("/action/share");
static nxt_str_t match_path = nxt_string("/match");
match_conf = nxt_conf_get_path(cv, &match_path);
@@ -391,25 +397,12 @@ nxt_http_route_match_create(nxt_task_t *task, nxt_router_temp_conf_t *tmcf,
return NULL;
}
- match->pass.u.route = NULL;
- match->pass.handler = NULL;
+ match->action.u.route = NULL;
+ match->action.handler = NULL;
match->items = n;
- pass_conf = nxt_conf_get_path(cv, &pass_path);
-
- if (pass_conf == NULL) {
- pass_conf = nxt_conf_get_path(cv, &share_path);
- if (nxt_slow_path(pass_conf == NULL)) {
- return NULL;
- }
-
- match->pass.handler = nxt_http_static_handler;
- }
-
- nxt_conf_get_string(pass_conf, &pass);
-
- string = nxt_str_dup(mp, &match->pass.name, &pass);
- if (nxt_slow_path(string == NULL)) {
+ ret = nxt_http_route_action_create(tmcf, cv, match);
+ if (nxt_slow_path(ret != NXT_OK)) {
return NULL;
}
@@ -516,6 +509,78 @@ nxt_http_route_match_create(nxt_task_t *task, nxt_router_temp_conf_t *tmcf,
}
+static nxt_conf_map_t nxt_http_route_action_conf[] = {
+ {
+ nxt_string("pass"),
+ NXT_CONF_MAP_PTR,
+ offsetof(nxt_http_route_action_conf_t, pass)
+ },
+ {
+ nxt_string("share"),
+ NXT_CONF_MAP_PTR,
+ offsetof(nxt_http_route_action_conf_t, share)
+ },
+ {
+ nxt_string("proxy"),
+ NXT_CONF_MAP_PTR,
+ offsetof(nxt_http_route_action_conf_t, proxy)
+ },
+};
+
+
+static nxt_int_t
+nxt_http_route_action_create(nxt_router_temp_conf_t *tmcf, nxt_conf_value_t *cv,
+ nxt_http_route_match_t *match)
+{
+ nxt_mp_t *mp;
+ nxt_int_t ret;
+ nxt_str_t name, *string;
+ nxt_conf_value_t *conf, *action_conf;
+ nxt_http_route_action_conf_t accf;
+
+ static nxt_str_t action_path = nxt_string("/action");
+
+ action_conf = nxt_conf_get_path(cv, &action_path);
+ if (action_conf == NULL) {
+ return NXT_ERROR;
+ }
+
+ nxt_memzero(&accf, sizeof(accf));
+
+ ret = nxt_conf_map_object(tmcf->mem_pool,
+ action_conf, nxt_http_route_action_conf,
+ nxt_nitems(nxt_http_route_action_conf), &accf);
+ if (ret != NXT_OK) {
+ return ret;
+ }
+
+ conf = accf.pass;
+
+ if (accf.share != NULL) {
+ conf = accf.share;
+ match->action.handler = nxt_http_static_handler;
+
+ } else if (accf.proxy != NULL) {
+ conf = accf.proxy;
+ }
+
+ nxt_conf_get_string(conf, &name);
+
+ mp = tmcf->router_conf->mem_pool;
+
+ string = nxt_str_dup(mp, &match->action.name, &name);
+ if (nxt_slow_path(string == NULL)) {
+ return NXT_ERROR;
+ }
+
+ if (accf.proxy != NULL) {
+ return nxt_http_proxy_create(mp, &match->action);
+ }
+
+ return NXT_OK;
+}
+
+
static nxt_http_route_table_t *
nxt_http_route_table_create(nxt_task_t *task, nxt_mp_t *mp,
nxt_conf_value_t *table_cv, nxt_http_route_object_t object,
@@ -877,17 +942,17 @@ static void
nxt_http_route_resolve(nxt_task_t *task, nxt_router_temp_conf_t *tmcf,
nxt_http_route_t *route)
{
- nxt_http_pass_t *pass;
+ nxt_http_action_t *action;
nxt_http_route_match_t **match, **end;
match = &route->match[0];
end = match + route->items;
while (match < end) {
- pass = &(*match)->pass;
+ action = &(*match)->action;
- if (pass->handler == NULL) {
- nxt_http_pass_resolve(task, tmcf, &(*match)->pass);
+ if (action->handler == NULL) {
+ nxt_http_action_resolve(task, tmcf, &(*match)->action);
}
match++;
@@ -896,21 +961,21 @@ nxt_http_route_resolve(nxt_task_t *task, nxt_router_temp_conf_t *tmcf,
static void
-nxt_http_pass_resolve(nxt_task_t *task, nxt_router_temp_conf_t *tmcf,
- nxt_http_pass_t *pass)
+nxt_http_action_resolve(nxt_task_t *task, nxt_router_temp_conf_t *tmcf,
+ nxt_http_action_t *action)
{
nxt_str_t name;
- name = pass->name;
+ name = action->name;
if (nxt_str_start(&name, "applications/", 13)) {
name.length -= 13;
name.start += 13;
- pass->u.application = nxt_router_listener_application(tmcf, &name);
- nxt_router_app_use(task, pass->u.application, 1);
+ action->u.application = nxt_router_listener_application(tmcf, &name);
+ nxt_router_app_use(task, action->u.application, 1);
- pass->handler = nxt_http_request_application;
+ action->handler = nxt_http_application_handler;
} else if (nxt_str_start(&name, "routes", 6)) {
@@ -923,9 +988,9 @@ nxt_http_pass_resolve(nxt_task_t *task, nxt_router_temp_conf_t *tmcf,
name.start += 7;
}
- pass->u.route = nxt_http_route_find(tmcf->router_conf->routes, &name);
+ action->u.route = nxt_http_route_find(tmcf->router_conf->routes, &name);
- pass->handler = nxt_http_route_pass;
+ action->handler = nxt_http_route_handler;
}
}
@@ -950,46 +1015,49 @@ nxt_http_route_find(nxt_http_routes_t *routes, nxt_str_t *name)
}
-nxt_http_pass_t *
-nxt_http_pass_create(nxt_task_t *task, nxt_router_temp_conf_t *tmcf,
+nxt_http_action_t *
+nxt_http_action_create(nxt_task_t *task, nxt_router_temp_conf_t *tmcf,
nxt_str_t *name)
{
- nxt_http_pass_t *pass;
+ nxt_http_action_t *action;
- pass = nxt_mp_alloc(tmcf->router_conf->mem_pool, sizeof(nxt_http_pass_t));
- if (nxt_slow_path(pass == NULL)) {
+ action = nxt_mp_alloc(tmcf->router_conf->mem_pool,
+ sizeof(nxt_http_action_t));
+ if (nxt_slow_path(action == NULL)) {
return NULL;
}
- pass->name = *name;
+ action->name = *name;
+ action->handler = NULL;
- nxt_http_pass_resolve(task, tmcf, pass);
+ nxt_http_action_resolve(task, tmcf, action);
- return pass;
+ return action;
}
/* COMPATIBILITY: listener application. */
-nxt_http_pass_t *
+nxt_http_action_t *
nxt_http_pass_application(nxt_task_t *task, nxt_router_temp_conf_t *tmcf,
nxt_str_t *name)
{
- nxt_http_pass_t *pass;
+ nxt_http_action_t *action;
- pass = nxt_mp_alloc(tmcf->router_conf->mem_pool, sizeof(nxt_http_pass_t));
- if (nxt_slow_path(pass == NULL)) {
+ action = nxt_mp_alloc(tmcf->router_conf->mem_pool,
+ sizeof(nxt_http_action_t));
+ if (nxt_slow_path(action == NULL)) {
return NULL;
}
- pass->name = *name;
+ action->name = *name;
- pass->u.application = nxt_router_listener_application(tmcf, name);
- nxt_router_app_use(task, pass->u.application, 1);
+ action->u.application = nxt_router_listener_application(tmcf, name);
+ nxt_router_app_use(task, action->u.application, 1);
- pass->handler = nxt_http_request_application;
+ action->handler = nxt_http_application_handler;
- return pass;
+ return action;
}
@@ -1020,7 +1088,7 @@ nxt_http_route_cleanup(nxt_task_t *task, nxt_http_route_t *route)
end = match + route->items;
while (match < end) {
- nxt_http_pass_cleanup(task, &(*match)->pass);
+ nxt_http_action_cleanup(task, &(*match)->action);
match++;
}
@@ -1028,20 +1096,20 @@ nxt_http_route_cleanup(nxt_task_t *task, nxt_http_route_t *route)
void
-nxt_http_pass_cleanup(nxt_task_t *task, nxt_http_pass_t *pass)
+nxt_http_action_cleanup(nxt_task_t *task, nxt_http_action_t *action)
{
- if (pass->handler == nxt_http_request_application) {
- nxt_router_app_use(task, pass->u.application, -1);
+ if (action->handler == nxt_http_application_handler) {
+ nxt_router_app_use(task, action->u.application, -1);
}
}
-static nxt_http_pass_t *
-nxt_http_route_pass(nxt_task_t *task, nxt_http_request_t *r,
- nxt_http_pass_t *start)
+static nxt_http_action_t *
+nxt_http_route_handler(nxt_task_t *task, nxt_http_request_t *r,
+ nxt_http_action_t *start)
{
- nxt_http_pass_t *pass;
nxt_http_route_t *route;
+ nxt_http_action_t *action;
nxt_http_route_match_t **match, **end;
route = start->u.route;
@@ -1049,9 +1117,9 @@ nxt_http_route_pass(nxt_task_t *task, nxt_http_request_t *r,
end = match + route->items;
while (match < end) {
- pass = nxt_http_route_match(r, *match);
- if (pass != NULL) {
- return pass;
+ action = nxt_http_route_match(r, *match);
+ if (action != NULL) {
+ return action;
}
match++;
@@ -1063,7 +1131,7 @@ nxt_http_route_pass(nxt_task_t *task, nxt_http_request_t *r,
}
-static nxt_http_pass_t *
+static nxt_http_action_t *
nxt_http_route_match(nxt_http_request_t *r, nxt_http_route_match_t *match)
{
nxt_int_t ret;
@@ -1081,14 +1149,14 @@ nxt_http_route_match(nxt_http_request_t *r, nxt_http_route_match_t *match)
}
if (ret <= 0) {
- /* 0 => NULL, -1 => NXT_HTTP_PASS_ERROR. */
- return (nxt_http_pass_t *) (intptr_t) ret;
+ /* 0 => NULL, -1 => NXT_HTTP_ACTION_ERROR. */
+ return (nxt_http_action_t *) (intptr_t) ret;
}
test++;
}
- return &match->pass;
+ return &match->action;
}
diff --git a/src/nxt_http_static.c b/src/nxt_http_static.c
index 48a989cf..44132859 100644
--- a/src/nxt_http_static.c
+++ b/src/nxt_http_static.c
@@ -27,9 +27,9 @@ static void nxt_http_static_mtypes_hash_free(void *data, void *p);
static const nxt_http_request_state_t nxt_http_static_send_state;
-nxt_http_pass_t *
+nxt_http_action_t *
nxt_http_static_handler(nxt_task_t *task, nxt_http_request_t *r,
- nxt_http_pass_t *pass)
+ nxt_http_action_t *action)
{
size_t alloc, encode;
u_char *p;
@@ -76,7 +76,7 @@ nxt_http_static_handler(nxt_task_t *task, nxt_http_request_t *r,
nxt_str_null(&extension);
}
- alloc = pass->name.length + r->path->length + index.length + 1;
+ alloc = action->name.length + r->path->length + index.length + 1;
f->name = nxt_mp_nget(r->mem_pool, alloc);
if (nxt_slow_path(f->name == NULL)) {
@@ -84,7 +84,7 @@ nxt_http_static_handler(nxt_task_t *task, nxt_http_request_t *r,
}
p = f->name;
- p = nxt_cpymem(p, pass->name.start, pass->name.length);
+ p = nxt_cpymem(p, action->name.start, action->name.length);
p = nxt_cpymem(p, r->path->start, r->path->length);
p = nxt_cpymem(p, index.start, index.length);
*p = '\0';
@@ -272,7 +272,7 @@ nxt_http_static_handler(nxt_task_t *task, nxt_http_request_t *r,
body_handler = NULL;
}
- nxt_http_request_header_send(task, r, body_handler);
+ nxt_http_request_header_send(task, r, body_handler, NULL);
r->state = &nxt_http_static_send_state;
return NULL;
diff --git a/src/nxt_http_websocket.c b/src/nxt_http_websocket.c
index d58d615c..fb888f5d 100644
--- a/src/nxt_http_websocket.c
+++ b/src/nxt_http_websocket.c
@@ -88,6 +88,7 @@ nxt_http_websocket_client(nxt_task_t *task, void *obj, void *data)
frame_size -= copy_size;
next = b->next;
+ b->next = NULL;
if (nxt_buf_mem_used_size(&b->mem) == 0) {
nxt_work_queue_add(&task->thread->engine->fast_work_queue,
diff --git a/src/nxt_main_process.c b/src/nxt_main_process.c
index 85685fbc..cfe0341f 100644
--- a/src/nxt_main_process.c
+++ b/src/nxt_main_process.c
@@ -70,8 +70,8 @@ static void nxt_main_port_access_log_handler(nxt_task_t *task,
static nxt_int_t nxt_init_set_isolation(nxt_task_t *task,
nxt_process_init_t *init, nxt_conf_value_t *isolation);
-static nxt_int_t nxt_init_set_ns(nxt_task_t *task,
- nxt_process_init_t *init, nxt_conf_value_t *ns);
+static nxt_int_t nxt_init_set_ns(nxt_task_t *task, nxt_process_init_t *init,
+ nxt_conf_value_t *ns);
const nxt_sig_event_t nxt_main_process_signals[] = {
nxt_event_signal(SIGHUP, nxt_main_process_signal_handler),
@@ -397,31 +397,19 @@ nxt_main_process_port_create(nxt_task_t *task, nxt_runtime_t *rt)
nxt_port_t *port;
nxt_process_t *process;
- process = nxt_runtime_process_get(rt, nxt_pid);
- if (nxt_slow_path(process == NULL)) {
- return NXT_ERROR;
- }
-
- port = nxt_port_new(task, 0, nxt_pid, NXT_PROCESS_MAIN);
+ port = nxt_runtime_process_port_create(task, rt, nxt_pid, 0,
+ NXT_PROCESS_MAIN);
if (nxt_slow_path(port == NULL)) {
- nxt_process_use(task, process, -1);
return NXT_ERROR;
}
- nxt_process_port_add(task, process, port);
-
- nxt_process_use(task, process, -1);
+ process = port->process;
ret = nxt_port_socket_init(task, port, 0);
if (nxt_slow_path(ret != NXT_OK)) {
- nxt_port_use(task, port, -1);
return ret;
}
- nxt_runtime_port_add(task, port);
-
- nxt_port_use(task, port, -1);
-
/*
* A main process port. A write port is not closed
* since it should be inherited by worker processes.
@@ -465,13 +453,11 @@ nxt_main_start_controller_process(nxt_task_t *task, nxt_runtime_t *rt)
{
nxt_process_init_t *init;
- init = nxt_malloc(sizeof(nxt_process_init_t));
+ init = nxt_mp_zalloc(rt->mem_pool, sizeof(nxt_process_init_t));
if (nxt_slow_path(init == NULL)) {
return NXT_ERROR;
}
- nxt_memzero(init, sizeof(nxt_process_init_t));
-
init->start = nxt_controller_start;
init->name = "controller";
init->user_cred = &rt->user_cred;
@@ -561,13 +547,11 @@ nxt_main_start_discovery_process(nxt_task_t *task, nxt_runtime_t *rt)
{
nxt_process_init_t *init;
- init = nxt_malloc(sizeof(nxt_process_init_t));
+ init = nxt_mp_zalloc(rt->mem_pool, sizeof(nxt_process_init_t));
if (nxt_slow_path(init == NULL)) {
return NXT_ERROR;
}
- nxt_memzero(init, sizeof(nxt_process_init_t));
-
init->start = nxt_discovery_start;
init->name = "discovery";
init->user_cred = &rt->user_cred;
@@ -587,13 +571,11 @@ nxt_main_start_router_process(nxt_task_t *task, nxt_runtime_t *rt)
{
nxt_process_init_t *init;
- init = nxt_malloc(sizeof(nxt_process_init_t));
+ init = nxt_mp_zalloc(rt->mem_pool, sizeof(nxt_process_init_t));
if (nxt_slow_path(init == NULL)) {
return NXT_ERROR;
}
- nxt_memzero(init, sizeof(nxt_process_init_t));
-
init->start = nxt_router_start;
init->name = "router";
init->user_cred = &rt->user_cred;
@@ -627,13 +609,11 @@ nxt_main_start_worker_process(nxt_task_t *task, nxt_runtime_t *rt,
+ app_conf->group.length + 1;
}
- init = nxt_malloc(size);
+ init = nxt_mp_zalloc(rt->mem_pool, size);
if (nxt_slow_path(init == NULL)) {
return NXT_ERROR;
}
- nxt_memzero(init, sizeof(nxt_process_init_t));
-
if (rt->capabilities.setid) {
init->user_cred = nxt_pointer_to(init, sizeof(nxt_process_init_t));
user = nxt_pointer_to(init->user_cred, sizeof(nxt_user_cred_t));
@@ -705,7 +685,7 @@ nxt_main_start_worker_process(nxt_task_t *task, nxt_runtime_t *rt,
fail:
- nxt_free(init);
+ nxt_mp_free(rt->mem_pool, init);
return NXT_ERROR;
}
@@ -726,6 +706,8 @@ nxt_main_create_worker_process(nxt_task_t *task, nxt_runtime_t *rt,
process = nxt_runtime_process_new(rt);
if (nxt_slow_path(process == NULL)) {
+ nxt_mp_free(rt->mem_pool, init);
+
return NXT_ERROR;
}
@@ -785,8 +767,6 @@ nxt_main_stop_all_processes(nxt_task_t *task, nxt_runtime_t *rt)
nxt_runtime_process_each(rt, process) {
if (nxt_pid != process->pid) {
- process->init = NULL;
-
nxt_process_port_each(process, port) {
(void) nxt_port_socket_write(task, port, NXT_PORT_MSG_QUIT,
@@ -1005,10 +985,11 @@ nxt_main_cleanup_worker_process(nxt_task_t *task, nxt_pid_t pid)
if (process) {
init = process->init;
+ process->init = NULL;
ptype = nxt_process_type(process);
- if (process->ready && init != NULL) {
+ if (process->ready) {
init->stream = 0;
}
@@ -1057,7 +1038,7 @@ nxt_main_cleanup_worker_process(nxt_task_t *task, nxt_pid_t pid)
init->restart(task, rt, init);
} else {
- nxt_free(init);
+ nxt_mp_free(rt->mem_pool, init);
}
}
}
@@ -1540,7 +1521,8 @@ nxt_init_set_isolation(nxt_task_t *task, nxt_process_init_t *init,
static nxt_int_t
-nxt_init_set_ns(nxt_task_t *task, nxt_process_init_t *init, nxt_conf_value_t *namespaces)
+nxt_init_set_ns(nxt_task_t *task, nxt_process_init_t *init,
+ nxt_conf_value_t *namespaces)
{
uint32_t index;
nxt_str_t name;
@@ -1549,7 +1531,13 @@ nxt_init_set_ns(nxt_task_t *task, nxt_process_init_t *init, nxt_conf_value_t *na
index = 0;
- while ((value = nxt_conf_next_object_member(namespaces, &name, &index)) != NULL) {
+ for ( ;; ) {
+ value = nxt_conf_next_object_member(namespaces, &name, &index);
+
+ if (value == NULL) {
+ break;
+ }
+
flag = 0;
#if (NXT_HAVE_CLONE_NEWUSER)
@@ -1593,11 +1581,9 @@ nxt_init_set_ns(nxt_task_t *task, nxt_process_init_t *init, nxt_conf_value_t *na
return NXT_ERROR;
}
- if (nxt_conf_get_integer(value) == 0) {
- continue; /* process shares everything by default */
+ if (nxt_conf_get_boolean(value)) {
+ init->isolation.clone.flags |= flag;
}
-
- init->isolation.clone.flags |= flag;
}
return NXT_OK;
diff --git a/src/nxt_port.c b/src/nxt_port.c
index 9029353a..8d14a5e7 100644
--- a/src/nxt_port.c
+++ b/src/nxt_port.c
@@ -238,7 +238,6 @@ void
nxt_port_new_port_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg)
{
nxt_port_t *port;
- nxt_process_t *process;
nxt_runtime_t *rt;
nxt_port_msg_new_port_t *new_port_msg;
@@ -261,22 +260,13 @@ nxt_port_new_port_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg)
return;
}
- process = nxt_runtime_process_get(rt, new_port_msg->pid);
- if (nxt_slow_path(process == NULL)) {
- return;
- }
-
- port = nxt_port_new(task, new_port_msg->id, new_port_msg->pid,
- new_port_msg->type);
+ port = nxt_runtime_process_port_create(task, rt, new_port_msg->pid,
+ new_port_msg->id,
+ new_port_msg->type);
if (nxt_slow_path(port == NULL)) {
- nxt_process_use(task, process, -1);
return;
}
- nxt_process_port_add(task, process, port);
-
- nxt_process_use(task, process, -1);
-
nxt_fd_nonblocking(task, msg->fd);
port->pair[0] = -1;
@@ -286,10 +276,6 @@ nxt_port_new_port_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg)
port->socket.task = task;
- nxt_runtime_port_add(task, port);
-
- nxt_port_use(task, port, -1);
-
nxt_port_write_enable(task, port);
msg->u.new_port = port;
diff --git a/src/nxt_port_socket.c b/src/nxt_port_socket.c
index 4edc423a..9c7da970 100644
--- a/src/nxt_port_socket.c
+++ b/src/nxt_port_socket.c
@@ -478,7 +478,8 @@ static nxt_buf_t *
nxt_port_buf_completion(nxt_task_t *task, nxt_work_queue_t *wq, nxt_buf_t *b,
size_t sent, nxt_bool_t mmap_mode)
{
- size_t size;
+ size_t size;
+ nxt_buf_t *next;
while (b != NULL) {
@@ -528,7 +529,9 @@ nxt_port_buf_completion(nxt_task_t *task, nxt_work_queue_t *wq, nxt_buf_t *b,
nxt_work_queue_add(wq, b->completion_handler, task, b, b->parent);
- b = b->next;
+ next = b->next;
+ b->next = NULL;
+ b = next;
}
return b;
@@ -796,7 +799,7 @@ static void
nxt_port_read_msg_process(nxt_task_t *task, nxt_port_t *port,
nxt_port_recv_msg_t *msg)
{
- nxt_buf_t *b, *orig_b;
+ nxt_buf_t *b, *orig_b, *next;
nxt_port_recv_msg_t *fmsg;
if (nxt_slow_path(msg->size < sizeof(nxt_port_msg_t))) {
@@ -915,11 +918,15 @@ fmsg_failed:
*/
if (msg->buf == b) {
/* complete mmap buffers */
- for (; b != NULL; b = b->next) {
+ while (b != NULL) {
nxt_debug(task, "complete buffer %p", b);
nxt_work_queue_add(port->socket.read_work_queue,
b->completion_handler, task, b, b->parent);
+
+ next = b->next;
+ b->next = NULL;
+ b = next;
}
}
@@ -964,7 +971,7 @@ static void
nxt_port_error_handler(nxt_task_t *task, void *obj, void *data)
{
int use_delta;
- nxt_buf_t *b;
+ nxt_buf_t *b, *next;
nxt_port_t *port;
nxt_work_queue_t *wq;
nxt_port_send_msg_t *msg;
@@ -986,7 +993,10 @@ nxt_port_error_handler(nxt_task_t *task, void *obj, void *data)
nxt_queue_each(msg, &port->messages, nxt_port_send_msg_t, link) {
- for (b = msg->buf; b != NULL; b = b->next) {
+ for (b = msg->buf; b != NULL; b = next) {
+ next = b->next;
+ b->next = NULL;
+
if (nxt_buf_is_sync(b)) {
continue;
}
diff --git a/src/nxt_process.c b/src/nxt_process.c
index 0cc9ccc4..b246a58c 100644
--- a/src/nxt_process.c
+++ b/src/nxt_process.c
@@ -207,7 +207,7 @@ nxt_process_create(nxt_task_t *task, nxt_process_t *process)
goto fail;
}
-#if (NXT_HAVE_CLONE_NEWUSER)
+#if (NXT_HAVE_CLONE && NXT_HAVE_CLONE_NEWUSER)
if ((init->isolation.clone.flags & CLONE_NEWUSER) == CLONE_NEWUSER) {
ret = nxt_clone_proc_map(task, pid, &init->isolation.clone);
if (nxt_slow_path(ret != NXT_OK)) {
@@ -723,16 +723,35 @@ free:
nxt_int_t
nxt_user_cred_set(nxt_task_t *task, nxt_user_cred_t *uc)
{
- nxt_debug(task, "user cred set: \"%s\" uid:%uL base gid:%uL",
- uc->user, (uint64_t) uc->uid, (uint64_t) uc->base_gid);
+ nxt_debug(task, "user cred set: \"%s\" uid:%d base gid:%d",
+ uc->user, uc->uid, uc->base_gid);
if (setgid(uc->base_gid) != 0) {
+
+#if (NXT_HAVE_CLONE)
+ if (nxt_errno == EINVAL) {
+ nxt_log(task, NXT_LOG_ERR, "The gid %d isn't valid in the "
+ "application namespace.", uc->base_gid);
+ return NXT_ERROR;
+ }
+#endif
+
nxt_alert(task, "setgid(%d) failed %E", uc->base_gid, nxt_errno);
return NXT_ERROR;
}
if (uc->gids != NULL) {
if (setgroups(uc->ngroups, uc->gids) != 0) {
+
+#if (NXT_HAVE_CLONE)
+ if (nxt_errno == EINVAL) {
+ nxt_log(task, NXT_LOG_ERR, "The user \"%s\" (uid: %d) has "
+ "supplementary group ids not valid in the application "
+ "namespace.", uc->user, uc->uid);
+ return NXT_ERROR;
+ }
+#endif
+
nxt_alert(task, "setgroups(%i) failed %E", uc->ngroups, nxt_errno);
return NXT_ERROR;
}
@@ -747,6 +766,15 @@ nxt_user_cred_set(nxt_task_t *task, nxt_user_cred_t *uc)
}
if (setuid(uc->uid) != 0) {
+
+#if (NXT_HAVE_CLONE)
+ if (nxt_errno == EINVAL) {
+ nxt_log(task, NXT_LOG_ERR, "The uid %d (user \"%s\") isn't "
+ "valid in the application namespace.", uc->uid, uc->user);
+ return NXT_ERROR;
+ }
+#endif
+
nxt_alert(task, "setuid(%d) failed %E", uc->uid, nxt_errno);
return NXT_ERROR;
}
@@ -756,6 +784,17 @@ nxt_user_cred_set(nxt_task_t *task, nxt_user_cred_t *uc)
void
+nxt_process_use(nxt_task_t *task, nxt_process_t *process, int i)
+{
+ process->use_count += i;
+
+ if (process->use_count == 0) {
+ nxt_runtime_process_release(task->thread->runtime, process);
+ }
+}
+
+
+void
nxt_process_port_add(nxt_task_t *task, nxt_process_t *process, nxt_port_t *port)
{
nxt_assert(port->process == NULL);
diff --git a/src/nxt_process.h b/src/nxt_process.h
index df9ca038..d67573f1 100644
--- a/src/nxt_process.h
+++ b/src/nxt_process.h
@@ -114,6 +114,8 @@ NXT_EXPORT void nxt_process_port_add(nxt_task_t *task, nxt_process_t *process,
nxt_process_type_t nxt_process_type(nxt_process_t *process);
+void nxt_process_use(nxt_task_t *task, nxt_process_t *process, int i);
+
void nxt_process_close_ports(nxt_task_t *task, nxt_process_t *process);
void nxt_process_connected_port_add(nxt_process_t *process, nxt_port_t *port);
diff --git a/src/nxt_python_wsgi.c b/src/nxt_python_wsgi.c
index a6d5f217..5bb2fb2c 100644
--- a/src/nxt_python_wsgi.c
+++ b/src/nxt_python_wsgi.c
@@ -86,6 +86,7 @@ static PyObject *nxt_py_input_read(nxt_py_input_t *self, PyObject *args);
static PyObject *nxt_py_input_readline(nxt_py_input_t *self, PyObject *args);
static PyObject *nxt_py_input_readlines(nxt_py_input_t *self, PyObject *args);
+static void nxt_python_print_exception(void);
static int nxt_python_write(nxt_python_run_ctx_t *ctx, PyObject *bytes);
struct nxt_python_run_ctx_s {
@@ -130,58 +131,17 @@ static PyMethodDef nxt_py_input_methods[] = {
static PyTypeObject nxt_py_input_type = {
PyVarObject_HEAD_INIT(NULL, 0)
- "unit._input", /* tp_name */
- (int) sizeof(nxt_py_input_t), /* tp_basicsize */
- 0, /* tp_itemsize */
- (destructor) nxt_py_input_dealloc, /* tp_dealloc */
- 0, /* tp_print */
- 0, /* tp_getattr */
- 0, /* tp_setattr */
- 0, /* tp_compare */
- 0, /* tp_repr */
- 0, /* tp_as_number */
- 0, /* tp_as_sequence */
- 0, /* tp_as_mapping */
- 0, /* tp_hash */
- 0, /* tp_call */
- 0, /* tp_str */
- 0, /* tp_getattro */
- 0, /* tp_setattro */
- 0, /* tp_as_buffer */
- Py_TPFLAGS_DEFAULT, /* tp_flags */
- "unit input object.", /* tp_doc */
- 0, /* tp_traverse */
- 0, /* tp_clear */
- 0, /* tp_richcompare */
- 0, /* tp_weaklistoffset */
- 0, /* tp_iter */
- 0, /* tp_iternext */
- nxt_py_input_methods, /* tp_methods */
- 0, /* tp_members */
- 0, /* tp_getset */
- 0, /* tp_base */
- 0, /* tp_dict */
- 0, /* tp_descr_get */
- 0, /* tp_descr_set */
- 0, /* tp_dictoffset */
- 0, /* tp_init */
- 0, /* tp_alloc */
- 0, /* tp_new */
- 0, /* tp_free */
- 0, /* tp_is_gc */
- 0, /* tp_bases */
- 0, /* tp_mro - method resolution order */
- 0, /* tp_cache */
- 0, /* tp_subclasses */
- 0, /* tp_weaklist */
- 0, /* tp_del */
- 0, /* tp_version_tag */
-#if PY_MAJOR_VERSION == 3 && PY_MINOR_VERSION > 3
- 0, /* tp_finalize */
-#endif
+
+ .tp_name = "unit._input",
+ .tp_basicsize = sizeof(nxt_py_input_t),
+ .tp_dealloc = (destructor) nxt_py_input_dealloc,
+ .tp_flags = Py_TPFLAGS_DEFAULT,
+ .tp_doc = "unit input object.",
+ .tp_methods = nxt_py_input_methods,
};
+static PyObject *nxt_py_stderr_flush;
static PyObject *nxt_py_application;
static PyObject *nxt_py_start_resp_obj;
static PyObject *nxt_py_write_obj;
@@ -193,6 +153,7 @@ static wchar_t *nxt_py_home;
static char *nxt_py_home;
#endif
+static PyThreadState *nxt_python_thread_state;
static nxt_python_run_ctx_t *nxt_python_run_ctx;
@@ -279,6 +240,21 @@ nxt_python_init(nxt_task_t *task, nxt_common_app_conf_t *conf)
module = NULL;
+ obj = PySys_GetObject((char *) "stderr");
+ if (nxt_slow_path(obj == NULL)) {
+ nxt_alert(task, "Python failed to get \"sys.stderr\" object");
+ goto fail;
+ }
+
+ nxt_py_stderr_flush = PyObject_GetAttrString(obj, "flush");
+ if (nxt_slow_path(nxt_py_stderr_flush == NULL)) {
+ nxt_alert(task, "Python failed to get \"flush\" attribute of "
+ "\"sys.stderr\" object");
+ goto fail;
+ }
+
+ Py_DECREF(obj);
+
if (c->path.length > 0) {
obj = PyString_FromStringAndSize((char *) c->path.start,
c->path.length);
@@ -349,7 +325,7 @@ nxt_python_init(nxt_task_t *task, nxt_common_app_conf_t *conf)
module = PyImport_ImportModule(nxt_py_module);
if (nxt_slow_path(module == NULL)) {
nxt_alert(task, "Python failed to import module \"%s\"", nxt_py_module);
- PyErr_Print();
+ nxt_python_print_exception();
goto fail;
}
@@ -363,7 +339,6 @@ nxt_python_init(nxt_task_t *task, nxt_common_app_conf_t *conf)
if (nxt_slow_path(PyCallable_Check(obj) == 0)) {
nxt_alert(task, "\"application\" in module \"%s\" "
"is not a callable object", nxt_py_module);
- PyErr_Print();
goto fail;
}
@@ -382,10 +357,14 @@ nxt_python_init(nxt_task_t *task, nxt_common_app_conf_t *conf)
goto fail;
}
+ nxt_python_thread_state = PyEval_SaveThread();
+
rc = nxt_unit_run(unit_ctx);
nxt_unit_done(unit_ctx);
+ PyEval_RestoreThread(nxt_python_thread_state);
+
nxt_python_atexit();
exit(rc);
@@ -407,22 +386,26 @@ static void
nxt_python_request_handler(nxt_unit_request_info_t *req)
{
int rc;
- PyObject *result, *iterator, *item, *args, *environ;
+ PyObject *environ, *args, *response, *iterator, *item;
+ PyObject *close, *result;
nxt_python_run_ctx_t run_ctx = {-1, 0, NULL, req};
+ PyEval_RestoreThread(nxt_python_thread_state);
+
environ = nxt_python_get_environ(&run_ctx);
if (nxt_slow_path(environ == NULL)) {
- nxt_unit_request_done(req, NXT_UNIT_ERROR);
-
- return;
+ rc = NXT_UNIT_ERROR;
+ goto done;
}
args = PyTuple_New(2);
if (nxt_slow_path(args == NULL)) {
+ Py_DECREF(environ);
+
nxt_unit_req_error(req, "Python failed to create arguments tuple");
- nxt_unit_request_done(req, NXT_UNIT_ERROR);
- return;
+ rc = NXT_UNIT_ERROR;
+ goto done;
}
PyTuple_SET_ITEM(args, 0, environ);
@@ -432,103 +415,104 @@ nxt_python_request_handler(nxt_unit_request_info_t *req)
nxt_python_run_ctx = &run_ctx;
- result = PyObject_CallObject(nxt_py_application, args);
+ response = PyObject_CallObject(nxt_py_application, args);
Py_DECREF(args);
- if (nxt_slow_path(result == NULL)) {
+ if (nxt_slow_path(response == NULL)) {
nxt_unit_req_error(req, "Python failed to call the application");
- PyErr_Print();
-
- nxt_unit_request_done(req, NXT_UNIT_ERROR);
- nxt_python_run_ctx = NULL;
+ nxt_python_print_exception();
- return;
+ rc = NXT_UNIT_ERROR;
+ goto done;
}
- item = NULL;
- iterator = NULL;
+ /* Shortcut: avoid iterate over response string symbols. */
+ if (PyBytes_Check(response)) {
+ rc = nxt_python_write(&run_ctx, response);
- /* Shortcut: avoid iterate over result string symbols. */
- if (PyBytes_Check(result)) {
+ } else {
+ iterator = PyObject_GetIter(response);
- rc = nxt_python_write(&run_ctx, result);
- if (nxt_slow_path(rc != NXT_UNIT_OK)) {
- goto fail;
- }
+ if (nxt_fast_path(iterator != NULL)) {
+ rc = NXT_UNIT_OK;
- } else {
- iterator = PyObject_GetIter(result);
+ while (run_ctx.bytes_sent < run_ctx.content_length) {
+ item = PyIter_Next(iterator);
- if (nxt_slow_path(iterator == NULL)) {
- nxt_unit_req_error(req, "the application returned "
- "not an iterable object");
+ if (item == NULL) {
+ if (nxt_slow_path(PyErr_Occurred() != NULL)) {
+ nxt_unit_req_error(req, "Python failed to iterate over "
+ "the application response object");
+ nxt_python_print_exception();
- goto fail;
- }
+ rc = NXT_UNIT_ERROR;
+ }
- while (run_ctx.bytes_sent < run_ctx.content_length
- && (item = PyIter_Next(iterator)))
- {
- if (nxt_slow_path(!PyBytes_Check(item))) {
- nxt_unit_req_error(req, "the application returned "
- "not a bytestring object");
+ break;
+ }
- goto fail;
- }
+ if (nxt_fast_path(PyBytes_Check(item))) {
+ rc = nxt_python_write(&run_ctx, item);
- rc = nxt_python_write(&run_ctx, item);
- if (nxt_slow_path(rc != NXT_UNIT_OK)) {
- goto fail;
- }
+ } else {
+ nxt_unit_req_error(req, "the application returned "
+ "not a bytestring object");
+ rc = NXT_UNIT_ERROR;
+ }
- Py_DECREF(item);
- }
+ Py_DECREF(item);
- Py_DECREF(iterator);
+ if (nxt_slow_path(rc != NXT_UNIT_OK)) {
+ break;
+ }
+ }
- if (PyObject_HasAttrString(result, "close")) {
- PyObject_CallMethod(result, (char *) "close", NULL);
- }
- }
+ Py_DECREF(iterator);
- if (nxt_slow_path(PyErr_Occurred() != NULL)) {
- nxt_unit_req_error(req, "an application error occurred");
- PyErr_Print();
- }
+ } else {
+ nxt_unit_req_error(req,
+ "the application returned not an iterable object");
+ nxt_python_print_exception();
- nxt_unit_request_done(req, NXT_UNIT_OK);
+ rc = NXT_UNIT_ERROR;
+ }
- Py_DECREF(result);
+ close = PyObject_GetAttrString(response, "close");
- nxt_python_run_ctx = NULL;
+ if (close != NULL) {
+ result = PyObject_CallFunction(close, NULL);
+ if (nxt_slow_path(result == NULL)) {
+ nxt_unit_req_error(req, "Python failed to call the close() "
+ "method of the application response");
+ nxt_python_print_exception();
- return;
+ } else {
+ Py_DECREF(result);
+ }
-fail:
+ Py_DECREF(close);
- if (item != NULL) {
- Py_DECREF(item);
+ } else {
+ PyErr_Clear();
+ }
}
- if (iterator != NULL) {
- Py_DECREF(iterator);
- }
+ Py_DECREF(response);
- if (PyObject_HasAttrString(result, "close")) {
- PyObject_CallMethod(result, (char *) "close", NULL);
- }
+done:
- Py_DECREF(result);
- nxt_python_run_ctx = NULL;
+ nxt_python_thread_state = PyEval_SaveThread();
- nxt_unit_request_done(req, NXT_UNIT_ERROR);
+ nxt_python_run_ctx = NULL;
+ nxt_unit_request_done(req, rc);
}
static void
nxt_python_atexit(void)
{
+ Py_XDECREF(nxt_py_stderr_flush);
Py_XDECREF(nxt_py_application);
Py_XDECREF(nxt_py_start_resp_obj);
Py_XDECREF(nxt_py_write_obj);
@@ -767,7 +751,7 @@ nxt_python_add_sptr(nxt_python_run_ctx_t *ctx, const char *name,
nxt_unit_req_error(ctx->req,
"Python failed to create value string \"%.*s\"",
(int) size, src);
- PyErr_Print();
+ nxt_python_print_exception();
return NXT_UNIT_ERROR;
}
@@ -802,7 +786,7 @@ nxt_python_add_str(nxt_python_run_ctx_t *ctx, const char *name,
nxt_unit_req_error(ctx->req,
"Python failed to create value string \"%.*s\"",
(int) size, str);
- PyErr_Print();
+ nxt_python_print_exception();
return NXT_UNIT_ERROR;
}
@@ -1137,6 +1121,28 @@ nxt_py_input_readlines(nxt_py_input_t *self, PyObject *args)
}
+static void
+nxt_python_print_exception(void)
+{
+ PyErr_Print();
+
+#if PY_MAJOR_VERSION == 3
+ /* The backtrace may be buffered in sys.stderr file object. */
+ {
+ PyObject *result;
+
+ result = PyObject_CallFunction(nxt_py_stderr_flush, NULL);
+ if (nxt_slow_path(result == NULL)) {
+ PyErr_Clear();
+ return;
+ }
+
+ Py_DECREF(result);
+ }
+#endif
+}
+
+
static int
nxt_python_write(nxt_python_run_ctx_t *ctx, PyObject *bytes)
{
diff --git a/src/nxt_router.c b/src/nxt_router.c
index 28781600..b9f5d921 100644
--- a/src/nxt_router.c
+++ b/src/nxt_router.c
@@ -537,6 +537,7 @@ nxt_router_msg_cancel(nxt_task_t *task, nxt_msg_info_t *msg_info,
for (b = msg_info->buf; b != NULL; b = next) {
next = b->next;
+ b->next = NULL;
b->completion_handler = msg_info->completion_handler;
@@ -1172,8 +1173,8 @@ nxt_router_conf_error(nxt_task_t *task, nxt_router_temp_conf_t *tmcf)
nxt_queue_each(skcf, &new_socket_confs, nxt_socket_conf_t, link) {
- if (skcf->pass != NULL) {
- nxt_http_pass_cleanup(task, skcf->pass);
+ if (skcf->action != NULL) {
+ nxt_http_action_cleanup(task, skcf->action);
}
} nxt_queue_loop;
@@ -1458,7 +1459,8 @@ nxt_router_conf_create(nxt_task_t *task, nxt_router_temp_conf_t *tmcf,
next = 0;
for ( ;; ) {
- application = nxt_conf_next_object_member(applications, &name, &next);
+ application = nxt_conf_next_object_member(applications,
+ &name, &next);
if (application == NULL) {
break;
}
@@ -1676,10 +1678,16 @@ nxt_router_conf_create(nxt_task_t *task, nxt_router_temp_conf_t *tmcf,
skcf->large_header_buffers = 4;
skcf->body_buffer_size = 16 * 1024;
skcf->max_body_size = 8 * 1024 * 1024;
+ skcf->proxy_header_buffer_size = 64 * 1024;
+ skcf->proxy_buffer_size = 4096;
+ skcf->proxy_buffers = 256;
skcf->idle_timeout = 180 * 1000;
skcf->header_read_timeout = 30 * 1000;
skcf->body_read_timeout = 30 * 1000;
skcf->send_timeout = 30 * 1000;
+ skcf->proxy_timeout = 60 * 1000;
+ skcf->proxy_send_timeout = 30 * 1000;
+ skcf->proxy_read_timeout = 30 * 1000;
skcf->websocket_conf.max_frame_size = 1024 * 1024;
skcf->websocket_conf.read_timeout = 60 * 1000;
@@ -1729,12 +1737,12 @@ nxt_router_conf_create(nxt_task_t *task, nxt_router_temp_conf_t *tmcf,
skcf->router_conf->count++;
if (lscf.pass.length != 0) {
- skcf->pass = nxt_http_pass_create(task, tmcf, &lscf.pass);
+ skcf->action = nxt_http_action_create(task, tmcf, &lscf.pass);
/* COMPATIBILITY: listener application. */
} else if (lscf.application.length > 0) {
- skcf->pass = nxt_http_pass_application(task, tmcf,
- &lscf.application);
+ skcf->action = nxt_http_pass_application(task, tmcf,
+ &lscf.application);
}
}
}
@@ -3070,8 +3078,8 @@ nxt_router_conf_release(nxt_task_t *task, nxt_socket_conf_joint_t *joint)
nxt_thread_spin_unlock(lock);
if (skcf != NULL) {
- if (skcf->pass != NULL) {
- nxt_http_pass_cleanup(task, skcf->pass);
+ if (skcf->action != NULL) {
+ nxt_http_action_cleanup(task, skcf->action);
}
#if (NXT_TLS)
@@ -3497,7 +3505,7 @@ nxt_router_response_ready_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg,
void *data)
{
nxt_int_t ret;
- nxt_buf_t *b;
+ nxt_buf_t *b, *next;
nxt_port_t *app_port;
nxt_unit_field_t *f;
nxt_http_field_t *field;
@@ -3580,6 +3588,7 @@ nxt_router_response_ready_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg,
field->hash = f->hash;
field->skip = 0;
+ field->hopbyhop = 0;
field->name_length = f->name_length;
field->value_length = f->value_length;
@@ -3612,17 +3621,20 @@ nxt_router_response_ready_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg,
}
if (nxt_buf_mem_used_size(&b->mem) == 0) {
+ next = b->next;
+ b->next = NULL;
+
nxt_work_queue_add(&task->thread->engine->fast_work_queue,
b->completion_handler, task, b, b->parent);
- b = b->next;
+ b = next;
}
if (b != NULL) {
nxt_buf_chain_add(&r->out, b);
}
- nxt_http_request_header_send(task, r, nxt_http_request_send_body);
+ nxt_http_request_header_send(task, r, nxt_http_request_send_body, NULL);
if (r->websocket_handshake
&& r->status == NXT_HTTP_SWITCHING_PROTOCOLS)
@@ -5056,6 +5068,7 @@ nxt_router_prepare_msg(nxt_task_t *task, nxt_http_request_t *r,
if (nxt_slow_path(buf == NULL)) {
while (out != NULL) {
buf = out->next;
+ out->next = NULL;
out->completion_handler(task, out, out->parent);
out = buf;
}
diff --git a/src/nxt_router.h b/src/nxt_router.h
index ec18ff48..1517c14b 100644
--- a/src/nxt_router.h
+++ b/src/nxt_router.h
@@ -16,12 +16,12 @@ typedef struct nxt_http_request_s nxt_http_request_t;
#include <nxt_application.h>
-typedef struct nxt_http_pass_s nxt_http_pass_t;
+typedef struct nxt_http_action_s nxt_http_action_t;
typedef struct nxt_http_routes_s nxt_http_routes_t;
typedef struct nxt_router_access_log_s nxt_router_access_log_t;
-#define NXT_HTTP_PASS_ERROR ((nxt_http_pass_t *) -1)
+#define NXT_HTTP_ACTION_ERROR ((nxt_http_action_t *) -1)
typedef struct {
@@ -154,7 +154,7 @@ typedef struct {
nxt_queue_link_t link;
nxt_router_conf_t *router_conf;
- nxt_http_pass_t *pass;
+ nxt_http_action_t *action;
/*
* A listen socket time can be shorter than socket configuration life
@@ -170,10 +170,17 @@ typedef struct {
size_t large_header_buffers;
size_t body_buffer_size;
size_t max_body_size;
+ size_t proxy_header_buffer_size;
+ size_t proxy_buffer_size;
+ size_t proxy_buffers;
+
nxt_msec_t idle_timeout;
nxt_msec_t header_read_timeout;
nxt_msec_t body_read_timeout;
nxt_msec_t send_timeout;
+ nxt_msec_t proxy_timeout;
+ nxt_msec_t proxy_send_timeout;
+ nxt_msec_t proxy_read_timeout;
nxt_websocket_conf_t websocket_conf;
diff --git a/src/nxt_runtime.c b/src/nxt_runtime.c
index de41ba4d..096aabc4 100644
--- a/src/nxt_runtime.c
+++ b/src/nxt_runtime.c
@@ -37,10 +37,10 @@ static void nxt_runtime_thread_pool_destroy(nxt_task_t *task, nxt_runtime_t *rt,
static void nxt_runtime_thread_pool_init(void);
static void nxt_runtime_thread_pool_exit(nxt_task_t *task, void *obj,
void *data);
-static void nxt_runtime_process_destroy(nxt_runtime_t *rt,
+static nxt_process_t *nxt_runtime_process_get(nxt_runtime_t *rt, nxt_pid_t pid);
+static void nxt_runtime_process_remove(nxt_runtime_t *rt,
nxt_process_t *process);
-static nxt_process_t *nxt_runtime_process_remove_pid(nxt_runtime_t *rt,
- nxt_pid_t pid);
+static void nxt_runtime_port_add(nxt_task_t *task, nxt_port_t *port);
nxt_int_t
@@ -459,7 +459,7 @@ nxt_runtime_close_idle_connections(nxt_event_engine_t *engine)
idle = &engine->idle_connections;
- for (link = nxt_queue_head(idle);
+ for (link = nxt_queue_first(idle);
link != nxt_queue_tail(idle);
link = next)
{
@@ -658,6 +658,8 @@ nxt_runtime_thread_pool_exit(nxt_task_t *task, void *obj, void *data)
if (tp == thread_pools[i]) {
nxt_array_remove(rt->thread_pools, &thread_pools[i]);
+ nxt_free(tp);
+
if (n == 1) {
/* The last thread pool. */
rt->continuation(task, 0);
@@ -1296,11 +1298,15 @@ nxt_runtime_process_new(nxt_runtime_t *rt)
}
-static void
-nxt_runtime_process_destroy(nxt_runtime_t *rt, nxt_process_t *process)
+void
+nxt_runtime_process_release(nxt_runtime_t *rt, nxt_process_t *process)
{
nxt_port_t *port;
+ if (process->registered == 1) {
+ nxt_runtime_process_remove(rt, process);
+ }
+
nxt_assert(process->use_count == 0);
nxt_assert(process->registered == 0);
@@ -1316,6 +1322,10 @@ nxt_runtime_process_destroy(nxt_runtime_t *rt, nxt_process_t *process)
nxt_thread_mutex_destroy(&process->outgoing.mutex);
nxt_thread_mutex_destroy(&process->cp_mutex);
+ if (process->init != NULL) {
+ nxt_mp_free(rt->mem_pool, process->init);
+ }
+
nxt_mp_free(rt->mem_pool, process);
}
@@ -1379,7 +1389,7 @@ nxt_runtime_process_find(nxt_runtime_t *rt, nxt_pid_t pid)
}
-nxt_process_t *
+static nxt_process_t *
nxt_runtime_process_get(nxt_runtime_t *rt, nxt_pid_t pid)
{
nxt_process_t *process;
@@ -1489,13 +1499,13 @@ nxt_runtime_process_add(nxt_task_t *task, nxt_process_t *process)
}
-static nxt_process_t *
-nxt_runtime_process_remove_pid(nxt_runtime_t *rt, nxt_pid_t pid)
+static void
+nxt_runtime_process_remove(nxt_runtime_t *rt, nxt_process_t *process)
{
- nxt_process_t *process;
+ nxt_pid_t pid;
nxt_lvlhsh_query_t lhq;
- process = NULL;
+ pid = process->pid;
nxt_runtime_process_lhq_pid(&lhq, &pid);
@@ -1521,40 +1531,49 @@ nxt_runtime_process_remove_pid(nxt_runtime_t *rt, nxt_pid_t pid)
}
nxt_thread_mutex_unlock(&rt->processes_mutex);
-
- return process;
}
-void
-nxt_process_use(nxt_task_t *task, nxt_process_t *process, int i)
+nxt_process_t *
+nxt_runtime_process_first(nxt_runtime_t *rt, nxt_lvlhsh_each_t *lhe)
{
- nxt_runtime_t *rt;
+ nxt_lvlhsh_each_init(lhe, &lvlhsh_processes_proto);
+
+ return nxt_runtime_process_next(rt, lhe);
+}
- process->use_count += i;
- if (process->use_count == 0) {
- rt = task->thread->runtime;
+nxt_port_t *
+nxt_runtime_process_port_create(nxt_task_t *task, nxt_runtime_t *rt,
+ nxt_pid_t pid, nxt_port_id_t id, nxt_process_type_t type)
+{
+ nxt_port_t *port;
+ nxt_process_t *process;
- if (process->registered == 1) {
- nxt_runtime_process_remove_pid(rt, process->pid);
- }
+ process = nxt_runtime_process_get(rt, pid);
+ if (nxt_slow_path(process == NULL)) {
+ return NULL;
+ }
- nxt_runtime_process_destroy(rt, process);
+ port = nxt_port_new(task, id, pid, type);
+ if (nxt_slow_path(port == NULL)) {
+ nxt_process_use(task, process, -1);
+ return NULL;
}
-}
+ nxt_process_port_add(task, process, port);
-nxt_process_t *
-nxt_runtime_process_first(nxt_runtime_t *rt, nxt_lvlhsh_each_t *lhe)
-{
- nxt_lvlhsh_each_init(lhe, &lvlhsh_processes_proto);
+ nxt_process_use(task, process, -1);
- return nxt_runtime_process_next(rt, lhe);
+ nxt_runtime_port_add(task, port);
+
+ nxt_port_use(task, port, -1);
+
+ return port;
}
-void
+static void
nxt_runtime_port_add(nxt_task_t *task, nxt_port_t *port)
{
nxt_int_t res;
diff --git a/src/nxt_runtime.h b/src/nxt_runtime.h
index 0791f8e7..d5b340b6 100644
--- a/src/nxt_runtime.h
+++ b/src/nxt_runtime.h
@@ -93,22 +93,20 @@ nxt_int_t nxt_runtime_thread_pool_create(nxt_thread_t *thr, nxt_runtime_t *rt,
nxt_process_t *nxt_runtime_process_new(nxt_runtime_t *rt);
-nxt_process_t *nxt_runtime_process_get(nxt_runtime_t *rt, nxt_pid_t pid);
-
void nxt_runtime_process_add(nxt_task_t *task, nxt_process_t *process);
nxt_process_t *nxt_runtime_process_find(nxt_runtime_t *rt, nxt_pid_t pid);
-void nxt_process_use(nxt_task_t *task, nxt_process_t *process, int i);
-
nxt_process_t *nxt_runtime_process_first(nxt_runtime_t *rt,
nxt_lvlhsh_each_t *lhe);
+void nxt_runtime_process_release(nxt_runtime_t *rt, nxt_process_t *process);
+
#define nxt_runtime_process_next(rt, lhe) \
nxt_lvlhsh_each(&rt->processes, lhe)
-
-void nxt_runtime_port_add(nxt_task_t *task, nxt_port_t *port);
+nxt_port_t *nxt_runtime_process_port_create(nxt_task_t *task, nxt_runtime_t *rt,
+ nxt_pid_t pid, nxt_port_id_t id, nxt_process_type_t type);
void nxt_runtime_port_remove(nxt_task_t *task, nxt_port_t *port);
diff --git a/src/nxt_sendbuf.c b/src/nxt_sendbuf.c
index 16fe4724..94f8e9eb 100644
--- a/src/nxt_sendbuf.c
+++ b/src/nxt_sendbuf.c
@@ -9,6 +9,8 @@
static nxt_bool_t nxt_sendbuf_copy(nxt_buf_mem_t *bm, nxt_buf_t *b,
size_t *copied);
+static nxt_buf_t *nxt_sendbuf_coalesce_completion(nxt_task_t *task,
+ nxt_work_queue_t *wq, nxt_buf_t *start);
nxt_uint_t
@@ -380,15 +382,11 @@ nxt_sendbuf_completion(nxt_task_t *task, nxt_work_queue_t *wq, nxt_buf_t *b)
{
while (b != NULL) {
- nxt_prefetch(b->next);
-
if (!nxt_buf_is_sync(b) && nxt_buf_used_size(b) != 0) {
break;
}
- nxt_work_queue_add(wq, b->completion_handler, task, b, b->parent);
-
- b = b->next;
+ b = nxt_sendbuf_coalesce_completion(task, wq, b);
}
return b;
@@ -399,10 +397,49 @@ void
nxt_sendbuf_drain(nxt_task_t *task, nxt_work_queue_t *wq, nxt_buf_t *b)
{
while (b != NULL) {
- nxt_prefetch(b->next);
+ b = nxt_sendbuf_coalesce_completion(task, wq, b);
+ }
+}
- nxt_work_queue_add(wq, b->completion_handler, task, b, b->parent);
- b = b->next;
+static nxt_buf_t *
+nxt_sendbuf_coalesce_completion(nxt_task_t *task, nxt_work_queue_t *wq,
+ nxt_buf_t *start)
+{
+ nxt_buf_t *b, *next, **last, *rest, **last_rest;
+ nxt_work_handler_t handler;
+
+ rest = NULL;
+ last_rest = &rest;
+ last = &start->next;
+ b = start;
+ handler = b->completion_handler;
+
+ for ( ;; ) {
+ next = b->next;
+ if (next == NULL) {
+ break;
+ }
+
+ b->next = NULL;
+ b = next;
+
+ if (!nxt_buf_is_sync(b) && nxt_buf_used_size(b) != 0) {
+ *last_rest = b;
+ break;
+ }
+
+ if (handler == b->completion_handler) {
+ *last = b;
+ last = &b->next;
+
+ } else {
+ *last_rest = b;
+ last_rest = &b->next;
+ }
}
+
+ nxt_work_queue_add(wq, handler, task, start, start->parent);
+
+ return rest;
}
diff --git a/src/nxt_sockaddr.c b/src/nxt_sockaddr.c
index 99cf54b4..57dfbfa6 100644
--- a/src/nxt_sockaddr.c
+++ b/src/nxt_sockaddr.c
@@ -23,11 +23,11 @@ static nxt_int_t nxt_job_sockaddr_inet_parse(nxt_job_sockaddr_parse_t *jbs);
nxt_sockaddr_t *
nxt_sockaddr_cache_alloc(nxt_event_engine_t *engine, nxt_listen_socket_t *ls)
{
- uint8_t hint;
size_t size;
+ uint8_t hint;
nxt_sockaddr_t *sa;
- hint = (uint8_t) -1;
+ hint = NXT_EVENT_ENGINE_NO_MEM_HINT;
size = offsetof(nxt_sockaddr_t, u) + ls->socklen + ls->address_length;
sa = nxt_event_engine_mem_alloc(engine, &hint, size);
@@ -56,7 +56,11 @@ nxt_sockaddr_cache_alloc(nxt_event_engine_t *engine, nxt_listen_socket_t *ls)
void
nxt_sockaddr_cache_free(nxt_event_engine_t *engine, nxt_conn_t *c)
{
- nxt_event_engine_mem_free(engine, &c->remote->cache_hint, c->remote);
+ nxt_sockaddr_t *sa;
+
+ sa = c->remote;
+
+ nxt_event_engine_mem_free(engine, sa->cache_hint, sa, 0);
}
diff --git a/src/nxt_socket.c b/src/nxt_socket.c
index 95a298d8..2a809184 100644
--- a/src/nxt_socket.c
+++ b/src/nxt_socket.c
@@ -300,6 +300,28 @@ nxt_socket_shutdown(nxt_task_t *task, nxt_socket_t s, nxt_uint_t how)
}
+nxt_err_t
+nxt_socket_error(nxt_socket_t s)
+{
+ int ret, err;
+ socklen_t len;
+
+ err = 0;
+ len = sizeof(int);
+ /*
+ * Linux and BSDs return 0 and store a pending error in the err argument;
+ * Solaris returns -1 and sets the errno.
+ */
+ ret = getsockopt(s, SOL_SOCKET, SO_ERROR, (void *) &err, &len);
+
+ if (nxt_slow_path(ret == -1)) {
+ err = nxt_errno;
+ }
+
+ return err;
+}
+
+
nxt_uint_t
nxt_socket_error_level(nxt_err_t err)
{
@@ -315,6 +337,9 @@ nxt_socket_error_level(nxt_err_t err)
case NXT_EHOSTUNREACH:
return NXT_LOG_INFO;
+ case NXT_ECONNREFUSED:
+ return NXT_LOG_ERR;
+
default:
return NXT_LOG_ALERT;
}
diff --git a/src/nxt_socket.h b/src/nxt_socket.h
index 3f00648d..6a450f83 100644
--- a/src/nxt_socket.h
+++ b/src/nxt_socket.h
@@ -106,6 +106,7 @@ NXT_EXPORT nxt_int_t nxt_socket_connect(nxt_task_t *task, nxt_socket_t s,
nxt_sockaddr_t *sa);
NXT_EXPORT void nxt_socket_shutdown(nxt_task_t *task, nxt_socket_t s,
nxt_uint_t how);
+nxt_err_t nxt_socket_error(nxt_socket_t s);
nxt_uint_t nxt_socket_error_level(nxt_err_t err);
NXT_EXPORT nxt_int_t nxt_socketpair_create(nxt_task_t *task,
diff --git a/src/nxt_string.c b/src/nxt_string.c
index b89e9555..d567883f 100644
--- a/src/nxt_string.c
+++ b/src/nxt_string.c
@@ -188,10 +188,14 @@ nxt_strncasecmp(const u_char *s1, const u_char *s2, size_t length)
nxt_int_t
-nxt_memcasecmp(const u_char *s1, const u_char *s2, size_t length)
+nxt_memcasecmp(const void *p1, const void *p2, size_t length)
{
- u_char c1, c2;
- nxt_int_t n;
+ u_char c1, c2;
+ nxt_int_t n;
+ const u_char *s1, *s2;
+
+ s1 = p1;
+ s2 = p2;
while (length-- != 0) {
c1 = *s1++;
diff --git a/src/nxt_string.h b/src/nxt_string.h
index 8d7b3b73..de498048 100644
--- a/src/nxt_string.h
+++ b/src/nxt_string.h
@@ -87,7 +87,7 @@ NXT_EXPORT u_char *nxt_cpystrn(u_char *dst, const u_char *src, size_t length);
NXT_EXPORT nxt_int_t nxt_strcasecmp(const u_char *s1, const u_char *s2);
NXT_EXPORT nxt_int_t nxt_strncasecmp(const u_char *s1, const u_char *s2,
size_t length);
-NXT_EXPORT nxt_int_t nxt_memcasecmp(const u_char *s1, const u_char *s2,
+NXT_EXPORT nxt_int_t nxt_memcasecmp(const void *p1, const void *p2,
size_t length);
NXT_EXPORT u_char *nxt_memstrn(const u_char *s, const u_char *end,
diff --git a/src/nxt_unit.c b/src/nxt_unit.c
index 9ccd1fd9..0cf32916 100644
--- a/src/nxt_unit.c
+++ b/src/nxt_unit.c
@@ -1316,8 +1316,12 @@ nxt_unit_response_init(nxt_unit_request_info_t *req,
nxt_unit_req_debug(req, "duplicate response init");
}
+ /*
+ * Each field name and value 0-terminated by libunit,
+ * this is the reason of '+ 2' below.
+ */
buf_size = sizeof(nxt_unit_response_t)
- + max_fields_count * sizeof(nxt_unit_field_t)
+ + max_fields_count * (sizeof(nxt_unit_field_t) + 2)
+ max_fields_size;
if (nxt_slow_path(req->response_buf != NULL)) {
@@ -1391,8 +1395,12 @@ nxt_unit_response_realloc(nxt_unit_request_info_t *req,
return NXT_UNIT_ERROR;
}
+ /*
+ * Each field name and value 0-terminated by libunit,
+ * this is the reason of '+ 2' below.
+ */
buf_size = sizeof(nxt_unit_response_t)
- + max_fields_count * sizeof(nxt_unit_field_t)
+ + max_fields_count * (sizeof(nxt_unit_field_t) + 2)
+ max_fields_size;
nxt_unit_req_debug(req, "realloc %"PRIu32"", buf_size);
@@ -1458,7 +1466,8 @@ nxt_unit_response_realloc(nxt_unit_request_info_t *req,
goto fail;
}
- resp->piggyback_content_length = req->response->piggyback_content_length;
+ resp->piggyback_content_length =
+ req->response->piggyback_content_length;
nxt_unit_sptr_set(&resp->piggyback_content, p);
p = nxt_cpymem(p, nxt_unit_sptr_get(&req->response->piggyback_content),
@@ -1953,7 +1962,8 @@ nxt_unit_mmap_buf_send(nxt_unit_ctx_t *ctx, uint32_t stream,
if (hdr != NULL) {
m.mmap_msg.mmap_id = hdr->id;
- m.mmap_msg.chunk_id = nxt_port_mmap_chunk_id(hdr, (u_char *) buf->start);
+ m.mmap_msg.chunk_id = nxt_port_mmap_chunk_id(hdr,
+ (u_char *) buf->start);
}
nxt_unit_debug(ctx, "#%"PRIu32": send mmap: (%d,%d,%d)",
diff --git a/src/nxt_unit_field.h b/src/nxt_unit_field.h
index d19db0f0..b07d3046 100644
--- a/src/nxt_unit_field.h
+++ b/src/nxt_unit_field.h
@@ -21,7 +21,8 @@ enum {
/* Name and Value field aka HTTP header. */
struct nxt_unit_field_s {
uint16_t hash;
- uint8_t skip;
+ uint8_t skip:1;
+ uint8_t hopbyhop:1;
uint8_t name_length;
uint32_t value_length;
diff --git a/src/nxt_websocket.c b/src/nxt_websocket.c
index 9a099760..91002237 100644
--- a/src/nxt_websocket.c
+++ b/src/nxt_websocket.c
@@ -19,7 +19,7 @@ nxt_inline void
nxt_hton16(uint8_t *b, uint16_t v)
{
b[0] = (v >> 8);
- b[1] = (v & 0xFFu);
+ b[1] = (v & 0xFFu);
}
diff --git a/src/ruby/nxt_ruby.c b/src/ruby/nxt_ruby.c
index ab9f7020..45d7a7aa 100644
--- a/src/ruby/nxt_ruby.c
+++ b/src/ruby/nxt_ruby.c
@@ -85,14 +85,16 @@ static nxt_int_t
nxt_ruby_init(nxt_task_t *task, nxt_common_app_conf_t *conf)
{
int state, rc;
- VALUE dummy, res;
+ VALUE res;
nxt_unit_ctx_t *unit_ctx;
nxt_unit_init_t ruby_unit_init;
nxt_ruby_rack_init_t rack_init;
+ static char *argv[2] = { (char *) "NGINX_Unit", (char *) "-e0" };
+
+ RUBY_INIT_STACK
ruby_init();
- Init_stack(&dummy);
- ruby_init_loadpath();
+ ruby_options(2, argv);
ruby_script("NGINX_Unit");
rack_init.task = task;
@@ -707,7 +709,8 @@ nxt_ruby_rack_result_body(VALUE result)
}
} else if (rb_respond_to(body, rb_intern("each"))) {
- rb_iterate(rb_each, body, nxt_ruby_rack_result_body_each, 0);
+ rb_block_call(body, rb_intern("each"), 0, 0,
+ nxt_ruby_rack_result_body_each, 0);
} else {
nxt_unit_req_error(nxt_ruby_run_ctx.req,
diff --git a/src/ruby/nxt_ruby_stream_io.c b/src/ruby/nxt_ruby_stream_io.c
index 3f6cac89..fcfcf5dd 100644
--- a/src/ruby/nxt_ruby_stream_io.c
+++ b/src/ruby/nxt_ruby_stream_io.c
@@ -31,7 +31,8 @@ nxt_ruby_stream_io_input_init(void)
rb_gc_register_address(&stream_io);
rb_define_singleton_method(stream_io, "new", nxt_ruby_stream_io_new, 1);
- rb_define_method(stream_io, "initialize", nxt_ruby_stream_io_initialize, -1);
+ rb_define_method(stream_io, "initialize",
+ nxt_ruby_stream_io_initialize, -1);
rb_define_method(stream_io, "gets", nxt_ruby_stream_io_gets, 0);
rb_define_method(stream_io, "each", nxt_ruby_stream_io_each, 0);
rb_define_method(stream_io, "read", nxt_ruby_stream_io_read, -2);
@@ -51,7 +52,8 @@ nxt_ruby_stream_io_error_init(void)
rb_gc_register_address(&stream_io);
rb_define_singleton_method(stream_io, "new", nxt_ruby_stream_io_new, 1);
- rb_define_method(stream_io, "initialize", nxt_ruby_stream_io_initialize, -1);
+ rb_define_method(stream_io, "initialize",
+ nxt_ruby_stream_io_initialize, -1);
rb_define_method(stream_io, "puts", nxt_ruby_stream_io_puts, -2);
rb_define_method(stream_io, "write", nxt_ruby_stream_io_write, -2);
rb_define_method(stream_io, "flush", nxt_ruby_stream_io_flush, 0);
diff --git a/src/test/nxt_unit_websocket_chat.c b/src/test/nxt_unit_websocket_chat.c
index ecc9a243..6e274722 100644
--- a/src/test/nxt_unit_websocket_chat.c
+++ b/src/test/nxt_unit_websocket_chat.c
@@ -104,10 +104,10 @@ ws_chat_root(nxt_unit_request_info_t *req)
rc = nxt_unit_response_init(req, 200 /* Status code. */,
2 /* Number of response headers. */,
- nxt_length(CONTENT_TYPE) + 1
- + nxt_length(TEXT_HTML) + 1
- + nxt_length(CONTENT_LENGTH) + 1
- + ws_chat_index_content_length_size + 1
+ nxt_length(CONTENT_TYPE)
+ + nxt_length(TEXT_HTML)
+ + nxt_length(CONTENT_LENGTH)
+ + ws_chat_index_content_length_size
+ ws_chat_index_html_size);
if (nxt_slow_path(rc != NXT_UNIT_OK)) {
return rc;