diff options
Diffstat (limited to '')
49 files changed, 1143 insertions, 1214 deletions
diff --git a/auto/sources b/auto/sources index 049ef1ef..3a5cab9a 100644 --- a/auto/sources +++ b/auto/sources @@ -126,6 +126,7 @@ NXT_LIB_SRCS=" \ src/nxt_job_resolve.c \ src/nxt_sockaddr.c \ src/nxt_listen_socket.c \ + src/nxt_upstream_round_robin.c \ " NXT_LIB_SRC0=" \ @@ -190,7 +191,6 @@ NXT_LIB_UNIT_TEST_SRCS=" \ test/nxt_rbtree_unit_test.c \ test/nxt_term_parse_unit_test.c \ test/nxt_msec_diff_unit_test.c \ - test/nxt_exp_approximation.c \ test/nxt_mem_cache_pool_unit_test.c \ test/nxt_mem_zone_unit_test.c \ test/nxt_lvlhsh_unit_test.c \ @@ -330,6 +330,7 @@ NXT_SRCS=" \ src/nxt_cycle.c \ src/nxt_port.c \ src/nxt_application.c \ + src/nxt_stream_module.c \ src/nxt_master_process.c \ src/nxt_worker_process.c \ " diff --git a/src/nxt_aix_send_file.c b/src/nxt_aix_send_file.c index b7cb3b28..6462efd5 100644 --- a/src/nxt_aix_send_file.c +++ b/src/nxt_aix_send_file.c @@ -32,7 +32,7 @@ nxt_aix_event_conn_io_send_file(nxt_event_conn_t *c, nxt_buf_t *b, size_t limit) sb.size = 0; sb.limit = limit; - nhd = nxt_sendbuf_mem_coalesce(&sb); + nhd = nxt_sendbuf_mem_coalesce(c->socket.task, &sb); if (nhd == 0 && sb.sync) { return 0; @@ -53,7 +53,7 @@ nxt_aix_event_conn_io_send_file(nxt_event_conn_t *c, nxt_buf_t *b, size_t limit) sb.iobuf = &tr; sb.nmax = 1; - ntr = nxt_sendbuf_mem_coalesce(&sb); + ntr = nxt_sendbuf_mem_coalesce(c->socket.task, &sb); nxt_memzero(&sfp, sizeof(struct sf_parms)); @@ -71,17 +71,16 @@ nxt_aix_event_conn_io_send_file(nxt_event_conn_t *c, nxt_buf_t *b, size_t limit) sfp.trailer_length = tr.iov_len; } - nxt_log_debug(c->socket.log, "send_file(%d) fd:%FD @%O:%O hd:%ui tr:%ui", - c->socket.fd, fb->file->fd, fb->file_pos, file_size, - nhd, ntr); + nxt_debug(c->socket.task, "send_file(%d) fd:%FD @%O:%O hd:%ui tr:%ui", + c->socket.fd, fb->file->fd, fb->file_pos, file_size, nhd, ntr); n = send_file(&c->socket.fd, &sfp, 0); err = (n == -1) ? nxt_errno : 0; sent = sfp.bytes_sent; - nxt_log_debug(c->socket.log, "send_file(%d): %d sent:%O", - c->socket.fd, n, sent); + nxt_debug(c->socket.task, "send_file(%d): %d sent:%O", + c->socket.fd, n, sent); /* * -1 an error has occurred, errno contains the error code; @@ -102,16 +101,15 @@ nxt_aix_event_conn_io_send_file(nxt_event_conn_t *c, nxt_buf_t *b, size_t limit) default: c->socket.error = err; - nxt_log_error(nxt_socket_error_level(err, c->socket.log_error), - c->socket.log, "send_file(%d) failed %E \"%FN\" " - "fd:%FD @%O:%O hd:%ui tr:%ui", c->socket.fd, err, - fb->file->name, fb->file->fd, fb->file_pos, - file_size, nhd, ntr); + nxt_log(c->socket.task, nxt_socket_error_level(err), + "send_file(%d) failed %E \"%FN\" fd:%FD @%O:%O hd:%ui tr:%ui", + c->socket.fd, err, fb->file->name, fb->file->fd, fb->file_pos, + file_size, nhd, ntr); return NXT_ERROR; } - nxt_log_debug(c->socket.log, "sendfile() %E", err); + nxt_debug(c->socket.task, "sendfile() %E", err); return sent; } diff --git a/src/nxt_application.c b/src/nxt_application.c index 64886cf9..e0e5ffc7 100644 --- a/src/nxt_application.c +++ b/src/nxt_application.c @@ -20,7 +20,6 @@ static void nxt_app_conn_update(nxt_thread_t *thr, nxt_event_conn_t *c, nxt_log_t *log); static nxt_int_t nxt_app_write_finish(nxt_app_request_t *r); static void nxt_app_buf_send(nxt_event_conn_t *c, nxt_buf_t *out); -static void nxt_app_buf_completion(nxt_task_t *task, void *obj, void *data); static void nxt_app_delivery_handler(nxt_task_t *task, void *obj, void *data); static void nxt_app_delivery_ready(nxt_task_t *task, void *obj, void *data); static void nxt_app_delivery_completion(nxt_task_t *task, void *obj, @@ -683,7 +682,7 @@ nxt_app_write(nxt_app_request_t *r, const u_char *data, size_t length) nxt_buf_mem_init(b, start, 4096); - b->completion_handler = nxt_app_buf_completion; + b->completion_handler = NULL; nxt_app_buf_current_number++; } @@ -713,7 +712,7 @@ nxt_app_write_finish(nxt_app_request_t *r) return NXT_ERROR; } - b->completion_handler = nxt_app_buf_completion; + b->completion_handler = NULL; b->parent = (nxt_buf_t *) r; out = r->output_buf; @@ -746,20 +745,6 @@ nxt_app_buf_send(nxt_event_conn_t *c, nxt_buf_t *out) static void -nxt_app_buf_completion(nxt_task_t *task, void *obj, void *data) -{ - nxt_buf_t *b; - - b = obj; - - nxt_debug(task, "app buf completion"); - - b->next = nxt_app_buf_done; - nxt_app_buf_done = b; -} - - -static void nxt_app_delivery_handler(nxt_task_t *task, void *obj, void *data) { nxt_buf_t *b; @@ -797,14 +782,14 @@ nxt_app_delivery_handler(nxt_task_t *task, void *obj, void *data) c->write = b; c->write_state = &nxt_app_delivery_write_state; - nxt_event_conn_write(task, c); + nxt_event_conn_write(task->thread->engine, c); } static const nxt_event_conn_state_t nxt_app_delivery_write_state nxt_aligned(64) = { - NXT_EVENT_BUF_PROCESS, + NXT_EVENT_NO_BUF_PROCESS, NXT_EVENT_TIMER_AUTORESET, nxt_app_delivery_ready, @@ -820,12 +805,26 @@ static const nxt_event_conn_state_t nxt_app_delivery_write_state static void nxt_app_delivery_ready(nxt_task_t *task, void *obj, void *data) { + nxt_buf_t *b, *next; nxt_event_conn_t *c; c = obj; nxt_debug(task, "app delivery ready"); + for (b = c->write; b != NULL; b = next) { + + if (nxt_buf_is_mem(b)) { + if (b->mem.pos != b->mem.free) { + break; + } + } + + next = b->next; + b->next = nxt_app_buf_done; + nxt_app_buf_done = b; + } + nxt_work_queue_add(c->write_work_queue, nxt_app_delivery_completion, task, c, NULL); } diff --git a/src/nxt_cycle.c b/src/nxt_cycle.c index a39bb02a..81ad8ce2 100644 --- a/src/nxt_cycle.c +++ b/src/nxt_cycle.c @@ -11,9 +11,9 @@ #include <nxt_master_process.h> -static nxt_int_t nxt_cycle_inherited_listen_sockets(nxt_thread_t *thr, +static nxt_int_t nxt_cycle_inherited_listen_sockets(nxt_task_t *task, nxt_cycle_t *cycle); -static nxt_int_t nxt_cycle_systemd_listen_sockets(nxt_thread_t *thr, +static nxt_int_t nxt_cycle_systemd_listen_sockets(nxt_task_t *task, nxt_cycle_t *cycle); static nxt_int_t nxt_cycle_event_engines(nxt_thread_t *thr, nxt_cycle_t *cycle); static nxt_int_t nxt_cycle_processes(nxt_cycle_t *cycle); @@ -39,11 +39,14 @@ static nxt_sockaddr_t *nxt_cycle_sockaddr_inet_parse(nxt_str_t *addr, static nxt_int_t nxt_cycle_conf_apply(nxt_thread_t *thr, nxt_task_t *task, nxt_cycle_t *cycle); static nxt_int_t nxt_cycle_listen_socket(nxt_cycle_t *cycle); +static nxt_int_t nxt_cycle_app_listen_socket(nxt_cycle_t *cycle); static nxt_int_t nxt_cycle_hostname(nxt_thread_t *thr, nxt_cycle_t *cycle); static nxt_int_t nxt_cycle_log_files_init(nxt_cycle_t *cycle); static nxt_int_t nxt_cycle_log_files_create(nxt_cycle_t *cycle); -static nxt_int_t nxt_cycle_listen_sockets_create(nxt_cycle_t *cycle); -static void nxt_cycle_listen_sockets_close(nxt_cycle_t *cycle); +static nxt_int_t nxt_cycle_listen_sockets_create(nxt_task_t *task, + nxt_cycle_t *cycle); +static void nxt_cycle_listen_sockets_close(nxt_task_t *task, + nxt_cycle_t *cycle); static void nxt_cycle_pid_file_delete(nxt_cycle_t *cycle); static nxt_int_t nxt_cycle_shm_zones_enable(nxt_cycle_t *cycle); static nxt_int_t nxt_cycle_shm_zone_create(nxt_cycle_shm_zone_t *shm_zone); @@ -106,7 +109,7 @@ nxt_cycle_create(nxt_thread_t *thr, nxt_task_t *task, nxt_cycle_t *previous, cycle->listen_sockets = listen_sockets; if (previous == NULL) { - ret = nxt_cycle_inherited_listen_sockets(thr, cycle); + ret = nxt_cycle_inherited_listen_sockets(task, cycle); if (nxt_slow_path(ret != NXT_OK)) { goto fail; } @@ -166,7 +169,7 @@ fail: static nxt_int_t -nxt_cycle_inherited_listen_sockets(nxt_thread_t *thr, nxt_cycle_t *cycle) +nxt_cycle_inherited_listen_sockets(nxt_task_t *task, nxt_cycle_t *cycle) { u_char *v, *p; nxt_int_t type; @@ -177,11 +180,10 @@ nxt_cycle_inherited_listen_sockets(nxt_thread_t *thr, nxt_cycle_t *cycle) v = (u_char *) getenv("NGINX"); if (v == NULL) { - return nxt_cycle_systemd_listen_sockets(thr, cycle); + return nxt_cycle_systemd_listen_sockets(task, cycle); } - nxt_log_error(NXT_LOG_NOTICE, thr->log, - "using inherited listen sockets: %s", v); + nxt_log(task, NXT_LOG_CRIT, "using inherited listen sockets: %s", v); inherited_sockets = nxt_array_create(cycle->mem_pool, 1, sizeof(nxt_listen_socket_t)); @@ -197,9 +199,9 @@ nxt_cycle_inherited_listen_sockets(nxt_thread_t *thr, nxt_cycle_t *cycle) s = nxt_int_parse(v, p - v); if (nxt_slow_path(s < 0)) { - nxt_log_emerg(thr->log, "invalid socket number " - "\"%s\" in NGINX environment variable, " - "ignoring the rest of the variable", v); + nxt_log(task, NXT_LOG_CRIT, "invalid socket number " + "\"%s\" in NGINX environment variable, " + "ignoring the rest of the variable", v); return NXT_ERROR; } @@ -212,12 +214,12 @@ nxt_cycle_inherited_listen_sockets(nxt_thread_t *thr, nxt_cycle_t *cycle) ls->socket = s; - ls->sockaddr = nxt_getsockname(cycle->mem_pool, s); + ls->sockaddr = nxt_getsockname(task, cycle->mem_pool, s); if (nxt_slow_path(ls->sockaddr == NULL)) { return NXT_ERROR; } - type = nxt_socket_getsockopt(s, SOL_SOCKET, SO_TYPE); + type = nxt_socket_getsockopt(task, s, SOL_SOCKET, SO_TYPE); if (nxt_slow_path(type == -1)) { return NXT_ERROR; } @@ -231,7 +233,7 @@ nxt_cycle_inherited_listen_sockets(nxt_thread_t *thr, nxt_cycle_t *cycle) static nxt_int_t -nxt_cycle_systemd_listen_sockets(nxt_thread_t *thr, nxt_cycle_t *cycle) +nxt_cycle_systemd_listen_sockets(nxt_task_t *task, nxt_cycle_t *cycle) { u_char *nfd, *pid; nxt_int_t n; @@ -263,8 +265,7 @@ nxt_cycle_systemd_listen_sockets(nxt_thread_t *thr, nxt_cycle_t *cycle) return NXT_OK; } - nxt_log_error(NXT_LOG_NOTICE, thr->log, - "using %s systemd listen sockets", n); + nxt_log(task, NXT_LOG_INFO, "using %s systemd listen sockets", n); inherited_sockets = nxt_array_create(cycle->mem_pool, n, sizeof(nxt_listen_socket_t)); @@ -282,7 +283,7 @@ nxt_cycle_systemd_listen_sockets(nxt_thread_t *thr, nxt_cycle_t *cycle) ls->socket = s; - ls->sockaddr = nxt_getsockname(cycle->mem_pool, s); + ls->sockaddr = nxt_getsockname(task, cycle->mem_pool, s); if (nxt_slow_path(ls->sockaddr == NULL)) { return NXT_ERROR; } @@ -652,7 +653,7 @@ nxt_cycle_event_engine_change(nxt_thread_t *thr, nxt_task_t *task, { const nxt_event_interface_t *interface; - if (thr->engine->batch == cycle->batch + if (thr->engine->batch0 == cycle->batch && nxt_strcmp(thr->engine->event.name, cycle->engine) == 0) { return NXT_OK; @@ -877,7 +878,22 @@ nxt_cycle_conf_read_cmd(nxt_thread_t *thr, nxt_cycle_t *cycle) return NXT_ERROR; } - cycle->listen = sa; + cycle->app_listen = sa; + + continue; + } + + if (nxt_strcmp(p, "--upstream") == 0) { + if (*argv == NULL) { + nxt_log_emerg(thr->log, + "no argument for option \"--upstream\""); + return NXT_ERROR; + } + + p = *argv++; + + cycle->upstream.length = nxt_strlen(p); + cycle->upstream.start = (u_char *) p; continue; } @@ -1039,7 +1055,7 @@ nxt_cycle_sockaddr_unix_parse(nxt_str_t *addr, nxt_mem_pool_t *mp, #endif - sa = nxt_sockaddr_alloc(mp, socklen); + sa = nxt_sockaddr_alloc(mp, socklen, addr->length); if (nxt_slow_path(sa == NULL)) { return NULL; @@ -1206,8 +1222,8 @@ nxt_cycle_sockaddr_inet_parse(nxt_str_t *addr, nxt_mem_pool_t *mp, } } - sa = nxt_sockaddr_alloc(mp, sizeof(struct sockaddr_in)); - + sa = nxt_sockaddr_alloc(mp, sizeof(struct sockaddr_in), + NXT_INET_ADDR_STR_LEN); if (nxt_slow_path(sa == NULL)) { return NULL; } @@ -1241,6 +1257,10 @@ nxt_cycle_conf_apply(nxt_thread_t *thr, nxt_task_t *task, nxt_cycle_t *cycle) return NXT_ERROR; } + if (nxt_cycle_app_listen_socket(cycle) != NXT_OK) { + return NXT_ERROR; + } + if (nxt_cycle_listen_socket(cycle) != NXT_OK) { return NXT_ERROR; } @@ -1249,7 +1269,7 @@ nxt_cycle_conf_apply(nxt_thread_t *thr, nxt_task_t *task, nxt_cycle_t *cycle) return NXT_ERROR; } - if (nxt_cycle_listen_sockets_create(cycle) != NXT_OK) { + if (nxt_cycle_listen_sockets_create(task, cycle) != NXT_OK) { return NXT_ERROR; } @@ -1257,7 +1277,7 @@ nxt_cycle_conf_apply(nxt_thread_t *thr, nxt_task_t *task, nxt_cycle_t *cycle) return NXT_ERROR; } - nxt_cycle_listen_sockets_close(cycle); + nxt_cycle_listen_sockets_close(task, cycle); return NXT_OK; } @@ -1267,36 +1287,34 @@ static nxt_int_t nxt_cycle_listen_socket(nxt_cycle_t *cycle) { nxt_sockaddr_t *sa; -// nxt_work_queue_t *wq; nxt_listen_socket_t *ls; - if (cycle->listen == NULL) { - sa = nxt_sockaddr_alloc(cycle->mem_pool, sizeof(struct sockaddr_in)); + if (cycle->stream_listen == NULL) { + sa = nxt_sockaddr_alloc(cycle->mem_pool, sizeof(struct sockaddr_in), + NXT_INET_ADDR_STR_LEN); if (sa == NULL) { return NXT_ERROR; } sa->type = SOCK_STREAM; sa->u.sockaddr_in.sin_family = AF_INET; - sa->u.sockaddr_in.sin_port = htons(8080); + sa->u.sockaddr_in.sin_port = htons(8000); - cycle->listen = sa; + cycle->stream_listen = sa; } - if (nxt_sockaddr_text(cycle->mem_pool, cycle->listen, 1) != NXT_OK) { - return NXT_ERROR; - } + nxt_sockaddr_text(cycle->stream_listen); - ls = nxt_cycle_listen_socket_add(cycle, cycle->listen); + ls = nxt_cycle_listen_socket_add(cycle, cycle->stream_listen); if (ls == NULL) { return NXT_ERROR; } ls->read_after_accept = 1; -#if 0 ls->flags = NXT_NONBLOCK; +#if 0 /* STUB */ wq = nxt_mem_zalloc(cf->mem_pool, sizeof(nxt_work_queue_t)); if (wq == NULL) { @@ -1306,6 +1324,7 @@ nxt_cycle_listen_socket(nxt_cycle_t *cycle) /**/ ls->work_queue = wq; +#endif ls->handler = nxt_stream_connection_init; /* @@ -1316,7 +1335,40 @@ nxt_cycle_listen_socket(nxt_cycle_t *cycle) + sizeof(nxt_event_conn_proxy_t) + sizeof(nxt_event_conn_t) + 4 * sizeof(nxt_buf_t); -#endif + + return NXT_OK; +} + + +static nxt_int_t +nxt_cycle_app_listen_socket(nxt_cycle_t *cycle) +{ + nxt_sockaddr_t *sa; +// nxt_work_queue_t *wq; + nxt_listen_socket_t *ls; + + if (cycle->app_listen == NULL) { + sa = nxt_sockaddr_alloc(cycle->mem_pool, sizeof(struct sockaddr_in), + NXT_INET_ADDR_STR_LEN); + if (sa == NULL) { + return NXT_ERROR; + } + + sa->type = SOCK_STREAM; + sa->u.sockaddr_in.sin_family = AF_INET; + sa->u.sockaddr_in.sin_port = htons(8080); + + cycle->app_listen = sa; + } + + nxt_sockaddr_text(cycle->app_listen); + + ls = nxt_cycle_listen_socket_add(cycle, cycle->app_listen); + if (ls == NULL) { + return NXT_ERROR; + } + + ls->read_after_accept = 1; return NXT_OK; } @@ -1335,16 +1387,15 @@ nxt_cycle_listen_socket_add(nxt_cycle_t *cycle, nxt_sockaddr_t *sa) mp = cycle->mem_pool; - ls->sockaddr = nxt_sockaddr_create(mp, &sa->u.sockaddr, nxt_socklen(sa)); + ls->sockaddr = nxt_sockaddr_create(mp, &sa->u.sockaddr, sa->socklen, + sa->length); if (ls->sockaddr == NULL) { return NULL; } ls->sockaddr->type = sa->type; - if (nxt_sockaddr_text(mp, ls->sockaddr, 1) != NXT_OK) { - return NULL; - } + nxt_sockaddr_text(ls->sockaddr); ls->socket = -1; ls->backlog = NXT_LISTEN_BACKLOG; @@ -1483,7 +1534,7 @@ nxt_cycle_log_files_create(nxt_cycle_t *cycle) static nxt_int_t -nxt_cycle_listen_sockets_create(nxt_cycle_t *cycle) +nxt_cycle_listen_sockets_create(nxt_task_t *task, nxt_cycle_t *cycle) { nxt_uint_t c, p, ncurr, nprev; nxt_listen_socket_t *curr, *prev; @@ -1510,7 +1561,7 @@ nxt_cycle_listen_sockets_create(nxt_cycle_t *cycle) if (nxt_sockaddr_cmp(curr[c].sockaddr, prev[p].sockaddr)) { - if (nxt_listen_socket_update(&curr[c], &prev[p]) != NXT_OK) { + if (nxt_listen_socket_update(task, &curr[c], &prev[p]) != NXT_OK) { return NXT_ERROR; } @@ -1518,7 +1569,7 @@ nxt_cycle_listen_sockets_create(nxt_cycle_t *cycle) } } - if (nxt_listen_socket_create(&curr[c], cycle->test_config) != NXT_OK) { + if (nxt_listen_socket_create(task, &curr[c], 0) != NXT_OK) { return NXT_ERROR; } @@ -1532,7 +1583,7 @@ nxt_cycle_listen_sockets_create(nxt_cycle_t *cycle) static void -nxt_cycle_listen_sockets_close(nxt_cycle_t *cycle) +nxt_cycle_listen_sockets_close(nxt_task_t *task, nxt_cycle_t *cycle) { nxt_uint_t p, c, nprev, ncurr; nxt_listen_socket_t *curr, *prev; @@ -1555,7 +1606,7 @@ nxt_cycle_listen_sockets_close(nxt_cycle_t *cycle) } } - nxt_socket_close(prev[p].socket); + nxt_socket_close(task, prev[p].socket); next: @@ -1576,8 +1627,10 @@ nxt_cycle_listen_sockets_enable(nxt_task_t *task, nxt_cycle_t *cycle) n = cycle->listen_sockets->nelts; for (i = 0; i < n; i++) { - if (nxt_event_conn_listen(task, &ls[i]) != NXT_OK) { - return NXT_ERROR; + if (ls[i].flags == NXT_NONBLOCK) { + if (nxt_event_conn_listen(task, &ls[i]) != NXT_OK) { + return NXT_ERROR; + } } } diff --git a/src/nxt_cycle.h b/src/nxt_cycle.h index 4a5b656d..6dbde173 100644 --- a/src/nxt_cycle.h +++ b/src/nxt_cycle.h @@ -77,7 +77,10 @@ struct nxt_cycle_s { const char *group; const char *pid; const char *error_log; - nxt_sockaddr_t *listen; + + nxt_sockaddr_t *app_listen; + nxt_sockaddr_t *stream_listen; + nxt_str_t upstream; }; @@ -146,6 +149,7 @@ nxt_int_t nxt_cycle_shm_zone_add(nxt_cycle_t *cycle, nxt_str_t *name, void nxt_cdecl nxt_log_time_handler(nxt_uint_t level, nxt_log_t *log, const char *fmt, ...); +void nxt_stream_connection_init(nxt_task_t *task, void *obj, void *data); nxt_int_t nxt_app_start(nxt_cycle_t *cycle); diff --git a/src/nxt_epoll_engine.c b/src/nxt_epoll_engine.c index 19e0389b..149f5cb8 100644 --- a/src/nxt_epoll_engine.c +++ b/src/nxt_epoll_engine.c @@ -106,7 +106,7 @@ static nxt_event_conn_io_t nxt_epoll_edge_event_conn_io = { nxt_epoll_edge_event_conn_io_recvbuf, nxt_event_conn_io_recv, - nxt_event_conn_io_write, + nxt_conn_io_write, nxt_event_conn_io_write_chunk, #if (NXT_HAVE_LINUX_SENDFILE) @@ -697,7 +697,7 @@ nxt_epoll_add_signal(nxt_event_engine_t *engine) engine->u.epoll.signalfd.fd = fd; - if (nxt_fd_nonblocking(fd) != NXT_OK) { + if (nxt_fd_nonblocking(&engine->task, fd) != NXT_OK) { return NXT_ERROR; } @@ -779,7 +779,8 @@ nxt_epoll_enable_post(nxt_event_engine_t *engine, nxt_work_handler_t handler) return NXT_ERROR; } - if (nxt_fd_nonblocking(engine->u.epoll.eventfd.fd) != NXT_OK) { + ret = nxt_fd_nonblocking(&engine->task, engine->u.epoll.eventfd.fd); + if (nxt_slow_path(ret != NXT_OK)) { return NXT_ERROR; } @@ -998,7 +999,7 @@ nxt_epoll_event_conn_io_accept4(nxt_task_t *task, void *obj, void *data) cls->ready--; cls->socket.read_ready = (cls->ready != 0); - len = nxt_socklen(c->remote); + len = c->remote->socklen; if (len >= sizeof(struct sockaddr)) { sa = &c->remote->u.sockaddr; @@ -1049,7 +1050,7 @@ nxt_epoll_edge_event_conn_io_connect(nxt_task_t *task, void *obj, void *data) state = c->write_state; - switch (nxt_socket_connect(c->socket.fd, c->remote) ){ + switch (nxt_socket_connect(task, c->socket.fd, c->remote) ){ case NXT_OK: c->socket.write_ready = 1; @@ -1105,8 +1106,7 @@ nxt_epoll_edge_event_conn_io_connect(nxt_task_t *task, void *obj, void *data) break; } - nxt_event_conn_io_handle(task->thread, c->write_work_queue, handler, - task, c, data); + nxt_work_queue_add(c->write_work_queue, handler, task, c, data); } @@ -1126,8 +1126,8 @@ nxt_epoll_edge_event_conn_connected(nxt_task_t *task, void *obj, void *data) nxt_timer_disable(task->thread->engine, &c->write_timer); } - nxt_event_conn_io_handle(task->thread, c->write_work_queue, - c->write_state->ready_handler, task, c, data); + nxt_work_queue_add(c->write_work_queue, c->write_state->ready_handler, + task, c, data); return; } diff --git a/src/nxt_event_conn.c b/src/nxt_event_conn.c index b78a9251..079901f3 100644 --- a/src/nxt_event_conn.c +++ b/src/nxt_event_conn.c @@ -21,7 +21,7 @@ nxt_event_conn_io_t nxt_unix_event_conn_io = { nxt_event_conn_io_recvbuf, nxt_event_conn_io_recv, - nxt_event_conn_io_write, + nxt_conn_io_write, nxt_event_conn_io_write_chunk, #if (NXT_HAVE_LINUX_SENDFILE) @@ -48,7 +48,7 @@ nxt_event_conn_io_t nxt_unix_event_conn_io = { nxt_event_conn_t * -nxt_event_conn_create(nxt_mem_pool_t *mp, nxt_log_t *log) +nxt_event_conn_create(nxt_mem_pool_t *mp, nxt_task_t *task) { nxt_thread_t *thr; nxt_event_conn_t *c; @@ -63,7 +63,7 @@ nxt_event_conn_create(nxt_mem_pool_t *mp, nxt_log_t *log) c->socket.fd = -1; c->socket.log = &c->log; - c->log = *log; + c->log = *task->log; /* The while loop skips possible uint32_t overflow. */ @@ -185,7 +185,7 @@ nxt_conn_shutdown_handler(nxt_task_t *task, void *obj, void *data) c->socket.shutdown = 1; - nxt_socket_shutdown(c->socket.fd, SHUT_RDWR); + nxt_socket_shutdown(task, c->socket.fd, SHUT_RDWR); nxt_work_queue_add(&engine->close_work_queue, nxt_conn_close_handler, task, c, engine); @@ -210,7 +210,7 @@ nxt_conn_close_handler(nxt_task_t *task, void *obj, void *data) events_pending = nxt_fd_event_close(engine, &c->socket); if (events_pending == 0) { - nxt_socket_close(c->socket.fd); + nxt_socket_close(task, c->socket.fd); c->socket.fd = -1; if (timers_pending == 0) { @@ -242,7 +242,7 @@ nxt_conn_close_timer_handler(nxt_task_t *task, void *obj, void *data) nxt_debug(task, "event conn close handler fd:%d", c->socket.fd); if (c->socket.fd != -1) { - nxt_socket_close(c->socket.fd); + nxt_socket_close(task, c->socket.fd); c->socket.fd = -1; } diff --git a/src/nxt_event_conn.h b/src/nxt_event_conn.h index 3f907633..73415cfe 100644 --- a/src/nxt_event_conn.h +++ b/src/nxt_event_conn.h @@ -175,7 +175,7 @@ typedef struct { nxt_task_t task; uint32_t ready; - uint32_t batch; + uint32_t batch0; /* An accept() interface is cached to minimize memory accesses. */ nxt_work_handler_t accept; @@ -189,18 +189,6 @@ typedef struct { #define \ -nxt_event_conn_io_handle(thr, wq, handler, task, c, data) \ - do { \ - if (thr->engine->batch != 0) { \ - nxt_work_queue_add(wq, handler, task, c, data); \ - \ - } else { \ - handler(task, c, data); \ - } \ - } while (0) - - -#define \ nxt_event_conn_timer_init(ev, c, wq) \ do { \ (ev)->work_queue = (wq); \ @@ -222,12 +210,12 @@ nxt_event_write_timer_conn(ev) \ #if (NXT_HAVE_UNIX_DOMAIN) #define \ -nxt_event_conn_tcp_nodelay_on(c) \ +nxt_event_conn_tcp_nodelay_on(task, c) \ do { \ nxt_int_t ret; \ \ if ((c)->remote->u.sockaddr.sa_family != AF_UNIX) { \ - ret = nxt_socket_setsockopt((c)->socket.fd, IPPROTO_TCP, \ + ret = nxt_socket_setsockopt(task, (c)->socket.fd, IPPROTO_TCP, \ TCP_NODELAY, 1); \ \ (c)->tcp_nodelay = (ret == NXT_OK); \ @@ -238,11 +226,11 @@ nxt_event_conn_tcp_nodelay_on(c) \ #else #define \ -nxt_event_conn_tcp_nodelay_on(c) \ +nxt_event_conn_tcp_nodelay_on(task, c) \ do { \ nxt_int_t ret; \ \ - ret = nxt_socket_setsockopt((c)->socket.fd, IPPROTO_TCP, \ + ret = nxt_socket_setsockopt(task, (c)->socket.fd, IPPROTO_TCP, \ TCP_NODELAY, 1); \ \ (c)->tcp_nodelay = (ret == NXT_OK); \ @@ -252,7 +240,7 @@ nxt_event_conn_tcp_nodelay_on(c) \ NXT_EXPORT nxt_event_conn_t *nxt_event_conn_create(nxt_mem_pool_t *mp, - nxt_log_t *log); + nxt_task_t *task); void nxt_event_conn_io_shutdown(nxt_task_t *task, void *obj, void *data); NXT_EXPORT void nxt_event_conn_close(nxt_event_engine_t *engine, nxt_event_conn_t *c); @@ -262,8 +250,7 @@ NXT_EXPORT void nxt_event_conn_timer(nxt_event_engine_t *engine, NXT_EXPORT void nxt_event_conn_work_queue_set(nxt_event_conn_t *c, nxt_work_queue_t *wq); -NXT_EXPORT void nxt_event_conn_connect(nxt_task_t *task, nxt_event_conn_t *c); -void nxt_event_conn_batch_socket(nxt_task_t *task, void *obj, void *data); +void nxt_event_conn_sys_socket(nxt_task_t *task, void *obj, void *data); void nxt_event_conn_io_connect(nxt_task_t *task, void *obj, void *data); nxt_int_t nxt_event_conn_socket(nxt_task_t *task, nxt_event_conn_t *c); void nxt_event_conn_connect_test(nxt_task_t *task, void *obj, void *data); @@ -277,17 +264,23 @@ NXT_EXPORT void nxt_event_conn_accept(nxt_task_t *task, void nxt_event_conn_accept_error(nxt_task_t *task, nxt_event_conn_listen_t *cls, const char *accept_syscall, nxt_err_t err); -NXT_EXPORT void nxt_event_conn_read(nxt_task_t *task, nxt_event_conn_t *c); +void nxt_conn_wait(nxt_event_conn_t *c); + void nxt_event_conn_io_read(nxt_task_t *task, void *obj, void *data); ssize_t nxt_event_conn_io_recvbuf(nxt_event_conn_t *c, nxt_buf_t *b); ssize_t nxt_event_conn_io_recv(nxt_event_conn_t *c, void *buf, size_t size, nxt_uint_t flags); -NXT_EXPORT void nxt_event_conn_write(nxt_task_t *task, nxt_event_conn_t *c); +void nxt_conn_io_write(nxt_task_t *task, void *obj, void *data); +ssize_t nxt_conn_io_sendbuf(nxt_task_t *task, nxt_sendbuf_t *sb); +ssize_t nxt_conn_io_writev(nxt_task_t *task, nxt_sendbuf_t *sb, + nxt_iobuf_t *iob, nxt_uint_t niob); +ssize_t nxt_conn_io_send(nxt_task_t *task, nxt_sendbuf_t *sb, void *buf, + size_t size); + size_t nxt_event_conn_write_limit(nxt_event_conn_t *c); nxt_bool_t nxt_event_conn_write_delayed(nxt_event_engine_t *engine, nxt_event_conn_t *c, size_t sent); -void nxt_event_conn_io_write(nxt_task_t *task, void *obj, void *data); ssize_t nxt_event_conn_io_write_chunk(nxt_event_conn_t *c, nxt_buf_t *b, size_t limit); ssize_t nxt_event_conn_io_writev(nxt_event_conn_t *c, nxt_iobuf_t *iob, @@ -301,29 +294,30 @@ NXT_EXPORT void nxt_event_conn_job_sendfile(nxt_task_t *task, nxt_event_conn_t *c); -#define \ -nxt_event_conn_connect_enqueue(thr, task, c) \ - nxt_work_queue_add(&thr->engine->socket_work_queue, \ - nxt_event_conn_batch_socket, task, c, c->socket.data) +#define nxt_event_conn_connect(engine, c) \ + nxt_work_queue_add(&engine->socket_work_queue, nxt_event_conn_sys_socket, \ + c->socket.task, c, c->socket.data) -#define \ -nxt_event_conn_read_enqueue(thr, task, c) \ +#define nxt_event_conn_read(e, c) \ do { \ - c->socket.read_work_queue = &thr->engine->read_work_queue; \ + nxt_event_engine_t *engine = e; \ \ - nxt_work_queue_add(&thr->engine->read_work_queue, \ - c->io->read, task, c, c->socket.data); \ + c->socket.read_work_queue = &engine->read_work_queue; \ + \ + nxt_work_queue_add(&engine->read_work_queue, c->io->read, \ + c->socket.task, c, c->socket.data); \ } while (0) -#define \ -nxt_event_conn_write_enqueue(thr, task, c) \ +#define nxt_event_conn_write(e, c) \ do { \ - c->socket.write_work_queue = &thr->engine->write_work_queue; \ + nxt_event_engine_t *engine = e; \ + \ + c->socket.write_work_queue = &engine->write_work_queue; \ \ - nxt_work_queue_add(&thr->engine->write_work_queue, \ - c->io->write, task, c, c->socket.data); \ + nxt_work_queue_add(&engine->write_work_queue, c->io->write, \ + c->socket.task, c, c->socket.data); \ } while (0) @@ -353,6 +347,7 @@ typedef struct { uint8_t connected; /* 1 bit */ uint8_t delayed; /* 1 bit */ uint8_t retries; /* 8 bits */ + uint8_t retain; /* 2 bits */ nxt_work_handler_t completion_handler; } nxt_event_conn_proxy_t; diff --git a/src/nxt_event_conn_accept.c b/src/nxt_event_conn_accept.c index 7b2dce21..606cdf03 100644 --- a/src/nxt_event_conn_accept.c +++ b/src/nxt_event_conn_accept.c @@ -44,16 +44,9 @@ nxt_event_conn_listen(nxt_task_t *task, nxt_listen_socket_t *ls) cls->socket.fd = ls->socket; engine = task->thread->engine; - cls->batch = engine->batch; - - if (cls->batch != 0) { - cls->socket.read_work_queue = &engine->accept_work_queue; - - } else { - cls->socket.read_work_queue = &engine->fast_work_queue; - cls->batch = 1; - } + cls->batch0 = engine->batch0; + cls->socket.read_work_queue = &engine->accept_work_queue; cls->socket.read_handler = nxt_event_conn_listen_handler; cls->socket.error_handler = nxt_event_conn_listen_event_error; cls->socket.log = &nxt_main_log; @@ -102,7 +95,7 @@ nxt_event_conn_accept_alloc(nxt_task_t *task, nxt_event_conn_listen_t *cls) if (nxt_fast_path(mp != NULL)) { /* This allocation cannot fail. */ - c = nxt_event_conn_create(mp, cls->socket.log); + c = nxt_event_conn_create(mp, cls->socket.task); cls->socket.data = c; c->socket.read_work_queue = cls->socket.read_work_queue; @@ -112,7 +105,7 @@ nxt_event_conn_accept_alloc(nxt_task_t *task, nxt_event_conn_listen_t *cls) c->listen = ls; /* This allocation cannot fail. */ - remote = nxt_sockaddr_alloc(mp, ls->socklen); + remote = nxt_sockaddr_alloc(mp, ls->socklen, ls->address_length); c->remote = remote; sa = ls->sockaddr; @@ -137,7 +130,7 @@ nxt_event_conn_listen_handler(nxt_task_t *task, void *obj, void *data) nxt_event_conn_listen_t *cls; cls = obj; - cls->ready = cls->batch; + cls->ready = cls->batch0; cls->accept(task, cls, data); } @@ -158,7 +151,7 @@ nxt_event_conn_io_accept(nxt_task_t *task, void *obj, void *data) cls->ready--; cls->socket.read_ready = (cls->ready != 0); - len = nxt_socklen(c->remote); + len = c->remote->socklen; if (len >= sizeof(struct sockaddr)) { sa = &c->remote->u.sockaddr; @@ -182,8 +175,8 @@ nxt_event_conn_io_accept(nxt_task_t *task, void *obj, void *data) * Linux does not inherit non-blocking mode * from listen socket for accept()ed socket. */ - if (nxt_slow_path(nxt_socket_nonblocking(s) != NXT_OK)) { - nxt_socket_close(s); + if (nxt_slow_path(nxt_socket_nonblocking(task, s) != NXT_OK)) { + nxt_socket_close(task, s); } #endif @@ -200,10 +193,10 @@ nxt_event_conn_accept(nxt_task_t *task, nxt_event_conn_listen_t *cls, { nxt_event_conn_t *next; - /* This allocation cannot fail. */ - (void) nxt_sockaddr_text(c->mem_pool, c->remote, 0); + nxt_sockaddr_text(c->remote); - nxt_debug(task, "client: %*s", c->remote->text_len, c->remote->text); + nxt_debug(task, "client: %*s", + c->remote->address_length, nxt_sockaddr_address(c->remote)); nxt_queue_insert_head(&task->thread->engine->idle_connections, &c->link); diff --git a/src/nxt_event_conn_connect.c b/src/nxt_event_conn_connect.c index ccd84011..7db8f704 100644 --- a/src/nxt_event_conn_connect.c +++ b/src/nxt_event_conn_connect.c @@ -8,31 +8,7 @@ void -nxt_event_conn_connect(nxt_task_t *task, nxt_event_conn_t *c) -{ - void *data; - nxt_event_engine_t *engine; - - data = c->socket.data; - engine = task->thread->engine; - - if (engine->batch != 0) { - nxt_work_queue_add(&engine->socket_work_queue, - nxt_event_conn_batch_socket, task, c, data); - return; - } - - if (nxt_event_conn_socket(task, c) == NXT_OK) { - c->io->connect(task, c, data); - return; - } - - c->write_state->error_handler(task, c, data); -} - - -void -nxt_event_conn_batch_socket(nxt_task_t *task, void *obj, void *data) +nxt_event_conn_sys_socket(nxt_task_t *task, void *obj, void *data) { nxt_event_conn_t *c; nxt_work_handler_t handler; @@ -64,7 +40,7 @@ nxt_event_conn_io_connect(nxt_task_t *task, void *obj, void *data) state = c->write_state; - switch (nxt_socket_connect(c->socket.fd, c->remote)) { + switch (nxt_socket_connect(task, c->socket.fd, c->remote)) { case NXT_OK: c->socket.write_ready = 1; @@ -91,8 +67,7 @@ nxt_event_conn_io_connect(nxt_task_t *task, void *obj, void *data) break; } - nxt_event_conn_io_handle(task->thread, c->write_work_queue, handler, task, - c, data); + nxt_work_queue_add(c->write_work_queue, handler, task, c, data); } @@ -106,7 +81,7 @@ nxt_event_conn_socket(nxt_task_t *task, nxt_event_conn_t *c) family = c->remote->u.sockaddr.sa_family; - s = nxt_socket_create(family, c->remote->type, 0, NXT_NONBLOCK); + s = nxt_socket_create(task, family, c->remote->type, 0, NXT_NONBLOCK); if (nxt_slow_path(s == -1)) { return NXT_ERROR; @@ -130,8 +105,8 @@ nxt_event_conn_socket(nxt_task_t *task, nxt_event_conn_t *c) c->write_timer.task = task; if (c->local != NULL) { - if (nxt_slow_path(nxt_socket_bind(s, c->local, 0) != NXT_OK)) { - nxt_socket_close(s); + if (nxt_slow_path(nxt_socket_bind(task, s, c->local, 0) != NXT_OK)) { + nxt_socket_close(task, s); return NXT_ERROR; } } @@ -172,16 +147,15 @@ nxt_event_conn_connect_test(nxt_task_t *task, void *obj, void *data) } if (err == 0) { - nxt_event_conn_io_handle(task->thread, c->write_work_queue, - c->write_state->ready_handler, task, c, data); + nxt_work_queue_add(c->write_work_queue, c->write_state->ready_handler, + task, c, data); return; } c->socket.error = err; - nxt_log(task, nxt_socket_error_level(err, c->socket.log_error), - "connect(%d, %*s) failed %E", - c->socket.fd, c->remote->text_len, c->remote->text, err); + nxt_log(task, nxt_socket_error_level(err), "connect(%d, %*s) failed %E", + c->socket.fd, c->remote->length, nxt_sockaddr_start(c->remote)); nxt_event_conn_connect_error(task, c, data); } @@ -216,6 +190,5 @@ nxt_event_conn_connect_error(nxt_task_t *task, void *obj, void *data) break; } - nxt_event_conn_io_handle(task->thread, c->write_work_queue, handler, - task, c, data); + nxt_work_queue_add(c->write_work_queue, handler, task, c, data); } diff --git a/src/nxt_event_conn_job_sendfile.c b/src/nxt_event_conn_job_sendfile.c index dd43c16f..bf2219f2 100644 --- a/src/nxt_event_conn_job_sendfile.c +++ b/src/nxt_event_conn_job_sendfile.c @@ -219,9 +219,8 @@ nxt_event_conn_job_sendfile_return(nxt_task_t *task, void *obj, void *data) } if (sent != 0) { - nxt_event_conn_io_handle(task->thread, c->write_work_queue, - c->write_state->ready_handler, - task, c, c->socket.data); + nxt_work_queue_add(c->write_work_queue, c->write_state->ready_handler, + task, c, c->socket.data); /* * Fall through if first operations were * successful but the last one failed. @@ -229,9 +228,8 @@ nxt_event_conn_job_sendfile_return(nxt_task_t *task, void *obj, void *data) } if (nxt_slow_path(c->socket.error != 0)) { - nxt_event_conn_io_handle(task->thread, c->write_work_queue, - c->write_state->error_handler, - task, c, c->socket.data); + nxt_work_queue_add(c->write_work_queue, c->write_state->error_handler, + task, c, c->socket.data); } } diff --git a/src/nxt_event_conn_proxy.c b/src/nxt_event_conn_proxy.c index 42b9dd0f..d91cff4a 100644 --- a/src/nxt_event_conn_proxy.c +++ b/src/nxt_event_conn_proxy.c @@ -53,6 +53,8 @@ static void nxt_event_conn_proxy_write_error(nxt_task_t *task, void *obj, void *data); static void nxt_event_conn_proxy_complete(nxt_task_t *task, nxt_event_conn_proxy_t *p); +static void nxt_event_conn_proxy_completion(nxt_task_t *task, void *obj, + void *data); static const nxt_event_conn_state_t nxt_event_conn_proxy_client_wait_state; @@ -78,7 +80,7 @@ nxt_event_conn_proxy_create(nxt_event_conn_t *client) return NULL; } - peer = nxt_event_conn_create(client->mem_pool, client->socket.log); + peer = nxt_event_conn_create(client->mem_pool, client->socket.task); if (nxt_slow_path(peer == NULL)) { return NULL; } @@ -109,11 +111,8 @@ nxt_event_conn_proxy_create(nxt_event_conn_t *client) void nxt_event_conn_proxy(nxt_task_t *task, nxt_event_conn_proxy_t *p) { - nxt_thread_t *thr; nxt_event_conn_t *peer; - thr = nxt_thread(); - /* * Peer read event: not connected, disabled. * Peer write event: not connected, disabled. @@ -127,7 +126,7 @@ nxt_event_conn_proxy(nxt_task_t *task, nxt_event_conn_proxy_t *p) peer = p->peer; peer->write_state = &nxt_event_conn_proxy_peer_connect_state; - nxt_event_conn_connect_enqueue(thr, task, peer); + nxt_event_conn_connect(task->thread->engine, peer); } /* @@ -136,7 +135,7 @@ nxt_event_conn_proxy(nxt_task_t *task, nxt_event_conn_proxy_t *p) */ p->client->read_state = &nxt_event_conn_proxy_client_wait_state; - nxt_event_conn_read(task, p->client); + nxt_conn_wait(p->client); } @@ -203,12 +202,13 @@ nxt_event_conn_proxy_client_buffer_alloc(nxt_task_t *task, void *obj, client->read_state = &nxt_event_conn_proxy_client_first_read_state; } - nxt_event_conn_read(task, client); + nxt_event_conn_read(task->thread->engine, client); } static const nxt_event_conn_state_t - nxt_event_conn_proxy_client_first_read_state nxt_aligned(64) = + nxt_event_conn_proxy_client_first_read_state + nxt_aligned(64) = { NXT_EVENT_BUF_PROCESS, NXT_EVENT_TIMER_AUTORESET, @@ -243,7 +243,7 @@ nxt_event_conn_proxy_peer_connect(nxt_task_t *task, void *obj, void *data) p->peer->write_state = &nxt_event_conn_proxy_peer_connect_state; - nxt_event_conn_connect(task, p->peer); + nxt_event_conn_connect(task->thread->engine, p->peer); } @@ -276,15 +276,15 @@ nxt_event_conn_proxy_connected(nxt_task_t *task, void *obj, void *data) p->connected = 1; - nxt_event_conn_tcp_nodelay_on(peer); - nxt_event_conn_tcp_nodelay_on(p->client); + nxt_event_conn_tcp_nodelay_on(task, peer); + nxt_event_conn_tcp_nodelay_on(task, p->client); /* Peer read event: waiting with peer_wait_timeout. */ peer->read_state = &nxt_event_conn_proxy_peer_wait_state; peer->write_state = &nxt_event_conn_proxy_peer_write_state; - nxt_event_conn_read_enqueue(task->thread, task, peer); + nxt_conn_wait(peer); if (p->client_buffer != NULL) { client = p->client; @@ -350,7 +350,7 @@ nxt_event_conn_proxy_peer_read(nxt_task_t *task, void *obj, void *data) * Peer read event: waiting with possible peer_wait_timeout. * Peer write event: blocked. */ - nxt_event_conn_read(task, peer); + nxt_event_conn_read(task->thread->engine, peer); } @@ -469,7 +469,7 @@ nxt_event_conn_proxy_read_process(nxt_task_t *task, nxt_event_conn_proxy_t *p, } if (p->connected) { - nxt_event_conn_write_enqueue(task->thread, task, sink); + nxt_event_conn_write(task->thread->engine, sink); } } @@ -547,7 +547,7 @@ nxt_event_conn_proxy_read(nxt_task_t *task, void *obj, void *data) sink = (source == p->client) ? p->peer : p->client; if (sink->socket.error == 0) { - nxt_event_conn_read(task, source); + nxt_event_conn_read(task->thread->engine, source); } } } @@ -658,7 +658,7 @@ nxt_event_conn_proxy_write_process(nxt_task_t *task, nxt_event_conn_proxy_t *p, } if (wb->mem.pos != wb->mem.free) { - nxt_event_conn_write_enqueue(task->thread, task, sink); + nxt_event_conn_write(task->thread->engine, sink); break; } @@ -865,7 +865,7 @@ nxt_event_conn_proxy_refused(nxt_task_t *task, void *obj, void *data) p->retries--; - nxt_socket_close(peer->socket.fd); + nxt_socket_close(task, peer->socket.fd); peer->socket.fd = -1; peer->socket.error = 0; @@ -903,7 +903,7 @@ nxt_event_conn_proxy_reconnect_handler(nxt_task_t *task, void *obj, void *data) * Peer read event: disabled. * Peer write event: waiting for connection with connect_timeout. */ - nxt_event_conn_connect(task, peer); + nxt_event_conn_connect(task->thread->engine, peer); } @@ -915,8 +915,7 @@ nxt_event_conn_proxy_shutdown(nxt_task_t *task, nxt_event_conn_proxy_t *p, nxt_debug(source->socket.task, "event conn proxy shutdown source fd:%d cl:%d err:%d", - source->socket.fd, source->socket.closed, - source->socket.error); + source->socket.fd, source->socket.closed, source->socket.error); nxt_debug(sink->socket.task, "event conn proxy shutdown sink fd:%d cl:%d err:%d", @@ -927,12 +926,9 @@ nxt_event_conn_proxy_shutdown(nxt_task_t *task, nxt_event_conn_proxy_t *p, return; } - if (sink->socket.error != 0 || sink->socket.closed) { - nxt_event_conn_close(task->thread->engine, sink); - - } else { + if (sink->socket.error == 0 && !sink->socket.closed) { sink->socket.shutdown = 1; - nxt_socket_shutdown(sink->socket.fd, SHUT_WR); + nxt_socket_shutdown(task, sink->socket.fd, SHUT_WR); } if (sink->socket.error != 0 @@ -943,6 +939,8 @@ nxt_event_conn_proxy_shutdown(nxt_task_t *task, nxt_event_conn_proxy_t *p, return; } + nxt_debug(source->socket.task, "free source buffer"); + /* Free the direction's buffer. */ b = (source == p->client) ? p->client_buffer : p->peer_buffer; nxt_mem_free(source->mem_pool, b); @@ -992,6 +990,22 @@ nxt_event_conn_proxy_write_error(nxt_task_t *task, void *obj, void *data) } +static const nxt_event_conn_state_t nxt_event_conn_proxy_close_state + nxt_aligned(64) = +{ + NXT_EVENT_NO_BUF_PROCESS, + NXT_EVENT_TIMER_NO_AUTORESET, + + nxt_event_conn_proxy_completion, + NULL, + NULL, + + NULL, + NULL, + 0, +}; + + static void nxt_event_conn_proxy_complete(nxt_task_t *task, nxt_event_conn_proxy_t *p) { @@ -1002,20 +1016,41 @@ nxt_event_conn_proxy_complete(nxt_task_t *task, nxt_event_conn_proxy_t *p) nxt_debug(p->client->socket.task, "event conn proxy complete %d:%d", p->client->socket.fd, p->peer->socket.fd); + if (p->delayed) { + p->delayed = 0; + nxt_queue_remove(&p->peer->link); + } + if (p->client->socket.fd != -1) { + p->retain = 1; + p->client->write_state = &nxt_event_conn_proxy_close_state; nxt_event_conn_close(engine, p->client); } if (p->peer->socket.fd != -1) { + p->retain++; + p->peer->write_state = &nxt_event_conn_proxy_close_state; nxt_event_conn_close(engine, p->peer); - - } else if (p->delayed) { - nxt_queue_remove(&p->peer->link); - nxt_timer_delete(engine, &p->peer->write_timer); } +} - nxt_mem_free(p->client->mem_pool, p->client_buffer); - nxt_mem_free(p->client->mem_pool, p->peer_buffer); - p->completion_handler(task, p, NULL); +static void +nxt_event_conn_proxy_completion(nxt_task_t *task, void *obj, void *data) +{ + nxt_event_conn_proxy_t *p; + + p = data; + + nxt_debug(p->client->socket.task, "event conn proxy completion %d:%d:%d", + p->retain, p->client->socket.fd, p->peer->socket.fd); + + p->retain--; + + if (p->retain == 0) { + nxt_mem_free(p->client->mem_pool, p->client_buffer); + nxt_mem_free(p->client->mem_pool, p->peer_buffer); + + p->completion_handler(task, p, NULL); + } } diff --git a/src/nxt_event_conn_read.c b/src/nxt_event_conn_read.c index 20e4d39b..d898c694 100644 --- a/src/nxt_event_conn_read.c +++ b/src/nxt_event_conn_read.c @@ -8,23 +8,29 @@ void -nxt_event_conn_read(nxt_task_t *task, nxt_event_conn_t *c) +nxt_conn_wait(nxt_event_conn_t *c) { - nxt_work_queue_t *wq; - nxt_work_handler_t handler; - - handler = c->io->read; + nxt_event_engine_t *engine; + const nxt_event_conn_state_t *state; - if (task->thread->engine->batch != 0) { + nxt_debug(c->socket.task, "conn wait fd:%d rdy:%d", + c->socket.fd, c->socket.read_ready); - wq = &task->thread->engine->read_work_queue; - c->socket.read_work_queue = wq; + engine = c->socket.task->thread->engine; + state = c->read_state; - nxt_work_queue_add( wq, handler, task, c, c->socket.data); + if (c->socket.read_ready) { + nxt_work_queue_add(&engine->fast_work_queue, state->ready_handler, + c->socket.task, c, c->socket.data); return; } - handler(task, c, c->socket.data); + c->socket.read_handler = state->ready_handler; + c->socket.error_handler = state->error_handler; + + nxt_event_conn_timer(engine, c, state, &c->read_timer); + + nxt_fd_event_enable_read(engine, &c->socket); } @@ -33,7 +39,7 @@ nxt_event_conn_io_read(nxt_task_t *task, void *obj, void *data) { ssize_t n; nxt_buf_t *b; - nxt_bool_t batch; + nxt_work_queue_t *wq; nxt_event_conn_t *c; nxt_event_engine_t *engine; nxt_work_handler_t handler; @@ -46,18 +52,12 @@ nxt_event_conn_io_read(nxt_task_t *task, void *obj, void *data) engine = task->thread->engine; - batch = (engine->batch != 0); state = c->read_state; if (c->socket.read_ready) { b = c->read; - if (b == NULL) { - /* Just test descriptor readiness. */ - goto ready; - } - if (c->peek == 0) { n = c->io->recvbuf(c, b); @@ -68,34 +68,33 @@ nxt_event_conn_io_read(nxt_task_t *task, void *obj, void *data) if (n > 0) { c->nbytes = n; - if (state->process_buffers) { - nxt_recvbuf_update(b, n); - - } else { - /* - * A ready_handler must not be queued, instead buffers - * must be processed by the ready_handler at once after - * recv() operation, otherwise two sequentially queued - * recv() operations will read in the same buffers. - */ - batch = 0; + nxt_recvbuf_update(b, n); + + nxt_fd_event_block_read(engine, &c->socket); + + if (state->autoreset_timer) { + nxt_timer_disable(engine, &c->read_timer); } - goto ready; + wq = c->read_work_queue; + handler = state->ready_handler; + + nxt_work_queue_add(wq, handler, task, c, data); + + return; } if (n != NXT_AGAIN) { nxt_fd_event_block_read(engine, &c->socket); nxt_timer_disable(engine, &c->read_timer); - if (n == 0) { - handler = state->close_handler; - goto done; - } + wq = &engine->fast_work_queue; - /* n == NXT_ERROR */ - handler = state->error_handler; - goto done; + handler = (n == 0) ? state->close_handler : state->error_handler; + + nxt_work_queue_add(wq, handler, task, c, data); + + return; } } @@ -119,25 +118,6 @@ nxt_event_conn_io_read(nxt_task_t *task, void *obj, void *data) } return; - -ready: - - nxt_fd_event_block_read(engine, &c->socket); - - if (state->autoreset_timer) { - nxt_timer_disable(engine, &c->read_timer); - } - - handler = state->ready_handler; - -done: - - if (batch) { - nxt_work_queue_add(c->read_work_queue, handler, task, c, data); - - } else { - handler(task, c, data); - } } @@ -198,9 +178,9 @@ nxt_event_conn_io_recvbuf(nxt_event_conn_t *c, nxt_buf_t *b) default: c->socket.error = err; - nxt_log_error(nxt_socket_error_level(err, c->socket.log_error), - c->socket.log, "readv(%d, %ui) failed %E", - c->socket.fd, niov, err); + nxt_log(c->socket.task, nxt_socket_error_level(err), + "readv(%d, %ui) failed %E", c->socket.fd, niov, err); + return NXT_ERROR; } } @@ -233,6 +213,7 @@ nxt_event_conn_io_recv(nxt_event_conn_t *c, void *buf, size_t size, if (n == 0) { c->socket.closed = 1; c->socket.read_ready = 0; + return n; } @@ -241,19 +222,21 @@ nxt_event_conn_io_recv(nxt_event_conn_t *c, void *buf, size_t size, switch (err) { case NXT_EAGAIN: - nxt_log_debug(c->socket.log, "recv() %E", err); + nxt_debug(c->socket.task, "recv() %E", err); c->socket.read_ready = 0; + return NXT_AGAIN; case NXT_EINTR: - nxt_log_debug(c->socket.log, "recv() %E", err); + nxt_debug(c->socket.task, "recv() %E", err); continue; default: c->socket.error = err; - nxt_log_error(nxt_socket_error_level(err, c->socket.log_error), - c->socket.log, "recv(%d, %p, %uz, %ui) failed %E", - c->socket.fd, buf, size, flags, err); + nxt_log(c->socket.task, nxt_socket_error_level(err), + "recv(%d, %p, %uz, %ui) failed %E", + c->socket.fd, buf, size, flags, err); + return NXT_ERROR; } } diff --git a/src/nxt_event_conn_write.c b/src/nxt_event_conn_write.c index 72d0731b..a0f6f953 100644 --- a/src/nxt_event_conn_write.c +++ b/src/nxt_event_conn_write.c @@ -7,32 +7,16 @@ #include <nxt_main.h> -static void nxt_event_conn_average_rate_update(nxt_event_write_rate_t *rate, - size_t sent, nxt_msec_t now); -NXT_LIB_UNIT_TEST_STATIC double - nxt_event_conn_exponential_approximation(double n); static void nxt_event_conn_write_timer_handler(nxt_task_t *task, void *obj, void *data); void -nxt_event_conn_write(nxt_task_t *task, nxt_event_conn_t *c) +nxt_conn_io_write(nxt_task_t *task, void *obj, void *data) { - if (task->thread->engine->batch != 0) { - nxt_event_conn_write_enqueue(task->thread, task, c); - - } else { - c->io->write(task, c, c->socket.data); - } -} - - -void -nxt_event_conn_io_write(nxt_task_t *task, void *obj, void *data) -{ - size_t sent, limit; ssize_t ret; nxt_buf_t *b; + nxt_sendbuf_t sb; nxt_event_conn_t *c; nxt_event_engine_t *engine; @@ -40,40 +24,37 @@ nxt_event_conn_io_write(nxt_task_t *task, void *obj, void *data) nxt_debug(task, "event conn write fd:%d", c->socket.fd); - if (!c->socket.write_ready || c->delayed || c->write == NULL) { + if (!c->socket.write_ready || c->write == NULL) { return; } engine = task->thread->engine; - c->socket.write_handler = nxt_event_conn_io_write; + c->socket.write_handler = nxt_conn_io_write; c->socket.error_handler = c->write_state->error_handler; - ret = NXT_DECLINED; - sent = 0; b = c->write; - limit = nxt_event_conn_write_limit(c); - - while (limit != 0) { + sb.socket = c->socket.fd; + sb.sent = 0; + sb.size = 0; + sb.buf = b; + sb.limit = 10 * 1024 * 1024; + sb.ready = 1; + sb.sync = 0; - ret = c->io->write_chunk(c, b, limit); + do { + ret = nxt_conn_io_sendbuf(task, &sb); if (ret < 0) { /* ret == NXT_AGAIN || ret == NXT_ERROR. */ break; } - sent += ret; - limit -= ret; + sb.sent += ret; + sb.limit -= ret; - if (c->write_state->process_buffers) { - b = nxt_sendbuf_completion(task, c->write_work_queue, b, ret); - c->write = b; - - } else { - b = nxt_sendbuf_update(b, ret); - } + b = nxt_sendbuf_update(b, ret); if (b == NULL) { nxt_fd_event_block_write(engine, &c->socket); @@ -84,20 +65,20 @@ nxt_event_conn_io_write(nxt_task_t *task, void *obj, void *data) ret = NXT_AGAIN; break; } - } - nxt_debug(task, "event conn: %i sent:%z", ret, sent); + } while (sb.limit != 0); - if (sent != 0) { + nxt_debug(task, "event conn: %i sent:%z", ret, sb.sent); + + if (sb.sent != 0) { if (c->write_state->autoreset_timer) { nxt_timer_disable(engine, &c->write_timer); } } - if (ret != NXT_ERROR - && !nxt_event_conn_write_delayed(engine, c, sent)) - { - if (limit == 0) { + if (ret != NXT_ERROR) { + + if (sb.limit == 0) { /* * Postpone writing until next event poll to allow to * process other recevied events and to get new events. @@ -117,11 +98,11 @@ nxt_event_conn_io_write(nxt_task_t *task, void *obj, void *data) } } - if (ret == 0 || sent != 0) { + if (ret == 0 || sb.sent != 0) { /* "ret == 0" means a sync buffer was processed. */ - c->sent += sent; - nxt_event_conn_io_handle(task->thread, c->write_work_queue, - c->write_state->ready_handler, task, c, data); + c->sent += sb.sent; + nxt_work_queue_add(c->write_work_queue, c->write_state->ready_handler, + task, c, data); /* * Fall through if first operations were * successful but the last one failed. @@ -131,8 +112,8 @@ nxt_event_conn_io_write(nxt_task_t *task, void *obj, void *data) if (nxt_slow_path(ret == NXT_ERROR)) { nxt_fd_event_block_write(engine, &c->socket); - nxt_event_conn_io_handle(task->thread, c->write_work_queue, - c->write_state->error_handler, task, c, data); + nxt_work_queue_add(c->write_work_queue, c->write_state->error_handler, + task, c, data); } } @@ -152,8 +133,8 @@ nxt_event_conn_write_limit(nxt_event_conn_t *c) limit = rate->limit; correction = limit - (size_t) rate->average; - nxt_log_debug(c->socket.log, "event conn correction:%z average:%0.3f", - correction, rate->average); + nxt_debug(c->socket.task, "event conn correction:%z average:%0.3f", + correction, rate->average); limit += correction; @@ -174,117 +155,10 @@ nxt_bool_t nxt_event_conn_write_delayed(nxt_event_engine_t *engine, nxt_event_conn_t *c, size_t sent) { - nxt_msec_t timer; - nxt_event_write_rate_t *rate; - - rate = c->rate; - - if (rate != NULL) { - nxt_event_conn_average_rate_update(rate, sent, engine->timers.now); - - if (rate->limit_after == 0) { - timer = sent * 1000 / rate->limit; - - } else if (rate->limit_after >= sent) { - timer = sent * 1000 / rate->max_limit; - rate->limit_after -= sent; - - } else { - sent -= rate->limit_after; - timer = rate->limit_after * 1000 / rate->max_limit - + sent * 1000 / rate->limit; - rate->limit_after = 0; - } - - nxt_log_debug(c->socket.log, "event conn timer: %M", timer); - - if (timer != 0) { - c->delayed = 1; - - nxt_fd_event_block_write(engine, &c->socket); - - c->write_timer.handler = nxt_event_conn_write_timer_handler; - nxt_timer_add(engine, &c->write_timer, timer); - - return 1; - } - } - return 0; } -/* Exponentially weighted moving average rate for a given interval. */ - -static void -nxt_event_conn_average_rate_update(nxt_event_write_rate_t *rate, size_t sent, - nxt_msec_t now) -{ - double weight, delta; - nxt_msec_t elapsed; - const nxt_uint_t interval = 10; /* 10s */ - - elapsed = now - rate->last; - - if (elapsed == 0) { - return; - } - - rate->last = now; - delta = (double) elapsed / 1000; - - weight = nxt_event_conn_exponential_approximation(-delta / interval); - - rate->average = (1 - weight) * sent / delta + weight * rate->average; - - nxt_thread_log_debug("event conn delta:%0.3f, weight:%0.3f, average:%0.3f", - delta, weight, rate->average); -} - - -/* - * exp() takes tens or hundreds nanoseconds on modern CPU. - * This is a faster exp() approximation based on IEEE-754 format - * layout and described in "A Fast, Compact Approximation of - * the Exponential Function" * by N. N. Schraudolph, 1999. - */ - -NXT_LIB_UNIT_TEST_STATIC double -nxt_event_conn_exponential_approximation(double x) -{ - union { - double d; - int64_t n; - } exp; - - if (x < -100) { - /* - * The approximation is correct in -700 to 700 range. - * The "x" argument is always negative. - */ - return 0; - } - - /* - * x * 2^52 / ln(2) + (1023 * 2^52 - 261140389990637.73 - * - * 52 is the number of mantissa bits; - * 1023 is the exponent bias; - * 261140389990637.73 is the adjustment parameter to - * improve the approximation. The parameter is equal to - * - * 2^52 * ln[ 3 / (8 * ln(2)) + 0.5 ] / ln(2) - * - * Only significant digits of the double float format - * are used to present the double float constants. - */ - exp.n = x * 4503599627370496.0 / 0.69314718055994530 - + (4607182418800017408.0 - 261140389990637.73); - - return exp.d; -} - - static void nxt_event_conn_write_timer_handler(nxt_task_t *task, void *obj, void *data) { @@ -359,8 +233,7 @@ nxt_event_conn_io_writev(nxt_event_conn_t *c, nxt_iobuf_t *iob, nxt_uint_t niob) err = (n == -1) ? nxt_socket_errno : 0; - nxt_log_debug(c->socket.log, "writev(%d, %ui): %d", - c->socket.fd, niob, n); + nxt_debug(c->socket.task, "writev(%d, %ui): %d", c->socket.fd, niob, n); if (n > 0) { return n; @@ -371,19 +244,18 @@ nxt_event_conn_io_writev(nxt_event_conn_t *c, nxt_iobuf_t *iob, nxt_uint_t niob) switch (err) { case NXT_EAGAIN: - nxt_log_debug(c->socket.log, "writev() %E", err); + nxt_debug(c->socket.task, "writev() %E", err); c->socket.write_ready = 0; return NXT_AGAIN; case NXT_EINTR: - nxt_log_debug(c->socket.log, "writev() %E", err); + nxt_debug(c->socket.task, "writev() %E", err); continue; default: c->socket.error = err; - nxt_log_error(nxt_socket_error_level(err, c->socket.log_error), - c->socket.log, "writev(%d, %ui) failed %E", - c->socket.fd, niob, err); + nxt_log(c->socket.task, nxt_socket_error_level(err), + "writev(%d, %ui) failed %E", c->socket.fd, niob, err); return NXT_ERROR; } } @@ -423,11 +295,116 @@ nxt_event_conn_io_send(nxt_event_conn_t *c, void *buf, size_t size) default: c->socket.error = err; - nxt_log(c->socket.task, - nxt_socket_error_level(err, c->socket.log_error), + nxt_log(c->socket.task, nxt_socket_error_level(err), "send(%d, %p, %uz) failed %E", c->socket.fd, buf, size, err); return NXT_ERROR; } } } + + +ssize_t +nxt_conn_io_sendbuf(nxt_task_t *task, nxt_sendbuf_t *sb) +{ + nxt_uint_t niov; + struct iovec iov[NXT_IOBUF_MAX]; + + niov = nxt_sendbuf_mem_coalesce0(task, sb, iov, NXT_IOBUF_MAX); + + if (niov == 0 && sb->sync) { + return 0; + } + + return nxt_conn_io_writev(task, sb, iov, niov); +} + + +ssize_t +nxt_conn_io_writev(nxt_task_t *task, nxt_sendbuf_t *sb, struct iovec *iov, + nxt_uint_t niov) +{ + ssize_t n; + nxt_err_t err; + + if (niov == 1) { + /* Disposal of surplus kernel iovec copy-in operation. */ + return nxt_conn_io_send(task, sb, iov[0].iov_base, iov[0].iov_len); + } + + for ( ;; ) { + n = writev(sb->socket, iov, niov); + + err = (n == -1) ? nxt_socket_errno : 0; + + nxt_debug(task, "writev(%d, %ui): %d", sb->socket, niov, n); + + if (n > 0) { + return n; + } + + /* n == -1 */ + + switch (err) { + + case NXT_EAGAIN: + sb->ready = 0; + nxt_debug(task, "writev() %E", err); + + return NXT_AGAIN; + + case NXT_EINTR: + nxt_debug(task, "writev() %E", err); + continue; + + default: + sb->error = err; + nxt_log(task, nxt_socket_error_level(err), + "writev(%d, %ui) failed %E", sb->socket, niov, err); + + return NXT_ERROR; + } + } +} + + +ssize_t +nxt_conn_io_send(nxt_task_t *task, nxt_sendbuf_t *sb, void *buf, size_t size) +{ + ssize_t n; + nxt_err_t err; + + for ( ;; ) { + n = send(sb->socket, buf, size, 0); + + err = (n == -1) ? nxt_socket_errno : 0; + + nxt_debug(task, "send(%d, %p, %uz): %z", sb->socket, buf, size, n); + + if (n > 0) { + return n; + } + + /* n == -1 */ + + switch (err) { + + case NXT_EAGAIN: + sb->ready = 0; + nxt_debug(task, "send() %E", err); + + return NXT_AGAIN; + + case NXT_EINTR: + nxt_debug(task, "send() %E", err); + continue; + + default: + sb->error = err; + nxt_log(task, nxt_socket_error_level(err), + "send(%d, %p, %uz) failed %E", sb->socket, buf, size, err); + + return NXT_ERROR; + } + } +} diff --git a/src/nxt_event_engine.c b/src/nxt_event_engine.c index fbcf2384..ca79e8ed 100644 --- a/src/nxt_event_engine.c +++ b/src/nxt_event_engine.c @@ -44,7 +44,7 @@ nxt_event_engine_create(nxt_thread_t *thr, thr->engine = engine; thr->fiber = &engine->fibers->fiber; - engine->batch = batch; + engine->batch0 = batch; if (flags & NXT_ENGINE_FIBERS) { engine->fibers = nxt_fiber_main_create(engine); @@ -190,7 +190,7 @@ nxt_event_engine_signal_pipe_create(nxt_event_engine_t *engine) * and in non-blocking node for reader. */ - if (nxt_pipe_create(pipe->fds, 1, 0) != NXT_OK) { + if (nxt_pipe_create(&engine->task, pipe->fds, 1, 0) != NXT_OK) { nxt_free(pipe); return NXT_ERROR; } @@ -220,7 +220,7 @@ nxt_event_engine_signal_pipe_free(nxt_event_engine_t *engine) if (pipe->event.read_work_queue != NULL) { nxt_fd_event_close(engine, &pipe->event); - nxt_pipe_close(pipe->fds); + nxt_pipe_close(pipe->event.task, pipe->fds); } nxt_free(pipe); @@ -235,7 +235,7 @@ nxt_event_engine_signal_pipe_close(nxt_task_t *task, void *obj, void *data) pipe = obj; - nxt_pipe_close(pipe->fds); + nxt_pipe_close(pipe->event.task, pipe->fds); nxt_free(pipe); } @@ -331,15 +331,17 @@ nxt_event_engine_post_handler(nxt_task_t *task, void *obj, void *data) static void nxt_event_engine_signal_pipe_error(nxt_task_t *task, void *obj, void *data) { - nxt_event_engine_t *engine; + nxt_event_engine_t *engine; + nxt_event_engine_pipe_t *pipe; engine = task->thread->engine; + pipe = engine->pipe; nxt_log(task, NXT_LOG_CRIT, "engine pipe(%FD:%FD) event error", - engine->pipe->fds[0], engine->pipe->fds[1]); + pipe->fds[0], pipe->fds[1]); - nxt_fd_event_close(engine, &engine->pipe->event); - nxt_pipe_close(engine->pipe->fds); + nxt_fd_event_close(engine, &pipe->event); + nxt_pipe_close(pipe->event.task, pipe->fds); } @@ -373,7 +375,7 @@ nxt_event_engine_change(nxt_thread_t *thr, nxt_task_t *task, nxt_event_engine_t *engine; engine = thr->engine; - engine->batch = batch; + engine->batch0 = batch; if (!engine->event.signal_support && interface->signal_support) { /* diff --git a/src/nxt_event_engine.h b/src/nxt_event_engine.h index c34f8cd7..b78b1b71 100644 --- a/src/nxt_event_engine.h +++ b/src/nxt_event_engine.h @@ -477,7 +477,7 @@ struct nxt_event_engine_s { uint8_t shutdown; /* 1 bit */ - uint32_t batch; + uint32_t batch0; uint32_t connections; uint32_t max_connections; diff --git a/src/nxt_fd_event.h b/src/nxt_fd_event.h index ff989394..762fdf25 100644 --- a/src/nxt_fd_event.h +++ b/src/nxt_fd_event.h @@ -66,31 +66,29 @@ struct nxt_fd_event_s { #if (NXT_64BIT) nxt_fd_event_state_t read:8; /* 3 bits. */ nxt_fd_event_state_t write:8; /* 3 bits. */ - nxt_socket_error_level_t log_error:8; /* 3 bits. */ uint8_t read_ready; uint8_t write_ready; uint8_t changing; uint8_t closed; - uint8_t shutdown; uint8_t timedout; + uint8_t shutdown:1; #if (NXT_HAVE_EPOLL) uint8_t epoll_eof:1; uint8_t epoll_error:1; #endif #if (NXT_HAVE_KQUEUE) - uint8_t kq_eof; + uint8_t kq_eof:1; #endif #else /* NXT_32BIT */ nxt_fd_event_state_t read:3; nxt_fd_event_state_t write:3; - nxt_socket_error_level_t log_error:3; uint8_t read_ready:1; uint8_t write_ready:1; uint8_t changing:1; uint8_t closed:1; - uint8_t shutdown:1; uint8_t timedout:1; + uint8_t shutdown:1; #if (NXT_HAVE_EPOLL) uint8_t epoll_eof:1; uint8_t epoll_error:1; diff --git a/src/nxt_file.c b/src/nxt_file.c index b2cab337..ddde8a9e 100644 --- a/src/nxt_file.c +++ b/src/nxt_file.c @@ -320,7 +320,7 @@ nxt_file_rename(nxt_file_name_t *old_name, nxt_file_name_t *new_name) #if (NXT_HAVE_FIONBIO) nxt_int_t -nxt_fd_nonblocking(nxt_fd_t fd) +nxt_fd_nonblocking(nxt_task_t *task, nxt_fd_t fd) { int nb; @@ -330,7 +330,7 @@ nxt_fd_nonblocking(nxt_fd_t fd) return NXT_OK; } - nxt_thread_log_alert("ioctl(%d, FIONBIO) failed %E", fd, nxt_errno); + nxt_log(task, NXT_LOG_CRIT, "ioctl(%d, FIONBIO) failed %E", fd, nxt_errno); return NXT_ERROR; @@ -338,7 +338,7 @@ nxt_fd_nonblocking(nxt_fd_t fd) nxt_int_t -nxt_fd_blocking(nxt_fd_t fd) +nxt_fd_blocking(nxt_task_t *task, nxt_fd_t fd) { int nb; @@ -348,7 +348,7 @@ nxt_fd_blocking(nxt_fd_t fd) return NXT_OK; } - nxt_thread_log_alert("ioctl(%d, !FIONBIO) failed %E", fd, nxt_errno); + nxt_log(task, NXT_LOG_CRIT, "ioctl(%d, !FIONBIO) failed %E", fd, nxt_errno); return NXT_ERROR; } @@ -356,22 +356,23 @@ nxt_fd_blocking(nxt_fd_t fd) #else /* !(NXT_HAVE_FIONBIO) */ nxt_int_t -nxt_fd_nonblocking(nxt_fd_t fd) +nxt_fd_nonblocking(nxt_task_t *task, nxt_fd_t fd) { int flags; flags = fcntl(fd, F_GETFL); if (nxt_slow_path(flags == -1)) { - nxt_thread_log_alert("fcntl(%d, F_GETFL) failed %E", fd, nxt_errno); + nxt_log(task, NXT_LOG_CRIT, "fcntl(%d, F_GETFL) failed %E", + fd, nxt_errno); return NXT_ERROR; } flags |= O_NONBLOCK; if (nxt_slow_path(fcntl(fd, F_SETFL, flags) == -1)) { - nxt_thread_log_alert("fcntl(%d, F_SETFL, O_NONBLOCK) failed %E", - fd, nxt_errno); + nxt_log(task, NXT_LOG_CRIT, "fcntl(%d, F_SETFL, O_NONBLOCK) failed %E", + fd, nxt_errno); return NXT_ERROR; } @@ -380,23 +381,23 @@ nxt_fd_nonblocking(nxt_fd_t fd) nxt_int_t -nxt_fd_blocking(nxt_fd_t fd) +nxt_fd_blocking(nxt_task_t *task, nxt_fd_t fd) { int flags; flags = fcntl(fd, F_GETFL); if (nxt_slow_path(flags == -1)) { - nxt_thread_log_alert("fcntl(%d, F_GETFL) failed %E", - fd, nxt_errno); + nxt_log(task, NXT_LOG_CRIT, "fcntl(%d, F_GETFL) failed %E", + fd, nxt_errno); return NXT_ERROR; } flags &= O_NONBLOCK; if (nxt_slow_path(fcntl(fd, F_SETFL, flags) == -1)) { - nxt_thread_log_alert("fcntl(%d, F_SETFL, !O_NONBLOCK) failed %E", - fd, nxt_errno); + nxt_log(task, NXT_LOG_CRIT, "fcntl(%d, F_SETFL, !O_NONBLOCK) failed %E", + fd, nxt_errno); return NXT_ERROR; } @@ -547,24 +548,25 @@ nxt_stderr_start(void) nxt_int_t -nxt_pipe_create(nxt_fd_t *pp, nxt_bool_t nbread, nxt_bool_t nbwrite) +nxt_pipe_create(nxt_task_t *task, nxt_fd_t *pp, nxt_bool_t nbread, + nxt_bool_t nbwrite) { if (pipe(pp) != 0) { - nxt_thread_log_alert("pipe() failed %E", nxt_errno); + nxt_log(task, NXT_LOG_CRIT, "pipe() failed %E", nxt_errno); return NXT_ERROR; } - nxt_thread_log_debug("pipe(): %FD:%FD", pp[0], pp[1]); + nxt_debug(task, "pipe(): %FD:%FD", pp[0], pp[1]); if (nbread) { - if (nxt_fd_nonblocking(pp[0]) != NXT_OK) { + if (nxt_fd_nonblocking(task, pp[0]) != NXT_OK) { return NXT_ERROR; } } if (nbwrite) { - if (nxt_fd_nonblocking(pp[1]) != NXT_OK) { + if (nxt_fd_nonblocking(task, pp[1]) != NXT_OK) { return NXT_ERROR; } } @@ -574,16 +576,18 @@ nxt_pipe_create(nxt_fd_t *pp, nxt_bool_t nbread, nxt_bool_t nbwrite) void -nxt_pipe_close(nxt_fd_t *pp) +nxt_pipe_close(nxt_task_t *task, nxt_fd_t *pp) { - nxt_thread_log_debug("pipe close(%FD:%FD)", pp[0], pp[1]); + nxt_debug(task, "pipe close(%FD:%FD)", pp[0], pp[1]); if (close(pp[0]) != 0) { - nxt_thread_log_alert("pipe close (%FD) failed %E", pp[0], nxt_errno); + nxt_log(task, NXT_LOG_CRIT, "pipe close (%FD) failed %E", + pp[0], nxt_errno); } if (close(pp[1]) != 0) { - nxt_thread_log_alert("pipe close(%FD) failed %E", pp[1], nxt_errno); + nxt_log(task, NXT_LOG_CRIT, "pipe close(%FD) failed %E", + pp[1], nxt_errno); } } diff --git a/src/nxt_file.h b/src/nxt_file.h index 3ef5fbfb..01486e95 100644 --- a/src/nxt_file.h +++ b/src/nxt_file.h @@ -161,8 +161,8 @@ NXT_EXPORT nxt_int_t nxt_file_set_access(nxt_file_name_t *name, NXT_EXPORT nxt_int_t nxt_file_rename(nxt_file_name_t *old_name, nxt_file_name_t *new_name); -NXT_EXPORT nxt_int_t nxt_fd_nonblocking(nxt_fd_t fd); -NXT_EXPORT nxt_int_t nxt_fd_blocking(nxt_fd_t fd); +NXT_EXPORT nxt_int_t nxt_fd_nonblocking(nxt_task_t *task, nxt_fd_t fd); +NXT_EXPORT nxt_int_t nxt_fd_blocking(nxt_task_t *task, nxt_fd_t fd); NXT_EXPORT ssize_t nxt_fd_write(nxt_fd_t fd, u_char *buf, size_t size); NXT_EXPORT ssize_t nxt_fd_read(nxt_fd_t fd, u_char *buf, size_t size); NXT_EXPORT void nxt_fd_close(nxt_fd_t fd); @@ -185,9 +185,9 @@ nxt_write_syslog(priority, message) \ syslog(priority, "%s", message) -NXT_EXPORT nxt_int_t nxt_pipe_create(nxt_fd_t *pp, nxt_bool_t nbread, - nxt_bool_t nbwrite); -NXT_EXPORT void nxt_pipe_close(nxt_fd_t *pp); +NXT_EXPORT nxt_int_t nxt_pipe_create(nxt_task_t *task, nxt_fd_t *pp, + nxt_bool_t nbread, nxt_bool_t nbwrite); +NXT_EXPORT void nxt_pipe_close(nxt_task_t *task, nxt_fd_t *pp); NXT_EXPORT size_t nxt_dir_current(char *buf, size_t len); diff --git a/src/nxt_freebsd_sendfile.c b/src/nxt_freebsd_sendfile.c index 4a61d60f..a9535c10 100644 --- a/src/nxt_freebsd_sendfile.c +++ b/src/nxt_freebsd_sendfile.c @@ -92,7 +92,7 @@ nxt_freebsd_event_conn_io_sendfile(nxt_event_conn_t *c, nxt_buf_t *b, hdtr.trl_cnt = ntr; } - nxt_log_debug(c->socket.log, "sendfile(%FD, %d, @%O, %uz) hd:%ui tr:%ui", + nxt_debug(c->socket.task, "sendfile(%FD, %d, @%O, %uz) hd:%ui tr:%ui", fb->file->fd, c->socket.fd, fb->file_pos, file_size, nhd, ntr); @@ -102,7 +102,7 @@ nxt_freebsd_event_conn_io_sendfile(nxt_event_conn_t *c, nxt_buf_t *b, err = (n == -1) ? nxt_errno : 0; - nxt_log_debug(c->socket.log, "sendfile(): %d sent:%O", n, sent); + nxt_debug(c->socket.task, "sendfile(): %d sent:%O", n, sent); if (n == -1) { switch (err) { @@ -116,23 +116,21 @@ nxt_freebsd_event_conn_io_sendfile(nxt_event_conn_t *c, nxt_buf_t *b, default: c->socket.error = err; - nxt_log_error(nxt_socket_error_level(err, c->socket.log_error), - c->socket.log, "sendfile(%FD, %d, %O, %uz) failed " - "%E \"%FN\" hd:%ui tr:%ui", fb->file->fd, - c->socket.fd, fb->file_pos, file_size, - err, fb->file->name, nhd, ntr); + nxt_log(c->socket.task, nxt_socket_error_level(err), + "sendfile(%FD, %d, %O, %uz) failed %E \"%FN\" hd:%ui tr:%ui", + fb->file->fd, c->socket.fd, fb->file_pos, file_size, err, + fb->file->name, nhd, ntr); return NXT_ERROR; } - nxt_log_debug(c->socket.log, "sendfile() %E", err); + nxt_debug(c->socket.task, "sendfile() %E", err); return sent; } else if (sent == 0) { - nxt_log_error(NXT_LOG_ERR, c->socket.log, - "file \"%FN\" was truncated while sendfile()", - fb->file->name); + nxt_log(c->socket.task, NXT_LOG_ERR, + "file \"%FN\" was truncated while sendfile()", fb->file->name); return NXT_ERROR; } diff --git a/src/nxt_hpux_sendfile.c b/src/nxt_hpux_sendfile.c index a105b684..3c42c559 100644 --- a/src/nxt_hpux_sendfile.c +++ b/src/nxt_hpux_sendfile.c @@ -48,7 +48,7 @@ nxt_hpux_event_conn_io_sendfile(nxt_event_conn_t *c, nxt_buf_t *b, size_t limit) sb.size = 0; sb.limit = limit; - nhd = nxt_sendbuf_mem_coalesce(&sb); + nhd = nxt_sendbuf_mem_coalesce(c->socket.task, &sb); if (nhd == 0 && sb.sync) { return 0; @@ -69,7 +69,7 @@ nxt_hpux_event_conn_io_sendfile(nxt_event_conn_t *c, nxt_buf_t *b, size_t limit) sb.iobuf = &iov[1]; sb.nmax = 1; - ntr = nxt_sendbuf_mem_coalesce(&sb); + ntr = nxt_sendbuf_mem_coalesce(c->socket.task, &sb); /* * Disposal of surplus kernel operations @@ -93,7 +93,7 @@ nxt_hpux_event_conn_io_sendfile(nxt_event_conn_t *c, nxt_buf_t *b, size_t limit) hdtrl = iov; } - nxt_log_debug(c->socket.log, "sendfile(%d, %FD, @%O, %uz) hd:%ui tr:%ui", + nxt_debug(c->socket.task, "sendfile(%d, %FD, @%O, %uz) hd:%ui tr:%ui", c->socket.fd, fb->file->fd, fb->file_pos, file_size, nhd, ntr); @@ -102,7 +102,7 @@ nxt_hpux_event_conn_io_sendfile(nxt_event_conn_t *c, nxt_buf_t *b, size_t limit) err = (n == -1) ? nxt_errno : 0; - nxt_log_debug(c->socket.log, "sendfile(): %uz", n); + nxt_debug(c->socket.task, "sendfile(): %uz", n); if (n == -1) { switch (err) { @@ -116,16 +116,15 @@ nxt_hpux_event_conn_io_sendfile(nxt_event_conn_t *c, nxt_buf_t *b, size_t limit) default: c->socket.error = err; - nxt_log_error(nxt_socket_error_level(err, c->socket.log_error), - c->socket.log, "sendfile(%d, %FD, @%O, %uz) failed " - "%E \"%FN\" hd:%ui tr:%ui", c->socket.fd, - fb->file->fd, fb->file_pos, file_size, - err, &fb->file->name, nhd, ntr); + nxt_log(c->socket.task, nxt_socket_error_level(err), + "sendfile(%d, %FD, @%O, %uz) failed \"%FN\" hd:%ui tr:%ui", + c->socket.fd, fb->file_pos, file_size, &fb->file->name, + nhd, ntr); return NXT_ERROR; } - nxt_log_debug(c->socket.log, "sendfile() %E", err); + nxt_debug(c->socket.task, "sendfile() %E", err); return 0; } diff --git a/src/nxt_job_resolve.c b/src/nxt_job_resolve.c index e220b085..dc508ec3 100644 --- a/src/nxt_job_resolve.c +++ b/src/nxt_job_resolve.c @@ -81,15 +81,18 @@ nxt_job_resolve(nxt_job_resolve_t *jbr) switch (r->ai_addr->sa_family) { #if (NXT_INET6) case AF_INET6: + length = NXT_INET6_ADDR_STR_LEN; + break; #endif case AF_INET: + length = NXT_INET_ADDR_STR_LEN; break; default: continue; } - sa = nxt_sockaddr_create(mp, r->ai_addr, r->ai_addrlen); + sa = nxt_sockaddr_create(mp, r->ai_addr, r->ai_addrlen, length); if (nxt_slow_path(sa == NULL)) { goto fail; } diff --git a/src/nxt_kqueue_engine.c b/src/nxt_kqueue_engine.c index b4057389..0aeb4faa 100644 --- a/src/nxt_kqueue_engine.c +++ b/src/nxt_kqueue_engine.c @@ -119,7 +119,7 @@ static nxt_event_conn_io_t nxt_kqueue_event_conn_io = { nxt_kqueue_event_conn_io_recvbuf, nxt_event_conn_io_recv, - nxt_event_conn_io_write, + nxt_conn_io_write, nxt_event_conn_io_write_chunk, #if (NXT_HAVE_FREEBSD_SENDFILE) @@ -524,7 +524,7 @@ nxt_kqueue_fd_error_handler(nxt_task_t *task, void *obj, void *data) if (ev->kq_eof && ev->kq_errno != 0) { ev->error = ev->kq_errno; - nxt_log(task, nxt_socket_error_level(ev->kq_errno, ev->log_error), + nxt_log(task, nxt_socket_error_level(ev->kq_errno), "kevent() reported error on descriptor %d %E", ev->fd, ev->kq_errno); } @@ -848,7 +848,7 @@ nxt_kqueue_event_conn_io_connect(nxt_task_t *task, void *obj, void *data) state = c->write_state; - switch (nxt_socket_connect(c->socket.fd, c->remote) ){ + switch (nxt_socket_connect(task, c->socket.fd, c->remote) ){ case NXT_OK: c->socket.write_ready = 1; @@ -874,8 +874,7 @@ nxt_kqueue_event_conn_io_connect(nxt_task_t *task, void *obj, void *data) break; } - nxt_event_conn_io_handle(task->thread, c->write_work_queue, handler, task, - c, data); + nxt_work_queue_add(c->write_work_queue, handler, task, c, data); } @@ -909,7 +908,7 @@ nxt_kqueue_listen_handler(nxt_task_t *task, void *obj, void *data) nxt_debug(task, "kevent fd:%d avail:%D", cls->socket.fd, cls->socket.kq_available); - cls->ready = nxt_min(cls->batch, (uint32_t) cls->socket.kq_available); + cls->ready = nxt_min(cls->batch0, (uint32_t) cls->socket.kq_available); nxt_kqueue_event_conn_io_accept(task, cls, data); } @@ -933,7 +932,7 @@ nxt_kqueue_event_conn_io_accept(nxt_task_t *task, void *obj, void *data) cls->socket.kq_available--; cls->socket.read_ready = (cls->socket.kq_available != 0); - len = nxt_socklen(c->remote); + len = c->remote->socklen; if (len >= sizeof(struct sockaddr)) { sa = &c->remote->u.sockaddr; diff --git a/src/nxt_linux_sendfile.c b/src/nxt_linux_sendfile.c index 1dbac677..f5a7dccf 100644 --- a/src/nxt_linux_sendfile.c +++ b/src/nxt_linux_sendfile.c @@ -95,8 +95,8 @@ nxt_linux_event_conn_io_sendfile(nxt_event_conn_t *c, nxt_buf_t *b, size = nxt_sendbuf_file_coalesce(&sb); - nxt_log_debug(c->socket.log, "sendfile(%d, %FD, @%O, %uz)", - c->socket.fd, fb->file->fd, fb->file_pos, size); + nxt_debug(c->socket.task, "sendfile(%d, %FD, @%O, %uz)", + c->socket.fd, fb->file->fd, fb->file_pos, size); offset = fb->file_pos; @@ -104,7 +104,7 @@ nxt_linux_event_conn_io_sendfile(nxt_event_conn_t *c, nxt_buf_t *b, err = (n == -1) ? nxt_errno : 0; - nxt_log_debug(c->socket.log, "sendfile(): %d", n); + nxt_debug(c->socket.task, "sendfile(): %d", n); if (n == -1) { switch (err) { @@ -118,16 +118,15 @@ nxt_linux_event_conn_io_sendfile(nxt_event_conn_t *c, nxt_buf_t *b, default: c->socket.error = err; - nxt_log_error(nxt_socket_error_level(err, c->socket.log_error), - c->socket.log, - "sendfile(%d, %FD, %O, %uz) failed %E \"%FN\"", - c->socket.fd, fb->file->fd, fb->file_pos, size, - err, fb->file->name); + nxt_log(c->socket.task, nxt_socket_error_level(err), + "sendfile(%d, %FD, %O, %uz) failed %E \"%FN\"", + c->socket.fd, fb->file->fd, fb->file_pos, size, + err, fb->file->name); return NXT_ERROR; } - nxt_log_debug(c->socket.log, "sendfile() %E", err); + nxt_debug(c->socket.task, "sendfile() %E", err); return 0; } @@ -150,8 +149,8 @@ nxt_linux_send(nxt_event_conn_t *c, void *buf, size_t size, nxt_uint_t flags) err = (n == -1) ? nxt_errno : 0; - nxt_log_debug(c->socket.log, "send(%d, %p, %uz, 0x%uXi): %z", - c->socket.fd, buf, size, flags, n); + nxt_debug(c->socket.task, "send(%d, %p, %uz, 0x%uXi): %z", + c->socket.fd, buf, size, flags, n); if (n == -1) { switch (err) { @@ -165,14 +164,14 @@ nxt_linux_send(nxt_event_conn_t *c, void *buf, size_t size, nxt_uint_t flags) default: c->socket.error = err; - nxt_log_error(nxt_socket_error_level(err, c->socket.log_error), - c->socket.log, "send(%d, %p, %uz, 0x%uXi) failed %E", - c->socket.fd, buf, size, flags, err); + nxt_log(c->socket.task, nxt_socket_error_level(err), + "send(%d, %p, %uz, 0x%uXi) failed %E", + c->socket.fd, buf, size, flags, err); return NXT_ERROR; } - nxt_log_debug(c->socket.log, "send() %E", err); + nxt_debug(c->socket.task, "send() %E", err); return 0; } @@ -205,8 +204,8 @@ nxt_linux_sendmsg(nxt_event_conn_t *c, nxt_sendbuf_coalesce_t *sb, err = (n == -1) ? nxt_errno : 0; - nxt_log_debug(c->socket.log, "sendmsg(%d, %ui, 0x%uXi): %d", - c->socket.fd, niov, flags, n); + nxt_debug(c->socket.task, "sendmsg(%d, %ui, 0x%uXi): %d", + c->socket.fd, niov, flags, n); if (n == -1) { switch (err) { @@ -220,14 +219,14 @@ nxt_linux_sendmsg(nxt_event_conn_t *c, nxt_sendbuf_coalesce_t *sb, default: c->socket.error = err; - nxt_log_error(nxt_socket_error_level(err, c->socket.log_error), - c->socket.log, "sendmsg(%d, %ui, 0x%uXi) failed %E", - c->socket.fd, niov, flags, err); + nxt_log(c->socket.task, nxt_socket_error_level(err), + "sendmsg(%d, %ui, 0x%uXi) failed %E", + c->socket.fd, niov, flags, err); return NXT_ERROR; } - nxt_log_debug(c->socket.log, "sendmsg() %E", err); + nxt_debug(c->socket.task, "sendmsg() %E", err); return 0; } diff --git a/src/nxt_listen_socket.c b/src/nxt_listen_socket.c index 22b5bf8d..08751dfc 100644 --- a/src/nxt_listen_socket.c +++ b/src/nxt_listen_socket.c @@ -12,7 +12,8 @@ static u_char *nxt_listen_socket_log_handler(void *ctx, u_char *pos, nxt_int_t -nxt_listen_socket_create(nxt_listen_socket_t *ls, nxt_bool_t bind_test) +nxt_listen_socket_create(nxt_task_t *task, nxt_listen_socket_t *ls, + nxt_bool_t bind_test) { nxt_log_t log, *old; nxt_uint_t family; @@ -31,12 +32,12 @@ nxt_listen_socket_create(nxt_listen_socket_t *ls, nxt_bool_t bind_test) family = sa->u.sockaddr.sa_family; - s = nxt_socket_create(family, sa->type, 0, ls->flags); + s = nxt_socket_create(task, family, sa->type, 0, ls->flags); if (s == -1) { goto socket_fail; } - if (nxt_socket_setsockopt(s, SOL_SOCKET, SO_REUSEADDR, 1) != NXT_OK) { + if (nxt_socket_setsockopt(task, s, SOL_SOCKET, SO_REUSEADDR, 1) != NXT_OK) { goto fail; } @@ -48,7 +49,8 @@ nxt_listen_socket_create(nxt_listen_socket_t *ls, nxt_bool_t bind_test) ipv6only = (ls->ipv6only == 1); /* Ignore possible error. TODO: why? */ - (void) nxt_socket_setsockopt(s, IPPROTO_IPV6, IPV6_V6ONLY, ipv6only); + (void) nxt_socket_setsockopt(task, s, IPPROTO_IPV6, IPV6_V6ONLY, + ipv6only); } #endif @@ -56,7 +58,7 @@ nxt_listen_socket_create(nxt_listen_socket_t *ls, nxt_bool_t bind_test) #if 0 /* Ignore possible error. TODO: why? */ - (void) nxt_socket_setsockopt(s, SOL_SOCKET, SO_SNDBUF, 8192); + (void) nxt_socket_setsockopt(task, s, SOL_SOCKET, SO_SNDBUF, 8192); #endif @@ -65,12 +67,12 @@ nxt_listen_socket_create(nxt_listen_socket_t *ls, nxt_bool_t bind_test) if (ls->read_after_accept) { /* Defer accept() maximum at 1 second. */ /* Ignore possible error. TODO: why? */ - (void) nxt_socket_setsockopt(s, IPPROTO_TCP, TCP_DEFER_ACCEPT, 1); + (void) nxt_socket_setsockopt(task, s, IPPROTO_TCP, TCP_DEFER_ACCEPT, 1); } #endif - switch (nxt_socket_bind(s, sa, bind_test)) { + switch (nxt_socket_bind(task, s, sa, bind_test)) { case NXT_OK: break; @@ -103,11 +105,11 @@ nxt_listen_socket_create(nxt_listen_socket_t *ls, nxt_bool_t bind_test) #endif - nxt_log_debug(&log, "listen(%d, %d)", s, ls->backlog); + nxt_debug(task, "listen(%d, %d)", s, ls->backlog); if (listen(s, ls->backlog) != 0) { - nxt_log_alert(&log, "listen(%d, %d) failed %E", - s, ls->backlog, nxt_socket_errno); + nxt_log(task, NXT_LOG_CRIT, "listen(%d, %d) failed %E", + s, ls->backlog, nxt_socket_errno); goto fail; } @@ -118,7 +120,7 @@ nxt_listen_socket_create(nxt_listen_socket_t *ls, nxt_bool_t bind_test) fail: - nxt_socket_close(s); + nxt_socket_close(task, s); socket_fail: @@ -129,7 +131,8 @@ socket_fail: nxt_int_t -nxt_listen_socket_update(nxt_listen_socket_t *ls, nxt_listen_socket_t *prev) +nxt_listen_socket_update(nxt_task_t *task, nxt_listen_socket_t *ls, + nxt_listen_socket_t *prev) { nxt_log_t log, *old; nxt_thread_t *thr; @@ -143,11 +146,11 @@ nxt_listen_socket_update(nxt_listen_socket_t *ls, nxt_listen_socket_t *prev) log.ctx = ls->sockaddr; thr->log = &log; - nxt_log_debug(&log, "listen(%d, %d)", ls->socket, ls->backlog); + nxt_debug(task, "listen(%d, %d)", ls->socket, ls->backlog); if (listen(ls->socket, ls->backlog) != 0) { - nxt_log_alert(&log, "listen(%d, %d) failed %E", - ls->socket, ls->backlog, nxt_socket_errno); + nxt_log(task, NXT_LOG_CRIT, "listen(%d, %d) failed %E", + ls->socket, ls->backlog, nxt_socket_errno); goto fail; } @@ -183,6 +186,7 @@ nxt_listen_socket_pool_min_size(nxt_listen_socket_t *ls) case AF_INET6: ls->socklen = sizeof(struct sockaddr_in6); + ls->address_length = NXT_INET6_ADDR_STR_LEN; size = offsetof(nxt_sockaddr_t, u) + sizeof(struct sockaddr_in6) + NXT_INET6_ADDR_STR_LEN + (sizeof(":65535") - 1); @@ -206,6 +210,7 @@ nxt_listen_socket_pool_min_size(nxt_listen_socket_t *ls) */ ls->socklen = 3; size = ls->socklen + sizeof("unix:") - 1; + ls->address_length = sizeof("unix:") - 1; break; @@ -213,6 +218,7 @@ nxt_listen_socket_pool_min_size(nxt_listen_socket_t *ls) default: ls->socklen = sizeof(struct sockaddr_in); + ls->address_length = NXT_INET_ADDR_STR_LEN; size = offsetof(nxt_sockaddr_t, u) + sizeof(struct sockaddr_in) + NXT_INET_ADDR_STR_LEN + (sizeof(":65535") - 1); @@ -248,5 +254,5 @@ nxt_listen_socket_log_handler(void *ctx, u_char *pos, u_char *end) sa = ctx; return nxt_sprintf(pos, end, " while creating listening socket on %*s", - sa->text_len, sa->text); + sa->length, sa->start); } diff --git a/src/nxt_listen_socket.h b/src/nxt_listen_socket.h index 2100f17f..1e62ae80 100644 --- a/src/nxt_listen_socket.h +++ b/src/nxt_listen_socket.h @@ -28,10 +28,12 @@ typedef struct { uint8_t ipv6only; /* 2 bits */ #endif - void *servers; + uint8_t socklen; + uint8_t address_length; - socklen_t socklen; uint32_t mem_pool_size; + + void *servers; } nxt_listen_socket_t; @@ -51,10 +53,10 @@ typedef struct { #endif -NXT_EXPORT nxt_int_t nxt_listen_socket_create(nxt_listen_socket_t *ls, - nxt_bool_t bind_test); -NXT_EXPORT nxt_int_t nxt_listen_socket_update(nxt_listen_socket_t *ls, - nxt_listen_socket_t *prev); +NXT_EXPORT nxt_int_t nxt_listen_socket_create(nxt_task_t *task, + nxt_listen_socket_t *ls, nxt_bool_t bind_test); +NXT_EXPORT nxt_int_t nxt_listen_socket_update(nxt_task_t *task, + nxt_listen_socket_t *ls, nxt_listen_socket_t *prev); NXT_EXPORT size_t nxt_listen_socket_pool_min_size(nxt_listen_socket_t *ls); diff --git a/src/nxt_macosx_sendfile.c b/src/nxt_macosx_sendfile.c index 34a586c3..f636819c 100644 --- a/src/nxt_macosx_sendfile.c +++ b/src/nxt_macosx_sendfile.c @@ -112,7 +112,7 @@ nxt_macosx_event_conn_io_sendfile(nxt_event_conn_t *c, nxt_buf_t *b, err = (n == -1) ? nxt_errno : 0; - nxt_log_debug(c->socket.log, "sendfile(): %d sent:%O", n, sent); + nxt_debug(c->socket.task, "sendfile(): %d sent:%O", n, sent); if (n == -1) { switch (err) { @@ -126,24 +126,22 @@ nxt_macosx_event_conn_io_sendfile(nxt_event_conn_t *c, nxt_buf_t *b, default: c->socket.error = err; - nxt_log_error(nxt_socket_error_level(err, c->socket.log_error), - c->socket.log, "sendfile(%FD, %d, %O, %O) failed " - "%E \"%FN\" hd:%ui tr:%ui", fb->file->fd, - c->socket.fd, fb->file_pos, sent, err, - fb->file->name, nhd, ntr); + nxt_log(c->socket.task, nxt_socket_error_level(err), + "sendfile(%FD, %d, %O, %O) failed %E \"%FN\" hd:%ui tr:%ui", + fb->file->fd, c->socket.fd, fb->file_pos, sent, err, + fb->file->name, nhd, ntr); return NXT_ERROR; } - nxt_log_debug(c->socket.log, "sendfile() %E", err); + nxt_debug(c->socket.task, "sendfile() %E", err); return sent; } if (sent == 0) { - nxt_log_error(NXT_LOG_ERR, c->socket.log, - "file \"%FN\" was truncated while sendfile()", - fb->file->name); + nxt_log(c->socket.task, NXT_LOG_ERR, + "file \"%FN\" was truncated while sendfile()", fb->file->name); return NXT_ERROR; } @@ -154,240 +152,3 @@ nxt_macosx_event_conn_io_sendfile(nxt_event_conn_t *c, nxt_buf_t *b, return sent; } - - -#if 0 - -typedef struct { - nxt_socket_t socket; - nxt_err_t error; - - uint8_t write_ready; /* 1 bit */ - uint8_t log_error; -} nxt_sendbuf_t; - - -ssize_t nxt_macosx_sendfile(nxt_thread_t *thr, nxt_sendbuf_t *sb, nxt_buf_t *b, - size_t limit); -ssize_t nxt_writev(nxt_thread_t *thr, nxt_sendbuf_t *sb, nxt_iobuf_t *iob, - nxt_uint_t niob); -ssize_t nxt_send(nxt_thread_t *thr, nxt_sendbuf_t *sb, void *buf, size_t size); - - -ssize_t -nxt_macosx_sendfile(nxt_thread_t *thr, nxt_sendbuf_t *sb, nxt_buf_t *b, - size_t limit) -{ - size_t hd_size, file_size; - ssize_t n; - nxt_buf_t *buf; - nxt_err_t err; - nxt_off_t sent; - nxt_uint_t nhd, ntr; - struct iovec hd[NXT_IOBUF_MAX], tr[NXT_IOBUF_MAX]; - struct sf_hdtr hdtr, *ht; - nxt_sendbuf_coalesce_t sbc; - - sbc.buf = b; - sbc.iobuf = hd; - sbc.nmax = NXT_IOBUF_MAX; - sbc.sync = 0; - sbc.size = 0; - sbc.limit = limit; - - nhd = nxt_sendbuf_mem_coalesce(&sbc); - - if (nhd == 0 && sbc.sync) { - return 0; - } - - if (sbc.buf == NULL || !nxt_buf_is_file(sbc.buf)) { - return nxt_writev(thr, sb, hd, nhd); - } - - hd_size = sbc.size; - buf = sbc.buf; - - file_size = nxt_sendbuf_file_coalesce(&sbc); - - if (file_size == 0) { - return nxt_writev(thr, sb, hd, nhd); - } - - sbc.iobuf = tr; - - ntr = nxt_sendbuf_mem_coalesce(&sbc); - - /* - * Disposal of surplus kernel operations if there are no headers - * and trailers. Besides sendfile() returns EINVAL if a sf_hdtr's - * count is 0, but corresponding pointer is not NULL. - */ - - nxt_memzero(&hdtr, sizeof(struct sf_hdtr)); - ht = NULL; - - if (nhd != 0) { - ht = &hdtr; - hdtr.headers = hd; - hdtr.hdr_cnt = nhd; - } - - if (ntr != 0) { - ht = &hdtr; - hdtr.trailers = tr; - hdtr.trl_cnt = ntr; - } - - /* - * MacOSX has the same bug as old FreeBSD (http://bugs.freebsd.org/33771). - * However this bug has never been fixed and instead of this it has been - * documented as a feature in MacOSX 10.7 (Lion) sendfile(2): - * - * When a header or trailer is specified, the value of len argument - * indicates the maximum number of bytes in the header and/or file - * to be sent. It does not control the trailer; if a trailer exists, - * all of it will be sent. - */ - sent = hd_size + file_size; - - nxt_log_debug(thr->log, "sendfile(%FD, %d, @%O, %O) hd:%ui tr:%ui hs:%uz", - buf->file->fd, sb->socket, buf->file_pos, sent, - nhd, ntr, hd_size); - - n = nxt_sys_sendfile(buf->file->fd, sb->socket, - buf->file_pos, &sent, ht, 0); - - err = (n == -1) ? nxt_errno : 0; - - nxt_log_debug(thr->log, "sendfile(): %d sent:%O", n, sent); - - if (n == -1) { - switch (err) { - - case NXT_EAGAIN: - sb->write_ready = 0; - break; - - case NXT_EINTR: - break; - - default: - sb->error = err; - nxt_log_error(nxt_socket_error_level(err, sb->log_error), thr->log, - "sendfile(%FD, %d, %O, %O) failed %E \"%FN\" " - "hd:%ui tr:%ui", buf->file->fd, sb->socket, - buf->file_pos, sent, err, buf->file->name, nhd, ntr); - - return NXT_ERROR; - } - - nxt_log_debug(thr->log, "sendfile() %E", err); - - return sent; - } - - if (sent == 0) { - nxt_log_error(NXT_LOG_ERR, thr->log, - "file \"%FN\" was truncated while sendfile()", - buf->file->name); - - return NXT_ERROR; - } - - if (sent < (nxt_off_t) sbc.size) { - sb->write_ready = 0; - } - - return sent; -} - - -ssize_t -nxt_writev(nxt_thread_t *thr, nxt_sendbuf_t *sb, nxt_iobuf_t *iob, - nxt_uint_t niob) -{ - ssize_t n; - nxt_err_t err; - - if (niob == 1) { - /* Disposal of surplus kernel iovec copy-in operation. */ - return nxt_send(thr, sb, iob->iov_base, iob->iov_len); - } - - for ( ;; ) { - n = writev(sb->socket, iob, niob); - - err = (n == -1) ? nxt_socket_errno : 0; - - nxt_log_debug(thr->log, "writev(%d, %ui): %d", sb->socket, niob, n); - - if (n > 0) { - return n; - } - - /* n == -1 */ - - switch (err) { - - case NXT_EAGAIN: - nxt_log_debug(thr->log, "writev() %E", err); - sb->write_ready = 0; - return NXT_AGAIN; - - case NXT_EINTR: - nxt_log_debug(thr->log, "writev() %E", err); - continue; - - default: - sb->error = err; - nxt_log_error(nxt_socket_error_level(err, sb->log_error), thr->log, - "writev(%d, %ui) failed %E", sb->socket, niob, err); - return NXT_ERROR; - } - } -} - - -ssize_t -nxt_send(nxt_thread_t *thr, nxt_sendbuf_t *sb, void *buf, size_t size) -{ - ssize_t n; - nxt_err_t err; - - for ( ;; ) { - n = send(sb->socket, buf, size, 0); - - err = (n == -1) ? nxt_socket_errno : 0; - - nxt_log_debug(thr->log, "send(%d, %p, %uz): %z", - sb->socket, buf, size, n); - - if (n > 0) { - return n; - } - - /* n == -1 */ - - switch (err) { - - case NXT_EAGAIN: - nxt_log_debug(thr->log, "send() %E", err); - sb->write_ready = 0; - return NXT_AGAIN; - - case NXT_EINTR: - nxt_log_debug(thr->log, "send() %E", err); - continue; - - default: - sb->error = err; - nxt_log_error(nxt_socket_error_level(err, sb->log_error), thr->log, - "send(%d, %p, %uz) failed %E", - sb->socket, buf, size, err); - return NXT_ERROR; - } - } -} - -#endif diff --git a/src/nxt_main.h b/src/nxt_main.h index 22f39a59..b692dc92 100644 --- a/src/nxt_main.h +++ b/src/nxt_main.h @@ -16,6 +16,7 @@ #include <nxt_time.h> #include <nxt_process.h> +typedef struct nxt_task_s nxt_task_t; typedef struct nxt_thread_s nxt_thread_t; #include <nxt_thread_id.h> diff --git a/src/nxt_master_process.c b/src/nxt_master_process.c index 4fd7e164..ceb44e1f 100644 --- a/src/nxt_master_process.c +++ b/src/nxt_master_process.c @@ -81,7 +81,7 @@ nxt_master_process_port_create(nxt_task_t *task, nxt_cycle_t *cycle) proc->pid = nxt_pid; proc->engine = 0; - proc->port = nxt_port_create(0); + proc->port = nxt_port_create(task, 0); if (nxt_slow_path(proc->port == NULL)) { return NXT_ERROR; } @@ -156,7 +156,7 @@ nxt_master_create_worker_process(nxt_task_t *task, nxt_cycle_t *cycle) proc->engine = 0; proc->generation = cycle->process_generation; - proc->port = nxt_port_create(0); + proc->port = nxt_port_create(task, 0); if (nxt_slow_path(proc->port == NULL)) { return NXT_ERROR; } diff --git a/src/nxt_openssl.c b/src/nxt_openssl.c index fbf08b24..17e65a56 100644 --- a/src/nxt_openssl.c +++ b/src/nxt_openssl.c @@ -59,7 +59,7 @@ static nxt_event_conn_io_t nxt_openssl_event_conn_io = { NULL, NULL, - nxt_event_conn_io_write, + nxt_conn_io_write, nxt_openssl_conn_io_write_chunk, NULL, NULL, @@ -309,9 +309,8 @@ nxt_openssl_conn_init(nxt_task_t *task, nxt_ssltls_conf_t *conf, fail: - nxt_event_conn_io_handle(task->thread, c->read_work_queue, - c->read_state->error_handler, - task, c, c->socket.data); + nxt_work_queue_add(c->read_work_queue, c->read_state->error_handler, + task, c, c->socket.data); } @@ -367,9 +366,8 @@ nxt_openssl_conn_handshake(nxt_task_t *task, void *obj, void *data) nxt_openssl_conn_error(c, err, "SSL_do_handshake(%d) failed", c->socket.fd); - nxt_event_conn_io_handle(task->thread, c->read_work_queue, - c->read_state->error_handler, - task, c, data); + nxt_work_queue_add(c->read_work_queue, c->read_state->error_handler, + task, c, data); } else if (ssltls->ssl_error == SSL_ERROR_WANT_READ && ssltls->times < 2) { ssltls->times++; @@ -429,8 +427,7 @@ nxt_openssl_conn_io_read(nxt_task_t *task, void *obj, void *data) } } - nxt_event_conn_io_handle(task->thread, c->read_work_queue, handler, - task, c, data); + nxt_work_queue_add(c->read_work_queue, handler, task, c, data); } @@ -475,7 +472,7 @@ nxt_openssl_conn_io_send(nxt_event_conn_t *c, void *buf, size_t size) } n = nxt_openssl_conn_test_error(c->socket.task, c, ret, err, - nxt_event_conn_io_write); + nxt_conn_io_write); if (n == NXT_ERROR) { nxt_openssl_conn_error(c, err, "SSL_write(%d, %p, %uz) failed", @@ -584,8 +581,7 @@ nxt_openssl_conn_io_shutdown(nxt_task_t *task, void *obj, void *data) done: - nxt_event_conn_io_handle(task->thread, c->write_work_queue, handler, - task, c, data); + nxt_work_queue_add(c->write_work_queue, handler, task, c, data); } @@ -656,8 +652,7 @@ nxt_openssl_conn_test_error(nxt_task_t *task, nxt_event_conn_t *c, int ret, handler = c->write_state->close_handler; } - nxt_event_conn_io_handle(task->thread, wq, handler, - task, c, c->socket.data); + nxt_work_queue_add(wq, handler, task, c, c->socket.data); return 0; @@ -707,7 +702,7 @@ nxt_openssl_log_error_level(nxt_event_conn_t *c, nxt_err_t err) switch (ERR_GET_REASON(ERR_peek_error())) { case 0: - return nxt_socket_error_level(err, c->socket.log_error); + return nxt_socket_error_level(err); case SSL_R_BAD_CHANGE_CIPHER_SPEC: /* 103 */ case SSL_R_BLOCK_CIPHER_PAD_IS_WRONG: /* 129 */ diff --git a/src/nxt_port.c b/src/nxt_port.c index c00fcb61..7da2bcc4 100644 --- a/src/nxt_port.c +++ b/src/nxt_port.c @@ -153,7 +153,7 @@ nxt_process_port_new_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg) return; } - port = nxt_port_alloc(); + port = nxt_port_alloc(task); if (nxt_slow_path(port == NULL)) { return; } diff --git a/src/nxt_port_socket.c b/src/nxt_port_socket.c index 5b0c2d43..a3bc7f26 100644 --- a/src/nxt_port_socket.c +++ b/src/nxt_port_socket.c @@ -17,7 +17,7 @@ static void nxt_port_error_handler(nxt_task_t *task, void *obj, void *data); nxt_port_t * -nxt_port_alloc(void) +nxt_port_alloc(nxt_task_t *task) { nxt_port_t *port; nxt_mem_pool_t *mp; @@ -29,6 +29,8 @@ nxt_port_alloc(void) port = nxt_mem_zalloc(mp, sizeof(nxt_port_t)); port->mem_pool = mp; + port->socket.task = task; + port->pair[0] = -1; port->pair[1] = -1; @@ -42,31 +44,31 @@ nxt_port_alloc(void) nxt_port_t * -nxt_port_create(size_t max_size) +nxt_port_create(nxt_task_t *task, size_t max_size) { nxt_int_t sndbuf, rcvbuf, size; nxt_port_t *port; nxt_socket_t snd, rcv; - port = nxt_port_alloc(); + port = nxt_port_alloc(task); if (nxt_slow_path(port == NULL)) { return NULL; } - if (nxt_slow_path(nxt_socketpair_create(port->pair) != NXT_OK)) { + if (nxt_slow_path(nxt_socketpair_create(task, port->pair) != NXT_OK)) { goto socketpair_fail; } snd = port->pair[1]; - sndbuf = nxt_socket_getsockopt(snd, SOL_SOCKET, SO_SNDBUF); + sndbuf = nxt_socket_getsockopt(task, snd, SOL_SOCKET, SO_SNDBUF); if (nxt_slow_path(sndbuf < 0)) { goto getsockopt_fail; } rcv = port->pair[0]; - rcvbuf = nxt_socket_getsockopt(rcv, SOL_SOCKET, SO_RCVBUF); + rcvbuf = nxt_socket_getsockopt(task, rcv, SOL_SOCKET, SO_RCVBUF); if (nxt_slow_path(rcvbuf < 0)) { goto getsockopt_fail; } @@ -83,9 +85,10 @@ nxt_port_create(size_t max_size) * on send direction and 4K buffer size on receive direction; * Solaris uses 16K on send direction and 5K on receive direction. */ - (void) nxt_socket_setsockopt(snd, SOL_SOCKET, SO_SNDBUF, max_size); + (void) nxt_socket_setsockopt(task, snd, SOL_SOCKET, SO_SNDBUF, + max_size); - sndbuf = nxt_socket_getsockopt(snd, SOL_SOCKET, SO_SNDBUF); + sndbuf = nxt_socket_getsockopt(task, snd, SOL_SOCKET, SO_SNDBUF); if (nxt_slow_path(sndbuf < 0)) { goto getsockopt_fail; } @@ -93,9 +96,10 @@ nxt_port_create(size_t max_size) size = sndbuf * 4; if (rcvbuf < size) { - (void) nxt_socket_setsockopt(rcv, SOL_SOCKET, SO_RCVBUF, size); + (void) nxt_socket_setsockopt(task, rcv, SOL_SOCKET, SO_RCVBUF, + size); - rcvbuf = nxt_socket_getsockopt(rcv, SOL_SOCKET, SO_RCVBUF); + rcvbuf = nxt_socket_getsockopt(task, rcv, SOL_SOCKET, SO_RCVBUF); if (nxt_slow_path(rcvbuf < 0)) { goto getsockopt_fail; } @@ -109,8 +113,8 @@ nxt_port_create(size_t max_size) getsockopt_fail: - nxt_socket_close(port->pair[0]); - nxt_socket_close(port->pair[1]); + nxt_socket_close(task, port->pair[0]); + nxt_socket_close(task, port->pair[1]); socketpair_fail: @@ -123,7 +127,7 @@ socketpair_fail: void nxt_port_destroy(nxt_port_t *port) { - nxt_socket_close(port->socket.fd); + nxt_socket_close(port->socket.task, port->socket.fd); nxt_mem_pool_destroy(port->mem_pool); } @@ -135,12 +139,6 @@ nxt_port_write_enable(nxt_task_t *task, nxt_port_t *port) port->socket.log = &nxt_main_log; port->socket.write_ready = 1; - port->task.thread = task->thread; - port->task.log = port->socket.log; - port->task.ident = nxt_task_next_ident(); - - port->socket.task = &port->task; - port->socket.write_work_queue = &task->thread->engine->fast_work_queue; port->socket.write_handler = nxt_port_write_handler; port->socket.error_handler = nxt_port_error_handler; @@ -150,7 +148,7 @@ nxt_port_write_enable(nxt_task_t *task, nxt_port_t *port) void nxt_port_write_close(nxt_port_t *port) { - nxt_socket_close(port->pair[1]); + nxt_socket_close(port->socket.task, port->pair[1]); port->pair[1] = -1; } @@ -301,12 +299,6 @@ nxt_port_read_enable(nxt_task_t *task, nxt_port_t *port) port->socket.fd = port->pair[0]; port->socket.log = &nxt_main_log; - port->task.thread = task->thread; - port->task.log = port->socket.log; - port->task.ident = nxt_task_next_ident(); - - port->socket.task = &port->task; - port->socket.read_work_queue = &task->thread->engine->fast_work_queue; port->socket.read_handler = nxt_port_read_handler; port->socket.error_handler = nxt_port_error_handler; @@ -318,7 +310,7 @@ nxt_port_read_enable(nxt_task_t *task, nxt_port_t *port) void nxt_port_read_close(nxt_port_t *port) { - nxt_socket_close(port->pair[0]); + nxt_socket_close(port->socket.task, port->pair[0]); port->pair[0] = -1; } diff --git a/src/nxt_port_socket.h b/src/nxt_port_socket.h index e3202a28..4740368d 100644 --- a/src/nxt_port_socket.h +++ b/src/nxt_port_socket.h @@ -33,8 +33,6 @@ typedef struct { /* Must be the first field. */ nxt_fd_event_t socket; - nxt_task_t task; - nxt_queue_t messages; /* of nxt_port_send_msg_t */ /* Maximum size of message part. */ @@ -61,8 +59,8 @@ struct nxt_port_recv_msg_s { }; -NXT_EXPORT nxt_port_t *nxt_port_alloc(void); -NXT_EXPORT nxt_port_t *nxt_port_create(size_t bufsize); +NXT_EXPORT nxt_port_t *nxt_port_alloc(nxt_task_t *task); +NXT_EXPORT nxt_port_t *nxt_port_create(nxt_task_t *task, size_t bufsize); NXT_EXPORT void nxt_port_destroy(nxt_port_t *port); NXT_EXPORT void nxt_port_write_enable(nxt_task_t *task, nxt_port_t *port); NXT_EXPORT void nxt_port_write_close(nxt_port_t *port); diff --git a/src/nxt_sendbuf.c b/src/nxt_sendbuf.c index 309d3147..21a25e68 100644 --- a/src/nxt_sendbuf.c +++ b/src/nxt_sendbuf.c @@ -12,6 +12,77 @@ static nxt_bool_t nxt_sendbuf_copy(nxt_buf_mem_t *bm, nxt_buf_t *b, nxt_uint_t +nxt_sendbuf_mem_coalesce0(nxt_task_t *task, nxt_sendbuf_t *sb, + struct iovec *iov, nxt_uint_t niov_max) +{ + u_char *last; + size_t size, total; + nxt_buf_t *b; + nxt_uint_t n; + + total = sb->size; + last = NULL; + n = (nxt_uint_t) -1; + + for (b = sb->buf; b != NULL && total < sb->limit; b = b->next) { + + nxt_prefetch(b->next); + + if (nxt_buf_is_file(b)) { + break; + } + + if (nxt_buf_is_mem(b)) { + + size = b->mem.free - b->mem.pos; + + if (size != 0) { + + if (total + size > sb->limit) { + size = sb->limit - total; + + if (size == 0) { + break; + } + } + + if (b->mem.pos != last) { + + if (++n >= niov_max) { + goto done; + } + + iov[n].iov_base = b->mem.pos; + iov[n].iov_len = size; + + } else { + iov[n].iov_len += size; + } + + nxt_debug(task, "sendbuf: %ui, %p, %uz", + n, iov[n].iov_base, iov[n].iov_len); + + total += size; + last = b->mem.pos + size; + } + + } else { + sb->sync = 1; + sb->last |= nxt_buf_is_last(b); + } + } + + n++; + +done: + + sb->buf = b; + + return n; +} + + +nxt_uint_t nxt_sendbuf_mem_coalesce(nxt_task_t *task, nxt_sendbuf_coalesce_t *sb) { u_char *last; diff --git a/src/nxt_sendbuf.h b/src/nxt_sendbuf.h index ca314e7b..71b18f91 100644 --- a/src/nxt_sendbuf.h +++ b/src/nxt_sendbuf.h @@ -36,6 +36,21 @@ typedef struct { + nxt_buf_t *buf; + nxt_socket_t socket; + nxt_err_t error; + nxt_off_t sent; + size_t size; + size_t limit; + + uint8_t ready; /* 1 bit */ + uint8_t once; /* 1 bit */ + uint8_t sync; /* 1 bit */ + uint8_t last; /* 1 bit */ +} nxt_sendbuf_t; + + +typedef struct { nxt_buf_t *buf; nxt_iobuf_t *iobuf; @@ -88,6 +103,8 @@ ssize_t nxt_event_conn_io_sendbuf(nxt_event_conn_t *c, nxt_buf_t *b, size_t limit); +nxt_uint_t nxt_sendbuf_mem_coalesce0(nxt_task_t *task, nxt_sendbuf_t *sb, + struct iovec *iov, nxt_uint_t niov_max); nxt_uint_t nxt_sendbuf_mem_coalesce(nxt_task_t *task, nxt_sendbuf_coalesce_t *sb); size_t nxt_sendbuf_file_coalesce(nxt_sendbuf_coalesce_t *sb); diff --git a/src/nxt_sockaddr.c b/src/nxt_sockaddr.c index 2bb85730..742cb0ba 100644 --- a/src/nxt_sockaddr.c +++ b/src/nxt_sockaddr.c @@ -17,10 +17,13 @@ static nxt_int_t nxt_job_sockaddr_inet_parse(nxt_job_sockaddr_parse_t *jbs); nxt_sockaddr_t * -nxt_sockaddr_alloc(nxt_mem_pool_t *mp, socklen_t length) +nxt_sockaddr_alloc(nxt_mem_pool_t *mp, socklen_t socklen, size_t address_length) { + size_t size; nxt_sockaddr_t *sa; + size = offsetof(nxt_sockaddr_t, u) + socklen + address_length; + /* * The current struct sockaddr's define 32-bit fields at maximum * and may define 64-bit AF_INET6 fields in the future. Alignment @@ -28,10 +31,13 @@ nxt_sockaddr_alloc(nxt_mem_pool_t *mp, socklen_t length) * If 128-bit alignment will be required then nxt_mem_malloc() and * nxt_memzero() should be used instead. */ - sa = nxt_mem_zalloc(mp, offsetof(nxt_sockaddr_t, u) + length); + + sa = nxt_mem_zalloc(mp, size); if (nxt_fast_path(sa != NULL)) { - nxt_socklen_set(sa, length); + sa->socklen = socklen; + sa->length = address_length; + sa->sockaddr_size = size; } return sa; @@ -40,7 +46,7 @@ nxt_sockaddr_alloc(nxt_mem_pool_t *mp, socklen_t length) nxt_sockaddr_t * nxt_sockaddr_create(nxt_mem_pool_t *mp, struct sockaddr *sockaddr, - socklen_t length) + socklen_t length, size_t address_length) { size_t size, copy; nxt_sockaddr_t *sa; @@ -95,19 +101,11 @@ nxt_sockaddr_create(nxt_mem_pool_t *mp, struct sockaddr *sockaddr, #endif /* NXT_HAVE_UNIX_DOMAIN */ - sa = nxt_sockaddr_alloc(mp, size); + sa = nxt_sockaddr_alloc(mp, size, address_length); if (nxt_fast_path(sa != NULL)) { - nxt_memcpy(&sa->u.sockaddr, sockaddr, copy); -#if (NXT_SOCKADDR_SA_LEN) - - /* Update shortcut sockaddr length overwritten by nxt_memcpy(). */ - nxt_socklen_set(sa, size); - -#endif - #if (NXT_HAVE_UNIX_DOMAIN && NXT_OPENBSD) if (length == 0) { @@ -127,7 +125,7 @@ nxt_sockaddr_copy(nxt_mem_pool_t *mp, nxt_sockaddr_t *src) size_t length; nxt_sockaddr_t *dst; - length = offsetof(nxt_sockaddr_t, u) + nxt_socklen(src); + length = offsetof(nxt_sockaddr_t, u) + src->socklen; dst = nxt_mem_alloc(mp, length); @@ -140,9 +138,10 @@ nxt_sockaddr_copy(nxt_mem_pool_t *mp, nxt_sockaddr_t *src) nxt_sockaddr_t * -nxt_getsockname(nxt_mem_pool_t *mp, nxt_socket_t s) +nxt_getsockname(nxt_task_t *task, nxt_mem_pool_t *mp, nxt_socket_t s) { int ret; + size_t length; socklen_t socklen; nxt_sockaddr_buf_t sockaddr; @@ -151,44 +150,148 @@ nxt_getsockname(nxt_mem_pool_t *mp, nxt_socket_t s) ret = getsockname(s, &sockaddr.buf, &socklen); if (nxt_fast_path(ret == 0)) { - return nxt_sockaddr_create(mp, &sockaddr.buf, socklen); + + switch (sockaddr.buf.sa_family) { +#if (NXT_INET6) + case AF_INET6: + length = NXT_INET6_ADDR_STR_LEN; + break; +#endif + +#if (NXT_HAVE_UNIX_DOMAIN) + case AF_UNIX: + length = sizeof("unix:") - 1 + socklen; +#endif + break; + + case AF_INET: + length = NXT_INET_ADDR_STR_LEN; + break; + + default: + length = 0; + break; + } + + return nxt_sockaddr_create(mp, &sockaddr.buf, socklen, length); } - nxt_thread_log_error(NXT_LOG_ERR, "getsockname(%d) failed %E", - s, nxt_errno); + nxt_log(task, NXT_LOG_ERR, "getsockname(%d) failed %E", s, nxt_errno); return NULL; } -nxt_int_t -nxt_sockaddr_text(nxt_mem_pool_t *mp, nxt_sockaddr_t *sa, nxt_bool_t port) +void +nxt_sockaddr_text(nxt_sockaddr_t *sa) { - size_t length; - u_char *p; - u_char buf[NXT_SOCKADDR_STR_LEN + NXT_SOCKPORT_STR_LEN]; + size_t offset; + u_char *p, *start, *end, *octet; + uint32_t port; - length = NXT_SOCKADDR_STR_LEN + NXT_SOCKPORT_STR_LEN; + end = (u_char *) sa + sa->sockaddr_size; - length = nxt_sockaddr_ntop(sa, buf, buf + length, port); + switch (sa->u.sockaddr.sa_family) { - p = nxt_mem_alloc(mp, length); + case AF_INET: + offset = offsetof(nxt_sockaddr_t, u) + sizeof(struct sockaddr_in); - if (nxt_fast_path(p != NULL)) { + sa->start = offset; + sa->address_start = offset; - sa->text = p; - sa->text_len = length; - nxt_memcpy(p, buf, length); + start = (u_char *) sa + offset; + octet = (u_char *) &sa->u.sockaddr_in.sin_addr; - return NXT_OK; + p = nxt_sprintf(start, end, "%ud.%ud.%ud.%ud", + octet[0], octet[1], octet[2], octet[3]); + + sa->address_length = p - start; + sa->port_start = sa->address_length + 1; + + port = sa->u.sockaddr_in.sin_port; + + break; + +#if (NXT_INET6) + + case AF_INET6: + offset = offsetof(nxt_sockaddr_t, u) + sizeof(struct sockaddr_in6); + + sa->start = offset; + sa->address_start = offset; + + start = (u_char *) sa + offset; + p = start; + + *p++ = '['; + + p = nxt_inet6_ntop(sa->u.sockaddr_in6.sin6_addr.s6_addr, p, end); + + sa->address_length = p - (start + 1); + sa->port_start = sa->address_length + 2; + + *p++ = ']'; + + port = sa->u.sockaddr_in6.sin6_port; + + break; + +#endif + +#if (NXT_HAVE_UNIX_DOMAIN) + + case AF_UNIX: + + offset = offsetof(nxt_sockaddr_t, u) + sizeof(struct sockaddr_un); + + sa->start = offset; + sa->address_start = offset; + + start = (u_char *) sa + offset; + p = start; + +#if (NXT_LINUX) + + p = (u_char *) sa->u.sockaddr_un.sun_path; + + if (p[0] == '\0') { + int length; + + /* Linux abstract socket address has no trailing zero. */ + length = sa->socklen - offsetof(struct sockaddr_un, sun_path); + + p = nxt_sprintf(start, end, "unix:@%*s", length - 1, p + 1); + + } else { + p = nxt_sprintf(start, end, "unix:%s", p); + } + +#else /* !(NXT_LINUX) */ + + p = nxt_sprintf(start, end, "unix:%s", p); + +#endif + + sa->address_length = p - start; + sa->port_start = sa->address_length; + sa->length = p - start; + + return; + +#endif /* NXT_HAVE_UNIX_DOMAIN */ + + default: + return; } - return NXT_ERROR; + p = nxt_sprintf(p, end, ":%d", ntohs(port)); + + sa->length = p - start; } uint32_t -nxt_sockaddr_port(nxt_sockaddr_t *sa) +nxt_sockaddr_port_number(nxt_sockaddr_t *sa) { uint32_t port; @@ -221,7 +324,7 @@ nxt_sockaddr_port(nxt_sockaddr_t *sa) nxt_bool_t nxt_sockaddr_cmp(nxt_sockaddr_t *sa1, nxt_sockaddr_t *sa2) { - if (nxt_socklen(sa1) != nxt_socklen(sa2)) { + if (sa1->socklen != sa2->socklen) { return 0; } @@ -264,7 +367,7 @@ nxt_sockaddr_cmp(nxt_sockaddr_t *sa1, nxt_sockaddr_t *sa2) { size_t length; - length = nxt_socklen(sa1) - offsetof(struct sockaddr_un, sun_path); + length = sa1->socklen - offsetof(struct sockaddr_un, sun_path); if (nxt_memcmp(&sa1->u.sockaddr_un.sun_path, &sa2->u.sockaddr_un.sun_path, length) @@ -347,8 +450,7 @@ nxt_sockaddr_ntop(nxt_sockaddr_t *sa, u_char *buf, u_char *end, nxt_bool_t port) /* Linux abstract socket address has no trailing zero. */ - length = nxt_socklen(sa) - - offsetof(struct sockaddr_un, sun_path) - 1; + length = sa->socklen - offsetof(struct sockaddr_un, sun_path) - 1; p = nxt_sprintf(buf, end, "unix:\\0%*s", length, p + 1); } else { @@ -556,7 +658,7 @@ nxt_job_sockaddr_unix_parse(nxt_job_sockaddr_parse_t *jbs) jbs->resolve.sockaddrs = nxt_mem_alloc(mp, sizeof(void *)); if (nxt_fast_path(jbs->resolve.sockaddrs != NULL)) { - sa = nxt_sockaddr_alloc(mp, socklen); + sa = nxt_sockaddr_alloc(mp, socklen, jbs->addr.length); if (nxt_fast_path(sa != NULL)) { jbs->resolve.count = 1; @@ -763,7 +865,8 @@ nxt_job_sockaddr_inet_parse(nxt_job_sockaddr_parse_t *jbs) return NXT_ERROR; } - sa = nxt_sockaddr_alloc(mp, sizeof(struct sockaddr_in)); + sa = nxt_sockaddr_alloc(mp, sizeof(struct sockaddr_in), + NXT_INET_ADDR_STR_LEN); if (nxt_fast_path(sa != NULL)) { jbs->resolve.count = 1; diff --git a/src/nxt_sockaddr.h b/src/nxt_sockaddr.h index 382e8fcb..c5f8ec71 100644 --- a/src/nxt_sockaddr.h +++ b/src/nxt_sockaddr.h @@ -14,25 +14,38 @@ * nxt_sockaddr_alloc(pool, sizeof(struct sockaddr_in)) */ +/* + * A textual sockaddr representation is stored after struct sockaddr union + * and allocated as a whole. + */ + struct nxt_sockaddr_s { + /* Socket type: SOCKS_STREAM, SOCK_DGRAM, etc. */ + uint8_t type; + /* Size of struct sockaddr. */ + uint8_t socklen; /* - * A sockaddr textual representation is optional and may be in two forms: - * with port or without port. If a nxt_sockaddr_t is intended to listen(), - * bind() or connect() then the textual representation must be present and - * must include the port. nxt_event_conn_accept() creates a textual - * representation without the port. + * Textual sockaddr representation, e.g.: "127.0.0.1:8000", + * "[::1]:8000", and "unix:/path/to/socket". */ - u_char *text; - + uint8_t start; + uint8_t length; /* - * text_len, socket type and socklen are stored - * together on 64-bit platforms without sockaddr.sa_len. + * Textual address representation, e.g: "127.0.0.1", "::1", + * and "unix:/path/to/socket". */ - uint16_t text_len; - uint16_t type; -#if !(NXT_SOCKADDR_SA_LEN) - socklen_t _socklen; -#endif + uint8_t address_start; + uint8_t address_length; + /* + * Textual port representation, e.g. "8000". + * Port length is length - port_start. + */ + uint8_t port_start; + /* + * Size of the whole structure: struct sockaddr union and maximal textual + * representation, used to place sockaddr into appropriate free list. + */ + uint8_t sockaddr_size; union { struct sockaddr sockaddr; @@ -56,51 +69,26 @@ typedef struct { } nxt_job_sockaddr_parse_t; -NXT_EXPORT nxt_sockaddr_t *nxt_sockaddr_alloc(nxt_mem_pool_t *mp, socklen_t len) +NXT_EXPORT nxt_sockaddr_t *nxt_sockaddr_alloc(nxt_mem_pool_t *mp, + socklen_t socklen, size_t address_length) NXT_MALLOC_LIKE; NXT_EXPORT nxt_sockaddr_t *nxt_sockaddr_create(nxt_mem_pool_t *mp, - struct sockaddr *sockaddr, socklen_t len) + struct sockaddr *sockaddr, socklen_t socklen, size_t address_length) NXT_MALLOC_LIKE; NXT_EXPORT nxt_sockaddr_t *nxt_sockaddr_copy(nxt_mem_pool_t *mp, nxt_sockaddr_t *src) NXT_MALLOC_LIKE; -NXT_EXPORT nxt_sockaddr_t *nxt_getsockname(nxt_mem_pool_t *mp, nxt_socket_t s) +NXT_EXPORT nxt_sockaddr_t *nxt_getsockname(nxt_task_t *task, + nxt_mem_pool_t *mp, nxt_socket_t s) NXT_MALLOC_LIKE; -NXT_EXPORT nxt_int_t nxt_sockaddr_text(nxt_mem_pool_t *mp, nxt_sockaddr_t *sa, - nxt_bool_t port); - - -#if (NXT_SOCKADDR_SA_LEN) +NXT_EXPORT void nxt_sockaddr_text(nxt_sockaddr_t *sa); -#define \ -nxt_socklen_set(sa, len) \ - (sa)->u.sockaddr.sa_len = (socklen_t) (len) - -#define \ -nxt_socklen(sa) \ - ((sa)->u.sockaddr.sa_len) - -#else - -#define \ -nxt_socklen_set(sa, len) \ - (sa)->_socklen = (socklen_t) (len) - - -#define \ -nxt_socklen(sa) \ - ((sa)->_socklen) - -#endif - - -NXT_EXPORT uint32_t nxt_sockaddr_port(nxt_sockaddr_t *sa); +NXT_EXPORT uint32_t nxt_sockaddr_port_number(nxt_sockaddr_t *sa); NXT_EXPORT nxt_bool_t nxt_sockaddr_cmp(nxt_sockaddr_t *sa1, nxt_sockaddr_t *sa2); NXT_EXPORT size_t nxt_sockaddr_ntop(nxt_sockaddr_t *sa, u_char *buf, - u_char *end, - nxt_bool_t port); + u_char *end, nxt_bool_t port); NXT_EXPORT void nxt_job_sockaddr_parse(nxt_job_sockaddr_parse_t *jbs); NXT_EXPORT in_addr_t nxt_inet_addr(u_char *buf, size_t len); #if (NXT_INET6) @@ -108,60 +96,17 @@ NXT_EXPORT nxt_int_t nxt_inet6_addr(struct in6_addr *in6_addr, u_char *buf, size_t len); #endif -#if (NXT_HAVE_UNIX_DOMAIN) -#define nxt_unix_addr_path_len(sa) \ - (nxt_socklen(sa) - offsetof(struct sockaddr_un, sun_path)) -#endif - -#define NXT_INET_ADDR_STR_LEN (sizeof("255.255.255.255") - 1) +#define NXT_INET_ADDR_STR_LEN (sizeof("255.255.255.255:65535") - 1) #define NXT_INET6_ADDR_STR_LEN \ - (sizeof("ffff:ffff:ffff:ffff:ffff:ffff:255.255.255.255") - 1) - -#define NXT_UNIX_ADDR_STR_LEN \ - ((sizeof("unix:") - 1) \ - + (sizeof(struct sockaddr_un) - offsetof(struct sockaddr_un, sun_path))) - - -#if (NXT_HAVE_UNIX_DOMAIN) -#define NXT_SOCKADDR_STR_LEN NXT_UNIX_ADDR_STR_LEN + (sizeof("[ffff:ffff:ffff:ffff:ffff:ffff:ffff:ffff]:65535") - 1) -#elif (NXT_INET6) -#define NXT_SOCKADDR_STR_LEN NXT_INET6_ADDR_STR_LEN - -#else -#define NXT_SOCKADDR_STR_LEN NXT_INET_ADDR_STR_LEN -#endif - - -#if (NXT_INET6) -#define NXT_SOCKPORT_STR_LEN (sizeof("[]:65535") - 1) - -#else -#define NXT_SOCKPORT_STR_LEN (sizeof(":65535") - 1) -#endif - - -nxt_inline size_t -nxt_sockaddr_text_len(nxt_sockaddr_t *sa) -{ - switch (sa->u.sockaddr.sa_family) { - -#if (NXT_INET6) - case AF_INET6: - return NXT_INET6_ADDR_STR_LEN; -#endif - -#if (NXT_HAVE_UNIX_DOMAIN) - case AF_UNIX: - return NXT_UNIX_ADDR_STR_LEN; -#endif - default: - return NXT_INET_ADDR_STR_LEN; - } -} +#define nxt_sockaddr_start(sa) ((u_char *) (sa) + (sa)->start) +#define nxt_sockaddr_address(sa) ((u_char *) (sa) + (sa)->address_start) +#define nxt_sockaddr_port(sa) ((u_char *) (sa) + (sa)->port_start) +#define nxt_sockaddr_length(sa) ((sa)->length - (sa)->port_start) #endif /* _NXT_SOCKADDR_H_INCLUDED_ */ diff --git a/src/nxt_socket.c b/src/nxt_socket.c index 19ba21c0..2663b855 100644 --- a/src/nxt_socket.c +++ b/src/nxt_socket.c @@ -12,8 +12,8 @@ static const char *nxt_socket_sockopt_name(nxt_uint_t level, nxt_socket_t -nxt_socket_create(nxt_uint_t domain, nxt_uint_t type, nxt_uint_t protocol, - nxt_uint_t flags) +nxt_socket_create(nxt_task_t *task, nxt_uint_t domain, nxt_uint_t type, + nxt_uint_t protocol, nxt_uint_t flags) { nxt_socket_t s; @@ -28,18 +28,18 @@ nxt_socket_create(nxt_uint_t domain, nxt_uint_t type, nxt_uint_t protocol, s = socket(domain, type, protocol); if (nxt_slow_path(s == -1)) { - nxt_thread_log_alert("socket(%ui, 0x%uXi, %ui) failed %E", - domain, type, protocol, nxt_socket_errno); + nxt_log(task, NXT_LOG_CRIT, "socket(%ui, 0x%uXi, %ui) failed %E", + domain, type, protocol, nxt_socket_errno); return s; } - nxt_thread_log_debug("socket(): %d", s); + nxt_debug(task, "socket(): %d", s); #if !(NXT_HAVE_SOCK_NONBLOCK) if (flags & NXT_NONBLOCK) { - if (nxt_slow_path(nxt_socket_nonblocking(s) != NXT_OK)) { - nxt_socket_close(s); + if (nxt_slow_path(nxt_socket_nonblocking(task, s) != NXT_OK)) { + nxt_socket_close(task, s); return -1; } } @@ -51,19 +51,21 @@ nxt_socket_create(nxt_uint_t domain, nxt_uint_t type, nxt_uint_t protocol, void -nxt_socket_close(nxt_socket_t s) +nxt_socket_close(nxt_task_t *task, nxt_socket_t s) { if (nxt_fast_path(close(s) == 0)) { - nxt_thread_log_debug("socket close(%d)", s); + nxt_debug(task, "socket close(%d)", s); } else { - nxt_thread_log_alert("socket close(%d) failed %E", s, nxt_socket_errno); + nxt_log(task, NXT_LOG_CRIT, "socket close(%d) failed %E", + s, nxt_socket_errno); } } nxt_int_t -nxt_socket_getsockopt(nxt_socket_t s, nxt_uint_t level, nxt_uint_t sockopt) +nxt_socket_getsockopt(nxt_task_t *task, nxt_socket_t s, nxt_uint_t level, + nxt_uint_t sockopt) { int val; socklen_t len; @@ -71,38 +73,36 @@ nxt_socket_getsockopt(nxt_socket_t s, nxt_uint_t level, nxt_uint_t sockopt) len = sizeof(val); if (nxt_fast_path(getsockopt(s, level, sockopt, &val, &len) == 0)) { - nxt_thread_log_debug("getsockopt(%d, %ui, %s): %d", - s, level, - nxt_socket_sockopt_name(level, sockopt), val); + nxt_debug(task, "getsockopt(%d, %ui, %s): %d", + s, level, nxt_socket_sockopt_name(level, sockopt), val); return val; } - nxt_thread_log_error(NXT_LOG_CRIT, "getsockopt(%d, %ui, %s) failed %E", - s, level, nxt_socket_sockopt_name(level, sockopt), - val, nxt_socket_errno); + nxt_log(task, NXT_LOG_CRIT, "getsockopt(%d, %ui, %s) failed %E", + s, level, nxt_socket_sockopt_name(level, sockopt), + val, nxt_socket_errno); return -1; } nxt_int_t -nxt_socket_setsockopt(nxt_socket_t s, nxt_uint_t level, nxt_uint_t sockopt, - int val) +nxt_socket_setsockopt(nxt_task_t *task, nxt_socket_t s, nxt_uint_t level, + nxt_uint_t sockopt, int val) { socklen_t len; len = sizeof(val); if (nxt_fast_path(setsockopt(s, level, sockopt, &val, len) == 0)) { - nxt_thread_log_debug("setsockopt(%d, %ui, %s): %d", - s, level, - nxt_socket_sockopt_name(level, sockopt), val); + nxt_debug(task, "setsockopt(%d, %ui, %s): %d", + s, level, nxt_socket_sockopt_name(level, sockopt), val); return NXT_OK; } - nxt_thread_log_error(NXT_LOG_CRIT, "setsockopt(%d, %ui, %s, %d) failed %E", - s, level, nxt_socket_sockopt_name(level, sockopt), - val, nxt_socket_errno); + nxt_log(task, NXT_LOG_CRIT, "setsockopt(%d, %ui, %s, %d) failed %E", + s, level, nxt_socket_sockopt_name(level, sockopt), + val, nxt_socket_errno); return NXT_ERROR; } @@ -123,7 +123,7 @@ nxt_socket_sockopt_name(nxt_uint_t level, nxt_uint_t sockopt) return "SO_RCVBUF"; case SO_REUSEADDR: - return "SO_TYPE"; + return "SO_REUSEADDR"; case SO_TYPE: return "SO_TYPE"; @@ -164,13 +164,14 @@ nxt_socket_sockopt_name(nxt_uint_t level, nxt_uint_t sockopt) nxt_int_t -nxt_socket_bind(nxt_socket_t s, nxt_sockaddr_t *sa, nxt_bool_t test) +nxt_socket_bind(nxt_task_t *task, nxt_socket_t s, nxt_sockaddr_t *sa, + nxt_bool_t test) { nxt_err_t err; - nxt_thread_log_debug("bind(%d, %*s)", s, sa->text_len, sa->text); + nxt_debug(task, "bind(%d, %*s)", s, sa->length, nxt_sockaddr_start(sa)); - if (nxt_fast_path(bind(s, &sa->u.sockaddr, nxt_socklen(sa)) == 0)) { + if (nxt_fast_path(bind(s, &sa->u.sockaddr, sa->socklen) == 0)) { return NXT_OK; } @@ -180,23 +181,23 @@ nxt_socket_bind(nxt_socket_t s, nxt_sockaddr_t *sa, nxt_bool_t test) return NXT_DECLINED; } - nxt_thread_log_error(NXT_LOG_CRIT, "bind(%d, %*s) failed %E", - s, sa->text_len, sa->text, err); + nxt_log(task, NXT_LOG_CRIT, "bind(%d, %*s) failed %E", + s, sa->length, nxt_sockaddr_start(sa), err); return NXT_ERROR; } nxt_int_t -nxt_socket_connect(nxt_socket_t s, nxt_sockaddr_t *sa) +nxt_socket_connect(nxt_task_t *task, nxt_socket_t s, nxt_sockaddr_t *sa) { nxt_err_t err; nxt_int_t ret; nxt_uint_t level; - nxt_thread_log_debug("connect(%d, %*s)", s, sa->text_len, sa->text); + nxt_debug(task, "connect(%d, %*s)", s, sa->length, nxt_sockaddr_start(sa)); - if (connect(s, &sa->u.sockaddr, nxt_socklen(sa)) == 0) { + if (connect(s, &sa->u.sockaddr, sa->socklen) == 0) { return NXT_OK; } @@ -205,7 +206,8 @@ nxt_socket_connect(nxt_socket_t s, nxt_sockaddr_t *sa) switch (err) { case NXT_EINPROGRESS: - nxt_thread_log_debug("connect(%d) in progress", s); + nxt_debug(task, "connect(%d, %*s) in progress", + s, sa->length, nxt_sockaddr_start(sa)); return NXT_AGAIN; case NXT_ECONNREFUSED: @@ -234,21 +236,21 @@ nxt_socket_connect(nxt_socket_t s, nxt_sockaddr_t *sa) ret = NXT_ERROR; } - nxt_thread_log_error(level, "connect(%d, %*s) failed %E", - s, sa->text_len, sa->text, err); + nxt_log(task, level, "connect(%d, %*s) failed %E", + s, sa->length, nxt_sockaddr_start(sa), err); return ret; } void -nxt_socket_shutdown(nxt_socket_t s, nxt_uint_t how) +nxt_socket_shutdown(nxt_task_t *task, nxt_socket_t s, nxt_uint_t how) { nxt_err_t err; nxt_uint_t level; if (nxt_fast_path(shutdown(s, how) == 0)) { - nxt_thread_log_debug("shutdown(%d, %ui)", s, how); + nxt_debug(task, "shutdown(%d, %ui)", s, how); return; } @@ -272,46 +274,26 @@ nxt_socket_shutdown(nxt_socket_t s, nxt_uint_t how) level = NXT_LOG_CRIT; } - nxt_thread_log_error(level, "shutdown(%d, %ui) failed %E", s, how, err); + nxt_log(task, level, "shutdown(%d, %ui) failed %E", s, how, err); } nxt_uint_t -nxt_socket_error_level(nxt_err_t err, nxt_socket_error_level_t level) +nxt_socket_error_level(nxt_err_t err) { switch (err) { - case NXT_ECONNRESET: - - if ((level & NXT_SOCKET_ECONNRESET_IGNORE) != 0) { - return NXT_LOG_DEBUG; - } - - return NXT_LOG_ERR; - - case NXT_EINVAL: - - if ((level & NXT_SOCKET_EINVAL_IGNORE) != 0) { - return NXT_LOG_DEBUG; - } - - return NXT_LOG_ALERT; - case NXT_EPIPE: + case NXT_ECONNRESET: case NXT_ENOTCONN: case NXT_ETIMEDOUT: case NXT_ENETDOWN: case NXT_ENETUNREACH: case NXT_EHOSTDOWN: case NXT_EHOSTUNREACH: - - if ((level & NXT_SOCKET_ERROR_IGNORE) != 0) { - return NXT_LOG_INFO; - } - return NXT_LOG_ERR; default: - return NXT_LOG_ALERT; + return NXT_LOG_CRIT; } } diff --git a/src/nxt_socket.h b/src/nxt_socket.h index 5ebe5c95..ec9d058c 100644 --- a/src/nxt_socket.h +++ b/src/nxt_socket.h @@ -4,8 +4,8 @@ * Copyright (C) NGINX, Inc. */ -#ifndef _NXT_UNIX_SOCKET_H_INCLUDED_ -#define _NXT_UNIX_SOCKET_H_INCLUDED_ +#ifndef _NXT_SOCKET_H_INCLUDED_ +#define _NXT_SOCKET_H_INCLUDED_ typedef int nxt_socket_t; @@ -91,29 +91,24 @@ typedef union { #define NXT_MAXHOSTNAMELEN MAXHOSTNAMELEN -typedef enum { - NXT_SOCKET_ERROR_IGNORE = 0x1, - NXT_SOCKET_ECONNRESET_IGNORE = 0x2, - NXT_SOCKET_EINVAL_IGNORE = 0x4, -} nxt_socket_error_level_t; - - -NXT_EXPORT nxt_socket_t nxt_socket_create(nxt_uint_t family, nxt_uint_t type, - nxt_uint_t protocol, nxt_uint_t flags); -NXT_EXPORT void nxt_socket_close(nxt_socket_t s); -NXT_EXPORT nxt_int_t nxt_socket_getsockopt(nxt_socket_t s, nxt_uint_t level, - nxt_uint_t sockopt); -NXT_EXPORT nxt_int_t nxt_socket_setsockopt(nxt_socket_t s, nxt_uint_t level, - nxt_uint_t sockopt, int val); -NXT_EXPORT nxt_int_t nxt_socket_bind(nxt_socket_t s, nxt_sockaddr_t *sa, - nxt_bool_t test); -NXT_EXPORT nxt_int_t nxt_socket_connect(nxt_socket_t s, nxt_sockaddr_t *sa); -NXT_EXPORT void nxt_socket_shutdown(nxt_socket_t s, nxt_uint_t how); -nxt_uint_t nxt_socket_error_level(nxt_err_t err, - nxt_socket_error_level_t level); - -NXT_EXPORT nxt_int_t nxt_socketpair_create(nxt_socket_t *pair); -NXT_EXPORT void nxt_socketpair_close(nxt_socket_t *pair); +NXT_EXPORT nxt_socket_t nxt_socket_create(nxt_task_t *task, nxt_uint_t family, + nxt_uint_t type, nxt_uint_t protocol, nxt_uint_t flags); +NXT_EXPORT void nxt_socket_close(nxt_task_t *task, nxt_socket_t s); +NXT_EXPORT nxt_int_t nxt_socket_getsockopt(nxt_task_t *task, nxt_socket_t s, + nxt_uint_t level, nxt_uint_t sockopt); +NXT_EXPORT nxt_int_t nxt_socket_setsockopt(nxt_task_t *task, nxt_socket_t s, + nxt_uint_t level, nxt_uint_t sockopt, int val); +NXT_EXPORT nxt_int_t nxt_socket_bind(nxt_task_t *task, nxt_socket_t s, + nxt_sockaddr_t *sa, nxt_bool_t test); +NXT_EXPORT nxt_int_t nxt_socket_connect(nxt_task_t *task, nxt_socket_t s, + nxt_sockaddr_t *sa); +NXT_EXPORT void nxt_socket_shutdown(nxt_task_t *task, nxt_socket_t s, + nxt_uint_t how); +nxt_uint_t nxt_socket_error_level(nxt_err_t err); + +NXT_EXPORT nxt_int_t nxt_socketpair_create(nxt_task_t *task, + nxt_socket_t *pair); +NXT_EXPORT void nxt_socketpair_close(nxt_task_t *task, nxt_socket_t *pair); NXT_EXPORT ssize_t nxt_socketpair_send(nxt_fd_event_t *ev, nxt_fd_t fd, nxt_iobuf_t *iob, nxt_uint_t niob); NXT_EXPORT ssize_t nxt_socketpair_recv(nxt_fd_event_t *ev, nxt_fd_t *fd, @@ -121,12 +116,12 @@ NXT_EXPORT ssize_t nxt_socketpair_recv(nxt_fd_event_t *ev, nxt_fd_t *fd, #define \ -nxt_socket_nonblocking(fd) \ - nxt_fd_nonblocking(fd) +nxt_socket_nonblocking(task, fd) \ + nxt_fd_nonblocking(task, fd) #define \ -nxt_socket_blocking(fd) \ - nxt_fd_blocking(fd) +nxt_socket_blocking(task, fd) \ + nxt_fd_blocking(task, fd) -#endif /* _NXT_UNIX_SOCKET_H_INCLUDED_ */ +#endif /* _NXT_SOCKET_H_INCLUDED_ */ diff --git a/src/nxt_socketpair.c b/src/nxt_socketpair.c index d0449020..e1c303dd 100644 --- a/src/nxt_socketpair.c +++ b/src/nxt_socketpair.c @@ -27,16 +27,16 @@ static ssize_t nxt_recvmsg(nxt_socket_t s, nxt_fd_t *fd, nxt_iobuf_t *iob, nxt_int_t -nxt_socketpair_create(nxt_socket_t *pair) +nxt_socketpair_create(nxt_task_t *task, nxt_socket_t *pair) { if (nxt_slow_path(socketpair(AF_UNIX, NXT_UNIX_SOCKET, 0, pair) != 0)) { - nxt_thread_log_alert("socketpair() failed %E", nxt_errno); + nxt_log(task, NXT_LOG_CRIT, "socketpair() failed %E", nxt_errno); return NXT_ERROR; } - nxt_thread_log_debug("socketpair(): %d:%d", pair[0], pair[1]); + nxt_debug(task, "socketpair(): %d:%d", pair[0], pair[1]); - if (nxt_slow_path(nxt_socket_nonblocking(pair[0]) != NXT_OK)) { + if (nxt_slow_path(nxt_socket_nonblocking(task, pair[0]) != NXT_OK)) { goto fail; } @@ -44,7 +44,7 @@ nxt_socketpair_create(nxt_socket_t *pair) goto fail; } - if (nxt_slow_path(nxt_socket_nonblocking(pair[1]) != NXT_OK)) { + if (nxt_slow_path(nxt_socket_nonblocking(task, pair[1]) != NXT_OK)) { goto fail; } @@ -56,17 +56,17 @@ nxt_socketpair_create(nxt_socket_t *pair) fail: - nxt_socketpair_close(pair); + nxt_socketpair_close(task, pair); return NXT_ERROR; } void -nxt_socketpair_close(nxt_socket_t *pair) +nxt_socketpair_close(nxt_task_t *task, nxt_socket_t *pair) { - nxt_socket_close(pair[0]); - nxt_socket_close(pair[1]); + nxt_socket_close(task, pair[0]); + nxt_socket_close(task, pair[1]); } diff --git a/src/nxt_solaris_sendfilev.c b/src/nxt_solaris_sendfilev.c index fdd803dc..1b46d099 100644 --- a/src/nxt_solaris_sendfilev.c +++ b/src/nxt_solaris_sendfilev.c @@ -42,7 +42,7 @@ nxt_solaris_event_conn_io_sendfilev(nxt_event_conn_t *c, nxt_buf_t *b, size = nxt_solaris_buf_coalesce(b, sfv, &nsfv, &sync, limit); - nxt_log_debug(c->socket.log, "sendfilev(%d, %D)", c->socket.fd, nsfv); + nxt_debug(c->socket.task, "sendfilev(%d, %D)", c->socket.fd, nsfv); if (nsfv == 0 && sync) { return 0; @@ -53,7 +53,7 @@ nxt_solaris_event_conn_io_sendfilev(nxt_event_conn_t *c, nxt_buf_t *b, err = (n == -1) ? nxt_errno : 0; - nxt_log_debug(c->socket.log, "sendfilev(): %d sent:%uz", n, sent); + nxt_debug(c->socket.task, "sendfilev(): %d sent:%uz", n, sent); if (n == -1) { switch (err) { @@ -67,14 +67,13 @@ nxt_solaris_event_conn_io_sendfilev(nxt_event_conn_t *c, nxt_buf_t *b, default: c->socket.error = err; - nxt_log_error(nxt_socket_error_level(err, c->socket.log_error), - c->socket.log, "sendfilev(%d, %D) failed %E", - c->socket.fd, nsfv, err); + nxt_log(c->socket.task, nxt_socket_error_level(err), + "sendfilev(%d, %D) failed %E", c->socket.fd, nsfv, err); return NXT_ERROR; } - nxt_log_debug(c->socket.log, "sendfilev() %E", err); + nxt_debug(c->socket.task, "sendfilev() %E", err); return sent; } diff --git a/src/nxt_stream_module.c b/src/nxt_stream_module.c new file mode 100644 index 00000000..89201893 --- /dev/null +++ b/src/nxt_stream_module.c @@ -0,0 +1,130 @@ + +/* + * Copyright (C) Igor Sysoev + * Copyright (C) NGINX, Inc. + */ + +#include <nxt_main.h> +#include <nxt_cycle.h> + + +static void nxt_stream_connection_peer(nxt_task_t *task, + nxt_upstream_peer_t *up); +static void nxt_stream_connection_close(nxt_task_t *task, void *obj, + void *data); + + +void +nxt_stream_connection_init(nxt_task_t *task, void *obj, void *data) +{ + nxt_cycle_t *cycle; + nxt_event_conn_t *c; + nxt_upstream_peer_t *up; + + c = obj; + + nxt_debug(task, "stream connection init"); + + up = nxt_mem_zalloc(c->mem_pool, sizeof(nxt_upstream_peer_t)); + if (nxt_slow_path(up == NULL)) { + goto fail; + } + + up->data = c; + + cycle = nxt_thread_cycle(); + + if (cycle->upstream.length != 0) { + up->addr = cycle->upstream; + + } else { + nxt_str_set(&up->addr, "127.0.0.1:8080"); + } + + up->ready_handler = nxt_stream_connection_peer; + up->mem_pool = c->mem_pool; + + nxt_upstream_round_robin_peer(task, up); + return; + +fail: + + /* TODO: close connection */ + return; +} + + +static void +nxt_stream_connection_peer(nxt_task_t *task, nxt_upstream_peer_t *up) +{ + nxt_event_conn_t *c; + nxt_event_conn_proxy_t *p; + + c = up->data; + + up->sockaddr->type = SOCK_STREAM; + + nxt_log_debug(c->socket.log, "stream connection peer %*s", + up->sockaddr->length, nxt_sockaddr_start(up->sockaddr)); + + p = nxt_event_conn_proxy_create(c); + if (nxt_slow_path(p == NULL)) { + goto fail; + } + + p->client->socket.data = p; + p->peer->socket.data = p; + + p->client_buffer_size = 1024; + p->peer_buffer_size = 4096; + //p->client_wait_timeout = 9000; + p->connect_timeout = 7000; + p->reconnect_timeout = 500; + //p->peer_wait_timeout = 5000; + p->client_write_timeout = 3000; + p->peer_write_timeout = 3000; + p->completion_handler = nxt_stream_connection_close; + //p->retries = 10; + p->peer->remote = up->sockaddr; + + if (0) { + nxt_event_engine_t *engine; + nxt_event_write_rate_t *rate; + + rate = nxt_mem_alloc(c->mem_pool, sizeof(nxt_event_write_rate_t)); + + if (nxt_slow_path(rate == NULL)) { + goto fail; + } + + c->rate = rate; + + rate->limit = 1024; + rate->limit_after = 0; + rate->average = rate->limit; + + engine = nxt_thread_event_engine(); + rate->last = engine->timers.now; + } + + nxt_event_conn_proxy(task, p); + return; + +fail: + + /* TODO: close connection */ + return; +} + + +static void +nxt_stream_connection_close(nxt_task_t *task, void *obj, void *data) +{ + nxt_event_conn_proxy_t *p; + + p = obj; + + nxt_log_debug(p->client->socket.log, "stream connection close"); + + nxt_mem_pool_destroy(p->client->mem_pool); +} diff --git a/src/nxt_upstream.h b/src/nxt_upstream.h index 2935ccc8..a7d2a224 100644 --- a/src/nxt_upstream.h +++ b/src/nxt_upstream.h @@ -25,7 +25,7 @@ struct nxt_upstream_peer_s { nxt_str_t addr; nxt_mem_pool_t *mem_pool; - void (*ready_handler)(nxt_upstream_peer_t *up); + void (*ready_handler)(nxt_task_t *task, nxt_upstream_peer_t *up); void (*protocol_handler)(nxt_upstream_source_t *us); }; @@ -39,7 +39,8 @@ typedef struct { /* STUB */ -NXT_EXPORT void nxt_upstream_round_robin_peer(nxt_upstream_peer_t *up); +NXT_EXPORT void nxt_upstream_round_robin_peer(nxt_task_t *task, + nxt_upstream_peer_t *up); /**/ diff --git a/src/nxt_upstream_round_robin.c b/src/nxt_upstream_round_robin.c index f8035762..7717a2de 100644 --- a/src/nxt_upstream_round_robin.c +++ b/src/nxt_upstream_round_robin.c @@ -24,28 +24,30 @@ typedef struct { } nxt_upstream_round_robin_t; -static void nxt_upstream_round_robin_create(nxt_thread_t *thr, void *obj, +static void nxt_upstream_round_robin_create(nxt_task_t *task, void *obj, void *data); -static void nxt_upstream_round_robin_peer_error(nxt_thread_t *thr, void *obj, +static void nxt_upstream_round_robin_peer_error(nxt_task_t *task, void *obj, void *data); -static void nxt_upstream_round_robin_get_peer(nxt_upstream_peer_t *up); +static void nxt_upstream_round_robin_get_peer(nxt_task_t *task, + nxt_upstream_peer_t *up); void -nxt_upstream_round_robin_peer(nxt_upstream_peer_t *up) +nxt_upstream_round_robin_peer(nxt_task_t *task, nxt_upstream_peer_t *up) { nxt_job_sockaddr_parse_t *jbs; if (up->upstream != NULL) { - nxt_upstream_round_robin_get_peer(up); + nxt_upstream_round_robin_get_peer(task, up); } jbs = nxt_job_create(up->mem_pool, sizeof(nxt_job_sockaddr_parse_t)); if (nxt_slow_path(jbs == NULL)) { - up->ready_handler(up); + up->ready_handler(task, up); return; } + jbs->resolve.job.task = task; jbs->resolve.job.data = up; jbs->resolve.port = up->port; jbs->resolve.log_level = NXT_LOG_ERR; @@ -58,7 +60,7 @@ nxt_upstream_round_robin_peer(nxt_upstream_peer_t *up) static void -nxt_upstream_round_robin_create(nxt_thread_t *thr, void *obj, void *data) +nxt_upstream_round_robin_create(nxt_task_t *task, void *obj, void *data) { nxt_uint_t i; nxt_sockaddr_t *sa; @@ -94,10 +96,10 @@ nxt_upstream_round_robin_create(nxt_thread_t *thr, void *obj, void *data) /* STUB */ sa->type = SOCK_STREAM; - /* TODO: test ret */ - (void) nxt_sockaddr_text(up->mem_pool, sa, 1); + nxt_sockaddr_text(sa); - nxt_log_debug(thr->log, "upstream peer: %*s", sa->text_len, sa->text); + nxt_debug(task, "upstream peer: %*s", + sa->length, nxt_sockaddr_start(sa)); /* TODO: memcpy to shared memory pool. */ peer[i].sockaddr = sa; @@ -109,7 +111,7 @@ nxt_upstream_round_robin_create(nxt_thread_t *thr, void *obj, void *data) up->sockaddr = peer[0].sockaddr; nxt_job_destroy(jbs); - up->ready_handler(up); + up->ready_handler(task, up); //nxt_upstream_round_robin_get_peer(up); return; @@ -118,12 +120,12 @@ fail: nxt_job_destroy(jbs); - up->ready_handler(up); + up->ready_handler(task, up); } static void -nxt_upstream_round_robin_peer_error(nxt_thread_t *thr, void *obj, void *data) +nxt_upstream_round_robin_peer_error(nxt_task_t *task, void *obj, void *data) { nxt_upstream_peer_t *up; nxt_job_sockaddr_parse_t *jbs; @@ -131,24 +133,22 @@ nxt_upstream_round_robin_peer_error(nxt_thread_t *thr, void *obj, void *data) jbs = obj; up = jbs->resolve.job.data; - up->ready_handler(up); + up->ready_handler(task, up); } static void -nxt_upstream_round_robin_get_peer(nxt_upstream_peer_t *up) +nxt_upstream_round_robin_get_peer(nxt_task_t *task, nxt_upstream_peer_t *up) { int32_t effective_weights; nxt_uint_t i; nxt_msec_t now; - nxt_event_engine_t *engine; nxt_upstream_round_robin_t *urr; nxt_upstream_round_robin_peer_t *peer, *best; urr = up->upstream; - engine = nxt_thread_event_engine(); - now = engine->timers.now; + now = task->thread->engine->timers.now; nxt_thread_spin_lock(&urr->lock); @@ -196,5 +196,5 @@ nxt_upstream_round_robin_get_peer(nxt_upstream_peer_t *up) nxt_thread_spin_unlock(&urr->lock); - up->ready_handler(up); + up->ready_handler(task, up); } diff --git a/src/nxt_work_queue.h b/src/nxt_work_queue.h index 4d2422cc..8dc9c1dd 100644 --- a/src/nxt_work_queue.h +++ b/src/nxt_work_queue.h @@ -10,14 +10,14 @@ typedef struct nxt_work_s nxt_work_t; -typedef struct { +struct nxt_task_s { nxt_thread_t *thread; nxt_log_t *log; uint32_t ident; nxt_work_t *next_work; /* TODO: exception_handler, prev/next task, subtasks. */ -} nxt_task_t; +}; #define nxt_task_next_ident() \ diff --git a/src/nxt_worker_process.c b/src/nxt_worker_process.c index 0f5f6eb6..01304f65 100644 --- a/src/nxt_worker_process.c +++ b/src/nxt_worker_process.c @@ -86,11 +86,9 @@ nxt_worker_process_start(void *data) goto fail; } -#if 0 - if (nxt_cycle_listen_sockets_enable(thr, cycle) != NXT_OK) { + if (nxt_cycle_listen_sockets_enable(&thr->engine->task, cycle) != NXT_OK) { goto fail; } -#endif proc = cycle->processes->elts; @@ -159,7 +157,7 @@ nxt_worker_process_quit(nxt_task_t *task) n = cycle->listen_sockets->nelts; while (n != 0) { - nxt_socket_close(ls->socket); + nxt_socket_close(task, ls->socket); ls->socket = -1; ls++; diff --git a/test/nxt_exp_approximation.c b/test/nxt_exp_approximation.c deleted file mode 100644 index 1f79c068..00000000 --- a/test/nxt_exp_approximation.c +++ /dev/null @@ -1,72 +0,0 @@ - -/* - * Copyright (C) Igor Sysoev - * Copyright (C) NGINX, Inc. - */ - -#include <nxt_main.h> -#include <math.h> - - -nxt_int_t -nxt_exp_approximation(nxt_thread_t *thr) -{ - double n, e0, e1, diff; - nxt_nsec_t start, end; - - nxt_thread_time_update(thr); - nxt_log_error(NXT_LOG_NOTICE, thr->log, - "exp approximation unit test started"); - - for (n = 0.0; n > -20.0; n -= 0.00001) { - - e0 = nxt_event_conn_exponential_approximation(n); - e1 = exp(n); - - diff = fabs(e0 - e1); - - /* 0.028993 is max difference with libm exp(). */ - if (diff > 0.028993) { - nxt_log_alert(thr->log, - "exp approximation unit test failed: %0.6f %0.6f", - n, diff); - return NXT_ERROR; - } - } - - - nxt_thread_time_update(thr); - start = nxt_thread_monotonic_time(thr); - - e0 = 0; - for (n = 0.0; n > -20.0; n -= 0.00001) { - e0 += nxt_event_conn_exponential_approximation(n); - } - - nxt_thread_time_update(thr); - end = nxt_thread_monotonic_time(thr); - - /* e0 is passed but not output to eliminate optimization. */ - nxt_log_error(NXT_LOG_NOTICE, thr->log, "exp approximation: %0.1fns", - (end - start) / 20000000.0, e0); - - - nxt_thread_time_update(thr); - start = nxt_thread_monotonic_time(thr); - - e0 = 0; - for (n = 0.0; n > -20.0; n -= 0.000001) { - e0 += exp(n); - } - - nxt_thread_time_update(thr); - end = nxt_thread_monotonic_time(thr); - - /* e0 is passed but not output to eliminate optimization. */ - nxt_log_error(NXT_LOG_NOTICE, thr->log, "libm exp(): %0.1fns", - (end - start) / 20000000.0, e0); - - nxt_log_error(NXT_LOG_NOTICE, thr->log, - "exp approximation unit test passed"); - return NXT_OK; -} diff --git a/test/nxt_lib_unit_test.c b/test/nxt_lib_unit_test.c index 28397187..876416e5 100644 --- a/test/nxt_lib_unit_test.c +++ b/test/nxt_lib_unit_test.c @@ -77,10 +77,6 @@ main(int argc, char **argv) return 1; } - if (nxt_exp_approximation(thr) != NXT_OK) { - return 1; - } - if (nxt_rbtree_unit_test(thr, 100 * 1000) != NXT_OK) { return 1; } diff --git a/test/nxt_lib_unit_test.h b/test/nxt_lib_unit_test.h index bbb2d5f4..a6a24855 100644 --- a/test/nxt_lib_unit_test.h +++ b/test/nxt_lib_unit_test.h @@ -38,8 +38,6 @@ nxt_rdtsc(void) nxt_int_t nxt_term_parse_unit_test(nxt_thread_t *thr); nxt_int_t nxt_msec_diff_unit_test(nxt_thread_t *thr, nxt_msec_less_t); -nxt_int_t nxt_exp_approximation(nxt_thread_t *thr); -double nxt_event_conn_exponential_approximation(double n); nxt_int_t nxt_rbtree_unit_test(nxt_thread_t *thr, nxt_uint_t n); nxt_int_t nxt_rbtree1_unit_test(nxt_thread_t *thr, nxt_uint_t n); |