summaryrefslogtreecommitdiffhomepage
path: root/src
diff options
context:
space:
mode:
authorIgor Sysoev <igor@sysoev.ru>2017-02-22 15:09:59 +0300
committerIgor Sysoev <igor@sysoev.ru>2017-02-22 15:09:59 +0300
commit029942f4eb7196c2cff0d0e26bc6ff274138f7d8 (patch)
treef4686c4d7b9cd574fe94c6f4918479a580fecf75 /src
parent059a8642898a6bd4b47d13a1c1d599cd44af7e1c (diff)
downloadunit-029942f4eb7196c2cff0d0e26bc6ff274138f7d8.tar.gz
unit-029942f4eb7196c2cff0d0e26bc6ff274138f7d8.tar.bz2
I/O operations refactoring.
Diffstat (limited to 'src')
-rw-r--r--src/nxt_aix_send_file.c24
-rw-r--r--src/nxt_application.c37
-rw-r--r--src/nxt_cycle.c147
-rw-r--r--src/nxt_cycle.h6
-rw-r--r--src/nxt_epoll_engine.c18
-rw-r--r--src/nxt_event_conn.c12
-rw-r--r--src/nxt_event_conn.h69
-rw-r--r--src/nxt_event_conn_accept.c29
-rw-r--r--src/nxt_event_conn_connect.c49
-rw-r--r--src/nxt_event_conn_job_sendfile.c10
-rw-r--r--src/nxt_event_conn_proxy.c99
-rw-r--r--src/nxt_event_conn_read.c109
-rw-r--r--src/nxt_event_conn_write.c309
-rw-r--r--src/nxt_event_engine.c20
-rw-r--r--src/nxt_event_engine.h2
-rw-r--r--src/nxt_fd_event.h8
-rw-r--r--src/nxt_file.c48
-rw-r--r--src/nxt_file.h10
-rw-r--r--src/nxt_freebsd_sendfile.c20
-rw-r--r--src/nxt_hpux_sendfile.c19
-rw-r--r--src/nxt_job_resolve.c5
-rw-r--r--src/nxt_kqueue_engine.c13
-rw-r--r--src/nxt_linux_sendfile.c41
-rw-r--r--src/nxt_listen_socket.c38
-rw-r--r--src/nxt_listen_socket.h14
-rw-r--r--src/nxt_macosx_sendfile.c255
-rw-r--r--src/nxt_main.h1
-rw-r--r--src/nxt_master_process.c4
-rw-r--r--src/nxt_openssl.c25
-rw-r--r--src/nxt_port.c2
-rw-r--r--src/nxt_port_socket.c46
-rw-r--r--src/nxt_port_socket.h6
-rw-r--r--src/nxt_sendbuf.c71
-rw-r--r--src/nxt_sendbuf.h17
-rw-r--r--src/nxt_sockaddr.c181
-rw-r--r--src/nxt_sockaddr.h137
-rw-r--r--src/nxt_socket.c108
-rw-r--r--src/nxt_socket.h55
-rw-r--r--src/nxt_socketpair.c18
-rw-r--r--src/nxt_solaris_sendfilev.c11
-rw-r--r--src/nxt_stream_module.c130
-rw-r--r--src/nxt_upstream.h5
-rw-r--r--src/nxt_upstream_round_robin.c38
-rw-r--r--src/nxt_work_queue.h4
-rw-r--r--src/nxt_worker_process.c6
45 files changed, 1141 insertions, 1135 deletions
diff --git a/src/nxt_aix_send_file.c b/src/nxt_aix_send_file.c
index b7cb3b28..6462efd5 100644
--- a/src/nxt_aix_send_file.c
+++ b/src/nxt_aix_send_file.c
@@ -32,7 +32,7 @@ nxt_aix_event_conn_io_send_file(nxt_event_conn_t *c, nxt_buf_t *b, size_t limit)
sb.size = 0;
sb.limit = limit;
- nhd = nxt_sendbuf_mem_coalesce(&sb);
+ nhd = nxt_sendbuf_mem_coalesce(c->socket.task, &sb);
if (nhd == 0 && sb.sync) {
return 0;
@@ -53,7 +53,7 @@ nxt_aix_event_conn_io_send_file(nxt_event_conn_t *c, nxt_buf_t *b, size_t limit)
sb.iobuf = &tr;
sb.nmax = 1;
- ntr = nxt_sendbuf_mem_coalesce(&sb);
+ ntr = nxt_sendbuf_mem_coalesce(c->socket.task, &sb);
nxt_memzero(&sfp, sizeof(struct sf_parms));
@@ -71,17 +71,16 @@ nxt_aix_event_conn_io_send_file(nxt_event_conn_t *c, nxt_buf_t *b, size_t limit)
sfp.trailer_length = tr.iov_len;
}
- nxt_log_debug(c->socket.log, "send_file(%d) fd:%FD @%O:%O hd:%ui tr:%ui",
- c->socket.fd, fb->file->fd, fb->file_pos, file_size,
- nhd, ntr);
+ nxt_debug(c->socket.task, "send_file(%d) fd:%FD @%O:%O hd:%ui tr:%ui",
+ c->socket.fd, fb->file->fd, fb->file_pos, file_size, nhd, ntr);
n = send_file(&c->socket.fd, &sfp, 0);
err = (n == -1) ? nxt_errno : 0;
sent = sfp.bytes_sent;
- nxt_log_debug(c->socket.log, "send_file(%d): %d sent:%O",
- c->socket.fd, n, sent);
+ nxt_debug(c->socket.task, "send_file(%d): %d sent:%O",
+ c->socket.fd, n, sent);
/*
* -1 an error has occurred, errno contains the error code;
@@ -102,16 +101,15 @@ nxt_aix_event_conn_io_send_file(nxt_event_conn_t *c, nxt_buf_t *b, size_t limit)
default:
c->socket.error = err;
- nxt_log_error(nxt_socket_error_level(err, c->socket.log_error),
- c->socket.log, "send_file(%d) failed %E \"%FN\" "
- "fd:%FD @%O:%O hd:%ui tr:%ui", c->socket.fd, err,
- fb->file->name, fb->file->fd, fb->file_pos,
- file_size, nhd, ntr);
+ nxt_log(c->socket.task, nxt_socket_error_level(err),
+ "send_file(%d) failed %E \"%FN\" fd:%FD @%O:%O hd:%ui tr:%ui",
+ c->socket.fd, err, fb->file->name, fb->file->fd, fb->file_pos,
+ file_size, nhd, ntr);
return NXT_ERROR;
}
- nxt_log_debug(c->socket.log, "sendfile() %E", err);
+ nxt_debug(c->socket.task, "sendfile() %E", err);
return sent;
}
diff --git a/src/nxt_application.c b/src/nxt_application.c
index 64886cf9..e0e5ffc7 100644
--- a/src/nxt_application.c
+++ b/src/nxt_application.c
@@ -20,7 +20,6 @@ static void nxt_app_conn_update(nxt_thread_t *thr, nxt_event_conn_t *c,
nxt_log_t *log);
static nxt_int_t nxt_app_write_finish(nxt_app_request_t *r);
static void nxt_app_buf_send(nxt_event_conn_t *c, nxt_buf_t *out);
-static void nxt_app_buf_completion(nxt_task_t *task, void *obj, void *data);
static void nxt_app_delivery_handler(nxt_task_t *task, void *obj, void *data);
static void nxt_app_delivery_ready(nxt_task_t *task, void *obj, void *data);
static void nxt_app_delivery_completion(nxt_task_t *task, void *obj,
@@ -683,7 +682,7 @@ nxt_app_write(nxt_app_request_t *r, const u_char *data, size_t length)
nxt_buf_mem_init(b, start, 4096);
- b->completion_handler = nxt_app_buf_completion;
+ b->completion_handler = NULL;
nxt_app_buf_current_number++;
}
@@ -713,7 +712,7 @@ nxt_app_write_finish(nxt_app_request_t *r)
return NXT_ERROR;
}
- b->completion_handler = nxt_app_buf_completion;
+ b->completion_handler = NULL;
b->parent = (nxt_buf_t *) r;
out = r->output_buf;
@@ -746,20 +745,6 @@ nxt_app_buf_send(nxt_event_conn_t *c, nxt_buf_t *out)
static void
-nxt_app_buf_completion(nxt_task_t *task, void *obj, void *data)
-{
- nxt_buf_t *b;
-
- b = obj;
-
- nxt_debug(task, "app buf completion");
-
- b->next = nxt_app_buf_done;
- nxt_app_buf_done = b;
-}
-
-
-static void
nxt_app_delivery_handler(nxt_task_t *task, void *obj, void *data)
{
nxt_buf_t *b;
@@ -797,14 +782,14 @@ nxt_app_delivery_handler(nxt_task_t *task, void *obj, void *data)
c->write = b;
c->write_state = &nxt_app_delivery_write_state;
- nxt_event_conn_write(task, c);
+ nxt_event_conn_write(task->thread->engine, c);
}
static const nxt_event_conn_state_t nxt_app_delivery_write_state
nxt_aligned(64) =
{
- NXT_EVENT_BUF_PROCESS,
+ NXT_EVENT_NO_BUF_PROCESS,
NXT_EVENT_TIMER_AUTORESET,
nxt_app_delivery_ready,
@@ -820,12 +805,26 @@ static const nxt_event_conn_state_t nxt_app_delivery_write_state
static void
nxt_app_delivery_ready(nxt_task_t *task, void *obj, void *data)
{
+ nxt_buf_t *b, *next;
nxt_event_conn_t *c;
c = obj;
nxt_debug(task, "app delivery ready");
+ for (b = c->write; b != NULL; b = next) {
+
+ if (nxt_buf_is_mem(b)) {
+ if (b->mem.pos != b->mem.free) {
+ break;
+ }
+ }
+
+ next = b->next;
+ b->next = nxt_app_buf_done;
+ nxt_app_buf_done = b;
+ }
+
nxt_work_queue_add(c->write_work_queue,
nxt_app_delivery_completion, task, c, NULL);
}
diff --git a/src/nxt_cycle.c b/src/nxt_cycle.c
index a39bb02a..81ad8ce2 100644
--- a/src/nxt_cycle.c
+++ b/src/nxt_cycle.c
@@ -11,9 +11,9 @@
#include <nxt_master_process.h>
-static nxt_int_t nxt_cycle_inherited_listen_sockets(nxt_thread_t *thr,
+static nxt_int_t nxt_cycle_inherited_listen_sockets(nxt_task_t *task,
nxt_cycle_t *cycle);
-static nxt_int_t nxt_cycle_systemd_listen_sockets(nxt_thread_t *thr,
+static nxt_int_t nxt_cycle_systemd_listen_sockets(nxt_task_t *task,
nxt_cycle_t *cycle);
static nxt_int_t nxt_cycle_event_engines(nxt_thread_t *thr, nxt_cycle_t *cycle);
static nxt_int_t nxt_cycle_processes(nxt_cycle_t *cycle);
@@ -39,11 +39,14 @@ static nxt_sockaddr_t *nxt_cycle_sockaddr_inet_parse(nxt_str_t *addr,
static nxt_int_t nxt_cycle_conf_apply(nxt_thread_t *thr, nxt_task_t *task,
nxt_cycle_t *cycle);
static nxt_int_t nxt_cycle_listen_socket(nxt_cycle_t *cycle);
+static nxt_int_t nxt_cycle_app_listen_socket(nxt_cycle_t *cycle);
static nxt_int_t nxt_cycle_hostname(nxt_thread_t *thr, nxt_cycle_t *cycle);
static nxt_int_t nxt_cycle_log_files_init(nxt_cycle_t *cycle);
static nxt_int_t nxt_cycle_log_files_create(nxt_cycle_t *cycle);
-static nxt_int_t nxt_cycle_listen_sockets_create(nxt_cycle_t *cycle);
-static void nxt_cycle_listen_sockets_close(nxt_cycle_t *cycle);
+static nxt_int_t nxt_cycle_listen_sockets_create(nxt_task_t *task,
+ nxt_cycle_t *cycle);
+static void nxt_cycle_listen_sockets_close(nxt_task_t *task,
+ nxt_cycle_t *cycle);
static void nxt_cycle_pid_file_delete(nxt_cycle_t *cycle);
static nxt_int_t nxt_cycle_shm_zones_enable(nxt_cycle_t *cycle);
static nxt_int_t nxt_cycle_shm_zone_create(nxt_cycle_shm_zone_t *shm_zone);
@@ -106,7 +109,7 @@ nxt_cycle_create(nxt_thread_t *thr, nxt_task_t *task, nxt_cycle_t *previous,
cycle->listen_sockets = listen_sockets;
if (previous == NULL) {
- ret = nxt_cycle_inherited_listen_sockets(thr, cycle);
+ ret = nxt_cycle_inherited_listen_sockets(task, cycle);
if (nxt_slow_path(ret != NXT_OK)) {
goto fail;
}
@@ -166,7 +169,7 @@ fail:
static nxt_int_t
-nxt_cycle_inherited_listen_sockets(nxt_thread_t *thr, nxt_cycle_t *cycle)
+nxt_cycle_inherited_listen_sockets(nxt_task_t *task, nxt_cycle_t *cycle)
{
u_char *v, *p;
nxt_int_t type;
@@ -177,11 +180,10 @@ nxt_cycle_inherited_listen_sockets(nxt_thread_t *thr, nxt_cycle_t *cycle)
v = (u_char *) getenv("NGINX");
if (v == NULL) {
- return nxt_cycle_systemd_listen_sockets(thr, cycle);
+ return nxt_cycle_systemd_listen_sockets(task, cycle);
}
- nxt_log_error(NXT_LOG_NOTICE, thr->log,
- "using inherited listen sockets: %s", v);
+ nxt_log(task, NXT_LOG_CRIT, "using inherited listen sockets: %s", v);
inherited_sockets = nxt_array_create(cycle->mem_pool,
1, sizeof(nxt_listen_socket_t));
@@ -197,9 +199,9 @@ nxt_cycle_inherited_listen_sockets(nxt_thread_t *thr, nxt_cycle_t *cycle)
s = nxt_int_parse(v, p - v);
if (nxt_slow_path(s < 0)) {
- nxt_log_emerg(thr->log, "invalid socket number "
- "\"%s\" in NGINX environment variable, "
- "ignoring the rest of the variable", v);
+ nxt_log(task, NXT_LOG_CRIT, "invalid socket number "
+ "\"%s\" in NGINX environment variable, "
+ "ignoring the rest of the variable", v);
return NXT_ERROR;
}
@@ -212,12 +214,12 @@ nxt_cycle_inherited_listen_sockets(nxt_thread_t *thr, nxt_cycle_t *cycle)
ls->socket = s;
- ls->sockaddr = nxt_getsockname(cycle->mem_pool, s);
+ ls->sockaddr = nxt_getsockname(task, cycle->mem_pool, s);
if (nxt_slow_path(ls->sockaddr == NULL)) {
return NXT_ERROR;
}
- type = nxt_socket_getsockopt(s, SOL_SOCKET, SO_TYPE);
+ type = nxt_socket_getsockopt(task, s, SOL_SOCKET, SO_TYPE);
if (nxt_slow_path(type == -1)) {
return NXT_ERROR;
}
@@ -231,7 +233,7 @@ nxt_cycle_inherited_listen_sockets(nxt_thread_t *thr, nxt_cycle_t *cycle)
static nxt_int_t
-nxt_cycle_systemd_listen_sockets(nxt_thread_t *thr, nxt_cycle_t *cycle)
+nxt_cycle_systemd_listen_sockets(nxt_task_t *task, nxt_cycle_t *cycle)
{
u_char *nfd, *pid;
nxt_int_t n;
@@ -263,8 +265,7 @@ nxt_cycle_systemd_listen_sockets(nxt_thread_t *thr, nxt_cycle_t *cycle)
return NXT_OK;
}
- nxt_log_error(NXT_LOG_NOTICE, thr->log,
- "using %s systemd listen sockets", n);
+ nxt_log(task, NXT_LOG_INFO, "using %s systemd listen sockets", n);
inherited_sockets = nxt_array_create(cycle->mem_pool,
n, sizeof(nxt_listen_socket_t));
@@ -282,7 +283,7 @@ nxt_cycle_systemd_listen_sockets(nxt_thread_t *thr, nxt_cycle_t *cycle)
ls->socket = s;
- ls->sockaddr = nxt_getsockname(cycle->mem_pool, s);
+ ls->sockaddr = nxt_getsockname(task, cycle->mem_pool, s);
if (nxt_slow_path(ls->sockaddr == NULL)) {
return NXT_ERROR;
}
@@ -652,7 +653,7 @@ nxt_cycle_event_engine_change(nxt_thread_t *thr, nxt_task_t *task,
{
const nxt_event_interface_t *interface;
- if (thr->engine->batch == cycle->batch
+ if (thr->engine->batch0 == cycle->batch
&& nxt_strcmp(thr->engine->event.name, cycle->engine) == 0)
{
return NXT_OK;
@@ -877,7 +878,22 @@ nxt_cycle_conf_read_cmd(nxt_thread_t *thr, nxt_cycle_t *cycle)
return NXT_ERROR;
}
- cycle->listen = sa;
+ cycle->app_listen = sa;
+
+ continue;
+ }
+
+ if (nxt_strcmp(p, "--upstream") == 0) {
+ if (*argv == NULL) {
+ nxt_log_emerg(thr->log,
+ "no argument for option \"--upstream\"");
+ return NXT_ERROR;
+ }
+
+ p = *argv++;
+
+ cycle->upstream.length = nxt_strlen(p);
+ cycle->upstream.start = (u_char *) p;
continue;
}
@@ -1039,7 +1055,7 @@ nxt_cycle_sockaddr_unix_parse(nxt_str_t *addr, nxt_mem_pool_t *mp,
#endif
- sa = nxt_sockaddr_alloc(mp, socklen);
+ sa = nxt_sockaddr_alloc(mp, socklen, addr->length);
if (nxt_slow_path(sa == NULL)) {
return NULL;
@@ -1206,8 +1222,8 @@ nxt_cycle_sockaddr_inet_parse(nxt_str_t *addr, nxt_mem_pool_t *mp,
}
}
- sa = nxt_sockaddr_alloc(mp, sizeof(struct sockaddr_in));
-
+ sa = nxt_sockaddr_alloc(mp, sizeof(struct sockaddr_in),
+ NXT_INET_ADDR_STR_LEN);
if (nxt_slow_path(sa == NULL)) {
return NULL;
}
@@ -1241,6 +1257,10 @@ nxt_cycle_conf_apply(nxt_thread_t *thr, nxt_task_t *task, nxt_cycle_t *cycle)
return NXT_ERROR;
}
+ if (nxt_cycle_app_listen_socket(cycle) != NXT_OK) {
+ return NXT_ERROR;
+ }
+
if (nxt_cycle_listen_socket(cycle) != NXT_OK) {
return NXT_ERROR;
}
@@ -1249,7 +1269,7 @@ nxt_cycle_conf_apply(nxt_thread_t *thr, nxt_task_t *task, nxt_cycle_t *cycle)
return NXT_ERROR;
}
- if (nxt_cycle_listen_sockets_create(cycle) != NXT_OK) {
+ if (nxt_cycle_listen_sockets_create(task, cycle) != NXT_OK) {
return NXT_ERROR;
}
@@ -1257,7 +1277,7 @@ nxt_cycle_conf_apply(nxt_thread_t *thr, nxt_task_t *task, nxt_cycle_t *cycle)
return NXT_ERROR;
}
- nxt_cycle_listen_sockets_close(cycle);
+ nxt_cycle_listen_sockets_close(task, cycle);
return NXT_OK;
}
@@ -1267,36 +1287,34 @@ static nxt_int_t
nxt_cycle_listen_socket(nxt_cycle_t *cycle)
{
nxt_sockaddr_t *sa;
-// nxt_work_queue_t *wq;
nxt_listen_socket_t *ls;
- if (cycle->listen == NULL) {
- sa = nxt_sockaddr_alloc(cycle->mem_pool, sizeof(struct sockaddr_in));
+ if (cycle->stream_listen == NULL) {
+ sa = nxt_sockaddr_alloc(cycle->mem_pool, sizeof(struct sockaddr_in),
+ NXT_INET_ADDR_STR_LEN);
if (sa == NULL) {
return NXT_ERROR;
}
sa->type = SOCK_STREAM;
sa->u.sockaddr_in.sin_family = AF_INET;
- sa->u.sockaddr_in.sin_port = htons(8080);
+ sa->u.sockaddr_in.sin_port = htons(8000);
- cycle->listen = sa;
+ cycle->stream_listen = sa;
}
- if (nxt_sockaddr_text(cycle->mem_pool, cycle->listen, 1) != NXT_OK) {
- return NXT_ERROR;
- }
+ nxt_sockaddr_text(cycle->stream_listen);
- ls = nxt_cycle_listen_socket_add(cycle, cycle->listen);
+ ls = nxt_cycle_listen_socket_add(cycle, cycle->stream_listen);
if (ls == NULL) {
return NXT_ERROR;
}
ls->read_after_accept = 1;
-#if 0
ls->flags = NXT_NONBLOCK;
+#if 0
/* STUB */
wq = nxt_mem_zalloc(cf->mem_pool, sizeof(nxt_work_queue_t));
if (wq == NULL) {
@@ -1306,6 +1324,7 @@ nxt_cycle_listen_socket(nxt_cycle_t *cycle)
/**/
ls->work_queue = wq;
+#endif
ls->handler = nxt_stream_connection_init;
/*
@@ -1316,7 +1335,40 @@ nxt_cycle_listen_socket(nxt_cycle_t *cycle)
+ sizeof(nxt_event_conn_proxy_t)
+ sizeof(nxt_event_conn_t)
+ 4 * sizeof(nxt_buf_t);
-#endif
+
+ return NXT_OK;
+}
+
+
+static nxt_int_t
+nxt_cycle_app_listen_socket(nxt_cycle_t *cycle)
+{
+ nxt_sockaddr_t *sa;
+// nxt_work_queue_t *wq;
+ nxt_listen_socket_t *ls;
+
+ if (cycle->app_listen == NULL) {
+ sa = nxt_sockaddr_alloc(cycle->mem_pool, sizeof(struct sockaddr_in),
+ NXT_INET_ADDR_STR_LEN);
+ if (sa == NULL) {
+ return NXT_ERROR;
+ }
+
+ sa->type = SOCK_STREAM;
+ sa->u.sockaddr_in.sin_family = AF_INET;
+ sa->u.sockaddr_in.sin_port = htons(8080);
+
+ cycle->app_listen = sa;
+ }
+
+ nxt_sockaddr_text(cycle->app_listen);
+
+ ls = nxt_cycle_listen_socket_add(cycle, cycle->app_listen);
+ if (ls == NULL) {
+ return NXT_ERROR;
+ }
+
+ ls->read_after_accept = 1;
return NXT_OK;
}
@@ -1335,16 +1387,15 @@ nxt_cycle_listen_socket_add(nxt_cycle_t *cycle, nxt_sockaddr_t *sa)
mp = cycle->mem_pool;
- ls->sockaddr = nxt_sockaddr_create(mp, &sa->u.sockaddr, nxt_socklen(sa));
+ ls->sockaddr = nxt_sockaddr_create(mp, &sa->u.sockaddr, sa->socklen,
+ sa->length);
if (ls->sockaddr == NULL) {
return NULL;
}
ls->sockaddr->type = sa->type;
- if (nxt_sockaddr_text(mp, ls->sockaddr, 1) != NXT_OK) {
- return NULL;
- }
+ nxt_sockaddr_text(ls->sockaddr);
ls->socket = -1;
ls->backlog = NXT_LISTEN_BACKLOG;
@@ -1483,7 +1534,7 @@ nxt_cycle_log_files_create(nxt_cycle_t *cycle)
static nxt_int_t
-nxt_cycle_listen_sockets_create(nxt_cycle_t *cycle)
+nxt_cycle_listen_sockets_create(nxt_task_t *task, nxt_cycle_t *cycle)
{
nxt_uint_t c, p, ncurr, nprev;
nxt_listen_socket_t *curr, *prev;
@@ -1510,7 +1561,7 @@ nxt_cycle_listen_sockets_create(nxt_cycle_t *cycle)
if (nxt_sockaddr_cmp(curr[c].sockaddr, prev[p].sockaddr)) {
- if (nxt_listen_socket_update(&curr[c], &prev[p]) != NXT_OK) {
+ if (nxt_listen_socket_update(task, &curr[c], &prev[p]) != NXT_OK) {
return NXT_ERROR;
}
@@ -1518,7 +1569,7 @@ nxt_cycle_listen_sockets_create(nxt_cycle_t *cycle)
}
}
- if (nxt_listen_socket_create(&curr[c], cycle->test_config) != NXT_OK) {
+ if (nxt_listen_socket_create(task, &curr[c], 0) != NXT_OK) {
return NXT_ERROR;
}
@@ -1532,7 +1583,7 @@ nxt_cycle_listen_sockets_create(nxt_cycle_t *cycle)
static void
-nxt_cycle_listen_sockets_close(nxt_cycle_t *cycle)
+nxt_cycle_listen_sockets_close(nxt_task_t *task, nxt_cycle_t *cycle)
{
nxt_uint_t p, c, nprev, ncurr;
nxt_listen_socket_t *curr, *prev;
@@ -1555,7 +1606,7 @@ nxt_cycle_listen_sockets_close(nxt_cycle_t *cycle)
}
}
- nxt_socket_close(prev[p].socket);
+ nxt_socket_close(task, prev[p].socket);
next:
@@ -1576,8 +1627,10 @@ nxt_cycle_listen_sockets_enable(nxt_task_t *task, nxt_cycle_t *cycle)
n = cycle->listen_sockets->nelts;
for (i = 0; i < n; i++) {
- if (nxt_event_conn_listen(task, &ls[i]) != NXT_OK) {
- return NXT_ERROR;
+ if (ls[i].flags == NXT_NONBLOCK) {
+ if (nxt_event_conn_listen(task, &ls[i]) != NXT_OK) {
+ return NXT_ERROR;
+ }
}
}
diff --git a/src/nxt_cycle.h b/src/nxt_cycle.h
index 4a5b656d..6dbde173 100644
--- a/src/nxt_cycle.h
+++ b/src/nxt_cycle.h
@@ -77,7 +77,10 @@ struct nxt_cycle_s {
const char *group;
const char *pid;
const char *error_log;
- nxt_sockaddr_t *listen;
+
+ nxt_sockaddr_t *app_listen;
+ nxt_sockaddr_t *stream_listen;
+ nxt_str_t upstream;
};
@@ -146,6 +149,7 @@ nxt_int_t nxt_cycle_shm_zone_add(nxt_cycle_t *cycle, nxt_str_t *name,
void nxt_cdecl nxt_log_time_handler(nxt_uint_t level, nxt_log_t *log,
const char *fmt, ...);
+void nxt_stream_connection_init(nxt_task_t *task, void *obj, void *data);
nxt_int_t nxt_app_start(nxt_cycle_t *cycle);
diff --git a/src/nxt_epoll_engine.c b/src/nxt_epoll_engine.c
index 19e0389b..149f5cb8 100644
--- a/src/nxt_epoll_engine.c
+++ b/src/nxt_epoll_engine.c
@@ -106,7 +106,7 @@ static nxt_event_conn_io_t nxt_epoll_edge_event_conn_io = {
nxt_epoll_edge_event_conn_io_recvbuf,
nxt_event_conn_io_recv,
- nxt_event_conn_io_write,
+ nxt_conn_io_write,
nxt_event_conn_io_write_chunk,
#if (NXT_HAVE_LINUX_SENDFILE)
@@ -697,7 +697,7 @@ nxt_epoll_add_signal(nxt_event_engine_t *engine)
engine->u.epoll.signalfd.fd = fd;
- if (nxt_fd_nonblocking(fd) != NXT_OK) {
+ if (nxt_fd_nonblocking(&engine->task, fd) != NXT_OK) {
return NXT_ERROR;
}
@@ -779,7 +779,8 @@ nxt_epoll_enable_post(nxt_event_engine_t *engine, nxt_work_handler_t handler)
return NXT_ERROR;
}
- if (nxt_fd_nonblocking(engine->u.epoll.eventfd.fd) != NXT_OK) {
+ ret = nxt_fd_nonblocking(&engine->task, engine->u.epoll.eventfd.fd);
+ if (nxt_slow_path(ret != NXT_OK)) {
return NXT_ERROR;
}
@@ -998,7 +999,7 @@ nxt_epoll_event_conn_io_accept4(nxt_task_t *task, void *obj, void *data)
cls->ready--;
cls->socket.read_ready = (cls->ready != 0);
- len = nxt_socklen(c->remote);
+ len = c->remote->socklen;
if (len >= sizeof(struct sockaddr)) {
sa = &c->remote->u.sockaddr;
@@ -1049,7 +1050,7 @@ nxt_epoll_edge_event_conn_io_connect(nxt_task_t *task, void *obj, void *data)
state = c->write_state;
- switch (nxt_socket_connect(c->socket.fd, c->remote) ){
+ switch (nxt_socket_connect(task, c->socket.fd, c->remote) ){
case NXT_OK:
c->socket.write_ready = 1;
@@ -1105,8 +1106,7 @@ nxt_epoll_edge_event_conn_io_connect(nxt_task_t *task, void *obj, void *data)
break;
}
- nxt_event_conn_io_handle(task->thread, c->write_work_queue, handler,
- task, c, data);
+ nxt_work_queue_add(c->write_work_queue, handler, task, c, data);
}
@@ -1126,8 +1126,8 @@ nxt_epoll_edge_event_conn_connected(nxt_task_t *task, void *obj, void *data)
nxt_timer_disable(task->thread->engine, &c->write_timer);
}
- nxt_event_conn_io_handle(task->thread, c->write_work_queue,
- c->write_state->ready_handler, task, c, data);
+ nxt_work_queue_add(c->write_work_queue, c->write_state->ready_handler,
+ task, c, data);
return;
}
diff --git a/src/nxt_event_conn.c b/src/nxt_event_conn.c
index b78a9251..079901f3 100644
--- a/src/nxt_event_conn.c
+++ b/src/nxt_event_conn.c
@@ -21,7 +21,7 @@ nxt_event_conn_io_t nxt_unix_event_conn_io = {
nxt_event_conn_io_recvbuf,
nxt_event_conn_io_recv,
- nxt_event_conn_io_write,
+ nxt_conn_io_write,
nxt_event_conn_io_write_chunk,
#if (NXT_HAVE_LINUX_SENDFILE)
@@ -48,7 +48,7 @@ nxt_event_conn_io_t nxt_unix_event_conn_io = {
nxt_event_conn_t *
-nxt_event_conn_create(nxt_mem_pool_t *mp, nxt_log_t *log)
+nxt_event_conn_create(nxt_mem_pool_t *mp, nxt_task_t *task)
{
nxt_thread_t *thr;
nxt_event_conn_t *c;
@@ -63,7 +63,7 @@ nxt_event_conn_create(nxt_mem_pool_t *mp, nxt_log_t *log)
c->socket.fd = -1;
c->socket.log = &c->log;
- c->log = *log;
+ c->log = *task->log;
/* The while loop skips possible uint32_t overflow. */
@@ -185,7 +185,7 @@ nxt_conn_shutdown_handler(nxt_task_t *task, void *obj, void *data)
c->socket.shutdown = 1;
- nxt_socket_shutdown(c->socket.fd, SHUT_RDWR);
+ nxt_socket_shutdown(task, c->socket.fd, SHUT_RDWR);
nxt_work_queue_add(&engine->close_work_queue, nxt_conn_close_handler,
task, c, engine);
@@ -210,7 +210,7 @@ nxt_conn_close_handler(nxt_task_t *task, void *obj, void *data)
events_pending = nxt_fd_event_close(engine, &c->socket);
if (events_pending == 0) {
- nxt_socket_close(c->socket.fd);
+ nxt_socket_close(task, c->socket.fd);
c->socket.fd = -1;
if (timers_pending == 0) {
@@ -242,7 +242,7 @@ nxt_conn_close_timer_handler(nxt_task_t *task, void *obj, void *data)
nxt_debug(task, "event conn close handler fd:%d", c->socket.fd);
if (c->socket.fd != -1) {
- nxt_socket_close(c->socket.fd);
+ nxt_socket_close(task, c->socket.fd);
c->socket.fd = -1;
}
diff --git a/src/nxt_event_conn.h b/src/nxt_event_conn.h
index 3f907633..73415cfe 100644
--- a/src/nxt_event_conn.h
+++ b/src/nxt_event_conn.h
@@ -175,7 +175,7 @@ typedef struct {
nxt_task_t task;
uint32_t ready;
- uint32_t batch;
+ uint32_t batch0;
/* An accept() interface is cached to minimize memory accesses. */
nxt_work_handler_t accept;
@@ -189,18 +189,6 @@ typedef struct {
#define \
-nxt_event_conn_io_handle(thr, wq, handler, task, c, data) \
- do { \
- if (thr->engine->batch != 0) { \
- nxt_work_queue_add(wq, handler, task, c, data); \
- \
- } else { \
- handler(task, c, data); \
- } \
- } while (0)
-
-
-#define \
nxt_event_conn_timer_init(ev, c, wq) \
do { \
(ev)->work_queue = (wq); \
@@ -222,12 +210,12 @@ nxt_event_write_timer_conn(ev) \
#if (NXT_HAVE_UNIX_DOMAIN)
#define \
-nxt_event_conn_tcp_nodelay_on(c) \
+nxt_event_conn_tcp_nodelay_on(task, c) \
do { \
nxt_int_t ret; \
\
if ((c)->remote->u.sockaddr.sa_family != AF_UNIX) { \
- ret = nxt_socket_setsockopt((c)->socket.fd, IPPROTO_TCP, \
+ ret = nxt_socket_setsockopt(task, (c)->socket.fd, IPPROTO_TCP, \
TCP_NODELAY, 1); \
\
(c)->tcp_nodelay = (ret == NXT_OK); \
@@ -238,11 +226,11 @@ nxt_event_conn_tcp_nodelay_on(c) \
#else
#define \
-nxt_event_conn_tcp_nodelay_on(c) \
+nxt_event_conn_tcp_nodelay_on(task, c) \
do { \
nxt_int_t ret; \
\
- ret = nxt_socket_setsockopt((c)->socket.fd, IPPROTO_TCP, \
+ ret = nxt_socket_setsockopt(task, (c)->socket.fd, IPPROTO_TCP, \
TCP_NODELAY, 1); \
\
(c)->tcp_nodelay = (ret == NXT_OK); \
@@ -252,7 +240,7 @@ nxt_event_conn_tcp_nodelay_on(c) \
NXT_EXPORT nxt_event_conn_t *nxt_event_conn_create(nxt_mem_pool_t *mp,
- nxt_log_t *log);
+ nxt_task_t *task);
void nxt_event_conn_io_shutdown(nxt_task_t *task, void *obj, void *data);
NXT_EXPORT void nxt_event_conn_close(nxt_event_engine_t *engine,
nxt_event_conn_t *c);
@@ -262,8 +250,7 @@ NXT_EXPORT void nxt_event_conn_timer(nxt_event_engine_t *engine,
NXT_EXPORT void nxt_event_conn_work_queue_set(nxt_event_conn_t *c,
nxt_work_queue_t *wq);
-NXT_EXPORT void nxt_event_conn_connect(nxt_task_t *task, nxt_event_conn_t *c);
-void nxt_event_conn_batch_socket(nxt_task_t *task, void *obj, void *data);
+void nxt_event_conn_sys_socket(nxt_task_t *task, void *obj, void *data);
void nxt_event_conn_io_connect(nxt_task_t *task, void *obj, void *data);
nxt_int_t nxt_event_conn_socket(nxt_task_t *task, nxt_event_conn_t *c);
void nxt_event_conn_connect_test(nxt_task_t *task, void *obj, void *data);
@@ -277,17 +264,23 @@ NXT_EXPORT void nxt_event_conn_accept(nxt_task_t *task,
void nxt_event_conn_accept_error(nxt_task_t *task, nxt_event_conn_listen_t *cls,
const char *accept_syscall, nxt_err_t err);
-NXT_EXPORT void nxt_event_conn_read(nxt_task_t *task, nxt_event_conn_t *c);
+void nxt_conn_wait(nxt_event_conn_t *c);
+
void nxt_event_conn_io_read(nxt_task_t *task, void *obj, void *data);
ssize_t nxt_event_conn_io_recvbuf(nxt_event_conn_t *c, nxt_buf_t *b);
ssize_t nxt_event_conn_io_recv(nxt_event_conn_t *c, void *buf,
size_t size, nxt_uint_t flags);
-NXT_EXPORT void nxt_event_conn_write(nxt_task_t *task, nxt_event_conn_t *c);
+void nxt_conn_io_write(nxt_task_t *task, void *obj, void *data);
+ssize_t nxt_conn_io_sendbuf(nxt_task_t *task, nxt_sendbuf_t *sb);
+ssize_t nxt_conn_io_writev(nxt_task_t *task, nxt_sendbuf_t *sb,
+ nxt_iobuf_t *iob, nxt_uint_t niob);
+ssize_t nxt_conn_io_send(nxt_task_t *task, nxt_sendbuf_t *sb, void *buf,
+ size_t size);
+
size_t nxt_event_conn_write_limit(nxt_event_conn_t *c);
nxt_bool_t nxt_event_conn_write_delayed(nxt_event_engine_t *engine,
nxt_event_conn_t *c, size_t sent);
-void nxt_event_conn_io_write(nxt_task_t *task, void *obj, void *data);
ssize_t nxt_event_conn_io_write_chunk(nxt_event_conn_t *c, nxt_buf_t *b,
size_t limit);
ssize_t nxt_event_conn_io_writev(nxt_event_conn_t *c, nxt_iobuf_t *iob,
@@ -301,29 +294,30 @@ NXT_EXPORT void nxt_event_conn_job_sendfile(nxt_task_t *task,
nxt_event_conn_t *c);
-#define \
-nxt_event_conn_connect_enqueue(thr, task, c) \
- nxt_work_queue_add(&thr->engine->socket_work_queue, \
- nxt_event_conn_batch_socket, task, c, c->socket.data)
+#define nxt_event_conn_connect(engine, c) \
+ nxt_work_queue_add(&engine->socket_work_queue, nxt_event_conn_sys_socket, \
+ c->socket.task, c, c->socket.data)
-#define \
-nxt_event_conn_read_enqueue(thr, task, c) \
+#define nxt_event_conn_read(e, c) \
do { \
- c->socket.read_work_queue = &thr->engine->read_work_queue; \
+ nxt_event_engine_t *engine = e; \
\
- nxt_work_queue_add(&thr->engine->read_work_queue, \
- c->io->read, task, c, c->socket.data); \
+ c->socket.read_work_queue = &engine->read_work_queue; \
+ \
+ nxt_work_queue_add(&engine->read_work_queue, c->io->read, \
+ c->socket.task, c, c->socket.data); \
} while (0)
-#define \
-nxt_event_conn_write_enqueue(thr, task, c) \
+#define nxt_event_conn_write(e, c) \
do { \
- c->socket.write_work_queue = &thr->engine->write_work_queue; \
+ nxt_event_engine_t *engine = e; \
+ \
+ c->socket.write_work_queue = &engine->write_work_queue; \
\
- nxt_work_queue_add(&thr->engine->write_work_queue, \
- c->io->write, task, c, c->socket.data); \
+ nxt_work_queue_add(&engine->write_work_queue, c->io->write, \
+ c->socket.task, c, c->socket.data); \
} while (0)
@@ -353,6 +347,7 @@ typedef struct {
uint8_t connected; /* 1 bit */
uint8_t delayed; /* 1 bit */
uint8_t retries; /* 8 bits */
+ uint8_t retain; /* 2 bits */
nxt_work_handler_t completion_handler;
} nxt_event_conn_proxy_t;
diff --git a/src/nxt_event_conn_accept.c b/src/nxt_event_conn_accept.c
index 7b2dce21..606cdf03 100644
--- a/src/nxt_event_conn_accept.c
+++ b/src/nxt_event_conn_accept.c
@@ -44,16 +44,9 @@ nxt_event_conn_listen(nxt_task_t *task, nxt_listen_socket_t *ls)
cls->socket.fd = ls->socket;
engine = task->thread->engine;
- cls->batch = engine->batch;
-
- if (cls->batch != 0) {
- cls->socket.read_work_queue = &engine->accept_work_queue;
-
- } else {
- cls->socket.read_work_queue = &engine->fast_work_queue;
- cls->batch = 1;
- }
+ cls->batch0 = engine->batch0;
+ cls->socket.read_work_queue = &engine->accept_work_queue;
cls->socket.read_handler = nxt_event_conn_listen_handler;
cls->socket.error_handler = nxt_event_conn_listen_event_error;
cls->socket.log = &nxt_main_log;
@@ -102,7 +95,7 @@ nxt_event_conn_accept_alloc(nxt_task_t *task, nxt_event_conn_listen_t *cls)
if (nxt_fast_path(mp != NULL)) {
/* This allocation cannot fail. */
- c = nxt_event_conn_create(mp, cls->socket.log);
+ c = nxt_event_conn_create(mp, cls->socket.task);
cls->socket.data = c;
c->socket.read_work_queue = cls->socket.read_work_queue;
@@ -112,7 +105,7 @@ nxt_event_conn_accept_alloc(nxt_task_t *task, nxt_event_conn_listen_t *cls)
c->listen = ls;
/* This allocation cannot fail. */
- remote = nxt_sockaddr_alloc(mp, ls->socklen);
+ remote = nxt_sockaddr_alloc(mp, ls->socklen, ls->address_length);
c->remote = remote;
sa = ls->sockaddr;
@@ -137,7 +130,7 @@ nxt_event_conn_listen_handler(nxt_task_t *task, void *obj, void *data)
nxt_event_conn_listen_t *cls;
cls = obj;
- cls->ready = cls->batch;
+ cls->ready = cls->batch0;
cls->accept(task, cls, data);
}
@@ -158,7 +151,7 @@ nxt_event_conn_io_accept(nxt_task_t *task, void *obj, void *data)
cls->ready--;
cls->socket.read_ready = (cls->ready != 0);
- len = nxt_socklen(c->remote);
+ len = c->remote->socklen;
if (len >= sizeof(struct sockaddr)) {
sa = &c->remote->u.sockaddr;
@@ -182,8 +175,8 @@ nxt_event_conn_io_accept(nxt_task_t *task, void *obj, void *data)
* Linux does not inherit non-blocking mode
* from listen socket for accept()ed socket.
*/
- if (nxt_slow_path(nxt_socket_nonblocking(s) != NXT_OK)) {
- nxt_socket_close(s);
+ if (nxt_slow_path(nxt_socket_nonblocking(task, s) != NXT_OK)) {
+ nxt_socket_close(task, s);
}
#endif
@@ -200,10 +193,10 @@ nxt_event_conn_accept(nxt_task_t *task, nxt_event_conn_listen_t *cls,
{
nxt_event_conn_t *next;
- /* This allocation cannot fail. */
- (void) nxt_sockaddr_text(c->mem_pool, c->remote, 0);
+ nxt_sockaddr_text(c->remote);
- nxt_debug(task, "client: %*s", c->remote->text_len, c->remote->text);
+ nxt_debug(task, "client: %*s",
+ c->remote->address_length, nxt_sockaddr_address(c->remote));
nxt_queue_insert_head(&task->thread->engine->idle_connections, &c->link);
diff --git a/src/nxt_event_conn_connect.c b/src/nxt_event_conn_connect.c
index ccd84011..7db8f704 100644
--- a/src/nxt_event_conn_connect.c
+++ b/src/nxt_event_conn_connect.c
@@ -8,31 +8,7 @@
void
-nxt_event_conn_connect(nxt_task_t *task, nxt_event_conn_t *c)
-{
- void *data;
- nxt_event_engine_t *engine;
-
- data = c->socket.data;
- engine = task->thread->engine;
-
- if (engine->batch != 0) {
- nxt_work_queue_add(&engine->socket_work_queue,
- nxt_event_conn_batch_socket, task, c, data);
- return;
- }
-
- if (nxt_event_conn_socket(task, c) == NXT_OK) {
- c->io->connect(task, c, data);
- return;
- }
-
- c->write_state->error_handler(task, c, data);
-}
-
-
-void
-nxt_event_conn_batch_socket(nxt_task_t *task, void *obj, void *data)
+nxt_event_conn_sys_socket(nxt_task_t *task, void *obj, void *data)
{
nxt_event_conn_t *c;
nxt_work_handler_t handler;
@@ -64,7 +40,7 @@ nxt_event_conn_io_connect(nxt_task_t *task, void *obj, void *data)
state = c->write_state;
- switch (nxt_socket_connect(c->socket.fd, c->remote)) {
+ switch (nxt_socket_connect(task, c->socket.fd, c->remote)) {
case NXT_OK:
c->socket.write_ready = 1;
@@ -91,8 +67,7 @@ nxt_event_conn_io_connect(nxt_task_t *task, void *obj, void *data)
break;
}
- nxt_event_conn_io_handle(task->thread, c->write_work_queue, handler, task,
- c, data);
+ nxt_work_queue_add(c->write_work_queue, handler, task, c, data);
}
@@ -106,7 +81,7 @@ nxt_event_conn_socket(nxt_task_t *task, nxt_event_conn_t *c)
family = c->remote->u.sockaddr.sa_family;
- s = nxt_socket_create(family, c->remote->type, 0, NXT_NONBLOCK);
+ s = nxt_socket_create(task, family, c->remote->type, 0, NXT_NONBLOCK);
if (nxt_slow_path(s == -1)) {
return NXT_ERROR;
@@ -130,8 +105,8 @@ nxt_event_conn_socket(nxt_task_t *task, nxt_event_conn_t *c)
c->write_timer.task = task;
if (c->local != NULL) {
- if (nxt_slow_path(nxt_socket_bind(s, c->local, 0) != NXT_OK)) {
- nxt_socket_close(s);
+ if (nxt_slow_path(nxt_socket_bind(task, s, c->local, 0) != NXT_OK)) {
+ nxt_socket_close(task, s);
return NXT_ERROR;
}
}
@@ -172,16 +147,15 @@ nxt_event_conn_connect_test(nxt_task_t *task, void *obj, void *data)
}
if (err == 0) {
- nxt_event_conn_io_handle(task->thread, c->write_work_queue,
- c->write_state->ready_handler, task, c, data);
+ nxt_work_queue_add(c->write_work_queue, c->write_state->ready_handler,
+ task, c, data);
return;
}
c->socket.error = err;
- nxt_log(task, nxt_socket_error_level(err, c->socket.log_error),
- "connect(%d, %*s) failed %E",
- c->socket.fd, c->remote->text_len, c->remote->text, err);
+ nxt_log(task, nxt_socket_error_level(err), "connect(%d, %*s) failed %E",
+ c->socket.fd, c->remote->length, nxt_sockaddr_start(c->remote));
nxt_event_conn_connect_error(task, c, data);
}
@@ -216,6 +190,5 @@ nxt_event_conn_connect_error(nxt_task_t *task, void *obj, void *data)
break;
}
- nxt_event_conn_io_handle(task->thread, c->write_work_queue, handler,
- task, c, data);
+ nxt_work_queue_add(c->write_work_queue, handler, task, c, data);
}
diff --git a/src/nxt_event_conn_job_sendfile.c b/src/nxt_event_conn_job_sendfile.c
index dd43c16f..bf2219f2 100644
--- a/src/nxt_event_conn_job_sendfile.c
+++ b/src/nxt_event_conn_job_sendfile.c
@@ -219,9 +219,8 @@ nxt_event_conn_job_sendfile_return(nxt_task_t *task, void *obj, void *data)
}
if (sent != 0) {
- nxt_event_conn_io_handle(task->thread, c->write_work_queue,
- c->write_state->ready_handler,
- task, c, c->socket.data);
+ nxt_work_queue_add(c->write_work_queue, c->write_state->ready_handler,
+ task, c, c->socket.data);
/*
* Fall through if first operations were
* successful but the last one failed.
@@ -229,9 +228,8 @@ nxt_event_conn_job_sendfile_return(nxt_task_t *task, void *obj, void *data)
}
if (nxt_slow_path(c->socket.error != 0)) {
- nxt_event_conn_io_handle(task->thread, c->write_work_queue,
- c->write_state->error_handler,
- task, c, c->socket.data);
+ nxt_work_queue_add(c->write_work_queue, c->write_state->error_handler,
+ task, c, c->socket.data);
}
}
diff --git a/src/nxt_event_conn_proxy.c b/src/nxt_event_conn_proxy.c
index 42b9dd0f..d91cff4a 100644
--- a/src/nxt_event_conn_proxy.c
+++ b/src/nxt_event_conn_proxy.c
@@ -53,6 +53,8 @@ static void nxt_event_conn_proxy_write_error(nxt_task_t *task, void *obj,
void *data);
static void nxt_event_conn_proxy_complete(nxt_task_t *task,
nxt_event_conn_proxy_t *p);
+static void nxt_event_conn_proxy_completion(nxt_task_t *task, void *obj,
+ void *data);
static const nxt_event_conn_state_t nxt_event_conn_proxy_client_wait_state;
@@ -78,7 +80,7 @@ nxt_event_conn_proxy_create(nxt_event_conn_t *client)
return NULL;
}
- peer = nxt_event_conn_create(client->mem_pool, client->socket.log);
+ peer = nxt_event_conn_create(client->mem_pool, client->socket.task);
if (nxt_slow_path(peer == NULL)) {
return NULL;
}
@@ -109,11 +111,8 @@ nxt_event_conn_proxy_create(nxt_event_conn_t *client)
void
nxt_event_conn_proxy(nxt_task_t *task, nxt_event_conn_proxy_t *p)
{
- nxt_thread_t *thr;
nxt_event_conn_t *peer;
- thr = nxt_thread();
-
/*
* Peer read event: not connected, disabled.
* Peer write event: not connected, disabled.
@@ -127,7 +126,7 @@ nxt_event_conn_proxy(nxt_task_t *task, nxt_event_conn_proxy_t *p)
peer = p->peer;
peer->write_state = &nxt_event_conn_proxy_peer_connect_state;
- nxt_event_conn_connect_enqueue(thr, task, peer);
+ nxt_event_conn_connect(task->thread->engine, peer);
}
/*
@@ -136,7 +135,7 @@ nxt_event_conn_proxy(nxt_task_t *task, nxt_event_conn_proxy_t *p)
*/
p->client->read_state = &nxt_event_conn_proxy_client_wait_state;
- nxt_event_conn_read(task, p->client);
+ nxt_conn_wait(p->client);
}
@@ -203,12 +202,13 @@ nxt_event_conn_proxy_client_buffer_alloc(nxt_task_t *task, void *obj,
client->read_state = &nxt_event_conn_proxy_client_first_read_state;
}
- nxt_event_conn_read(task, client);
+ nxt_event_conn_read(task->thread->engine, client);
}
static const nxt_event_conn_state_t
- nxt_event_conn_proxy_client_first_read_state nxt_aligned(64) =
+ nxt_event_conn_proxy_client_first_read_state
+ nxt_aligned(64) =
{
NXT_EVENT_BUF_PROCESS,
NXT_EVENT_TIMER_AUTORESET,
@@ -243,7 +243,7 @@ nxt_event_conn_proxy_peer_connect(nxt_task_t *task, void *obj, void *data)
p->peer->write_state = &nxt_event_conn_proxy_peer_connect_state;
- nxt_event_conn_connect(task, p->peer);
+ nxt_event_conn_connect(task->thread->engine, p->peer);
}
@@ -276,15 +276,15 @@ nxt_event_conn_proxy_connected(nxt_task_t *task, void *obj, void *data)
p->connected = 1;
- nxt_event_conn_tcp_nodelay_on(peer);
- nxt_event_conn_tcp_nodelay_on(p->client);
+ nxt_event_conn_tcp_nodelay_on(task, peer);
+ nxt_event_conn_tcp_nodelay_on(task, p->client);
/* Peer read event: waiting with peer_wait_timeout. */
peer->read_state = &nxt_event_conn_proxy_peer_wait_state;
peer->write_state = &nxt_event_conn_proxy_peer_write_state;
- nxt_event_conn_read_enqueue(task->thread, task, peer);
+ nxt_conn_wait(peer);
if (p->client_buffer != NULL) {
client = p->client;
@@ -350,7 +350,7 @@ nxt_event_conn_proxy_peer_read(nxt_task_t *task, void *obj, void *data)
* Peer read event: waiting with possible peer_wait_timeout.
* Peer write event: blocked.
*/
- nxt_event_conn_read(task, peer);
+ nxt_event_conn_read(task->thread->engine, peer);
}
@@ -469,7 +469,7 @@ nxt_event_conn_proxy_read_process(nxt_task_t *task, nxt_event_conn_proxy_t *p,
}
if (p->connected) {
- nxt_event_conn_write_enqueue(task->thread, task, sink);
+ nxt_event_conn_write(task->thread->engine, sink);
}
}
@@ -547,7 +547,7 @@ nxt_event_conn_proxy_read(nxt_task_t *task, void *obj, void *data)
sink = (source == p->client) ? p->peer : p->client;
if (sink->socket.error == 0) {
- nxt_event_conn_read(task, source);
+ nxt_event_conn_read(task->thread->engine, source);
}
}
}
@@ -658,7 +658,7 @@ nxt_event_conn_proxy_write_process(nxt_task_t *task, nxt_event_conn_proxy_t *p,
}
if (wb->mem.pos != wb->mem.free) {
- nxt_event_conn_write_enqueue(task->thread, task, sink);
+ nxt_event_conn_write(task->thread->engine, sink);
break;
}
@@ -865,7 +865,7 @@ nxt_event_conn_proxy_refused(nxt_task_t *task, void *obj, void *data)
p->retries--;
- nxt_socket_close(peer->socket.fd);
+ nxt_socket_close(task, peer->socket.fd);
peer->socket.fd = -1;
peer->socket.error = 0;
@@ -903,7 +903,7 @@ nxt_event_conn_proxy_reconnect_handler(nxt_task_t *task, void *obj, void *data)
* Peer read event: disabled.
* Peer write event: waiting for connection with connect_timeout.
*/
- nxt_event_conn_connect(task, peer);
+ nxt_event_conn_connect(task->thread->engine, peer);
}
@@ -915,8 +915,7 @@ nxt_event_conn_proxy_shutdown(nxt_task_t *task, nxt_event_conn_proxy_t *p,
nxt_debug(source->socket.task,
"event conn proxy shutdown source fd:%d cl:%d err:%d",
- source->socket.fd, source->socket.closed,
- source->socket.error);
+ source->socket.fd, source->socket.closed, source->socket.error);
nxt_debug(sink->socket.task,
"event conn proxy shutdown sink fd:%d cl:%d err:%d",
@@ -927,12 +926,9 @@ nxt_event_conn_proxy_shutdown(nxt_task_t *task, nxt_event_conn_proxy_t *p,
return;
}
- if (sink->socket.error != 0 || sink->socket.closed) {
- nxt_event_conn_close(task->thread->engine, sink);
-
- } else {
+ if (sink->socket.error == 0 && !sink->socket.closed) {
sink->socket.shutdown = 1;
- nxt_socket_shutdown(sink->socket.fd, SHUT_WR);
+ nxt_socket_shutdown(task, sink->socket.fd, SHUT_WR);
}
if (sink->socket.error != 0
@@ -943,6 +939,8 @@ nxt_event_conn_proxy_shutdown(nxt_task_t *task, nxt_event_conn_proxy_t *p,
return;
}
+ nxt_debug(source->socket.task, "free source buffer");
+
/* Free the direction's buffer. */
b = (source == p->client) ? p->client_buffer : p->peer_buffer;
nxt_mem_free(source->mem_pool, b);
@@ -992,6 +990,22 @@ nxt_event_conn_proxy_write_error(nxt_task_t *task, void *obj, void *data)
}
+static const nxt_event_conn_state_t nxt_event_conn_proxy_close_state
+ nxt_aligned(64) =
+{
+ NXT_EVENT_NO_BUF_PROCESS,
+ NXT_EVENT_TIMER_NO_AUTORESET,
+
+ nxt_event_conn_proxy_completion,
+ NULL,
+ NULL,
+
+ NULL,
+ NULL,
+ 0,
+};
+
+
static void
nxt_event_conn_proxy_complete(nxt_task_t *task, nxt_event_conn_proxy_t *p)
{
@@ -1002,20 +1016,41 @@ nxt_event_conn_proxy_complete(nxt_task_t *task, nxt_event_conn_proxy_t *p)
nxt_debug(p->client->socket.task, "event conn proxy complete %d:%d",
p->client->socket.fd, p->peer->socket.fd);
+ if (p->delayed) {
+ p->delayed = 0;
+ nxt_queue_remove(&p->peer->link);
+ }
+
if (p->client->socket.fd != -1) {
+ p->retain = 1;
+ p->client->write_state = &nxt_event_conn_proxy_close_state;
nxt_event_conn_close(engine, p->client);
}
if (p->peer->socket.fd != -1) {
+ p->retain++;
+ p->peer->write_state = &nxt_event_conn_proxy_close_state;
nxt_event_conn_close(engine, p->peer);
-
- } else if (p->delayed) {
- nxt_queue_remove(&p->peer->link);
- nxt_timer_delete(engine, &p->peer->write_timer);
}
+}
- nxt_mem_free(p->client->mem_pool, p->client_buffer);
- nxt_mem_free(p->client->mem_pool, p->peer_buffer);
- p->completion_handler(task, p, NULL);
+static void
+nxt_event_conn_proxy_completion(nxt_task_t *task, void *obj, void *data)
+{
+ nxt_event_conn_proxy_t *p;
+
+ p = data;
+
+ nxt_debug(p->client->socket.task, "event conn proxy completion %d:%d:%d",
+ p->retain, p->client->socket.fd, p->peer->socket.fd);
+
+ p->retain--;
+
+ if (p->retain == 0) {
+ nxt_mem_free(p->client->mem_pool, p->client_buffer);
+ nxt_mem_free(p->client->mem_pool, p->peer_buffer);
+
+ p->completion_handler(task, p, NULL);
+ }
}
diff --git a/src/nxt_event_conn_read.c b/src/nxt_event_conn_read.c
index 20e4d39b..d898c694 100644
--- a/src/nxt_event_conn_read.c
+++ b/src/nxt_event_conn_read.c
@@ -8,23 +8,29 @@
void
-nxt_event_conn_read(nxt_task_t *task, nxt_event_conn_t *c)
+nxt_conn_wait(nxt_event_conn_t *c)
{
- nxt_work_queue_t *wq;
- nxt_work_handler_t handler;
-
- handler = c->io->read;
+ nxt_event_engine_t *engine;
+ const nxt_event_conn_state_t *state;
- if (task->thread->engine->batch != 0) {
+ nxt_debug(c->socket.task, "conn wait fd:%d rdy:%d",
+ c->socket.fd, c->socket.read_ready);
- wq = &task->thread->engine->read_work_queue;
- c->socket.read_work_queue = wq;
+ engine = c->socket.task->thread->engine;
+ state = c->read_state;
- nxt_work_queue_add( wq, handler, task, c, c->socket.data);
+ if (c->socket.read_ready) {
+ nxt_work_queue_add(&engine->fast_work_queue, state->ready_handler,
+ c->socket.task, c, c->socket.data);
return;
}
- handler(task, c, c->socket.data);
+ c->socket.read_handler = state->ready_handler;
+ c->socket.error_handler = state->error_handler;
+
+ nxt_event_conn_timer(engine, c, state, &c->read_timer);
+
+ nxt_fd_event_enable_read(engine, &c->socket);
}
@@ -33,7 +39,7 @@ nxt_event_conn_io_read(nxt_task_t *task, void *obj, void *data)
{
ssize_t n;
nxt_buf_t *b;
- nxt_bool_t batch;
+ nxt_work_queue_t *wq;
nxt_event_conn_t *c;
nxt_event_engine_t *engine;
nxt_work_handler_t handler;
@@ -46,18 +52,12 @@ nxt_event_conn_io_read(nxt_task_t *task, void *obj, void *data)
engine = task->thread->engine;
- batch = (engine->batch != 0);
state = c->read_state;
if (c->socket.read_ready) {
b = c->read;
- if (b == NULL) {
- /* Just test descriptor readiness. */
- goto ready;
- }
-
if (c->peek == 0) {
n = c->io->recvbuf(c, b);
@@ -68,34 +68,33 @@ nxt_event_conn_io_read(nxt_task_t *task, void *obj, void *data)
if (n > 0) {
c->nbytes = n;
- if (state->process_buffers) {
- nxt_recvbuf_update(b, n);
-
- } else {
- /*
- * A ready_handler must not be queued, instead buffers
- * must be processed by the ready_handler at once after
- * recv() operation, otherwise two sequentially queued
- * recv() operations will read in the same buffers.
- */
- batch = 0;
+ nxt_recvbuf_update(b, n);
+
+ nxt_fd_event_block_read(engine, &c->socket);
+
+ if (state->autoreset_timer) {
+ nxt_timer_disable(engine, &c->read_timer);
}
- goto ready;
+ wq = c->read_work_queue;
+ handler = state->ready_handler;
+
+ nxt_work_queue_add(wq, handler, task, c, data);
+
+ return;
}
if (n != NXT_AGAIN) {
nxt_fd_event_block_read(engine, &c->socket);
nxt_timer_disable(engine, &c->read_timer);
- if (n == 0) {
- handler = state->close_handler;
- goto done;
- }
+ wq = &engine->fast_work_queue;
- /* n == NXT_ERROR */
- handler = state->error_handler;
- goto done;
+ handler = (n == 0) ? state->close_handler : state->error_handler;
+
+ nxt_work_queue_add(wq, handler, task, c, data);
+
+ return;
}
}
@@ -119,25 +118,6 @@ nxt_event_conn_io_read(nxt_task_t *task, void *obj, void *data)
}
return;
-
-ready:
-
- nxt_fd_event_block_read(engine, &c->socket);
-
- if (state->autoreset_timer) {
- nxt_timer_disable(engine, &c->read_timer);
- }
-
- handler = state->ready_handler;
-
-done:
-
- if (batch) {
- nxt_work_queue_add(c->read_work_queue, handler, task, c, data);
-
- } else {
- handler(task, c, data);
- }
}
@@ -198,9 +178,9 @@ nxt_event_conn_io_recvbuf(nxt_event_conn_t *c, nxt_buf_t *b)
default:
c->socket.error = err;
- nxt_log_error(nxt_socket_error_level(err, c->socket.log_error),
- c->socket.log, "readv(%d, %ui) failed %E",
- c->socket.fd, niov, err);
+ nxt_log(c->socket.task, nxt_socket_error_level(err),
+ "readv(%d, %ui) failed %E", c->socket.fd, niov, err);
+
return NXT_ERROR;
}
}
@@ -233,6 +213,7 @@ nxt_event_conn_io_recv(nxt_event_conn_t *c, void *buf, size_t size,
if (n == 0) {
c->socket.closed = 1;
c->socket.read_ready = 0;
+
return n;
}
@@ -241,19 +222,21 @@ nxt_event_conn_io_recv(nxt_event_conn_t *c, void *buf, size_t size,
switch (err) {
case NXT_EAGAIN:
- nxt_log_debug(c->socket.log, "recv() %E", err);
+ nxt_debug(c->socket.task, "recv() %E", err);
c->socket.read_ready = 0;
+
return NXT_AGAIN;
case NXT_EINTR:
- nxt_log_debug(c->socket.log, "recv() %E", err);
+ nxt_debug(c->socket.task, "recv() %E", err);
continue;
default:
c->socket.error = err;
- nxt_log_error(nxt_socket_error_level(err, c->socket.log_error),
- c->socket.log, "recv(%d, %p, %uz, %ui) failed %E",
- c->socket.fd, buf, size, flags, err);
+ nxt_log(c->socket.task, nxt_socket_error_level(err),
+ "recv(%d, %p, %uz, %ui) failed %E",
+ c->socket.fd, buf, size, flags, err);
+
return NXT_ERROR;
}
}
diff --git a/src/nxt_event_conn_write.c b/src/nxt_event_conn_write.c
index 72d0731b..a0f6f953 100644
--- a/src/nxt_event_conn_write.c
+++ b/src/nxt_event_conn_write.c
@@ -7,32 +7,16 @@
#include <nxt_main.h>
-static void nxt_event_conn_average_rate_update(nxt_event_write_rate_t *rate,
- size_t sent, nxt_msec_t now);
-NXT_LIB_UNIT_TEST_STATIC double
- nxt_event_conn_exponential_approximation(double n);
static void nxt_event_conn_write_timer_handler(nxt_task_t *task, void *obj,
void *data);
void
-nxt_event_conn_write(nxt_task_t *task, nxt_event_conn_t *c)
+nxt_conn_io_write(nxt_task_t *task, void *obj, void *data)
{
- if (task->thread->engine->batch != 0) {
- nxt_event_conn_write_enqueue(task->thread, task, c);
-
- } else {
- c->io->write(task, c, c->socket.data);
- }
-}
-
-
-void
-nxt_event_conn_io_write(nxt_task_t *task, void *obj, void *data)
-{
- size_t sent, limit;
ssize_t ret;
nxt_buf_t *b;
+ nxt_sendbuf_t sb;
nxt_event_conn_t *c;
nxt_event_engine_t *engine;
@@ -40,40 +24,37 @@ nxt_event_conn_io_write(nxt_task_t *task, void *obj, void *data)
nxt_debug(task, "event conn write fd:%d", c->socket.fd);
- if (!c->socket.write_ready || c->delayed || c->write == NULL) {
+ if (!c->socket.write_ready || c->write == NULL) {
return;
}
engine = task->thread->engine;
- c->socket.write_handler = nxt_event_conn_io_write;
+ c->socket.write_handler = nxt_conn_io_write;
c->socket.error_handler = c->write_state->error_handler;
- ret = NXT_DECLINED;
- sent = 0;
b = c->write;
- limit = nxt_event_conn_write_limit(c);
-
- while (limit != 0) {
+ sb.socket = c->socket.fd;
+ sb.sent = 0;
+ sb.size = 0;
+ sb.buf = b;
+ sb.limit = 10 * 1024 * 1024;
+ sb.ready = 1;
+ sb.sync = 0;
- ret = c->io->write_chunk(c, b, limit);
+ do {
+ ret = nxt_conn_io_sendbuf(task, &sb);
if (ret < 0) {
/* ret == NXT_AGAIN || ret == NXT_ERROR. */
break;
}
- sent += ret;
- limit -= ret;
+ sb.sent += ret;
+ sb.limit -= ret;
- if (c->write_state->process_buffers) {
- b = nxt_sendbuf_completion(task, c->write_work_queue, b, ret);
- c->write = b;
-
- } else {
- b = nxt_sendbuf_update(b, ret);
- }
+ b = nxt_sendbuf_update(b, ret);
if (b == NULL) {
nxt_fd_event_block_write(engine, &c->socket);
@@ -84,20 +65,20 @@ nxt_event_conn_io_write(nxt_task_t *task, void *obj, void *data)
ret = NXT_AGAIN;
break;
}
- }
- nxt_debug(task, "event conn: %i sent:%z", ret, sent);
+ } while (sb.limit != 0);
- if (sent != 0) {
+ nxt_debug(task, "event conn: %i sent:%z", ret, sb.sent);
+
+ if (sb.sent != 0) {
if (c->write_state->autoreset_timer) {
nxt_timer_disable(engine, &c->write_timer);
}
}
- if (ret != NXT_ERROR
- && !nxt_event_conn_write_delayed(engine, c, sent))
- {
- if (limit == 0) {
+ if (ret != NXT_ERROR) {
+
+ if (sb.limit == 0) {
/*
* Postpone writing until next event poll to allow to
* process other recevied events and to get new events.
@@ -117,11 +98,11 @@ nxt_event_conn_io_write(nxt_task_t *task, void *obj, void *data)
}
}
- if (ret == 0 || sent != 0) {
+ if (ret == 0 || sb.sent != 0) {
/* "ret == 0" means a sync buffer was processed. */
- c->sent += sent;
- nxt_event_conn_io_handle(task->thread, c->write_work_queue,
- c->write_state->ready_handler, task, c, data);
+ c->sent += sb.sent;
+ nxt_work_queue_add(c->write_work_queue, c->write_state->ready_handler,
+ task, c, data);
/*
* Fall through if first operations were
* successful but the last one failed.
@@ -131,8 +112,8 @@ nxt_event_conn_io_write(nxt_task_t *task, void *obj, void *data)
if (nxt_slow_path(ret == NXT_ERROR)) {
nxt_fd_event_block_write(engine, &c->socket);
- nxt_event_conn_io_handle(task->thread, c->write_work_queue,
- c->write_state->error_handler, task, c, data);
+ nxt_work_queue_add(c->write_work_queue, c->write_state->error_handler,
+ task, c, data);
}
}
@@ -152,8 +133,8 @@ nxt_event_conn_write_limit(nxt_event_conn_t *c)
limit = rate->limit;
correction = limit - (size_t) rate->average;
- nxt_log_debug(c->socket.log, "event conn correction:%z average:%0.3f",
- correction, rate->average);
+ nxt_debug(c->socket.task, "event conn correction:%z average:%0.3f",
+ correction, rate->average);
limit += correction;
@@ -174,117 +155,10 @@ nxt_bool_t
nxt_event_conn_write_delayed(nxt_event_engine_t *engine, nxt_event_conn_t *c,
size_t sent)
{
- nxt_msec_t timer;
- nxt_event_write_rate_t *rate;
-
- rate = c->rate;
-
- if (rate != NULL) {
- nxt_event_conn_average_rate_update(rate, sent, engine->timers.now);
-
- if (rate->limit_after == 0) {
- timer = sent * 1000 / rate->limit;
-
- } else if (rate->limit_after >= sent) {
- timer = sent * 1000 / rate->max_limit;
- rate->limit_after -= sent;
-
- } else {
- sent -= rate->limit_after;
- timer = rate->limit_after * 1000 / rate->max_limit
- + sent * 1000 / rate->limit;
- rate->limit_after = 0;
- }
-
- nxt_log_debug(c->socket.log, "event conn timer: %M", timer);
-
- if (timer != 0) {
- c->delayed = 1;
-
- nxt_fd_event_block_write(engine, &c->socket);
-
- c->write_timer.handler = nxt_event_conn_write_timer_handler;
- nxt_timer_add(engine, &c->write_timer, timer);
-
- return 1;
- }
- }
-
return 0;
}
-/* Exponentially weighted moving average rate for a given interval. */
-
-static void
-nxt_event_conn_average_rate_update(nxt_event_write_rate_t *rate, size_t sent,
- nxt_msec_t now)
-{
- double weight, delta;
- nxt_msec_t elapsed;
- const nxt_uint_t interval = 10; /* 10s */
-
- elapsed = now - rate->last;
-
- if (elapsed == 0) {
- return;
- }
-
- rate->last = now;
- delta = (double) elapsed / 1000;
-
- weight = nxt_event_conn_exponential_approximation(-delta / interval);
-
- rate->average = (1 - weight) * sent / delta + weight * rate->average;
-
- nxt_thread_log_debug("event conn delta:%0.3f, weight:%0.3f, average:%0.3f",
- delta, weight, rate->average);
-}
-
-
-/*
- * exp() takes tens or hundreds nanoseconds on modern CPU.
- * This is a faster exp() approximation based on IEEE-754 format
- * layout and described in "A Fast, Compact Approximation of
- * the Exponential Function" * by N. N. Schraudolph, 1999.
- */
-
-NXT_LIB_UNIT_TEST_STATIC double
-nxt_event_conn_exponential_approximation(double x)
-{
- union {
- double d;
- int64_t n;
- } exp;
-
- if (x < -100) {
- /*
- * The approximation is correct in -700 to 700 range.
- * The "x" argument is always negative.
- */
- return 0;
- }
-
- /*
- * x * 2^52 / ln(2) + (1023 * 2^52 - 261140389990637.73
- *
- * 52 is the number of mantissa bits;
- * 1023 is the exponent bias;
- * 261140389990637.73 is the adjustment parameter to
- * improve the approximation. The parameter is equal to
- *
- * 2^52 * ln[ 3 / (8 * ln(2)) + 0.5 ] / ln(2)
- *
- * Only significant digits of the double float format
- * are used to present the double float constants.
- */
- exp.n = x * 4503599627370496.0 / 0.69314718055994530
- + (4607182418800017408.0 - 261140389990637.73);
-
- return exp.d;
-}
-
-
static void
nxt_event_conn_write_timer_handler(nxt_task_t *task, void *obj, void *data)
{
@@ -359,8 +233,7 @@ nxt_event_conn_io_writev(nxt_event_conn_t *c, nxt_iobuf_t *iob, nxt_uint_t niob)
err = (n == -1) ? nxt_socket_errno : 0;
- nxt_log_debug(c->socket.log, "writev(%d, %ui): %d",
- c->socket.fd, niob, n);
+ nxt_debug(c->socket.task, "writev(%d, %ui): %d", c->socket.fd, niob, n);
if (n > 0) {
return n;
@@ -371,19 +244,18 @@ nxt_event_conn_io_writev(nxt_event_conn_t *c, nxt_iobuf_t *iob, nxt_uint_t niob)
switch (err) {
case NXT_EAGAIN:
- nxt_log_debug(c->socket.log, "writev() %E", err);
+ nxt_debug(c->socket.task, "writev() %E", err);
c->socket.write_ready = 0;
return NXT_AGAIN;
case NXT_EINTR:
- nxt_log_debug(c->socket.log, "writev() %E", err);
+ nxt_debug(c->socket.task, "writev() %E", err);
continue;
default:
c->socket.error = err;
- nxt_log_error(nxt_socket_error_level(err, c->socket.log_error),
- c->socket.log, "writev(%d, %ui) failed %E",
- c->socket.fd, niob, err);
+ nxt_log(c->socket.task, nxt_socket_error_level(err),
+ "writev(%d, %ui) failed %E", c->socket.fd, niob, err);
return NXT_ERROR;
}
}
@@ -423,11 +295,116 @@ nxt_event_conn_io_send(nxt_event_conn_t *c, void *buf, size_t size)
default:
c->socket.error = err;
- nxt_log(c->socket.task,
- nxt_socket_error_level(err, c->socket.log_error),
+ nxt_log(c->socket.task, nxt_socket_error_level(err),
"send(%d, %p, %uz) failed %E",
c->socket.fd, buf, size, err);
return NXT_ERROR;
}
}
}
+
+
+ssize_t
+nxt_conn_io_sendbuf(nxt_task_t *task, nxt_sendbuf_t *sb)
+{
+ nxt_uint_t niov;
+ struct iovec iov[NXT_IOBUF_MAX];
+
+ niov = nxt_sendbuf_mem_coalesce0(task, sb, iov, NXT_IOBUF_MAX);
+
+ if (niov == 0 && sb->sync) {
+ return 0;
+ }
+
+ return nxt_conn_io_writev(task, sb, iov, niov);
+}
+
+
+ssize_t
+nxt_conn_io_writev(nxt_task_t *task, nxt_sendbuf_t *sb, struct iovec *iov,
+ nxt_uint_t niov)
+{
+ ssize_t n;
+ nxt_err_t err;
+
+ if (niov == 1) {
+ /* Disposal of surplus kernel iovec copy-in operation. */
+ return nxt_conn_io_send(task, sb, iov[0].iov_base, iov[0].iov_len);
+ }
+
+ for ( ;; ) {
+ n = writev(sb->socket, iov, niov);
+
+ err = (n == -1) ? nxt_socket_errno : 0;
+
+ nxt_debug(task, "writev(%d, %ui): %d", sb->socket, niov, n);
+
+ if (n > 0) {
+ return n;
+ }
+
+ /* n == -1 */
+
+ switch (err) {
+
+ case NXT_EAGAIN:
+ sb->ready = 0;
+ nxt_debug(task, "writev() %E", err);
+
+ return NXT_AGAIN;
+
+ case NXT_EINTR:
+ nxt_debug(task, "writev() %E", err);
+ continue;
+
+ default:
+ sb->error = err;
+ nxt_log(task, nxt_socket_error_level(err),
+ "writev(%d, %ui) failed %E", sb->socket, niov, err);
+
+ return NXT_ERROR;
+ }
+ }
+}
+
+
+ssize_t
+nxt_conn_io_send(nxt_task_t *task, nxt_sendbuf_t *sb, void *buf, size_t size)
+{
+ ssize_t n;
+ nxt_err_t err;
+
+ for ( ;; ) {
+ n = send(sb->socket, buf, size, 0);
+
+ err = (n == -1) ? nxt_socket_errno : 0;
+
+ nxt_debug(task, "send(%d, %p, %uz): %z", sb->socket, buf, size, n);
+
+ if (n > 0) {
+ return n;
+ }
+
+ /* n == -1 */
+
+ switch (err) {
+
+ case NXT_EAGAIN:
+ sb->ready = 0;
+ nxt_debug(task, "send() %E", err);
+
+ return NXT_AGAIN;
+
+ case NXT_EINTR:
+ nxt_debug(task, "send() %E", err);
+ continue;
+
+ default:
+ sb->error = err;
+ nxt_log(task, nxt_socket_error_level(err),
+ "send(%d, %p, %uz) failed %E", sb->socket, buf, size, err);
+
+ return NXT_ERROR;
+ }
+ }
+}
diff --git a/src/nxt_event_engine.c b/src/nxt_event_engine.c
index fbcf2384..ca79e8ed 100644
--- a/src/nxt_event_engine.c
+++ b/src/nxt_event_engine.c
@@ -44,7 +44,7 @@ nxt_event_engine_create(nxt_thread_t *thr,
thr->engine = engine;
thr->fiber = &engine->fibers->fiber;
- engine->batch = batch;
+ engine->batch0 = batch;
if (flags & NXT_ENGINE_FIBERS) {
engine->fibers = nxt_fiber_main_create(engine);
@@ -190,7 +190,7 @@ nxt_event_engine_signal_pipe_create(nxt_event_engine_t *engine)
* and in non-blocking node for reader.
*/
- if (nxt_pipe_create(pipe->fds, 1, 0) != NXT_OK) {
+ if (nxt_pipe_create(&engine->task, pipe->fds, 1, 0) != NXT_OK) {
nxt_free(pipe);
return NXT_ERROR;
}
@@ -220,7 +220,7 @@ nxt_event_engine_signal_pipe_free(nxt_event_engine_t *engine)
if (pipe->event.read_work_queue != NULL) {
nxt_fd_event_close(engine, &pipe->event);
- nxt_pipe_close(pipe->fds);
+ nxt_pipe_close(pipe->event.task, pipe->fds);
}
nxt_free(pipe);
@@ -235,7 +235,7 @@ nxt_event_engine_signal_pipe_close(nxt_task_t *task, void *obj, void *data)
pipe = obj;
- nxt_pipe_close(pipe->fds);
+ nxt_pipe_close(pipe->event.task, pipe->fds);
nxt_free(pipe);
}
@@ -331,15 +331,17 @@ nxt_event_engine_post_handler(nxt_task_t *task, void *obj, void *data)
static void
nxt_event_engine_signal_pipe_error(nxt_task_t *task, void *obj, void *data)
{
- nxt_event_engine_t *engine;
+ nxt_event_engine_t *engine;
+ nxt_event_engine_pipe_t *pipe;
engine = task->thread->engine;
+ pipe = engine->pipe;
nxt_log(task, NXT_LOG_CRIT, "engine pipe(%FD:%FD) event error",
- engine->pipe->fds[0], engine->pipe->fds[1]);
+ pipe->fds[0], pipe->fds[1]);
- nxt_fd_event_close(engine, &engine->pipe->event);
- nxt_pipe_close(engine->pipe->fds);
+ nxt_fd_event_close(engine, &pipe->event);
+ nxt_pipe_close(pipe->event.task, pipe->fds);
}
@@ -373,7 +375,7 @@ nxt_event_engine_change(nxt_thread_t *thr, nxt_task_t *task,
nxt_event_engine_t *engine;
engine = thr->engine;
- engine->batch = batch;
+ engine->batch0 = batch;
if (!engine->event.signal_support && interface->signal_support) {
/*
diff --git a/src/nxt_event_engine.h b/src/nxt_event_engine.h
index c34f8cd7..b78b1b71 100644
--- a/src/nxt_event_engine.h
+++ b/src/nxt_event_engine.h
@@ -477,7 +477,7 @@ struct nxt_event_engine_s {
uint8_t shutdown; /* 1 bit */
- uint32_t batch;
+ uint32_t batch0;
uint32_t connections;
uint32_t max_connections;
diff --git a/src/nxt_fd_event.h b/src/nxt_fd_event.h
index ff989394..762fdf25 100644
--- a/src/nxt_fd_event.h
+++ b/src/nxt_fd_event.h
@@ -66,31 +66,29 @@ struct nxt_fd_event_s {
#if (NXT_64BIT)
nxt_fd_event_state_t read:8; /* 3 bits. */
nxt_fd_event_state_t write:8; /* 3 bits. */
- nxt_socket_error_level_t log_error:8; /* 3 bits. */
uint8_t read_ready;
uint8_t write_ready;
uint8_t changing;
uint8_t closed;
- uint8_t shutdown;
uint8_t timedout;
+ uint8_t shutdown:1;
#if (NXT_HAVE_EPOLL)
uint8_t epoll_eof:1;
uint8_t epoll_error:1;
#endif
#if (NXT_HAVE_KQUEUE)
- uint8_t kq_eof;
+ uint8_t kq_eof:1;
#endif
#else /* NXT_32BIT */
nxt_fd_event_state_t read:3;
nxt_fd_event_state_t write:3;
- nxt_socket_error_level_t log_error:3;
uint8_t read_ready:1;
uint8_t write_ready:1;
uint8_t changing:1;
uint8_t closed:1;
- uint8_t shutdown:1;
uint8_t timedout:1;
+ uint8_t shutdown:1;
#if (NXT_HAVE_EPOLL)
uint8_t epoll_eof:1;
uint8_t epoll_error:1;
diff --git a/src/nxt_file.c b/src/nxt_file.c
index b2cab337..ddde8a9e 100644
--- a/src/nxt_file.c
+++ b/src/nxt_file.c
@@ -320,7 +320,7 @@ nxt_file_rename(nxt_file_name_t *old_name, nxt_file_name_t *new_name)
#if (NXT_HAVE_FIONBIO)
nxt_int_t
-nxt_fd_nonblocking(nxt_fd_t fd)
+nxt_fd_nonblocking(nxt_task_t *task, nxt_fd_t fd)
{
int nb;
@@ -330,7 +330,7 @@ nxt_fd_nonblocking(nxt_fd_t fd)
return NXT_OK;
}
- nxt_thread_log_alert("ioctl(%d, FIONBIO) failed %E", fd, nxt_errno);
+ nxt_log(task, NXT_LOG_CRIT, "ioctl(%d, FIONBIO) failed %E", fd, nxt_errno);
return NXT_ERROR;
@@ -338,7 +338,7 @@ nxt_fd_nonblocking(nxt_fd_t fd)
nxt_int_t
-nxt_fd_blocking(nxt_fd_t fd)
+nxt_fd_blocking(nxt_task_t *task, nxt_fd_t fd)
{
int nb;
@@ -348,7 +348,7 @@ nxt_fd_blocking(nxt_fd_t fd)
return NXT_OK;
}
- nxt_thread_log_alert("ioctl(%d, !FIONBIO) failed %E", fd, nxt_errno);
+ nxt_log(task, NXT_LOG_CRIT, "ioctl(%d, !FIONBIO) failed %E", fd, nxt_errno);
return NXT_ERROR;
}
@@ -356,22 +356,23 @@ nxt_fd_blocking(nxt_fd_t fd)
#else /* !(NXT_HAVE_FIONBIO) */
nxt_int_t
-nxt_fd_nonblocking(nxt_fd_t fd)
+nxt_fd_nonblocking(nxt_task_t *task, nxt_fd_t fd)
{
int flags;
flags = fcntl(fd, F_GETFL);
if (nxt_slow_path(flags == -1)) {
- nxt_thread_log_alert("fcntl(%d, F_GETFL) failed %E", fd, nxt_errno);
+ nxt_log(task, NXT_LOG_CRIT, "fcntl(%d, F_GETFL) failed %E",
+ fd, nxt_errno);
return NXT_ERROR;
}
flags |= O_NONBLOCK;
if (nxt_slow_path(fcntl(fd, F_SETFL, flags) == -1)) {
- nxt_thread_log_alert("fcntl(%d, F_SETFL, O_NONBLOCK) failed %E",
- fd, nxt_errno);
+ nxt_log(task, NXT_LOG_CRIT, "fcntl(%d, F_SETFL, O_NONBLOCK) failed %E",
+ fd, nxt_errno);
return NXT_ERROR;
}
@@ -380,23 +381,23 @@ nxt_fd_nonblocking(nxt_fd_t fd)
nxt_int_t
-nxt_fd_blocking(nxt_fd_t fd)
+nxt_fd_blocking(nxt_task_t *task, nxt_fd_t fd)
{
int flags;
flags = fcntl(fd, F_GETFL);
if (nxt_slow_path(flags == -1)) {
- nxt_thread_log_alert("fcntl(%d, F_GETFL) failed %E",
- fd, nxt_errno);
+ nxt_log(task, NXT_LOG_CRIT, "fcntl(%d, F_GETFL) failed %E",
+ fd, nxt_errno);
return NXT_ERROR;
}
flags &= O_NONBLOCK;
if (nxt_slow_path(fcntl(fd, F_SETFL, flags) == -1)) {
- nxt_thread_log_alert("fcntl(%d, F_SETFL, !O_NONBLOCK) failed %E",
- fd, nxt_errno);
+ nxt_log(task, NXT_LOG_CRIT, "fcntl(%d, F_SETFL, !O_NONBLOCK) failed %E",
+ fd, nxt_errno);
return NXT_ERROR;
}
@@ -547,24 +548,25 @@ nxt_stderr_start(void)
nxt_int_t
-nxt_pipe_create(nxt_fd_t *pp, nxt_bool_t nbread, nxt_bool_t nbwrite)
+nxt_pipe_create(nxt_task_t *task, nxt_fd_t *pp, nxt_bool_t nbread,
+ nxt_bool_t nbwrite)
{
if (pipe(pp) != 0) {
- nxt_thread_log_alert("pipe() failed %E", nxt_errno);
+ nxt_log(task, NXT_LOG_CRIT, "pipe() failed %E", nxt_errno);
return NXT_ERROR;
}
- nxt_thread_log_debug("pipe(): %FD:%FD", pp[0], pp[1]);
+ nxt_debug(task, "pipe(): %FD:%FD", pp[0], pp[1]);
if (nbread) {
- if (nxt_fd_nonblocking(pp[0]) != NXT_OK) {
+ if (nxt_fd_nonblocking(task, pp[0]) != NXT_OK) {
return NXT_ERROR;
}
}
if (nbwrite) {
- if (nxt_fd_nonblocking(pp[1]) != NXT_OK) {
+ if (nxt_fd_nonblocking(task, pp[1]) != NXT_OK) {
return NXT_ERROR;
}
}
@@ -574,16 +576,18 @@ nxt_pipe_create(nxt_fd_t *pp, nxt_bool_t nbread, nxt_bool_t nbwrite)
void
-nxt_pipe_close(nxt_fd_t *pp)
+nxt_pipe_close(nxt_task_t *task, nxt_fd_t *pp)
{
- nxt_thread_log_debug("pipe close(%FD:%FD)", pp[0], pp[1]);
+ nxt_debug(task, "pipe close(%FD:%FD)", pp[0], pp[1]);
if (close(pp[0]) != 0) {
- nxt_thread_log_alert("pipe close (%FD) failed %E", pp[0], nxt_errno);
+ nxt_log(task, NXT_LOG_CRIT, "pipe close (%FD) failed %E",
+ pp[0], nxt_errno);
}
if (close(pp[1]) != 0) {
- nxt_thread_log_alert("pipe close(%FD) failed %E", pp[1], nxt_errno);
+ nxt_log(task, NXT_LOG_CRIT, "pipe close(%FD) failed %E",
+ pp[1], nxt_errno);
}
}
diff --git a/src/nxt_file.h b/src/nxt_file.h
index 3ef5fbfb..01486e95 100644
--- a/src/nxt_file.h
+++ b/src/nxt_file.h
@@ -161,8 +161,8 @@ NXT_EXPORT nxt_int_t nxt_file_set_access(nxt_file_name_t *name,
NXT_EXPORT nxt_int_t nxt_file_rename(nxt_file_name_t *old_name,
nxt_file_name_t *new_name);
-NXT_EXPORT nxt_int_t nxt_fd_nonblocking(nxt_fd_t fd);
-NXT_EXPORT nxt_int_t nxt_fd_blocking(nxt_fd_t fd);
+NXT_EXPORT nxt_int_t nxt_fd_nonblocking(nxt_task_t *task, nxt_fd_t fd);
+NXT_EXPORT nxt_int_t nxt_fd_blocking(nxt_task_t *task, nxt_fd_t fd);
NXT_EXPORT ssize_t nxt_fd_write(nxt_fd_t fd, u_char *buf, size_t size);
NXT_EXPORT ssize_t nxt_fd_read(nxt_fd_t fd, u_char *buf, size_t size);
NXT_EXPORT void nxt_fd_close(nxt_fd_t fd);
@@ -185,9 +185,9 @@ nxt_write_syslog(priority, message) \
syslog(priority, "%s", message)
-NXT_EXPORT nxt_int_t nxt_pipe_create(nxt_fd_t *pp, nxt_bool_t nbread,
- nxt_bool_t nbwrite);
-NXT_EXPORT void nxt_pipe_close(nxt_fd_t *pp);
+NXT_EXPORT nxt_int_t nxt_pipe_create(nxt_task_t *task, nxt_fd_t *pp,
+ nxt_bool_t nbread, nxt_bool_t nbwrite);
+NXT_EXPORT void nxt_pipe_close(nxt_task_t *task, nxt_fd_t *pp);
NXT_EXPORT size_t nxt_dir_current(char *buf, size_t len);
diff --git a/src/nxt_freebsd_sendfile.c b/src/nxt_freebsd_sendfile.c
index 4a61d60f..a9535c10 100644
--- a/src/nxt_freebsd_sendfile.c
+++ b/src/nxt_freebsd_sendfile.c
@@ -92,7 +92,7 @@ nxt_freebsd_event_conn_io_sendfile(nxt_event_conn_t *c, nxt_buf_t *b,
hdtr.trl_cnt = ntr;
}
- nxt_log_debug(c->socket.log, "sendfile(%FD, %d, @%O, %uz) hd:%ui tr:%ui",
+ nxt_debug(c->socket.task, "sendfile(%FD, %d, @%O, %uz) hd:%ui tr:%ui",
fb->file->fd, c->socket.fd, fb->file_pos, file_size,
nhd, ntr);
@@ -102,7 +102,7 @@ nxt_freebsd_event_conn_io_sendfile(nxt_event_conn_t *c, nxt_buf_t *b,
err = (n == -1) ? nxt_errno : 0;
- nxt_log_debug(c->socket.log, "sendfile(): %d sent:%O", n, sent);
+ nxt_debug(c->socket.task, "sendfile(): %d sent:%O", n, sent);
if (n == -1) {
switch (err) {
@@ -116,23 +116,21 @@ nxt_freebsd_event_conn_io_sendfile(nxt_event_conn_t *c, nxt_buf_t *b,
default:
c->socket.error = err;
- nxt_log_error(nxt_socket_error_level(err, c->socket.log_error),
- c->socket.log, "sendfile(%FD, %d, %O, %uz) failed "
- "%E \"%FN\" hd:%ui tr:%ui", fb->file->fd,
- c->socket.fd, fb->file_pos, file_size,
- err, fb->file->name, nhd, ntr);
+ nxt_log(c->socket.task, nxt_socket_error_level(err),
+ "sendfile(%FD, %d, %O, %uz) failed %E \"%FN\" hd:%ui tr:%ui",
+ fb->file->fd, c->socket.fd, fb->file_pos, file_size, err,
+ fb->file->name, nhd, ntr);
return NXT_ERROR;
}
- nxt_log_debug(c->socket.log, "sendfile() %E", err);
+ nxt_debug(c->socket.task, "sendfile() %E", err);
return sent;
} else if (sent == 0) {
- nxt_log_error(NXT_LOG_ERR, c->socket.log,
- "file \"%FN\" was truncated while sendfile()",
- fb->file->name);
+ nxt_log(c->socket.task, NXT_LOG_ERR,
+ "file \"%FN\" was truncated while sendfile()", fb->file->name);
return NXT_ERROR;
}
diff --git a/src/nxt_hpux_sendfile.c b/src/nxt_hpux_sendfile.c
index a105b684..3c42c559 100644
--- a/src/nxt_hpux_sendfile.c
+++ b/src/nxt_hpux_sendfile.c
@@ -48,7 +48,7 @@ nxt_hpux_event_conn_io_sendfile(nxt_event_conn_t *c, nxt_buf_t *b, size_t limit)
sb.size = 0;
sb.limit = limit;
- nhd = nxt_sendbuf_mem_coalesce(&sb);
+ nhd = nxt_sendbuf_mem_coalesce(c->socket.task, &sb);
if (nhd == 0 && sb.sync) {
return 0;
@@ -69,7 +69,7 @@ nxt_hpux_event_conn_io_sendfile(nxt_event_conn_t *c, nxt_buf_t *b, size_t limit)
sb.iobuf = &iov[1];
sb.nmax = 1;
- ntr = nxt_sendbuf_mem_coalesce(&sb);
+ ntr = nxt_sendbuf_mem_coalesce(c->socket.task, &sb);
/*
* Disposal of surplus kernel operations
@@ -93,7 +93,7 @@ nxt_hpux_event_conn_io_sendfile(nxt_event_conn_t *c, nxt_buf_t *b, size_t limit)
hdtrl = iov;
}
- nxt_log_debug(c->socket.log, "sendfile(%d, %FD, @%O, %uz) hd:%ui tr:%ui",
+ nxt_debug(c->socket.task, "sendfile(%d, %FD, @%O, %uz) hd:%ui tr:%ui",
c->socket.fd, fb->file->fd, fb->file_pos, file_size,
nhd, ntr);
@@ -102,7 +102,7 @@ nxt_hpux_event_conn_io_sendfile(nxt_event_conn_t *c, nxt_buf_t *b, size_t limit)
err = (n == -1) ? nxt_errno : 0;
- nxt_log_debug(c->socket.log, "sendfile(): %uz", n);
+ nxt_debug(c->socket.task, "sendfile(): %uz", n);
if (n == -1) {
switch (err) {
@@ -116,16 +116,15 @@ nxt_hpux_event_conn_io_sendfile(nxt_event_conn_t *c, nxt_buf_t *b, size_t limit)
default:
c->socket.error = err;
- nxt_log_error(nxt_socket_error_level(err, c->socket.log_error),
- c->socket.log, "sendfile(%d, %FD, @%O, %uz) failed "
- "%E \"%FN\" hd:%ui tr:%ui", c->socket.fd,
- fb->file->fd, fb->file_pos, file_size,
- err, &fb->file->name, nhd, ntr);
+ nxt_log(c->socket.task, nxt_socket_error_level(err),
+ "sendfile(%d, %FD, @%O, %uz) failed \"%FN\" hd:%ui tr:%ui",
+ c->socket.fd, fb->file_pos, file_size, &fb->file->name,
+ nhd, ntr);
return NXT_ERROR;
}
- nxt_log_debug(c->socket.log, "sendfile() %E", err);
+ nxt_debug(c->socket.task, "sendfile() %E", err);
return 0;
}
diff --git a/src/nxt_job_resolve.c b/src/nxt_job_resolve.c
index e220b085..dc508ec3 100644
--- a/src/nxt_job_resolve.c
+++ b/src/nxt_job_resolve.c
@@ -81,15 +81,18 @@ nxt_job_resolve(nxt_job_resolve_t *jbr)
switch (r->ai_addr->sa_family) {
#if (NXT_INET6)
case AF_INET6:
+ length = NXT_INET6_ADDR_STR_LEN;
+ break;
#endif
case AF_INET:
+ length = NXT_INET_ADDR_STR_LEN;
break;
default:
continue;
}
- sa = nxt_sockaddr_create(mp, r->ai_addr, r->ai_addrlen);
+ sa = nxt_sockaddr_create(mp, r->ai_addr, r->ai_addrlen, length);
if (nxt_slow_path(sa == NULL)) {
goto fail;
}
diff --git a/src/nxt_kqueue_engine.c b/src/nxt_kqueue_engine.c
index b4057389..0aeb4faa 100644
--- a/src/nxt_kqueue_engine.c
+++ b/src/nxt_kqueue_engine.c
@@ -119,7 +119,7 @@ static nxt_event_conn_io_t nxt_kqueue_event_conn_io = {
nxt_kqueue_event_conn_io_recvbuf,
nxt_event_conn_io_recv,
- nxt_event_conn_io_write,
+ nxt_conn_io_write,
nxt_event_conn_io_write_chunk,
#if (NXT_HAVE_FREEBSD_SENDFILE)
@@ -524,7 +524,7 @@ nxt_kqueue_fd_error_handler(nxt_task_t *task, void *obj, void *data)
if (ev->kq_eof && ev->kq_errno != 0) {
ev->error = ev->kq_errno;
- nxt_log(task, nxt_socket_error_level(ev->kq_errno, ev->log_error),
+ nxt_log(task, nxt_socket_error_level(ev->kq_errno),
"kevent() reported error on descriptor %d %E",
ev->fd, ev->kq_errno);
}
@@ -848,7 +848,7 @@ nxt_kqueue_event_conn_io_connect(nxt_task_t *task, void *obj, void *data)
state = c->write_state;
- switch (nxt_socket_connect(c->socket.fd, c->remote) ){
+ switch (nxt_socket_connect(task, c->socket.fd, c->remote) ){
case NXT_OK:
c->socket.write_ready = 1;
@@ -874,8 +874,7 @@ nxt_kqueue_event_conn_io_connect(nxt_task_t *task, void *obj, void *data)
break;
}
- nxt_event_conn_io_handle(task->thread, c->write_work_queue, handler, task,
- c, data);
+ nxt_work_queue_add(c->write_work_queue, handler, task, c, data);
}
@@ -909,7 +908,7 @@ nxt_kqueue_listen_handler(nxt_task_t *task, void *obj, void *data)
nxt_debug(task, "kevent fd:%d avail:%D",
cls->socket.fd, cls->socket.kq_available);
- cls->ready = nxt_min(cls->batch, (uint32_t) cls->socket.kq_available);
+ cls->ready = nxt_min(cls->batch0, (uint32_t) cls->socket.kq_available);
nxt_kqueue_event_conn_io_accept(task, cls, data);
}
@@ -933,7 +932,7 @@ nxt_kqueue_event_conn_io_accept(nxt_task_t *task, void *obj, void *data)
cls->socket.kq_available--;
cls->socket.read_ready = (cls->socket.kq_available != 0);
- len = nxt_socklen(c->remote);
+ len = c->remote->socklen;
if (len >= sizeof(struct sockaddr)) {
sa = &c->remote->u.sockaddr;
diff --git a/src/nxt_linux_sendfile.c b/src/nxt_linux_sendfile.c
index 1dbac677..f5a7dccf 100644
--- a/src/nxt_linux_sendfile.c
+++ b/src/nxt_linux_sendfile.c
@@ -95,8 +95,8 @@ nxt_linux_event_conn_io_sendfile(nxt_event_conn_t *c, nxt_buf_t *b,
size = nxt_sendbuf_file_coalesce(&sb);
- nxt_log_debug(c->socket.log, "sendfile(%d, %FD, @%O, %uz)",
- c->socket.fd, fb->file->fd, fb->file_pos, size);
+ nxt_debug(c->socket.task, "sendfile(%d, %FD, @%O, %uz)",
+ c->socket.fd, fb->file->fd, fb->file_pos, size);
offset = fb->file_pos;
@@ -104,7 +104,7 @@ nxt_linux_event_conn_io_sendfile(nxt_event_conn_t *c, nxt_buf_t *b,
err = (n == -1) ? nxt_errno : 0;
- nxt_log_debug(c->socket.log, "sendfile(): %d", n);
+ nxt_debug(c->socket.task, "sendfile(): %d", n);
if (n == -1) {
switch (err) {
@@ -118,16 +118,15 @@ nxt_linux_event_conn_io_sendfile(nxt_event_conn_t *c, nxt_buf_t *b,
default:
c->socket.error = err;
- nxt_log_error(nxt_socket_error_level(err, c->socket.log_error),
- c->socket.log,
- "sendfile(%d, %FD, %O, %uz) failed %E \"%FN\"",
- c->socket.fd, fb->file->fd, fb->file_pos, size,
- err, fb->file->name);
+ nxt_log(c->socket.task, nxt_socket_error_level(err),
+ "sendfile(%d, %FD, %O, %uz) failed %E \"%FN\"",
+ c->socket.fd, fb->file->fd, fb->file_pos, size,
+ err, fb->file->name);
return NXT_ERROR;
}
- nxt_log_debug(c->socket.log, "sendfile() %E", err);
+ nxt_debug(c->socket.task, "sendfile() %E", err);
return 0;
}
@@ -150,8 +149,8 @@ nxt_linux_send(nxt_event_conn_t *c, void *buf, size_t size, nxt_uint_t flags)
err = (n == -1) ? nxt_errno : 0;
- nxt_log_debug(c->socket.log, "send(%d, %p, %uz, 0x%uXi): %z",
- c->socket.fd, buf, size, flags, n);
+ nxt_debug(c->socket.task, "send(%d, %p, %uz, 0x%uXi): %z",
+ c->socket.fd, buf, size, flags, n);
if (n == -1) {
switch (err) {
@@ -165,14 +164,14 @@ nxt_linux_send(nxt_event_conn_t *c, void *buf, size_t size, nxt_uint_t flags)
default:
c->socket.error = err;
- nxt_log_error(nxt_socket_error_level(err, c->socket.log_error),
- c->socket.log, "send(%d, %p, %uz, 0x%uXi) failed %E",
- c->socket.fd, buf, size, flags, err);
+ nxt_log(c->socket.task, nxt_socket_error_level(err),
+ "send(%d, %p, %uz, 0x%uXi) failed %E",
+ c->socket.fd, buf, size, flags, err);
return NXT_ERROR;
}
- nxt_log_debug(c->socket.log, "send() %E", err);
+ nxt_debug(c->socket.task, "send() %E", err);
return 0;
}
@@ -205,8 +204,8 @@ nxt_linux_sendmsg(nxt_event_conn_t *c, nxt_sendbuf_coalesce_t *sb,
err = (n == -1) ? nxt_errno : 0;
- nxt_log_debug(c->socket.log, "sendmsg(%d, %ui, 0x%uXi): %d",
- c->socket.fd, niov, flags, n);
+ nxt_debug(c->socket.task, "sendmsg(%d, %ui, 0x%uXi): %d",
+ c->socket.fd, niov, flags, n);
if (n == -1) {
switch (err) {
@@ -220,14 +219,14 @@ nxt_linux_sendmsg(nxt_event_conn_t *c, nxt_sendbuf_coalesce_t *sb,
default:
c->socket.error = err;
- nxt_log_error(nxt_socket_error_level(err, c->socket.log_error),
- c->socket.log, "sendmsg(%d, %ui, 0x%uXi) failed %E",
- c->socket.fd, niov, flags, err);
+ nxt_log(c->socket.task, nxt_socket_error_level(err),
+ "sendmsg(%d, %ui, 0x%uXi) failed %E",
+ c->socket.fd, niov, flags, err);
return NXT_ERROR;
}
- nxt_log_debug(c->socket.log, "sendmsg() %E", err);
+ nxt_debug(c->socket.task, "sendmsg() %E", err);
return 0;
}
diff --git a/src/nxt_listen_socket.c b/src/nxt_listen_socket.c
index 22b5bf8d..08751dfc 100644
--- a/src/nxt_listen_socket.c
+++ b/src/nxt_listen_socket.c
@@ -12,7 +12,8 @@ static u_char *nxt_listen_socket_log_handler(void *ctx, u_char *pos,
nxt_int_t
-nxt_listen_socket_create(nxt_listen_socket_t *ls, nxt_bool_t bind_test)
+nxt_listen_socket_create(nxt_task_t *task, nxt_listen_socket_t *ls,
+ nxt_bool_t bind_test)
{
nxt_log_t log, *old;
nxt_uint_t family;
@@ -31,12 +32,12 @@ nxt_listen_socket_create(nxt_listen_socket_t *ls, nxt_bool_t bind_test)
family = sa->u.sockaddr.sa_family;
- s = nxt_socket_create(family, sa->type, 0, ls->flags);
+ s = nxt_socket_create(task, family, sa->type, 0, ls->flags);
if (s == -1) {
goto socket_fail;
}
- if (nxt_socket_setsockopt(s, SOL_SOCKET, SO_REUSEADDR, 1) != NXT_OK) {
+ if (nxt_socket_setsockopt(task, s, SOL_SOCKET, SO_REUSEADDR, 1) != NXT_OK) {
goto fail;
}
@@ -48,7 +49,8 @@ nxt_listen_socket_create(nxt_listen_socket_t *ls, nxt_bool_t bind_test)
ipv6only = (ls->ipv6only == 1);
/* Ignore possible error. TODO: why? */
- (void) nxt_socket_setsockopt(s, IPPROTO_IPV6, IPV6_V6ONLY, ipv6only);
+ (void) nxt_socket_setsockopt(task, s, IPPROTO_IPV6, IPV6_V6ONLY,
+ ipv6only);
}
#endif
@@ -56,7 +58,7 @@ nxt_listen_socket_create(nxt_listen_socket_t *ls, nxt_bool_t bind_test)
#if 0
/* Ignore possible error. TODO: why? */
- (void) nxt_socket_setsockopt(s, SOL_SOCKET, SO_SNDBUF, 8192);
+ (void) nxt_socket_setsockopt(task, s, SOL_SOCKET, SO_SNDBUF, 8192);
#endif
@@ -65,12 +67,12 @@ nxt_listen_socket_create(nxt_listen_socket_t *ls, nxt_bool_t bind_test)
if (ls->read_after_accept) {
/* Defer accept() maximum at 1 second. */
/* Ignore possible error. TODO: why? */
- (void) nxt_socket_setsockopt(s, IPPROTO_TCP, TCP_DEFER_ACCEPT, 1);
+ (void) nxt_socket_setsockopt(task, s, IPPROTO_TCP, TCP_DEFER_ACCEPT, 1);
}
#endif
- switch (nxt_socket_bind(s, sa, bind_test)) {
+ switch (nxt_socket_bind(task, s, sa, bind_test)) {
case NXT_OK:
break;
@@ -103,11 +105,11 @@ nxt_listen_socket_create(nxt_listen_socket_t *ls, nxt_bool_t bind_test)
#endif
- nxt_log_debug(&log, "listen(%d, %d)", s, ls->backlog);
+ nxt_debug(task, "listen(%d, %d)", s, ls->backlog);
if (listen(s, ls->backlog) != 0) {
- nxt_log_alert(&log, "listen(%d, %d) failed %E",
- s, ls->backlog, nxt_socket_errno);
+ nxt_log(task, NXT_LOG_CRIT, "listen(%d, %d) failed %E",
+ s, ls->backlog, nxt_socket_errno);
goto fail;
}
@@ -118,7 +120,7 @@ nxt_listen_socket_create(nxt_listen_socket_t *ls, nxt_bool_t bind_test)
fail:
- nxt_socket_close(s);
+ nxt_socket_close(task, s);
socket_fail:
@@ -129,7 +131,8 @@ socket_fail:
nxt_int_t
-nxt_listen_socket_update(nxt_listen_socket_t *ls, nxt_listen_socket_t *prev)
+nxt_listen_socket_update(nxt_task_t *task, nxt_listen_socket_t *ls,
+ nxt_listen_socket_t *prev)
{
nxt_log_t log, *old;
nxt_thread_t *thr;
@@ -143,11 +146,11 @@ nxt_listen_socket_update(nxt_listen_socket_t *ls, nxt_listen_socket_t *prev)
log.ctx = ls->sockaddr;
thr->log = &log;
- nxt_log_debug(&log, "listen(%d, %d)", ls->socket, ls->backlog);
+ nxt_debug(task, "listen(%d, %d)", ls->socket, ls->backlog);
if (listen(ls->socket, ls->backlog) != 0) {
- nxt_log_alert(&log, "listen(%d, %d) failed %E",
- ls->socket, ls->backlog, nxt_socket_errno);
+ nxt_log(task, NXT_LOG_CRIT, "listen(%d, %d) failed %E",
+ ls->socket, ls->backlog, nxt_socket_errno);
goto fail;
}
@@ -183,6 +186,7 @@ nxt_listen_socket_pool_min_size(nxt_listen_socket_t *ls)
case AF_INET6:
ls->socklen = sizeof(struct sockaddr_in6);
+ ls->address_length = NXT_INET6_ADDR_STR_LEN;
size = offsetof(nxt_sockaddr_t, u) + sizeof(struct sockaddr_in6)
+ NXT_INET6_ADDR_STR_LEN + (sizeof(":65535") - 1);
@@ -206,6 +210,7 @@ nxt_listen_socket_pool_min_size(nxt_listen_socket_t *ls)
*/
ls->socklen = 3;
size = ls->socklen + sizeof("unix:") - 1;
+ ls->address_length = sizeof("unix:") - 1;
break;
@@ -213,6 +218,7 @@ nxt_listen_socket_pool_min_size(nxt_listen_socket_t *ls)
default:
ls->socklen = sizeof(struct sockaddr_in);
+ ls->address_length = NXT_INET_ADDR_STR_LEN;
size = offsetof(nxt_sockaddr_t, u) + sizeof(struct sockaddr_in)
+ NXT_INET_ADDR_STR_LEN + (sizeof(":65535") - 1);
@@ -248,5 +254,5 @@ nxt_listen_socket_log_handler(void *ctx, u_char *pos, u_char *end)
sa = ctx;
return nxt_sprintf(pos, end, " while creating listening socket on %*s",
- sa->text_len, sa->text);
+ sa->length, sa->start);
}
diff --git a/src/nxt_listen_socket.h b/src/nxt_listen_socket.h
index 2100f17f..1e62ae80 100644
--- a/src/nxt_listen_socket.h
+++ b/src/nxt_listen_socket.h
@@ -28,10 +28,12 @@ typedef struct {
uint8_t ipv6only; /* 2 bits */
#endif
- void *servers;
+ uint8_t socklen;
+ uint8_t address_length;
- socklen_t socklen;
uint32_t mem_pool_size;
+
+ void *servers;
} nxt_listen_socket_t;
@@ -51,10 +53,10 @@ typedef struct {
#endif
-NXT_EXPORT nxt_int_t nxt_listen_socket_create(nxt_listen_socket_t *ls,
- nxt_bool_t bind_test);
-NXT_EXPORT nxt_int_t nxt_listen_socket_update(nxt_listen_socket_t *ls,
- nxt_listen_socket_t *prev);
+NXT_EXPORT nxt_int_t nxt_listen_socket_create(nxt_task_t *task,
+ nxt_listen_socket_t *ls, nxt_bool_t bind_test);
+NXT_EXPORT nxt_int_t nxt_listen_socket_update(nxt_task_t *task,
+ nxt_listen_socket_t *ls, nxt_listen_socket_t *prev);
NXT_EXPORT size_t nxt_listen_socket_pool_min_size(nxt_listen_socket_t *ls);
diff --git a/src/nxt_macosx_sendfile.c b/src/nxt_macosx_sendfile.c
index 34a586c3..f636819c 100644
--- a/src/nxt_macosx_sendfile.c
+++ b/src/nxt_macosx_sendfile.c
@@ -112,7 +112,7 @@ nxt_macosx_event_conn_io_sendfile(nxt_event_conn_t *c, nxt_buf_t *b,
err = (n == -1) ? nxt_errno : 0;
- nxt_log_debug(c->socket.log, "sendfile(): %d sent:%O", n, sent);
+ nxt_debug(c->socket.task, "sendfile(): %d sent:%O", n, sent);
if (n == -1) {
switch (err) {
@@ -126,24 +126,22 @@ nxt_macosx_event_conn_io_sendfile(nxt_event_conn_t *c, nxt_buf_t *b,
default:
c->socket.error = err;
- nxt_log_error(nxt_socket_error_level(err, c->socket.log_error),
- c->socket.log, "sendfile(%FD, %d, %O, %O) failed "
- "%E \"%FN\" hd:%ui tr:%ui", fb->file->fd,
- c->socket.fd, fb->file_pos, sent, err,
- fb->file->name, nhd, ntr);
+ nxt_log(c->socket.task, nxt_socket_error_level(err),
+ "sendfile(%FD, %d, %O, %O) failed %E \"%FN\" hd:%ui tr:%ui",
+ fb->file->fd, c->socket.fd, fb->file_pos, sent, err,
+ fb->file->name, nhd, ntr);
return NXT_ERROR;
}
- nxt_log_debug(c->socket.log, "sendfile() %E", err);
+ nxt_debug(c->socket.task, "sendfile() %E", err);
return sent;
}
if (sent == 0) {
- nxt_log_error(NXT_LOG_ERR, c->socket.log,
- "file \"%FN\" was truncated while sendfile()",
- fb->file->name);
+ nxt_log(c->socket.task, NXT_LOG_ERR,
+ "file \"%FN\" was truncated while sendfile()", fb->file->name);
return NXT_ERROR;
}
@@ -154,240 +152,3 @@ nxt_macosx_event_conn_io_sendfile(nxt_event_conn_t *c, nxt_buf_t *b,
return sent;
}
-
-
-#if 0
-
-typedef struct {
- nxt_socket_t socket;
- nxt_err_t error;
-
- uint8_t write_ready; /* 1 bit */
- uint8_t log_error;
-} nxt_sendbuf_t;
-
-
-ssize_t nxt_macosx_sendfile(nxt_thread_t *thr, nxt_sendbuf_t *sb, nxt_buf_t *b,
- size_t limit);
-ssize_t nxt_writev(nxt_thread_t *thr, nxt_sendbuf_t *sb, nxt_iobuf_t *iob,
- nxt_uint_t niob);
-ssize_t nxt_send(nxt_thread_t *thr, nxt_sendbuf_t *sb, void *buf, size_t size);
-
-
-ssize_t
-nxt_macosx_sendfile(nxt_thread_t *thr, nxt_sendbuf_t *sb, nxt_buf_t *b,
- size_t limit)
-{
- size_t hd_size, file_size;
- ssize_t n;
- nxt_buf_t *buf;
- nxt_err_t err;
- nxt_off_t sent;
- nxt_uint_t nhd, ntr;
- struct iovec hd[NXT_IOBUF_MAX], tr[NXT_IOBUF_MAX];
- struct sf_hdtr hdtr, *ht;
- nxt_sendbuf_coalesce_t sbc;
-
- sbc.buf = b;
- sbc.iobuf = hd;
- sbc.nmax = NXT_IOBUF_MAX;
- sbc.sync = 0;
- sbc.size = 0;
- sbc.limit = limit;
-
- nhd = nxt_sendbuf_mem_coalesce(&sbc);
-
- if (nhd == 0 && sbc.sync) {
- return 0;
- }
-
- if (sbc.buf == NULL || !nxt_buf_is_file(sbc.buf)) {
- return nxt_writev(thr, sb, hd, nhd);
- }
-
- hd_size = sbc.size;
- buf = sbc.buf;
-
- file_size = nxt_sendbuf_file_coalesce(&sbc);
-
- if (file_size == 0) {
- return nxt_writev(thr, sb, hd, nhd);
- }
-
- sbc.iobuf = tr;
-
- ntr = nxt_sendbuf_mem_coalesce(&sbc);
-
- /*
- * Disposal of surplus kernel operations if there are no headers
- * and trailers. Besides sendfile() returns EINVAL if a sf_hdtr's
- * count is 0, but corresponding pointer is not NULL.
- */
-
- nxt_memzero(&hdtr, sizeof(struct sf_hdtr));
- ht = NULL;
-
- if (nhd != 0) {
- ht = &hdtr;
- hdtr.headers = hd;
- hdtr.hdr_cnt = nhd;
- }
-
- if (ntr != 0) {
- ht = &hdtr;
- hdtr.trailers = tr;
- hdtr.trl_cnt = ntr;
- }
-
- /*
- * MacOSX has the same bug as old FreeBSD (http://bugs.freebsd.org/33771).
- * However this bug has never been fixed and instead of this it has been
- * documented as a feature in MacOSX 10.7 (Lion) sendfile(2):
- *
- * When a header or trailer is specified, the value of len argument
- * indicates the maximum number of bytes in the header and/or file
- * to be sent. It does not control the trailer; if a trailer exists,
- * all of it will be sent.
- */
- sent = hd_size + file_size;
-
- nxt_log_debug(thr->log, "sendfile(%FD, %d, @%O, %O) hd:%ui tr:%ui hs:%uz",
- buf->file->fd, sb->socket, buf->file_pos, sent,
- nhd, ntr, hd_size);
-
- n = nxt_sys_sendfile(buf->file->fd, sb->socket,
- buf->file_pos, &sent, ht, 0);
-
- err = (n == -1) ? nxt_errno : 0;
-
- nxt_log_debug(thr->log, "sendfile(): %d sent:%O", n, sent);
-
- if (n == -1) {
- switch (err) {
-
- case NXT_EAGAIN:
- sb->write_ready = 0;
- break;
-
- case NXT_EINTR:
- break;
-
- default:
- sb->error = err;
- nxt_log_error(nxt_socket_error_level(err, sb->log_error), thr->log,
- "sendfile(%FD, %d, %O, %O) failed %E \"%FN\" "
- "hd:%ui tr:%ui", buf->file->fd, sb->socket,
- buf->file_pos, sent, err, buf->file->name, nhd, ntr);
-
- return NXT_ERROR;
- }
-
- nxt_log_debug(thr->log, "sendfile() %E", err);
-
- return sent;
- }
-
- if (sent == 0) {
- nxt_log_error(NXT_LOG_ERR, thr->log,
- "file \"%FN\" was truncated while sendfile()",
- buf->file->name);
-
- return NXT_ERROR;
- }
-
- if (sent < (nxt_off_t) sbc.size) {
- sb->write_ready = 0;
- }
-
- return sent;
-}
-
-
-ssize_t
-nxt_writev(nxt_thread_t *thr, nxt_sendbuf_t *sb, nxt_iobuf_t *iob,
- nxt_uint_t niob)
-{
- ssize_t n;
- nxt_err_t err;
-
- if (niob == 1) {
- /* Disposal of surplus kernel iovec copy-in operation. */
- return nxt_send(thr, sb, iob->iov_base, iob->iov_len);
- }
-
- for ( ;; ) {
- n = writev(sb->socket, iob, niob);
-
- err = (n == -1) ? nxt_socket_errno : 0;
-
- nxt_log_debug(thr->log, "writev(%d, %ui): %d", sb->socket, niob, n);
-
- if (n > 0) {
- return n;
- }
-
- /* n == -1 */
-
- switch (err) {
-
- case NXT_EAGAIN:
- nxt_log_debug(thr->log, "writev() %E", err);
- sb->write_ready = 0;
- return NXT_AGAIN;
-
- case NXT_EINTR:
- nxt_log_debug(thr->log, "writev() %E", err);
- continue;
-
- default:
- sb->error = err;
- nxt_log_error(nxt_socket_error_level(err, sb->log_error), thr->log,
- "writev(%d, %ui) failed %E", sb->socket, niob, err);
- return NXT_ERROR;
- }
- }
-}
-
-
-ssize_t
-nxt_send(nxt_thread_t *thr, nxt_sendbuf_t *sb, void *buf, size_t size)
-{
- ssize_t n;
- nxt_err_t err;
-
- for ( ;; ) {
- n = send(sb->socket, buf, size, 0);
-
- err = (n == -1) ? nxt_socket_errno : 0;
-
- nxt_log_debug(thr->log, "send(%d, %p, %uz): %z",
- sb->socket, buf, size, n);
-
- if (n > 0) {
- return n;
- }
-
- /* n == -1 */
-
- switch (err) {
-
- case NXT_EAGAIN:
- nxt_log_debug(thr->log, "send() %E", err);
- sb->write_ready = 0;
- return NXT_AGAIN;
-
- case NXT_EINTR:
- nxt_log_debug(thr->log, "send() %E", err);
- continue;
-
- default:
- sb->error = err;
- nxt_log_error(nxt_socket_error_level(err, sb->log_error), thr->log,
- "send(%d, %p, %uz) failed %E",
- sb->socket, buf, size, err);
- return NXT_ERROR;
- }
- }
-}
-
-#endif
diff --git a/src/nxt_main.h b/src/nxt_main.h
index 22f39a59..b692dc92 100644
--- a/src/nxt_main.h
+++ b/src/nxt_main.h
@@ -16,6 +16,7 @@
#include <nxt_time.h>
#include <nxt_process.h>
+typedef struct nxt_task_s nxt_task_t;
typedef struct nxt_thread_s nxt_thread_t;
#include <nxt_thread_id.h>
diff --git a/src/nxt_master_process.c b/src/nxt_master_process.c
index 4fd7e164..ceb44e1f 100644
--- a/src/nxt_master_process.c
+++ b/src/nxt_master_process.c
@@ -81,7 +81,7 @@ nxt_master_process_port_create(nxt_task_t *task, nxt_cycle_t *cycle)
proc->pid = nxt_pid;
proc->engine = 0;
- proc->port = nxt_port_create(0);
+ proc->port = nxt_port_create(task, 0);
if (nxt_slow_path(proc->port == NULL)) {
return NXT_ERROR;
}
@@ -156,7 +156,7 @@ nxt_master_create_worker_process(nxt_task_t *task, nxt_cycle_t *cycle)
proc->engine = 0;
proc->generation = cycle->process_generation;
- proc->port = nxt_port_create(0);
+ proc->port = nxt_port_create(task, 0);
if (nxt_slow_path(proc->port == NULL)) {
return NXT_ERROR;
}
diff --git a/src/nxt_openssl.c b/src/nxt_openssl.c
index fbf08b24..17e65a56 100644
--- a/src/nxt_openssl.c
+++ b/src/nxt_openssl.c
@@ -59,7 +59,7 @@ static nxt_event_conn_io_t nxt_openssl_event_conn_io = {
NULL,
NULL,
- nxt_event_conn_io_write,
+ nxt_conn_io_write,
nxt_openssl_conn_io_write_chunk,
NULL,
NULL,
@@ -309,9 +309,8 @@ nxt_openssl_conn_init(nxt_task_t *task, nxt_ssltls_conf_t *conf,
fail:
- nxt_event_conn_io_handle(task->thread, c->read_work_queue,
- c->read_state->error_handler,
- task, c, c->socket.data);
+ nxt_work_queue_add(c->read_work_queue, c->read_state->error_handler,
+ task, c, c->socket.data);
}
@@ -367,9 +366,8 @@ nxt_openssl_conn_handshake(nxt_task_t *task, void *obj, void *data)
nxt_openssl_conn_error(c, err, "SSL_do_handshake(%d) failed",
c->socket.fd);
- nxt_event_conn_io_handle(task->thread, c->read_work_queue,
- c->read_state->error_handler,
- task, c, data);
+ nxt_work_queue_add(c->read_work_queue, c->read_state->error_handler,
+ task, c, data);
} else if (ssltls->ssl_error == SSL_ERROR_WANT_READ && ssltls->times < 2) {
ssltls->times++;
@@ -429,8 +427,7 @@ nxt_openssl_conn_io_read(nxt_task_t *task, void *obj, void *data)
}
}
- nxt_event_conn_io_handle(task->thread, c->read_work_queue, handler,
- task, c, data);
+ nxt_work_queue_add(c->read_work_queue, handler, task, c, data);
}
@@ -475,7 +472,7 @@ nxt_openssl_conn_io_send(nxt_event_conn_t *c, void *buf, size_t size)
}
n = nxt_openssl_conn_test_error(c->socket.task, c, ret, err,
- nxt_event_conn_io_write);
+ nxt_conn_io_write);
if (n == NXT_ERROR) {
nxt_openssl_conn_error(c, err, "SSL_write(%d, %p, %uz) failed",
@@ -584,8 +581,7 @@ nxt_openssl_conn_io_shutdown(nxt_task_t *task, void *obj, void *data)
done:
- nxt_event_conn_io_handle(task->thread, c->write_work_queue, handler,
- task, c, data);
+ nxt_work_queue_add(c->write_work_queue, handler, task, c, data);
}
@@ -656,8 +652,7 @@ nxt_openssl_conn_test_error(nxt_task_t *task, nxt_event_conn_t *c, int ret,
handler = c->write_state->close_handler;
}
- nxt_event_conn_io_handle(task->thread, wq, handler,
- task, c, c->socket.data);
+ nxt_work_queue_add(wq, handler, task, c, c->socket.data);
return 0;
@@ -707,7 +702,7 @@ nxt_openssl_log_error_level(nxt_event_conn_t *c, nxt_err_t err)
switch (ERR_GET_REASON(ERR_peek_error())) {
case 0:
- return nxt_socket_error_level(err, c->socket.log_error);
+ return nxt_socket_error_level(err);
case SSL_R_BAD_CHANGE_CIPHER_SPEC: /* 103 */
case SSL_R_BLOCK_CIPHER_PAD_IS_WRONG: /* 129 */
diff --git a/src/nxt_port.c b/src/nxt_port.c
index c00fcb61..7da2bcc4 100644
--- a/src/nxt_port.c
+++ b/src/nxt_port.c
@@ -153,7 +153,7 @@ nxt_process_port_new_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg)
return;
}
- port = nxt_port_alloc();
+ port = nxt_port_alloc(task);
if (nxt_slow_path(port == NULL)) {
return;
}
diff --git a/src/nxt_port_socket.c b/src/nxt_port_socket.c
index 5b0c2d43..a3bc7f26 100644
--- a/src/nxt_port_socket.c
+++ b/src/nxt_port_socket.c
@@ -17,7 +17,7 @@ static void nxt_port_error_handler(nxt_task_t *task, void *obj, void *data);
nxt_port_t *
-nxt_port_alloc(void)
+nxt_port_alloc(nxt_task_t *task)
{
nxt_port_t *port;
nxt_mem_pool_t *mp;
@@ -29,6 +29,8 @@ nxt_port_alloc(void)
port = nxt_mem_zalloc(mp, sizeof(nxt_port_t));
port->mem_pool = mp;
+ port->socket.task = task;
+
port->pair[0] = -1;
port->pair[1] = -1;
@@ -42,31 +44,31 @@ nxt_port_alloc(void)
nxt_port_t *
-nxt_port_create(size_t max_size)
+nxt_port_create(nxt_task_t *task, size_t max_size)
{
nxt_int_t sndbuf, rcvbuf, size;
nxt_port_t *port;
nxt_socket_t snd, rcv;
- port = nxt_port_alloc();
+ port = nxt_port_alloc(task);
if (nxt_slow_path(port == NULL)) {
return NULL;
}
- if (nxt_slow_path(nxt_socketpair_create(port->pair) != NXT_OK)) {
+ if (nxt_slow_path(nxt_socketpair_create(task, port->pair) != NXT_OK)) {
goto socketpair_fail;
}
snd = port->pair[1];
- sndbuf = nxt_socket_getsockopt(snd, SOL_SOCKET, SO_SNDBUF);
+ sndbuf = nxt_socket_getsockopt(task, snd, SOL_SOCKET, SO_SNDBUF);
if (nxt_slow_path(sndbuf < 0)) {
goto getsockopt_fail;
}
rcv = port->pair[0];
- rcvbuf = nxt_socket_getsockopt(rcv, SOL_SOCKET, SO_RCVBUF);
+ rcvbuf = nxt_socket_getsockopt(task, rcv, SOL_SOCKET, SO_RCVBUF);
if (nxt_slow_path(rcvbuf < 0)) {
goto getsockopt_fail;
}
@@ -83,9 +85,10 @@ nxt_port_create(size_t max_size)
* on send direction and 4K buffer size on receive direction;
* Solaris uses 16K on send direction and 5K on receive direction.
*/
- (void) nxt_socket_setsockopt(snd, SOL_SOCKET, SO_SNDBUF, max_size);
+ (void) nxt_socket_setsockopt(task, snd, SOL_SOCKET, SO_SNDBUF,
+ max_size);
- sndbuf = nxt_socket_getsockopt(snd, SOL_SOCKET, SO_SNDBUF);
+ sndbuf = nxt_socket_getsockopt(task, snd, SOL_SOCKET, SO_SNDBUF);
if (nxt_slow_path(sndbuf < 0)) {
goto getsockopt_fail;
}
@@ -93,9 +96,10 @@ nxt_port_create(size_t max_size)
size = sndbuf * 4;
if (rcvbuf < size) {
- (void) nxt_socket_setsockopt(rcv, SOL_SOCKET, SO_RCVBUF, size);
+ (void) nxt_socket_setsockopt(task, rcv, SOL_SOCKET, SO_RCVBUF,
+ size);
- rcvbuf = nxt_socket_getsockopt(rcv, SOL_SOCKET, SO_RCVBUF);
+ rcvbuf = nxt_socket_getsockopt(task, rcv, SOL_SOCKET, SO_RCVBUF);
if (nxt_slow_path(rcvbuf < 0)) {
goto getsockopt_fail;
}
@@ -109,8 +113,8 @@ nxt_port_create(size_t max_size)
getsockopt_fail:
- nxt_socket_close(port->pair[0]);
- nxt_socket_close(port->pair[1]);
+ nxt_socket_close(task, port->pair[0]);
+ nxt_socket_close(task, port->pair[1]);
socketpair_fail:
@@ -123,7 +127,7 @@ socketpair_fail:
void
nxt_port_destroy(nxt_port_t *port)
{
- nxt_socket_close(port->socket.fd);
+ nxt_socket_close(port->socket.task, port->socket.fd);
nxt_mem_pool_destroy(port->mem_pool);
}
@@ -135,12 +139,6 @@ nxt_port_write_enable(nxt_task_t *task, nxt_port_t *port)
port->socket.log = &nxt_main_log;
port->socket.write_ready = 1;
- port->task.thread = task->thread;
- port->task.log = port->socket.log;
- port->task.ident = nxt_task_next_ident();
-
- port->socket.task = &port->task;
-
port->socket.write_work_queue = &task->thread->engine->fast_work_queue;
port->socket.write_handler = nxt_port_write_handler;
port->socket.error_handler = nxt_port_error_handler;
@@ -150,7 +148,7 @@ nxt_port_write_enable(nxt_task_t *task, nxt_port_t *port)
void
nxt_port_write_close(nxt_port_t *port)
{
- nxt_socket_close(port->pair[1]);
+ nxt_socket_close(port->socket.task, port->pair[1]);
port->pair[1] = -1;
}
@@ -301,12 +299,6 @@ nxt_port_read_enable(nxt_task_t *task, nxt_port_t *port)
port->socket.fd = port->pair[0];
port->socket.log = &nxt_main_log;
- port->task.thread = task->thread;
- port->task.log = port->socket.log;
- port->task.ident = nxt_task_next_ident();
-
- port->socket.task = &port->task;
-
port->socket.read_work_queue = &task->thread->engine->fast_work_queue;
port->socket.read_handler = nxt_port_read_handler;
port->socket.error_handler = nxt_port_error_handler;
@@ -318,7 +310,7 @@ nxt_port_read_enable(nxt_task_t *task, nxt_port_t *port)
void
nxt_port_read_close(nxt_port_t *port)
{
- nxt_socket_close(port->pair[0]);
+ nxt_socket_close(port->socket.task, port->pair[0]);
port->pair[0] = -1;
}
diff --git a/src/nxt_port_socket.h b/src/nxt_port_socket.h
index e3202a28..4740368d 100644
--- a/src/nxt_port_socket.h
+++ b/src/nxt_port_socket.h
@@ -33,8 +33,6 @@ typedef struct {
/* Must be the first field. */
nxt_fd_event_t socket;
- nxt_task_t task;
-
nxt_queue_t messages; /* of nxt_port_send_msg_t */
/* Maximum size of message part. */
@@ -61,8 +59,8 @@ struct nxt_port_recv_msg_s {
};
-NXT_EXPORT nxt_port_t *nxt_port_alloc(void);
-NXT_EXPORT nxt_port_t *nxt_port_create(size_t bufsize);
+NXT_EXPORT nxt_port_t *nxt_port_alloc(nxt_task_t *task);
+NXT_EXPORT nxt_port_t *nxt_port_create(nxt_task_t *task, size_t bufsize);
NXT_EXPORT void nxt_port_destroy(nxt_port_t *port);
NXT_EXPORT void nxt_port_write_enable(nxt_task_t *task, nxt_port_t *port);
NXT_EXPORT void nxt_port_write_close(nxt_port_t *port);
diff --git a/src/nxt_sendbuf.c b/src/nxt_sendbuf.c
index 309d3147..21a25e68 100644
--- a/src/nxt_sendbuf.c
+++ b/src/nxt_sendbuf.c
@@ -12,6 +12,77 @@ static nxt_bool_t nxt_sendbuf_copy(nxt_buf_mem_t *bm, nxt_buf_t *b,
nxt_uint_t
+nxt_sendbuf_mem_coalesce0(nxt_task_t *task, nxt_sendbuf_t *sb,
+ struct iovec *iov, nxt_uint_t niov_max)
+{
+ u_char *last;
+ size_t size, total;
+ nxt_buf_t *b;
+ nxt_uint_t n;
+
+ total = sb->size;
+ last = NULL;
+ n = (nxt_uint_t) -1;
+
+ for (b = sb->buf; b != NULL && total < sb->limit; b = b->next) {
+
+ nxt_prefetch(b->next);
+
+ if (nxt_buf_is_file(b)) {
+ break;
+ }
+
+ if (nxt_buf_is_mem(b)) {
+
+ size = b->mem.free - b->mem.pos;
+
+ if (size != 0) {
+
+ if (total + size > sb->limit) {
+ size = sb->limit - total;
+
+ if (size == 0) {
+ break;
+ }
+ }
+
+ if (b->mem.pos != last) {
+
+ if (++n >= niov_max) {
+ goto done;
+ }
+
+ iov[n].iov_base = b->mem.pos;
+ iov[n].iov_len = size;
+
+ } else {
+ iov[n].iov_len += size;
+ }
+
+ nxt_debug(task, "sendbuf: %ui, %p, %uz",
+ n, iov[n].iov_base, iov[n].iov_len);
+
+ total += size;
+ last = b->mem.pos + size;
+ }
+
+ } else {
+ sb->sync = 1;
+ sb->last |= nxt_buf_is_last(b);
+ }
+ }
+
+ n++;
+
+done:
+
+ sb->buf = b;
+
+ return n;
+}
+
+
+nxt_uint_t
nxt_sendbuf_mem_coalesce(nxt_task_t *task, nxt_sendbuf_coalesce_t *sb)
{
u_char *last;
diff --git a/src/nxt_sendbuf.h b/src/nxt_sendbuf.h
index ca314e7b..71b18f91 100644
--- a/src/nxt_sendbuf.h
+++ b/src/nxt_sendbuf.h
@@ -36,6 +36,21 @@
typedef struct {
+ nxt_buf_t *buf;
+ nxt_socket_t socket;
+ nxt_err_t error;
+ nxt_off_t sent;
+ size_t size;
+ size_t limit;
+
+ uint8_t ready; /* 1 bit */
+ uint8_t once; /* 1 bit */
+ uint8_t sync; /* 1 bit */
+ uint8_t last; /* 1 bit */
+} nxt_sendbuf_t;
+
+
+typedef struct {
nxt_buf_t *buf;
nxt_iobuf_t *iobuf;
@@ -88,6 +103,8 @@ ssize_t nxt_event_conn_io_sendbuf(nxt_event_conn_t *c, nxt_buf_t *b,
size_t limit);
+nxt_uint_t nxt_sendbuf_mem_coalesce0(nxt_task_t *task, nxt_sendbuf_t *sb,
+ struct iovec *iov, nxt_uint_t niov_max);
nxt_uint_t nxt_sendbuf_mem_coalesce(nxt_task_t *task,
nxt_sendbuf_coalesce_t *sb);
size_t nxt_sendbuf_file_coalesce(nxt_sendbuf_coalesce_t *sb);
diff --git a/src/nxt_sockaddr.c b/src/nxt_sockaddr.c
index 2bb85730..742cb0ba 100644
--- a/src/nxt_sockaddr.c
+++ b/src/nxt_sockaddr.c
@@ -17,10 +17,13 @@ static nxt_int_t nxt_job_sockaddr_inet_parse(nxt_job_sockaddr_parse_t *jbs);
nxt_sockaddr_t *
-nxt_sockaddr_alloc(nxt_mem_pool_t *mp, socklen_t length)
+nxt_sockaddr_alloc(nxt_mem_pool_t *mp, socklen_t socklen, size_t address_length)
{
+ size_t size;
nxt_sockaddr_t *sa;
+ size = offsetof(nxt_sockaddr_t, u) + socklen + address_length;
+
/*
* The current struct sockaddr's define 32-bit fields at maximum
* and may define 64-bit AF_INET6 fields in the future. Alignment
@@ -28,10 +31,13 @@ nxt_sockaddr_alloc(nxt_mem_pool_t *mp, socklen_t length)
* If 128-bit alignment will be required then nxt_mem_malloc() and
* nxt_memzero() should be used instead.
*/
- sa = nxt_mem_zalloc(mp, offsetof(nxt_sockaddr_t, u) + length);
+
+ sa = nxt_mem_zalloc(mp, size);
if (nxt_fast_path(sa != NULL)) {
- nxt_socklen_set(sa, length);
+ sa->socklen = socklen;
+ sa->length = address_length;
+ sa->sockaddr_size = size;
}
return sa;
@@ -40,7 +46,7 @@ nxt_sockaddr_alloc(nxt_mem_pool_t *mp, socklen_t length)
nxt_sockaddr_t *
nxt_sockaddr_create(nxt_mem_pool_t *mp, struct sockaddr *sockaddr,
- socklen_t length)
+ socklen_t length, size_t address_length)
{
size_t size, copy;
nxt_sockaddr_t *sa;
@@ -95,19 +101,11 @@ nxt_sockaddr_create(nxt_mem_pool_t *mp, struct sockaddr *sockaddr,
#endif /* NXT_HAVE_UNIX_DOMAIN */
- sa = nxt_sockaddr_alloc(mp, size);
+ sa = nxt_sockaddr_alloc(mp, size, address_length);
if (nxt_fast_path(sa != NULL)) {
-
nxt_memcpy(&sa->u.sockaddr, sockaddr, copy);
-#if (NXT_SOCKADDR_SA_LEN)
-
- /* Update shortcut sockaddr length overwritten by nxt_memcpy(). */
- nxt_socklen_set(sa, size);
-
-#endif
-
#if (NXT_HAVE_UNIX_DOMAIN && NXT_OPENBSD)
if (length == 0) {
@@ -127,7 +125,7 @@ nxt_sockaddr_copy(nxt_mem_pool_t *mp, nxt_sockaddr_t *src)
size_t length;
nxt_sockaddr_t *dst;
- length = offsetof(nxt_sockaddr_t, u) + nxt_socklen(src);
+ length = offsetof(nxt_sockaddr_t, u) + src->socklen;
dst = nxt_mem_alloc(mp, length);
@@ -140,9 +138,10 @@ nxt_sockaddr_copy(nxt_mem_pool_t *mp, nxt_sockaddr_t *src)
nxt_sockaddr_t *
-nxt_getsockname(nxt_mem_pool_t *mp, nxt_socket_t s)
+nxt_getsockname(nxt_task_t *task, nxt_mem_pool_t *mp, nxt_socket_t s)
{
int ret;
+ size_t length;
socklen_t socklen;
nxt_sockaddr_buf_t sockaddr;
@@ -151,44 +150,148 @@ nxt_getsockname(nxt_mem_pool_t *mp, nxt_socket_t s)
ret = getsockname(s, &sockaddr.buf, &socklen);
if (nxt_fast_path(ret == 0)) {
- return nxt_sockaddr_create(mp, &sockaddr.buf, socklen);
+
+ switch (sockaddr.buf.sa_family) {
+#if (NXT_INET6)
+ case AF_INET6:
+ length = NXT_INET6_ADDR_STR_LEN;
+ break;
+#endif
+
+#if (NXT_HAVE_UNIX_DOMAIN)
+ case AF_UNIX:
+ length = sizeof("unix:") - 1 + socklen;
+#endif
+ break;
+
+ case AF_INET:
+ length = NXT_INET_ADDR_STR_LEN;
+ break;
+
+ default:
+ length = 0;
+ break;
+ }
+
+ return nxt_sockaddr_create(mp, &sockaddr.buf, socklen, length);
}
- nxt_thread_log_error(NXT_LOG_ERR, "getsockname(%d) failed %E",
- s, nxt_errno);
+ nxt_log(task, NXT_LOG_ERR, "getsockname(%d) failed %E", s, nxt_errno);
return NULL;
}
-nxt_int_t
-nxt_sockaddr_text(nxt_mem_pool_t *mp, nxt_sockaddr_t *sa, nxt_bool_t port)
+void
+nxt_sockaddr_text(nxt_sockaddr_t *sa)
{
- size_t length;
- u_char *p;
- u_char buf[NXT_SOCKADDR_STR_LEN + NXT_SOCKPORT_STR_LEN];
+ size_t offset;
+ u_char *p, *start, *end, *octet;
+ uint32_t port;
- length = NXT_SOCKADDR_STR_LEN + NXT_SOCKPORT_STR_LEN;
+ end = (u_char *) sa + sa->sockaddr_size;
- length = nxt_sockaddr_ntop(sa, buf, buf + length, port);
+ switch (sa->u.sockaddr.sa_family) {
- p = nxt_mem_alloc(mp, length);
+ case AF_INET:
+ offset = offsetof(nxt_sockaddr_t, u) + sizeof(struct sockaddr_in);
- if (nxt_fast_path(p != NULL)) {
+ sa->start = offset;
+ sa->address_start = offset;
- sa->text = p;
- sa->text_len = length;
- nxt_memcpy(p, buf, length);
+ start = (u_char *) sa + offset;
+ octet = (u_char *) &sa->u.sockaddr_in.sin_addr;
- return NXT_OK;
+ p = nxt_sprintf(start, end, "%ud.%ud.%ud.%ud",
+ octet[0], octet[1], octet[2], octet[3]);
+
+ sa->address_length = p - start;
+ sa->port_start = sa->address_length + 1;
+
+ port = sa->u.sockaddr_in.sin_port;
+
+ break;
+
+#if (NXT_INET6)
+
+ case AF_INET6:
+ offset = offsetof(nxt_sockaddr_t, u) + sizeof(struct sockaddr_in6);
+
+ sa->start = offset;
+ sa->address_start = offset;
+
+ start = (u_char *) sa + offset;
+ p = start;
+
+ *p++ = '[';
+
+ p = nxt_inet6_ntop(sa->u.sockaddr_in6.sin6_addr.s6_addr, p, end);
+
+ sa->address_length = p - (start + 1);
+ sa->port_start = sa->address_length + 2;
+
+ *p++ = ']';
+
+ port = sa->u.sockaddr_in6.sin6_port;
+
+ break;
+
+#endif
+
+#if (NXT_HAVE_UNIX_DOMAIN)
+
+ case AF_UNIX:
+
+ offset = offsetof(nxt_sockaddr_t, u) + sizeof(struct sockaddr_un);
+
+ sa->start = offset;
+ sa->address_start = offset;
+
+ start = (u_char *) sa + offset;
+ p = start;
+
+#if (NXT_LINUX)
+
+ p = (u_char *) sa->u.sockaddr_un.sun_path;
+
+ if (p[0] == '\0') {
+ int length;
+
+ /* Linux abstract socket address has no trailing zero. */
+ length = sa->socklen - offsetof(struct sockaddr_un, sun_path);
+
+ p = nxt_sprintf(start, end, "unix:@%*s", length - 1, p + 1);
+
+ } else {
+ p = nxt_sprintf(start, end, "unix:%s", p);
+ }
+
+#else /* !(NXT_LINUX) */
+
+ p = nxt_sprintf(start, end, "unix:%s", p);
+
+#endif
+
+ sa->address_length = p - start;
+ sa->port_start = sa->address_length;
+ sa->length = p - start;
+
+ return;
+
+#endif /* NXT_HAVE_UNIX_DOMAIN */
+
+ default:
+ return;
}
- return NXT_ERROR;
+ p = nxt_sprintf(p, end, ":%d", ntohs(port));
+
+ sa->length = p - start;
}
uint32_t
-nxt_sockaddr_port(nxt_sockaddr_t *sa)
+nxt_sockaddr_port_number(nxt_sockaddr_t *sa)
{
uint32_t port;
@@ -221,7 +324,7 @@ nxt_sockaddr_port(nxt_sockaddr_t *sa)
nxt_bool_t
nxt_sockaddr_cmp(nxt_sockaddr_t *sa1, nxt_sockaddr_t *sa2)
{
- if (nxt_socklen(sa1) != nxt_socklen(sa2)) {
+ if (sa1->socklen != sa2->socklen) {
return 0;
}
@@ -264,7 +367,7 @@ nxt_sockaddr_cmp(nxt_sockaddr_t *sa1, nxt_sockaddr_t *sa2)
{
size_t length;
- length = nxt_socklen(sa1) - offsetof(struct sockaddr_un, sun_path);
+ length = sa1->socklen - offsetof(struct sockaddr_un, sun_path);
if (nxt_memcmp(&sa1->u.sockaddr_un.sun_path,
&sa2->u.sockaddr_un.sun_path, length)
@@ -347,8 +450,7 @@ nxt_sockaddr_ntop(nxt_sockaddr_t *sa, u_char *buf, u_char *end, nxt_bool_t port)
/* Linux abstract socket address has no trailing zero. */
- length = nxt_socklen(sa)
- - offsetof(struct sockaddr_un, sun_path) - 1;
+ length = sa->socklen - offsetof(struct sockaddr_un, sun_path) - 1;
p = nxt_sprintf(buf, end, "unix:\\0%*s", length, p + 1);
} else {
@@ -556,7 +658,7 @@ nxt_job_sockaddr_unix_parse(nxt_job_sockaddr_parse_t *jbs)
jbs->resolve.sockaddrs = nxt_mem_alloc(mp, sizeof(void *));
if (nxt_fast_path(jbs->resolve.sockaddrs != NULL)) {
- sa = nxt_sockaddr_alloc(mp, socklen);
+ sa = nxt_sockaddr_alloc(mp, socklen, jbs->addr.length);
if (nxt_fast_path(sa != NULL)) {
jbs->resolve.count = 1;
@@ -763,7 +865,8 @@ nxt_job_sockaddr_inet_parse(nxt_job_sockaddr_parse_t *jbs)
return NXT_ERROR;
}
- sa = nxt_sockaddr_alloc(mp, sizeof(struct sockaddr_in));
+ sa = nxt_sockaddr_alloc(mp, sizeof(struct sockaddr_in),
+ NXT_INET_ADDR_STR_LEN);
if (nxt_fast_path(sa != NULL)) {
jbs->resolve.count = 1;
diff --git a/src/nxt_sockaddr.h b/src/nxt_sockaddr.h
index 382e8fcb..c5f8ec71 100644
--- a/src/nxt_sockaddr.h
+++ b/src/nxt_sockaddr.h
@@ -14,25 +14,38 @@
* nxt_sockaddr_alloc(pool, sizeof(struct sockaddr_in))
*/
+/*
+ * A textual sockaddr representation is stored after struct sockaddr union
+ * and allocated as a whole.
+ */
+
struct nxt_sockaddr_s {
+ /* Socket type: SOCKS_STREAM, SOCK_DGRAM, etc. */
+ uint8_t type;
+ /* Size of struct sockaddr. */
+ uint8_t socklen;
/*
- * A sockaddr textual representation is optional and may be in two forms:
- * with port or without port. If a nxt_sockaddr_t is intended to listen(),
- * bind() or connect() then the textual representation must be present and
- * must include the port. nxt_event_conn_accept() creates a textual
- * representation without the port.
+ * Textual sockaddr representation, e.g.: "127.0.0.1:8000",
+ * "[::1]:8000", and "unix:/path/to/socket".
*/
- u_char *text;
-
+ uint8_t start;
+ uint8_t length;
/*
- * text_len, socket type and socklen are stored
- * together on 64-bit platforms without sockaddr.sa_len.
+ * Textual address representation, e.g: "127.0.0.1", "::1",
+ * and "unix:/path/to/socket".
*/
- uint16_t text_len;
- uint16_t type;
-#if !(NXT_SOCKADDR_SA_LEN)
- socklen_t _socklen;
-#endif
+ uint8_t address_start;
+ uint8_t address_length;
+ /*
+ * Textual port representation, e.g. "8000".
+ * Port length is length - port_start.
+ */
+ uint8_t port_start;
+ /*
+ * Size of the whole structure: struct sockaddr union and maximal textual
+ * representation, used to place sockaddr into appropriate free list.
+ */
+ uint8_t sockaddr_size;
union {
struct sockaddr sockaddr;
@@ -56,51 +69,26 @@ typedef struct {
} nxt_job_sockaddr_parse_t;
-NXT_EXPORT nxt_sockaddr_t *nxt_sockaddr_alloc(nxt_mem_pool_t *mp, socklen_t len)
+NXT_EXPORT nxt_sockaddr_t *nxt_sockaddr_alloc(nxt_mem_pool_t *mp,
+ socklen_t socklen, size_t address_length)
NXT_MALLOC_LIKE;
NXT_EXPORT nxt_sockaddr_t *nxt_sockaddr_create(nxt_mem_pool_t *mp,
- struct sockaddr *sockaddr, socklen_t len)
+ struct sockaddr *sockaddr, socklen_t socklen, size_t address_length)
NXT_MALLOC_LIKE;
NXT_EXPORT nxt_sockaddr_t *nxt_sockaddr_copy(nxt_mem_pool_t *mp,
nxt_sockaddr_t *src)
NXT_MALLOC_LIKE;
-NXT_EXPORT nxt_sockaddr_t *nxt_getsockname(nxt_mem_pool_t *mp, nxt_socket_t s)
+NXT_EXPORT nxt_sockaddr_t *nxt_getsockname(nxt_task_t *task,
+ nxt_mem_pool_t *mp, nxt_socket_t s)
NXT_MALLOC_LIKE;
-NXT_EXPORT nxt_int_t nxt_sockaddr_text(nxt_mem_pool_t *mp, nxt_sockaddr_t *sa,
- nxt_bool_t port);
-
-
-#if (NXT_SOCKADDR_SA_LEN)
+NXT_EXPORT void nxt_sockaddr_text(nxt_sockaddr_t *sa);
-#define \
-nxt_socklen_set(sa, len) \
- (sa)->u.sockaddr.sa_len = (socklen_t) (len)
-
-#define \
-nxt_socklen(sa) \
- ((sa)->u.sockaddr.sa_len)
-
-#else
-
-#define \
-nxt_socklen_set(sa, len) \
- (sa)->_socklen = (socklen_t) (len)
-
-
-#define \
-nxt_socklen(sa) \
- ((sa)->_socklen)
-
-#endif
-
-
-NXT_EXPORT uint32_t nxt_sockaddr_port(nxt_sockaddr_t *sa);
+NXT_EXPORT uint32_t nxt_sockaddr_port_number(nxt_sockaddr_t *sa);
NXT_EXPORT nxt_bool_t nxt_sockaddr_cmp(nxt_sockaddr_t *sa1,
nxt_sockaddr_t *sa2);
NXT_EXPORT size_t nxt_sockaddr_ntop(nxt_sockaddr_t *sa, u_char *buf,
- u_char *end,
- nxt_bool_t port);
+ u_char *end, nxt_bool_t port);
NXT_EXPORT void nxt_job_sockaddr_parse(nxt_job_sockaddr_parse_t *jbs);
NXT_EXPORT in_addr_t nxt_inet_addr(u_char *buf, size_t len);
#if (NXT_INET6)
@@ -108,60 +96,17 @@ NXT_EXPORT nxt_int_t nxt_inet6_addr(struct in6_addr *in6_addr, u_char *buf,
size_t len);
#endif
-#if (NXT_HAVE_UNIX_DOMAIN)
-#define nxt_unix_addr_path_len(sa) \
- (nxt_socklen(sa) - offsetof(struct sockaddr_un, sun_path))
-#endif
-
-#define NXT_INET_ADDR_STR_LEN (sizeof("255.255.255.255") - 1)
+#define NXT_INET_ADDR_STR_LEN (sizeof("255.255.255.255:65535") - 1)
#define NXT_INET6_ADDR_STR_LEN \
- (sizeof("ffff:ffff:ffff:ffff:ffff:ffff:255.255.255.255") - 1)
-
-#define NXT_UNIX_ADDR_STR_LEN \
- ((sizeof("unix:") - 1) \
- + (sizeof(struct sockaddr_un) - offsetof(struct sockaddr_un, sun_path)))
-
-
-#if (NXT_HAVE_UNIX_DOMAIN)
-#define NXT_SOCKADDR_STR_LEN NXT_UNIX_ADDR_STR_LEN
+ (sizeof("[ffff:ffff:ffff:ffff:ffff:ffff:ffff:ffff]:65535") - 1)
-#elif (NXT_INET6)
-#define NXT_SOCKADDR_STR_LEN NXT_INET6_ADDR_STR_LEN
-
-#else
-#define NXT_SOCKADDR_STR_LEN NXT_INET_ADDR_STR_LEN
-#endif
-
-
-#if (NXT_INET6)
-#define NXT_SOCKPORT_STR_LEN (sizeof("[]:65535") - 1)
-
-#else
-#define NXT_SOCKPORT_STR_LEN (sizeof(":65535") - 1)
-#endif
-
-
-nxt_inline size_t
-nxt_sockaddr_text_len(nxt_sockaddr_t *sa)
-{
- switch (sa->u.sockaddr.sa_family) {
-
-#if (NXT_INET6)
- case AF_INET6:
- return NXT_INET6_ADDR_STR_LEN;
-#endif
-
-#if (NXT_HAVE_UNIX_DOMAIN)
- case AF_UNIX:
- return NXT_UNIX_ADDR_STR_LEN;
-#endif
- default:
- return NXT_INET_ADDR_STR_LEN;
- }
-}
+#define nxt_sockaddr_start(sa) ((u_char *) (sa) + (sa)->start)
+#define nxt_sockaddr_address(sa) ((u_char *) (sa) + (sa)->address_start)
+#define nxt_sockaddr_port(sa) ((u_char *) (sa) + (sa)->port_start)
+#define nxt_sockaddr_length(sa) ((sa)->length - (sa)->port_start)
#endif /* _NXT_SOCKADDR_H_INCLUDED_ */
diff --git a/src/nxt_socket.c b/src/nxt_socket.c
index 19ba21c0..2663b855 100644
--- a/src/nxt_socket.c
+++ b/src/nxt_socket.c
@@ -12,8 +12,8 @@ static const char *nxt_socket_sockopt_name(nxt_uint_t level,
nxt_socket_t
-nxt_socket_create(nxt_uint_t domain, nxt_uint_t type, nxt_uint_t protocol,
- nxt_uint_t flags)
+nxt_socket_create(nxt_task_t *task, nxt_uint_t domain, nxt_uint_t type,
+ nxt_uint_t protocol, nxt_uint_t flags)
{
nxt_socket_t s;
@@ -28,18 +28,18 @@ nxt_socket_create(nxt_uint_t domain, nxt_uint_t type, nxt_uint_t protocol,
s = socket(domain, type, protocol);
if (nxt_slow_path(s == -1)) {
- nxt_thread_log_alert("socket(%ui, 0x%uXi, %ui) failed %E",
- domain, type, protocol, nxt_socket_errno);
+ nxt_log(task, NXT_LOG_CRIT, "socket(%ui, 0x%uXi, %ui) failed %E",
+ domain, type, protocol, nxt_socket_errno);
return s;
}
- nxt_thread_log_debug("socket(): %d", s);
+ nxt_debug(task, "socket(): %d", s);
#if !(NXT_HAVE_SOCK_NONBLOCK)
if (flags & NXT_NONBLOCK) {
- if (nxt_slow_path(nxt_socket_nonblocking(s) != NXT_OK)) {
- nxt_socket_close(s);
+ if (nxt_slow_path(nxt_socket_nonblocking(task, s) != NXT_OK)) {
+ nxt_socket_close(task, s);
return -1;
}
}
@@ -51,19 +51,21 @@ nxt_socket_create(nxt_uint_t domain, nxt_uint_t type, nxt_uint_t protocol,
void
-nxt_socket_close(nxt_socket_t s)
+nxt_socket_close(nxt_task_t *task, nxt_socket_t s)
{
if (nxt_fast_path(close(s) == 0)) {
- nxt_thread_log_debug("socket close(%d)", s);
+ nxt_debug(task, "socket close(%d)", s);
} else {
- nxt_thread_log_alert("socket close(%d) failed %E", s, nxt_socket_errno);
+ nxt_log(task, NXT_LOG_CRIT, "socket close(%d) failed %E",
+ s, nxt_socket_errno);
}
}
nxt_int_t
-nxt_socket_getsockopt(nxt_socket_t s, nxt_uint_t level, nxt_uint_t sockopt)
+nxt_socket_getsockopt(nxt_task_t *task, nxt_socket_t s, nxt_uint_t level,
+ nxt_uint_t sockopt)
{
int val;
socklen_t len;
@@ -71,38 +73,36 @@ nxt_socket_getsockopt(nxt_socket_t s, nxt_uint_t level, nxt_uint_t sockopt)
len = sizeof(val);
if (nxt_fast_path(getsockopt(s, level, sockopt, &val, &len) == 0)) {
- nxt_thread_log_debug("getsockopt(%d, %ui, %s): %d",
- s, level,
- nxt_socket_sockopt_name(level, sockopt), val);
+ nxt_debug(task, "getsockopt(%d, %ui, %s): %d",
+ s, level, nxt_socket_sockopt_name(level, sockopt), val);
return val;
}
- nxt_thread_log_error(NXT_LOG_CRIT, "getsockopt(%d, %ui, %s) failed %E",
- s, level, nxt_socket_sockopt_name(level, sockopt),
- val, nxt_socket_errno);
+ nxt_log(task, NXT_LOG_CRIT, "getsockopt(%d, %ui, %s) failed %E",
+ s, level, nxt_socket_sockopt_name(level, sockopt),
+ val, nxt_socket_errno);
return -1;
}
nxt_int_t
-nxt_socket_setsockopt(nxt_socket_t s, nxt_uint_t level, nxt_uint_t sockopt,
- int val)
+nxt_socket_setsockopt(nxt_task_t *task, nxt_socket_t s, nxt_uint_t level,
+ nxt_uint_t sockopt, int val)
{
socklen_t len;
len = sizeof(val);
if (nxt_fast_path(setsockopt(s, level, sockopt, &val, len) == 0)) {
- nxt_thread_log_debug("setsockopt(%d, %ui, %s): %d",
- s, level,
- nxt_socket_sockopt_name(level, sockopt), val);
+ nxt_debug(task, "setsockopt(%d, %ui, %s): %d",
+ s, level, nxt_socket_sockopt_name(level, sockopt), val);
return NXT_OK;
}
- nxt_thread_log_error(NXT_LOG_CRIT, "setsockopt(%d, %ui, %s, %d) failed %E",
- s, level, nxt_socket_sockopt_name(level, sockopt),
- val, nxt_socket_errno);
+ nxt_log(task, NXT_LOG_CRIT, "setsockopt(%d, %ui, %s, %d) failed %E",
+ s, level, nxt_socket_sockopt_name(level, sockopt),
+ val, nxt_socket_errno);
return NXT_ERROR;
}
@@ -123,7 +123,7 @@ nxt_socket_sockopt_name(nxt_uint_t level, nxt_uint_t sockopt)
return "SO_RCVBUF";
case SO_REUSEADDR:
- return "SO_TYPE";
+ return "SO_REUSEADDR";
case SO_TYPE:
return "SO_TYPE";
@@ -164,13 +164,14 @@ nxt_socket_sockopt_name(nxt_uint_t level, nxt_uint_t sockopt)
nxt_int_t
-nxt_socket_bind(nxt_socket_t s, nxt_sockaddr_t *sa, nxt_bool_t test)
+nxt_socket_bind(nxt_task_t *task, nxt_socket_t s, nxt_sockaddr_t *sa,
+ nxt_bool_t test)
{
nxt_err_t err;
- nxt_thread_log_debug("bind(%d, %*s)", s, sa->text_len, sa->text);
+ nxt_debug(task, "bind(%d, %*s)", s, sa->length, nxt_sockaddr_start(sa));
- if (nxt_fast_path(bind(s, &sa->u.sockaddr, nxt_socklen(sa)) == 0)) {
+ if (nxt_fast_path(bind(s, &sa->u.sockaddr, sa->socklen) == 0)) {
return NXT_OK;
}
@@ -180,23 +181,23 @@ nxt_socket_bind(nxt_socket_t s, nxt_sockaddr_t *sa, nxt_bool_t test)
return NXT_DECLINED;
}
- nxt_thread_log_error(NXT_LOG_CRIT, "bind(%d, %*s) failed %E",
- s, sa->text_len, sa->text, err);
+ nxt_log(task, NXT_LOG_CRIT, "bind(%d, %*s) failed %E",
+ s, sa->length, nxt_sockaddr_start(sa), err);
return NXT_ERROR;
}
nxt_int_t
-nxt_socket_connect(nxt_socket_t s, nxt_sockaddr_t *sa)
+nxt_socket_connect(nxt_task_t *task, nxt_socket_t s, nxt_sockaddr_t *sa)
{
nxt_err_t err;
nxt_int_t ret;
nxt_uint_t level;
- nxt_thread_log_debug("connect(%d, %*s)", s, sa->text_len, sa->text);
+ nxt_debug(task, "connect(%d, %*s)", s, sa->length, nxt_sockaddr_start(sa));
- if (connect(s, &sa->u.sockaddr, nxt_socklen(sa)) == 0) {
+ if (connect(s, &sa->u.sockaddr, sa->socklen) == 0) {
return NXT_OK;
}
@@ -205,7 +206,8 @@ nxt_socket_connect(nxt_socket_t s, nxt_sockaddr_t *sa)
switch (err) {
case NXT_EINPROGRESS:
- nxt_thread_log_debug("connect(%d) in progress", s);
+ nxt_debug(task, "connect(%d, %*s) in progress",
+ s, sa->length, nxt_sockaddr_start(sa));
return NXT_AGAIN;
case NXT_ECONNREFUSED:
@@ -234,21 +236,21 @@ nxt_socket_connect(nxt_socket_t s, nxt_sockaddr_t *sa)
ret = NXT_ERROR;
}
- nxt_thread_log_error(level, "connect(%d, %*s) failed %E",
- s, sa->text_len, sa->text, err);
+ nxt_log(task, level, "connect(%d, %*s) failed %E",
+ s, sa->length, nxt_sockaddr_start(sa), err);
return ret;
}
void
-nxt_socket_shutdown(nxt_socket_t s, nxt_uint_t how)
+nxt_socket_shutdown(nxt_task_t *task, nxt_socket_t s, nxt_uint_t how)
{
nxt_err_t err;
nxt_uint_t level;
if (nxt_fast_path(shutdown(s, how) == 0)) {
- nxt_thread_log_debug("shutdown(%d, %ui)", s, how);
+ nxt_debug(task, "shutdown(%d, %ui)", s, how);
return;
}
@@ -272,46 +274,26 @@ nxt_socket_shutdown(nxt_socket_t s, nxt_uint_t how)
level = NXT_LOG_CRIT;
}
- nxt_thread_log_error(level, "shutdown(%d, %ui) failed %E", s, how, err);
+ nxt_log(task, level, "shutdown(%d, %ui) failed %E", s, how, err);
}
nxt_uint_t
-nxt_socket_error_level(nxt_err_t err, nxt_socket_error_level_t level)
+nxt_socket_error_level(nxt_err_t err)
{
switch (err) {
- case NXT_ECONNRESET:
-
- if ((level & NXT_SOCKET_ECONNRESET_IGNORE) != 0) {
- return NXT_LOG_DEBUG;
- }
-
- return NXT_LOG_ERR;
-
- case NXT_EINVAL:
-
- if ((level & NXT_SOCKET_EINVAL_IGNORE) != 0) {
- return NXT_LOG_DEBUG;
- }
-
- return NXT_LOG_ALERT;
-
case NXT_EPIPE:
+ case NXT_ECONNRESET:
case NXT_ENOTCONN:
case NXT_ETIMEDOUT:
case NXT_ENETDOWN:
case NXT_ENETUNREACH:
case NXT_EHOSTDOWN:
case NXT_EHOSTUNREACH:
-
- if ((level & NXT_SOCKET_ERROR_IGNORE) != 0) {
- return NXT_LOG_INFO;
- }
-
return NXT_LOG_ERR;
default:
- return NXT_LOG_ALERT;
+ return NXT_LOG_CRIT;
}
}
diff --git a/src/nxt_socket.h b/src/nxt_socket.h
index 5ebe5c95..ec9d058c 100644
--- a/src/nxt_socket.h
+++ b/src/nxt_socket.h
@@ -4,8 +4,8 @@
* Copyright (C) NGINX, Inc.
*/
-#ifndef _NXT_UNIX_SOCKET_H_INCLUDED_
-#define _NXT_UNIX_SOCKET_H_INCLUDED_
+#ifndef _NXT_SOCKET_H_INCLUDED_
+#define _NXT_SOCKET_H_INCLUDED_
typedef int nxt_socket_t;
@@ -91,29 +91,24 @@ typedef union {
#define NXT_MAXHOSTNAMELEN MAXHOSTNAMELEN
-typedef enum {
- NXT_SOCKET_ERROR_IGNORE = 0x1,
- NXT_SOCKET_ECONNRESET_IGNORE = 0x2,
- NXT_SOCKET_EINVAL_IGNORE = 0x4,
-} nxt_socket_error_level_t;
-
-
-NXT_EXPORT nxt_socket_t nxt_socket_create(nxt_uint_t family, nxt_uint_t type,
- nxt_uint_t protocol, nxt_uint_t flags);
-NXT_EXPORT void nxt_socket_close(nxt_socket_t s);
-NXT_EXPORT nxt_int_t nxt_socket_getsockopt(nxt_socket_t s, nxt_uint_t level,
- nxt_uint_t sockopt);
-NXT_EXPORT nxt_int_t nxt_socket_setsockopt(nxt_socket_t s, nxt_uint_t level,
- nxt_uint_t sockopt, int val);
-NXT_EXPORT nxt_int_t nxt_socket_bind(nxt_socket_t s, nxt_sockaddr_t *sa,
- nxt_bool_t test);
-NXT_EXPORT nxt_int_t nxt_socket_connect(nxt_socket_t s, nxt_sockaddr_t *sa);
-NXT_EXPORT void nxt_socket_shutdown(nxt_socket_t s, nxt_uint_t how);
-nxt_uint_t nxt_socket_error_level(nxt_err_t err,
- nxt_socket_error_level_t level);
-
-NXT_EXPORT nxt_int_t nxt_socketpair_create(nxt_socket_t *pair);
-NXT_EXPORT void nxt_socketpair_close(nxt_socket_t *pair);
+NXT_EXPORT nxt_socket_t nxt_socket_create(nxt_task_t *task, nxt_uint_t family,
+ nxt_uint_t type, nxt_uint_t protocol, nxt_uint_t flags);
+NXT_EXPORT void nxt_socket_close(nxt_task_t *task, nxt_socket_t s);
+NXT_EXPORT nxt_int_t nxt_socket_getsockopt(nxt_task_t *task, nxt_socket_t s,
+ nxt_uint_t level, nxt_uint_t sockopt);
+NXT_EXPORT nxt_int_t nxt_socket_setsockopt(nxt_task_t *task, nxt_socket_t s,
+ nxt_uint_t level, nxt_uint_t sockopt, int val);
+NXT_EXPORT nxt_int_t nxt_socket_bind(nxt_task_t *task, nxt_socket_t s,
+ nxt_sockaddr_t *sa, nxt_bool_t test);
+NXT_EXPORT nxt_int_t nxt_socket_connect(nxt_task_t *task, nxt_socket_t s,
+ nxt_sockaddr_t *sa);
+NXT_EXPORT void nxt_socket_shutdown(nxt_task_t *task, nxt_socket_t s,
+ nxt_uint_t how);
+nxt_uint_t nxt_socket_error_level(nxt_err_t err);
+
+NXT_EXPORT nxt_int_t nxt_socketpair_create(nxt_task_t *task,
+ nxt_socket_t *pair);
+NXT_EXPORT void nxt_socketpair_close(nxt_task_t *task, nxt_socket_t *pair);
NXT_EXPORT ssize_t nxt_socketpair_send(nxt_fd_event_t *ev, nxt_fd_t fd,
nxt_iobuf_t *iob, nxt_uint_t niob);
NXT_EXPORT ssize_t nxt_socketpair_recv(nxt_fd_event_t *ev, nxt_fd_t *fd,
@@ -121,12 +116,12 @@ NXT_EXPORT ssize_t nxt_socketpair_recv(nxt_fd_event_t *ev, nxt_fd_t *fd,
#define \
-nxt_socket_nonblocking(fd) \
- nxt_fd_nonblocking(fd)
+nxt_socket_nonblocking(task, fd) \
+ nxt_fd_nonblocking(task, fd)
#define \
-nxt_socket_blocking(fd) \
- nxt_fd_blocking(fd)
+nxt_socket_blocking(task, fd) \
+ nxt_fd_blocking(task, fd)
-#endif /* _NXT_UNIX_SOCKET_H_INCLUDED_ */
+#endif /* _NXT_SOCKET_H_INCLUDED_ */
diff --git a/src/nxt_socketpair.c b/src/nxt_socketpair.c
index d0449020..e1c303dd 100644
--- a/src/nxt_socketpair.c
+++ b/src/nxt_socketpair.c
@@ -27,16 +27,16 @@ static ssize_t nxt_recvmsg(nxt_socket_t s, nxt_fd_t *fd, nxt_iobuf_t *iob,
nxt_int_t
-nxt_socketpair_create(nxt_socket_t *pair)
+nxt_socketpair_create(nxt_task_t *task, nxt_socket_t *pair)
{
if (nxt_slow_path(socketpair(AF_UNIX, NXT_UNIX_SOCKET, 0, pair) != 0)) {
- nxt_thread_log_alert("socketpair() failed %E", nxt_errno);
+ nxt_log(task, NXT_LOG_CRIT, "socketpair() failed %E", nxt_errno);
return NXT_ERROR;
}
- nxt_thread_log_debug("socketpair(): %d:%d", pair[0], pair[1]);
+ nxt_debug(task, "socketpair(): %d:%d", pair[0], pair[1]);
- if (nxt_slow_path(nxt_socket_nonblocking(pair[0]) != NXT_OK)) {
+ if (nxt_slow_path(nxt_socket_nonblocking(task, pair[0]) != NXT_OK)) {
goto fail;
}
@@ -44,7 +44,7 @@ nxt_socketpair_create(nxt_socket_t *pair)
goto fail;
}
- if (nxt_slow_path(nxt_socket_nonblocking(pair[1]) != NXT_OK)) {
+ if (nxt_slow_path(nxt_socket_nonblocking(task, pair[1]) != NXT_OK)) {
goto fail;
}
@@ -56,17 +56,17 @@ nxt_socketpair_create(nxt_socket_t *pair)
fail:
- nxt_socketpair_close(pair);
+ nxt_socketpair_close(task, pair);
return NXT_ERROR;
}
void
-nxt_socketpair_close(nxt_socket_t *pair)
+nxt_socketpair_close(nxt_task_t *task, nxt_socket_t *pair)
{
- nxt_socket_close(pair[0]);
- nxt_socket_close(pair[1]);
+ nxt_socket_close(task, pair[0]);
+ nxt_socket_close(task, pair[1]);
}
diff --git a/src/nxt_solaris_sendfilev.c b/src/nxt_solaris_sendfilev.c
index fdd803dc..1b46d099 100644
--- a/src/nxt_solaris_sendfilev.c
+++ b/src/nxt_solaris_sendfilev.c
@@ -42,7 +42,7 @@ nxt_solaris_event_conn_io_sendfilev(nxt_event_conn_t *c, nxt_buf_t *b,
size = nxt_solaris_buf_coalesce(b, sfv, &nsfv, &sync, limit);
- nxt_log_debug(c->socket.log, "sendfilev(%d, %D)", c->socket.fd, nsfv);
+ nxt_debug(c->socket.task, "sendfilev(%d, %D)", c->socket.fd, nsfv);
if (nsfv == 0 && sync) {
return 0;
@@ -53,7 +53,7 @@ nxt_solaris_event_conn_io_sendfilev(nxt_event_conn_t *c, nxt_buf_t *b,
err = (n == -1) ? nxt_errno : 0;
- nxt_log_debug(c->socket.log, "sendfilev(): %d sent:%uz", n, sent);
+ nxt_debug(c->socket.task, "sendfilev(): %d sent:%uz", n, sent);
if (n == -1) {
switch (err) {
@@ -67,14 +67,13 @@ nxt_solaris_event_conn_io_sendfilev(nxt_event_conn_t *c, nxt_buf_t *b,
default:
c->socket.error = err;
- nxt_log_error(nxt_socket_error_level(err, c->socket.log_error),
- c->socket.log, "sendfilev(%d, %D) failed %E",
- c->socket.fd, nsfv, err);
+ nxt_log(c->socket.task, nxt_socket_error_level(err),
+ "sendfilev(%d, %D) failed %E", c->socket.fd, nsfv, err);
return NXT_ERROR;
}
- nxt_log_debug(c->socket.log, "sendfilev() %E", err);
+ nxt_debug(c->socket.task, "sendfilev() %E", err);
return sent;
}
diff --git a/src/nxt_stream_module.c b/src/nxt_stream_module.c
new file mode 100644
index 00000000..89201893
--- /dev/null
+++ b/src/nxt_stream_module.c
@@ -0,0 +1,130 @@
+
+/*
+ * Copyright (C) Igor Sysoev
+ * Copyright (C) NGINX, Inc.
+ */
+
+#include <nxt_main.h>
+#include <nxt_cycle.h>
+
+
+static void nxt_stream_connection_peer(nxt_task_t *task,
+ nxt_upstream_peer_t *up);
+static void nxt_stream_connection_close(nxt_task_t *task, void *obj,
+ void *data);
+
+
+void
+nxt_stream_connection_init(nxt_task_t *task, void *obj, void *data)
+{
+ nxt_cycle_t *cycle;
+ nxt_event_conn_t *c;
+ nxt_upstream_peer_t *up;
+
+ c = obj;
+
+ nxt_debug(task, "stream connection init");
+
+ up = nxt_mem_zalloc(c->mem_pool, sizeof(nxt_upstream_peer_t));
+ if (nxt_slow_path(up == NULL)) {
+ goto fail;
+ }
+
+ up->data = c;
+
+ cycle = nxt_thread_cycle();
+
+ if (cycle->upstream.length != 0) {
+ up->addr = cycle->upstream;
+
+ } else {
+ nxt_str_set(&up->addr, "127.0.0.1:8080");
+ }
+
+ up->ready_handler = nxt_stream_connection_peer;
+ up->mem_pool = c->mem_pool;
+
+ nxt_upstream_round_robin_peer(task, up);
+ return;
+
+fail:
+
+ /* TODO: close connection */
+ return;
+}
+
+
+static void
+nxt_stream_connection_peer(nxt_task_t *task, nxt_upstream_peer_t *up)
+{
+ nxt_event_conn_t *c;
+ nxt_event_conn_proxy_t *p;
+
+ c = up->data;
+
+ up->sockaddr->type = SOCK_STREAM;
+
+ nxt_log_debug(c->socket.log, "stream connection peer %*s",
+ up->sockaddr->length, nxt_sockaddr_start(up->sockaddr));
+
+ p = nxt_event_conn_proxy_create(c);
+ if (nxt_slow_path(p == NULL)) {
+ goto fail;
+ }
+
+ p->client->socket.data = p;
+ p->peer->socket.data = p;
+
+ p->client_buffer_size = 1024;
+ p->peer_buffer_size = 4096;
+ //p->client_wait_timeout = 9000;
+ p->connect_timeout = 7000;
+ p->reconnect_timeout = 500;
+ //p->peer_wait_timeout = 5000;
+ p->client_write_timeout = 3000;
+ p->peer_write_timeout = 3000;
+ p->completion_handler = nxt_stream_connection_close;
+ //p->retries = 10;
+ p->peer->remote = up->sockaddr;
+
+ if (0) {
+ nxt_event_engine_t *engine;
+ nxt_event_write_rate_t *rate;
+
+ rate = nxt_mem_alloc(c->mem_pool, sizeof(nxt_event_write_rate_t));
+
+ if (nxt_slow_path(rate == NULL)) {
+ goto fail;
+ }
+
+ c->rate = rate;
+
+ rate->limit = 1024;
+ rate->limit_after = 0;
+ rate->average = rate->limit;
+
+ engine = nxt_thread_event_engine();
+ rate->last = engine->timers.now;
+ }
+
+ nxt_event_conn_proxy(task, p);
+ return;
+
+fail:
+
+ /* TODO: close connection */
+ return;
+}
+
+
+static void
+nxt_stream_connection_close(nxt_task_t *task, void *obj, void *data)
+{
+ nxt_event_conn_proxy_t *p;
+
+ p = obj;
+
+ nxt_log_debug(p->client->socket.log, "stream connection close");
+
+ nxt_mem_pool_destroy(p->client->mem_pool);
+}
diff --git a/src/nxt_upstream.h b/src/nxt_upstream.h
index 2935ccc8..a7d2a224 100644
--- a/src/nxt_upstream.h
+++ b/src/nxt_upstream.h
@@ -25,7 +25,7 @@ struct nxt_upstream_peer_s {
nxt_str_t addr;
nxt_mem_pool_t *mem_pool;
- void (*ready_handler)(nxt_upstream_peer_t *up);
+ void (*ready_handler)(nxt_task_t *task, nxt_upstream_peer_t *up);
void (*protocol_handler)(nxt_upstream_source_t *us);
};
@@ -39,7 +39,8 @@ typedef struct {
/* STUB */
-NXT_EXPORT void nxt_upstream_round_robin_peer(nxt_upstream_peer_t *up);
+NXT_EXPORT void nxt_upstream_round_robin_peer(nxt_task_t *task,
+ nxt_upstream_peer_t *up);
/**/
diff --git a/src/nxt_upstream_round_robin.c b/src/nxt_upstream_round_robin.c
index f8035762..7717a2de 100644
--- a/src/nxt_upstream_round_robin.c
+++ b/src/nxt_upstream_round_robin.c
@@ -24,28 +24,30 @@ typedef struct {
} nxt_upstream_round_robin_t;
-static void nxt_upstream_round_robin_create(nxt_thread_t *thr, void *obj,
+static void nxt_upstream_round_robin_create(nxt_task_t *task, void *obj,
void *data);
-static void nxt_upstream_round_robin_peer_error(nxt_thread_t *thr, void *obj,
+static void nxt_upstream_round_robin_peer_error(nxt_task_t *task, void *obj,
void *data);
-static void nxt_upstream_round_robin_get_peer(nxt_upstream_peer_t *up);
+static void nxt_upstream_round_robin_get_peer(nxt_task_t *task,
+ nxt_upstream_peer_t *up);
void
-nxt_upstream_round_robin_peer(nxt_upstream_peer_t *up)
+nxt_upstream_round_robin_peer(nxt_task_t *task, nxt_upstream_peer_t *up)
{
nxt_job_sockaddr_parse_t *jbs;
if (up->upstream != NULL) {
- nxt_upstream_round_robin_get_peer(up);
+ nxt_upstream_round_robin_get_peer(task, up);
}
jbs = nxt_job_create(up->mem_pool, sizeof(nxt_job_sockaddr_parse_t));
if (nxt_slow_path(jbs == NULL)) {
- up->ready_handler(up);
+ up->ready_handler(task, up);
return;
}
+ jbs->resolve.job.task = task;
jbs->resolve.job.data = up;
jbs->resolve.port = up->port;
jbs->resolve.log_level = NXT_LOG_ERR;
@@ -58,7 +60,7 @@ nxt_upstream_round_robin_peer(nxt_upstream_peer_t *up)
static void
-nxt_upstream_round_robin_create(nxt_thread_t *thr, void *obj, void *data)
+nxt_upstream_round_robin_create(nxt_task_t *task, void *obj, void *data)
{
nxt_uint_t i;
nxt_sockaddr_t *sa;
@@ -94,10 +96,10 @@ nxt_upstream_round_robin_create(nxt_thread_t *thr, void *obj, void *data)
/* STUB */
sa->type = SOCK_STREAM;
- /* TODO: test ret */
- (void) nxt_sockaddr_text(up->mem_pool, sa, 1);
+ nxt_sockaddr_text(sa);
- nxt_log_debug(thr->log, "upstream peer: %*s", sa->text_len, sa->text);
+ nxt_debug(task, "upstream peer: %*s",
+ sa->length, nxt_sockaddr_start(sa));
/* TODO: memcpy to shared memory pool. */
peer[i].sockaddr = sa;
@@ -109,7 +111,7 @@ nxt_upstream_round_robin_create(nxt_thread_t *thr, void *obj, void *data)
up->sockaddr = peer[0].sockaddr;
nxt_job_destroy(jbs);
- up->ready_handler(up);
+ up->ready_handler(task, up);
//nxt_upstream_round_robin_get_peer(up);
return;
@@ -118,12 +120,12 @@ fail:
nxt_job_destroy(jbs);
- up->ready_handler(up);
+ up->ready_handler(task, up);
}
static void
-nxt_upstream_round_robin_peer_error(nxt_thread_t *thr, void *obj, void *data)
+nxt_upstream_round_robin_peer_error(nxt_task_t *task, void *obj, void *data)
{
nxt_upstream_peer_t *up;
nxt_job_sockaddr_parse_t *jbs;
@@ -131,24 +133,22 @@ nxt_upstream_round_robin_peer_error(nxt_thread_t *thr, void *obj, void *data)
jbs = obj;
up = jbs->resolve.job.data;
- up->ready_handler(up);
+ up->ready_handler(task, up);
}
static void
-nxt_upstream_round_robin_get_peer(nxt_upstream_peer_t *up)
+nxt_upstream_round_robin_get_peer(nxt_task_t *task, nxt_upstream_peer_t *up)
{
int32_t effective_weights;
nxt_uint_t i;
nxt_msec_t now;
- nxt_event_engine_t *engine;
nxt_upstream_round_robin_t *urr;
nxt_upstream_round_robin_peer_t *peer, *best;
urr = up->upstream;
- engine = nxt_thread_event_engine();
- now = engine->timers.now;
+ now = task->thread->engine->timers.now;
nxt_thread_spin_lock(&urr->lock);
@@ -196,5 +196,5 @@ nxt_upstream_round_robin_get_peer(nxt_upstream_peer_t *up)
nxt_thread_spin_unlock(&urr->lock);
- up->ready_handler(up);
+ up->ready_handler(task, up);
}
diff --git a/src/nxt_work_queue.h b/src/nxt_work_queue.h
index 4d2422cc..8dc9c1dd 100644
--- a/src/nxt_work_queue.h
+++ b/src/nxt_work_queue.h
@@ -10,14 +10,14 @@
typedef struct nxt_work_s nxt_work_t;
-typedef struct {
+struct nxt_task_s {
nxt_thread_t *thread;
nxt_log_t *log;
uint32_t ident;
nxt_work_t *next_work;
/* TODO: exception_handler, prev/next task, subtasks. */
-} nxt_task_t;
+};
#define nxt_task_next_ident() \
diff --git a/src/nxt_worker_process.c b/src/nxt_worker_process.c
index 0f5f6eb6..01304f65 100644
--- a/src/nxt_worker_process.c
+++ b/src/nxt_worker_process.c
@@ -86,11 +86,9 @@ nxt_worker_process_start(void *data)
goto fail;
}
-#if 0
- if (nxt_cycle_listen_sockets_enable(thr, cycle) != NXT_OK) {
+ if (nxt_cycle_listen_sockets_enable(&thr->engine->task, cycle) != NXT_OK) {
goto fail;
}
-#endif
proc = cycle->processes->elts;
@@ -159,7 +157,7 @@ nxt_worker_process_quit(nxt_task_t *task)
n = cycle->listen_sockets->nelts;
while (n != 0) {
- nxt_socket_close(ls->socket);
+ nxt_socket_close(task, ls->socket);
ls->socket = -1;
ls++;