diff options
author | Igor Sysoev <igor@sysoev.ru> | 2017-07-05 14:05:21 +0300 |
---|---|---|
committer | Igor Sysoev <igor@sysoev.ru> | 2017-07-05 14:05:21 +0300 |
commit | 4fe5d22dcc5d6e42c5faa6fe06dd076cde799324 (patch) | |
tree | 91f346ac5380888617f4ed32fa0933445c4bd65a /src | |
parent | d28d502aa5c6982552ce6626088a77f3295465f6 (diff) | |
download | unit-4fe5d22dcc5d6e42c5faa6fe06dd076cde799324.tar.gz unit-4fe5d22dcc5d6e42c5faa6fe06dd076cde799324.tar.bz2 |
Router: processing JSON configuration.
Diffstat (limited to 'src')
-rw-r--r-- | src/nxt_conn_accept.c | 2 | ||||
-rw-r--r-- | src/nxt_event_engine.h | 2 | ||||
-rw-r--r-- | src/nxt_listen_socket.c | 84 | ||||
-rw-r--r-- | src/nxt_listen_socket.h | 7 | ||||
-rw-r--r-- | src/nxt_queue.h | 6 | ||||
-rw-r--r-- | src/nxt_router.c | 446 | ||||
-rw-r--r-- | src/nxt_router.h | 27 | ||||
-rw-r--r-- | src/nxt_socket.c | 2 |
8 files changed, 445 insertions, 131 deletions
diff --git a/src/nxt_conn_accept.c b/src/nxt_conn_accept.c index 4fcd530d..6f42cf3d 100644 --- a/src/nxt_conn_accept.c +++ b/src/nxt_conn_accept.c @@ -69,7 +69,7 @@ nxt_listen_event(nxt_task_t *task, nxt_listen_socket_t *ls) if (nxt_conn_accept_alloc(task, lev) != NULL) { nxt_fd_event_enable_accept(engine, &lev->socket); - nxt_queue_insert_head(&engine->listen_connections, &lev->link); + nxt_queue_insert_tail(&engine->listen_connections, &lev->link); } return lev; diff --git a/src/nxt_event_engine.h b/src/nxt_event_engine.h index ceb1654e..d8741669 100644 --- a/src/nxt_event_engine.h +++ b/src/nxt_event_engine.h @@ -494,6 +494,8 @@ struct nxt_event_engine_s { nxt_lvlhsh_t requests; /* req_id to nxt_req_conn_link_t */ nxt_queue_link_t link; + // STUB: router link + nxt_queue_link_t link0; }; diff --git a/src/nxt_listen_socket.c b/src/nxt_listen_socket.c index 56bff6ea..bbd6d34a 100644 --- a/src/nxt_listen_socket.c +++ b/src/nxt_listen_socket.c @@ -11,6 +11,90 @@ static u_char *nxt_listen_socket_log_handler(void *ctx, u_char *pos, u_char *last); +nxt_socket_t +nxt_listen_socket_create0(nxt_task_t *task, nxt_sockaddr_t *sa, + nxt_uint_t flags) +{ + nxt_int_t ret; + nxt_socket_t s; + + s = nxt_socket_create(task, sa->u.sockaddr.sa_family, sa->type, 0, flags); + if (nxt_slow_path(s == -1)) { + return s; + } + + ret = nxt_socket_setsockopt(task, s, SOL_SOCKET, SO_REUSEADDR, 1); + if (nxt_slow_path(ret != NXT_OK)) { + goto fail; + } + +#if (NXT_INET6) + + if (sa->u.sockaddr.sa_family == AF_INET6) { + ret = nxt_socket_setsockopt(task, s, IPPROTO_IPV6, IPV6_V6ONLY, 1); + if (nxt_slow_path(ret != NXT_OK)) { + goto fail; + } + } + +#endif + +#ifdef TCP_DEFER_ACCEPT + + /* Defer Linux accept() up to for 1 second. */ + (void) nxt_socket_setsockopt(task, s, IPPROTO_TCP, TCP_DEFER_ACCEPT, 1); + +#endif + + ret = nxt_socket_bind(task, s, sa, 0); + if (nxt_slow_path(ret != NXT_OK)) { + goto fail; + } + +#if (NXT_HAVE_UNIX_DOMAIN) + + if (sa->u.sockaddr.sa_family == AF_UNIX) { + nxt_file_name_t *name; + nxt_file_access_t access; + + name = (nxt_file_name_t *) sa->u.sockaddr_un.sun_path; + + access = (S_IRUSR | S_IWUSR | S_IRGRP | S_IWGRP | S_IROTH | S_IWOTH); + + ret = nxt_file_set_access(name, access); + if (nxt_slow_path(ret != NXT_OK)) { + goto fail; + } + } + +#endif + + return s; + +fail: + + nxt_socket_close(task, s); + + return -1; +} + + +nxt_int_t +nxt_listen_socket(nxt_task_t *task, nxt_socket_t s, int backlog) +{ + nxt_debug(task, "listen(%d, %d)", s, backlog); + + if (nxt_fast_path(listen(s, backlog) == 0)) { + return NXT_OK; + } + + nxt_log(task, NXT_LOG_CRIT, "listen(%d, %d) failed %E", + s, backlog, nxt_socket_errno); + + return NXT_ERROR; +} + + nxt_int_t nxt_listen_socket_create(nxt_task_t *task, nxt_listen_socket_t *ls, nxt_bool_t bind_test) diff --git a/src/nxt_listen_socket.h b/src/nxt_listen_socket.h index 907c3a6e..c58850ba 100644 --- a/src/nxt_listen_socket.h +++ b/src/nxt_listen_socket.h @@ -30,8 +30,6 @@ typedef struct { uint8_t socklen; uint8_t address_length; - - uint32_t count; } nxt_listen_socket_t; @@ -51,6 +49,11 @@ typedef struct { #endif +NXT_EXPORT nxt_socket_t nxt_listen_socket_create0(nxt_task_t *task, + nxt_sockaddr_t *sa, nxt_uint_t flags); +NXT_EXPORT nxt_int_t nxt_listen_socket(nxt_task_t *task, nxt_socket_t s, + int backlog); + NXT_EXPORT nxt_int_t nxt_listen_socket_create(nxt_task_t *task, nxt_listen_socket_t *ls, nxt_bool_t bind_test); NXT_EXPORT nxt_int_t nxt_listen_socket_update(nxt_task_t *task, diff --git a/src/nxt_queue.h b/src/nxt_queue.h index f1efd328..e107b6aa 100644 --- a/src/nxt_queue.h +++ b/src/nxt_queue.h @@ -193,7 +193,11 @@ nxt_queue_truncate(queue, link) \ } while (0) -/* Add the queue "tail" to the queue "queue". */ +/* + * Add the queue "tail" to the queue "queue". + * If the queue "tail" is intended to be reused again, + * it must be initiated with nxt_queue_init(tail). + */ #define \ nxt_queue_add(queue, tail) \ diff --git a/src/nxt_router.c b/src/nxt_router.c index 34d163dc..6e9bb994 100644 --- a/src/nxt_router.c +++ b/src/nxt_router.c @@ -6,37 +6,43 @@ */ #include <nxt_router.h> +#include <nxt_conf.h> #include <nxt_application.h> +typedef struct { + nxt_str_t application_type; + uint32_t application_workers; +} nxt_router_listener_conf_t; + + static nxt_router_temp_conf_t *nxt_router_temp_conf(nxt_task_t *task, nxt_router_t *router); static void nxt_router_listen_sockets_sort(nxt_router_t *router, nxt_router_temp_conf_t *tmcf); -static nxt_int_t nxt_router_stub_conf(nxt_task_t *task, - nxt_router_temp_conf_t *tmcf); +static nxt_int_t nxt_router_conf_create(nxt_task_t *task, + nxt_router_temp_conf_t *tmcf, u_char *start, u_char *end); static nxt_int_t nxt_router_listen_sockets_stub_create(nxt_task_t *task, nxt_router_temp_conf_t *tmcf); static nxt_socket_conf_t *nxt_router_socket_conf(nxt_task_t *task, nxt_mp_t *mp, nxt_sockaddr_t *sa); -static nxt_sockaddr_t *nxt_router_listen_sockaddr_stub(nxt_task_t *task, - nxt_mp_t *mp, uint32_t port); static nxt_int_t nxt_router_engines_create(nxt_task_t *task, nxt_router_t *router, nxt_router_temp_conf_t *tmcf, const nxt_event_interface_t *interface); -static nxt_int_t nxt_router_engine_conf_create(nxt_task_t *task, nxt_mp_t *mp, - nxt_router_temp_conf_t *tmcf, nxt_router_engine_conf_t *recf); -static nxt_int_t nxt_router_engine_conf_update(nxt_task_t *task, nxt_mp_t *mp, - nxt_router_temp_conf_t *tmcf, nxt_router_engine_conf_t *recf); -static nxt_int_t nxt_router_engine_conf_delete(nxt_task_t *task, nxt_mp_t *mp, - nxt_router_temp_conf_t *tmcf, nxt_router_engine_conf_t *recf); -static nxt_int_t nxt_router_engine_joints_create(nxt_task_t *task, nxt_mp_t *mp, +static nxt_int_t nxt_router_engine_conf_create(nxt_router_temp_conf_t *tmcf, + nxt_router_engine_conf_t *recf); +static nxt_int_t nxt_router_engine_conf_update(nxt_router_temp_conf_t *tmcf, + nxt_router_engine_conf_t *recf); +static nxt_int_t nxt_router_engine_conf_delete(nxt_router_temp_conf_t *tmcf, + nxt_router_engine_conf_t *recf); +static void nxt_router_engine_socket_count(nxt_queue_t *sockets); +static nxt_int_t nxt_router_engine_joints_create(nxt_mp_t *mp, nxt_router_engine_conf_t *recf, nxt_queue_t *sockets, nxt_array_t *array, nxt_work_handler_t handler); -static nxt_int_t nxt_router_engine_joints_delete(nxt_task_t *task, - nxt_router_engine_conf_t *recf, nxt_queue_t *sockets, nxt_array_t *array); +static nxt_int_t nxt_router_engine_joints_delete(nxt_router_engine_conf_t *recf, + nxt_queue_t *sockets); static nxt_int_t nxt_router_threads_create(nxt_task_t *task, nxt_runtime_t *rt, nxt_router_temp_conf_t *tmcf); @@ -78,10 +84,8 @@ static nxt_msec_t nxt_router_conn_timeout_value(nxt_conn_t *c, uintptr_t data); nxt_int_t nxt_router_start(nxt_task_t *task, nxt_runtime_t *rt) { - nxt_int_t ret; - nxt_router_t *router; - nxt_router_temp_conf_t *tmcf; - const nxt_event_interface_t *interface; + nxt_int_t ret; + nxt_router_t *router; ret = nxt_app_http_init(task, rt); if (nxt_slow_path(ret != NXT_OK)) { @@ -96,14 +100,24 @@ nxt_router_start(nxt_task_t *task, nxt_runtime_t *rt) nxt_queue_init(&router->engines); nxt_queue_init(&router->sockets); - /**/ + return NXT_OK; +} + + +nxt_int_t +nxt_router_new_conf(nxt_task_t *task, nxt_runtime_t *rt, nxt_router_t *router, + u_char *start, u_char *end) +{ + nxt_int_t ret; + nxt_router_temp_conf_t *tmcf; + const nxt_event_interface_t *interface; tmcf = nxt_router_temp_conf(task, router); if (nxt_slow_path(tmcf == NULL)) { return NXT_ERROR; } - ret = nxt_router_stub_conf(task, tmcf); + ret = nxt_router_conf_create(task, tmcf, start, end); if (nxt_slow_path(ret != NXT_OK)) { return ret; } @@ -132,6 +146,8 @@ nxt_router_start(nxt_task_t *task, nxt_runtime_t *rt) nxt_queue_add(&router->sockets, &tmcf->updating); nxt_queue_add(&router->sockets, &tmcf->creating); +// nxt_mp_destroy(tmcf->mem_pool); + return NXT_OK; } @@ -196,34 +212,159 @@ fail: } +static nxt_conf_map_t nxt_router_conf[] = { + { + nxt_string("threads"), + NXT_CONF_MAP_INT32, + offsetof(nxt_router_conf_t, threads), + }, + + { + nxt_null_string, 0, 0, + }, +}; + + +static nxt_conf_map_t nxt_router_listener_conf[] = { + { + nxt_string("_application_type"), + NXT_CONF_MAP_STR, + offsetof(nxt_router_listener_conf_t, application_type), + }, + + { + nxt_string("_application_workers"), + NXT_CONF_MAP_INT32, + offsetof(nxt_router_listener_conf_t, application_workers), + }, + + { + nxt_null_string, 0, 0, + }, +}; + + +static nxt_conf_map_t nxt_router_http_conf[] = { + { + nxt_string("header_buffer_size"), + NXT_CONF_MAP_SIZE, + offsetof(nxt_socket_conf_t, header_buffer_size), + }, + + { + nxt_string("large_header_buffer_size"), + NXT_CONF_MAP_SIZE, + offsetof(nxt_socket_conf_t, large_header_buffer_size), + }, + + { + nxt_string("header_read_timeout"), + NXT_CONF_MAP_MSEC, + offsetof(nxt_socket_conf_t, header_read_timeout), + }, + + { + nxt_null_string, 0, 0, + }, +}; + + static nxt_int_t -nxt_router_stub_conf(nxt_task_t *task, nxt_router_temp_conf_t *tmcf) +nxt_router_conf_create(nxt_task_t *task, nxt_router_temp_conf_t *tmcf, + u_char *start, u_char *end) { - nxt_mp_t *mp; - nxt_sockaddr_t *sa; - nxt_socket_conf_t *skcf; + nxt_mp_t *mp; + uint32_t next; + nxt_int_t ret; + nxt_str_t name; + nxt_sockaddr_t *sa; + nxt_conf_value_t *conf, *listeners, *router, *http, *listener; + nxt_socket_conf_t *skcf; + nxt_router_listener_conf_t lscf; + + static nxt_str_t router_path = nxt_string("/router"); + static nxt_str_t http_path = nxt_string("/http"); + static nxt_str_t listeners_path = nxt_string("/listeners"); + + conf = nxt_conf_json_parse(tmcf->mem_pool, start, end); + if (conf == NULL) { + nxt_log(task, NXT_LOG_CRIT, "configuration parsing error"); + return NXT_ERROR; + } + + router = nxt_conf_get_path(conf, &router_path); + + if (router == NULL) { + nxt_log(task, NXT_LOG_CRIT, "no \"/router\" block"); + return NXT_ERROR; + } + + ret = nxt_conf_map_object(router, nxt_router_conf, tmcf->conf); + if (ret != NXT_OK) { + nxt_log(task, NXT_LOG_CRIT, "router map error"); + return NXT_ERROR; + } - tmcf->conf->threads = 1; + http = nxt_conf_get_path(conf, &http_path); + + if (http == NULL) { + nxt_log(task, NXT_LOG_CRIT, "no \"/http\" block"); + return NXT_ERROR; + } + + listeners = nxt_conf_get_path(conf, &listeners_path); + + if (listeners == NULL) { + nxt_log(task, NXT_LOG_CRIT, "no \"/listeners\" block"); + return NXT_ERROR; + } mp = tmcf->conf->mem_pool; - sa = nxt_router_listen_sockaddr_stub(task, mp, 8000); - skcf = nxt_router_socket_conf(task, mp, sa); + next = 0; + + for ( ;; ) { + listener = nxt_conf_next_object_member(listeners, &name, &next); + if (listener == NULL) { + break; + } + + sa = nxt_sockaddr_parse(mp, &name); + if (sa == NULL) { + nxt_log(task, NXT_LOG_CRIT, "invalid listener \"%V\"", &name); + return NXT_ERROR; + } + + sa->type = SOCK_STREAM; + + nxt_debug(task, "router listener: \"%*s\"", + sa->length, nxt_sockaddr_start(sa)); + + skcf = nxt_router_socket_conf(task, mp, sa); + if (skcf == NULL) { + return NXT_ERROR; + } - skcf->listen.handler = nxt_router_conn_init; - skcf->header_buffer_size = 2048; - skcf->large_header_buffer_size = 8192; - skcf->header_read_timeout = 5000; + ret = nxt_conf_map_object(listener, nxt_router_listener_conf, &lscf); + if (ret != NXT_OK) { + nxt_log(task, NXT_LOG_CRIT, "listener map error"); + return NXT_ERROR; + } - nxt_queue_insert_tail(&tmcf->pending, &skcf->link); + nxt_debug(task, "router type: %V", &lscf.application_type); + nxt_debug(task, "router workers: %D", lscf.application_workers); - sa = nxt_router_listen_sockaddr_stub(task, mp, 8001); - skcf = nxt_router_socket_conf(task, mp, sa); + ret = nxt_conf_map_object(http, nxt_router_http_conf, skcf); + if (ret != NXT_OK) { + nxt_log(task, NXT_LOG_CRIT, "http map error"); + return NXT_ERROR; + } - skcf->listen.handler = nxt_stream_connection_init; - skcf->header_read_timeout = 5000; + skcf->listen.handler = nxt_router_conn_init; + skcf->router_conf = tmcf->conf; - nxt_queue_insert_tail(&tmcf->pending, &skcf->link); + nxt_queue_insert_tail(&tmcf->pending, &skcf->link); + } return NXT_OK; } @@ -239,6 +380,8 @@ nxt_router_socket_conf(nxt_task_t *task, nxt_mp_t *mp, nxt_sockaddr_t *sa) return NULL; } + conf->sockaddr = sa; + conf->listen.sockaddr = sa; conf->listen.socklen = sa->socklen; conf->listen.address_length = sa->length; @@ -252,31 +395,6 @@ nxt_router_socket_conf(nxt_task_t *task, nxt_mp_t *mp, nxt_sockaddr_t *sa) } -static nxt_sockaddr_t * -nxt_router_listen_sockaddr_stub(nxt_task_t *task, nxt_mp_t *mp, uint32_t port) -{ - nxt_sockaddr_t *sa; - struct sockaddr_in sin; - - nxt_memzero(&sin, sizeof(struct sockaddr_in)); - - sin.sin_family = AF_INET; - sin.sin_port = htons(port); - - sa = nxt_sockaddr_create(mp, (struct sockaddr *) &sin, - sizeof(struct sockaddr_in), NXT_INET_ADDR_STR_LEN); - if (nxt_slow_path(sa == NULL)) { - return NULL; - } - - sa->type = SOCK_STREAM; - - nxt_sockaddr_text(sa); - - return sa; -} - - static void nxt_router_listen_sockets_sort(nxt_router_t *router, nxt_router_temp_conf_t *tmcf) @@ -297,9 +415,10 @@ nxt_router_listen_sockets_sort(nxt_router_t *router, { oskcf = nxt_queue_link_data(oqlk, nxt_socket_conf_t, link); - if (nxt_sockaddr_cmp(nskcf->listen.sockaddr, - oskcf->listen.sockaddr)) - { + if (nxt_sockaddr_cmp(nskcf->sockaddr, oskcf->sockaddr)) { + nskcf->socket = oskcf->socket; + nskcf->listen.socket = oskcf->listen.socket; + nxt_queue_remove(oqlk); nxt_queue_insert_tail(&tmcf->keeping, oqlk); @@ -312,6 +431,7 @@ nxt_router_listen_sockets_sort(nxt_router_t *router, } nxt_queue_add(&tmcf->deleting, &router->sockets); + nxt_queue_init(&router->sockets); } @@ -319,19 +439,40 @@ static nxt_int_t nxt_router_listen_sockets_stub_create(nxt_task_t *task, nxt_router_temp_conf_t *tmcf) { - nxt_queue_link_t *qlk, *nqlk; - nxt_socket_conf_t *skcf; + nxt_int_t ret; + nxt_socket_t s; + nxt_queue_link_t *qlk, *nqlk; + nxt_socket_conf_t *skcf; + nxt_router_socket_t *rtsk; for (qlk = nxt_queue_first(&tmcf->pending); qlk != nxt_queue_tail(&tmcf->pending); qlk = nqlk) { + rtsk = nxt_malloc(sizeof(nxt_router_socket_t)); + if (nxt_slow_path(rtsk == NULL)) { + return NXT_ERROR; + } + + rtsk->count = 0; + skcf = nxt_queue_link_data(qlk, nxt_socket_conf_t, link); + skcf->socket = rtsk; + + s = nxt_listen_socket_create0(task, skcf->sockaddr, NXT_NONBLOCK); + if (nxt_slow_path(s == -1)) { + return NXT_ERROR; + } - if (nxt_listen_socket_create(task, &skcf->listen, 0) != NXT_OK) { + ret = nxt_listen_socket(task, s, NXT_LISTEN_BACKLOG); + if (nxt_slow_path(ret != NXT_OK)) { return NXT_ERROR; } + skcf->listen.socket = s; + + rtsk->fd = s; + nqlk = nxt_queue_next(qlk); nxt_queue_remove(qlk); nxt_queue_insert_tail(&tmcf->creating, qlk); @@ -345,13 +486,11 @@ static nxt_int_t nxt_router_engines_create(nxt_task_t *task, nxt_router_t *router, nxt_router_temp_conf_t *tmcf, const nxt_event_interface_t *interface) { - nxt_mp_t *mp; nxt_int_t ret; nxt_uint_t n, threads; nxt_queue_link_t *qlk; nxt_router_engine_conf_t *recf; - mp = tmcf->conf->mem_pool; threads = tmcf->conf->threads; tmcf->engines = nxt_array_create(tmcf->mem_pool, threads, @@ -371,15 +510,15 @@ nxt_router_engines_create(nxt_task_t *task, nxt_router_t *router, return NXT_ERROR; } - recf->engine = nxt_queue_link_data(qlk, nxt_event_engine_t, link); + recf->engine = nxt_queue_link_data(qlk, nxt_event_engine_t, link0); // STUB recf->task = recf->engine->task; if (n < threads) { - ret = nxt_router_engine_conf_update(task, mp, tmcf, recf); + ret = nxt_router_engine_conf_update(tmcf, recf); } else { - ret = nxt_router_engine_conf_delete(task, mp, tmcf, recf); + ret = nxt_router_engine_conf_delete(tmcf, recf); } if (nxt_slow_path(ret != NXT_OK)) { @@ -404,11 +543,13 @@ nxt_router_engines_create(nxt_task_t *task, nxt_router_t *router, // STUB recf->task = recf->engine->task; - ret = nxt_router_engine_conf_create(task, mp, tmcf, recf); + ret = nxt_router_engine_conf_create(tmcf, recf); if (nxt_slow_path(ret != NXT_OK)) { return ret; } + nxt_queue_insert_tail(&router->engines, &recf->engine->link0); + n++; } @@ -417,39 +558,61 @@ nxt_router_engines_create(nxt_task_t *task, nxt_router_t *router, static nxt_int_t -nxt_router_engine_conf_create(nxt_task_t *task, nxt_mp_t *mp, - nxt_router_temp_conf_t *tmcf, nxt_router_engine_conf_t *recf) +nxt_router_engine_conf_create(nxt_router_temp_conf_t *tmcf, + nxt_router_engine_conf_t *recf) { - nxt_int_t ret; + nxt_mp_t *mp; + nxt_int_t ret; + nxt_thread_spinlock_t *lock; recf->creating = nxt_array_create(tmcf->mem_pool, 4, sizeof(nxt_work_t)); if (nxt_slow_path(recf->creating == NULL)) { return NXT_ERROR; } - ret = nxt_router_engine_joints_create(task, mp, recf, &tmcf->creating, + mp = tmcf->conf->mem_pool; + + ret = nxt_router_engine_joints_create(mp, recf, &tmcf->creating, recf->creating, nxt_router_listen_socket_create); if (nxt_slow_path(ret != NXT_OK)) { return ret; } - return nxt_router_engine_joints_create(task, mp, recf, &tmcf->updating, + ret = nxt_router_engine_joints_create(mp, recf, &tmcf->updating, recf->creating, nxt_router_listen_socket_create); + if (nxt_slow_path(ret != NXT_OK)) { + return ret; + } + + lock = &tmcf->conf->router->lock; + + nxt_thread_spin_lock(lock); + + nxt_router_engine_socket_count(&tmcf->creating); + nxt_router_engine_socket_count(&tmcf->updating); + + nxt_thread_spin_unlock(lock); + + return ret; } static nxt_int_t -nxt_router_engine_conf_update(nxt_task_t *task, nxt_mp_t *mp, - nxt_router_temp_conf_t *tmcf, nxt_router_engine_conf_t *recf) +nxt_router_engine_conf_update(nxt_router_temp_conf_t *tmcf, + nxt_router_engine_conf_t *recf) { - nxt_int_t ret; + nxt_mp_t *mp; + nxt_int_t ret; + nxt_thread_spinlock_t *lock; recf->creating = nxt_array_create(tmcf->mem_pool, 4, sizeof(nxt_work_t)); if (nxt_slow_path(recf->creating == NULL)) { return NXT_ERROR; } - ret = nxt_router_engine_joints_create(task, mp, recf, &tmcf->creating, + mp = tmcf->conf->mem_pool; + + ret = nxt_router_engine_joints_create(mp, recf, &tmcf->creating, recf->creating, nxt_router_listen_socket_create); if (nxt_slow_path(ret != NXT_OK)) { return ret; @@ -460,7 +623,7 @@ nxt_router_engine_conf_update(nxt_task_t *task, nxt_mp_t *mp, return NXT_ERROR; } - ret = nxt_router_engine_joints_create(task, mp, recf, &tmcf->updating, + ret = nxt_router_engine_joints_create(mp, recf, &tmcf->updating, recf->updating, nxt_router_listen_socket_update); if (nxt_slow_path(ret != NXT_OK)) { return ret; @@ -471,14 +634,26 @@ nxt_router_engine_conf_update(nxt_task_t *task, nxt_mp_t *mp, return NXT_ERROR; } - return nxt_router_engine_joints_delete(task, recf, &tmcf->deleting, - recf->deleting); + ret = nxt_router_engine_joints_delete(recf, &tmcf->deleting); + if (nxt_slow_path(ret != NXT_OK)) { + return ret; + } + + lock = &tmcf->conf->router->lock; + + nxt_thread_spin_lock(lock); + + nxt_router_engine_socket_count(&tmcf->creating); + + nxt_thread_spin_unlock(lock); + + return ret; } static nxt_int_t -nxt_router_engine_conf_delete(nxt_task_t *task, nxt_mp_t *mp, - nxt_router_temp_conf_t *tmcf, nxt_router_engine_conf_t *recf) +nxt_router_engine_conf_delete(nxt_router_temp_conf_t *tmcf, + nxt_router_engine_conf_t *recf) { nxt_int_t ret; @@ -487,20 +662,18 @@ nxt_router_engine_conf_delete(nxt_task_t *task, nxt_mp_t *mp, return NXT_ERROR; } - ret = nxt_router_engine_joints_delete(task, recf, &tmcf->updating, - recf->deleting); + ret = nxt_router_engine_joints_delete(recf, &tmcf->updating); if (nxt_slow_path(ret != NXT_OK)) { return ret; } - return nxt_router_engine_joints_delete(task, recf, &tmcf->deleting, - recf->deleting); + return nxt_router_engine_joints_delete(recf, &tmcf->deleting); } static nxt_int_t -nxt_router_engine_joints_create(nxt_task_t *task, nxt_mp_t *mp, - nxt_router_engine_conf_t *recf, nxt_queue_t *sockets, nxt_array_t *array, +nxt_router_engine_joints_create(nxt_mp_t *mp, nxt_router_engine_conf_t *recf, + nxt_queue_t *sockets, nxt_array_t *array, nxt_work_handler_t handler) { nxt_work_t *work; @@ -531,15 +704,33 @@ nxt_router_engine_joints_create(nxt_task_t *task, nxt_mp_t *mp, joint->count = 1; joint->socket_conf = nxt_queue_link_data(qlk, nxt_socket_conf_t, link); joint->engine = recf->engine; + + nxt_queue_insert_tail(&joint->engine->joints, &joint->link); } return NXT_OK; } +static void +nxt_router_engine_socket_count(nxt_queue_t *sockets) +{ + nxt_queue_link_t *qlk; + nxt_socket_conf_t *skcf; + + for (qlk = nxt_queue_first(sockets); + qlk != nxt_queue_tail(sockets); + qlk = nxt_queue_next(qlk)) + { + skcf = nxt_queue_link_data(qlk, nxt_socket_conf_t, link); + skcf->socket->count++; + } +} + + static nxt_int_t -nxt_router_engine_joints_delete(nxt_task_t *task, - nxt_router_engine_conf_t *recf, nxt_queue_t *sockets, nxt_array_t *array) +nxt_router_engine_joints_delete(nxt_router_engine_conf_t *recf, + nxt_queue_t *sockets) { nxt_work_t *work; nxt_queue_link_t *qlk; @@ -548,7 +739,7 @@ nxt_router_engine_joints_delete(nxt_task_t *task, qlk != nxt_queue_tail(sockets); qlk = nxt_queue_next(qlk)) { - work = nxt_array_add(array); + work = nxt_array_add(recf->deleting); if (nxt_slow_path(work == NULL)) { return NXT_ERROR; } @@ -611,7 +802,6 @@ nxt_router_thread_create(nxt_task_t *task, nxt_runtime_t *rt, nxt_queue_insert_tail(&rt->engines, &engine->link); - process = nxt_runtime_process_find(rt, nxt_pid); if (nxt_slow_path(process == NULL)) { return NXT_ERROR; @@ -640,7 +830,6 @@ nxt_router_thread_create(nxt_task_t *task, nxt_runtime_t *rt, nxt_runtime_port_add(rt, port); - ret = nxt_thread_create(&handle, link); if (nxt_slow_path(ret != NXT_OK)) { @@ -672,11 +861,31 @@ nxt_router_engine_post(nxt_router_engine_conf_t *recf) nxt_uint_t n; nxt_work_t *work; - work = recf->creating->elts; + if (recf->creating != NULL) { + work = recf->creating->elts; + + for (n = recf->creating->nelts; n != 0; n--) { + nxt_event_engine_post(recf->engine, work); + work++; + } + } + + if (recf->updating != NULL) { + work = recf->updating->elts; + + for (n = recf->updating->nelts; n != 0; n--) { + nxt_event_engine_post(recf->engine, work); + work++; + } + } + + if (recf->deleting != NULL) { + work = recf->deleting->elts; - for (n = recf->creating->nelts; n != 0; n--) { - nxt_event_engine_post(recf->engine, work); - work++; + for (n = recf->deleting->nelts; n != 0; n--) { + nxt_event_engine_post(recf->engine, work); + work++; + } } } @@ -750,19 +959,19 @@ nxt_inline nxt_listen_event_t * nxt_router_listen_event(nxt_queue_t *listen_connections, nxt_socket_conf_t *skcf) { - nxt_socket_t socket; - nxt_queue_link_t *link; + nxt_socket_t fd; + nxt_queue_link_t *qlk; nxt_listen_event_t *listen; - socket = skcf->listen.socket; + fd = skcf->socket->fd; - for (link = nxt_queue_first(listen_connections); - link != nxt_queue_tail(listen_connections); - link = nxt_queue_next(link)) + for (qlk = nxt_queue_first(listen_connections); + qlk != nxt_queue_tail(listen_connections); + qlk = nxt_queue_next(qlk)) { - listen = nxt_queue_link_data(link, nxt_listen_event_t, link); + listen = nxt_queue_link_data(qlk, nxt_listen_event_t, link); - if (socket == listen->socket.fd) { + if (fd == listen->socket.fd) { return listen; } } @@ -834,25 +1043,23 @@ static void nxt_router_listen_socket_release(nxt_task_t *task, nxt_socket_conf_joint_t *joint) { - nxt_socket_t s; - nxt_listen_socket_t *ls; + nxt_router_socket_t *rtsk; nxt_thread_spinlock_t *lock; - s = -1; - ls = &joint->socket_conf->listen; + rtsk = joint->socket_conf->socket; lock = &joint->socket_conf->router_conf->router->lock; nxt_thread_spin_lock(lock); - if (--ls->count == 0) { - s = ls->socket; - ls->socket = -1; + if (--rtsk->count != 0) { + rtsk = NULL; } nxt_thread_spin_unlock(lock); - if (s != -1) { - nxt_socket_close(task, s); + if (rtsk != NULL) { + nxt_socket_close(task, rtsk->fd); + nxt_free(rtsk); } nxt_router_conf_release(task, joint); @@ -894,6 +1101,7 @@ nxt_router_conf_release(nxt_task_t *task, nxt_socket_conf_joint_t *joint) nxt_thread_spin_unlock(lock); if (rtcf != NULL) { + nxt_debug(task, "old router conf is destroyed"); nxt_mp_destroy(rtcf->mem_pool); } diff --git a/src/nxt_router.h b/src/nxt_router.h index 9e2dd0a6..9c6f75b4 100644 --- a/src/nxt_router.h +++ b/src/nxt_router.h @@ -17,7 +17,7 @@ typedef struct { nxt_thread_spinlock_t lock; nxt_queue_t engines; - nxt_queue_t sockets; + nxt_queue_t sockets; /* of nxt_socket_conf_t */ } nxt_router_t; @@ -39,11 +39,11 @@ typedef struct { typedef struct { - nxt_queue_t creating; - nxt_queue_t pending; - nxt_queue_t updating; - nxt_queue_t keeping; - nxt_queue_t deleting; + nxt_queue_t creating; /* of nxt_socket_conf_t */ + nxt_queue_t pending; /* of nxt_socket_conf_t */ + nxt_queue_t updating; /* of nxt_socket_conf_t */ + nxt_queue_t keeping; /* of nxt_socket_conf_t */ + nxt_queue_t deleting; /* of nxt_socket_conf_t */ uint32_t new_threads; @@ -55,9 +55,18 @@ typedef struct { typedef struct { uint32_t count; - nxt_listen_socket_t listen; + nxt_socket_t fd; +} nxt_router_socket_t; + + +typedef struct { + uint32_t count; nxt_queue_link_t link; + nxt_router_socket_t *socket; nxt_router_conf_t *router_conf; + nxt_sockaddr_t *sockaddr; + + nxt_listen_socket_t listen; size_t header_buffer_size; size_t large_header_buffer_size; @@ -75,4 +84,8 @@ typedef struct { } nxt_socket_conf_joint_t; +nxt_int_t nxt_router_new_conf(nxt_task_t *task, nxt_runtime_t *rt, + nxt_router_t *router, u_char *start, u_char *end); + + #endif /* _NXT_ROUTER_H_INCLUDED_ */ diff --git a/src/nxt_socket.c b/src/nxt_socket.c index 2663b855..c174081c 100644 --- a/src/nxt_socket.c +++ b/src/nxt_socket.c @@ -145,7 +145,7 @@ nxt_socket_sockopt_name(nxt_uint_t level, nxt_uint_t sockopt) break; -#if (NXT_INET6 && defined IPV6_V6ONLY) +#if (NXT_INET6) case IPPROTO_IPV6: switch (sockopt) { |