diff options
author | Igor Sysoev <igor@sysoev.ru> | 2017-07-11 20:14:50 +0300 |
---|---|---|
committer | Igor Sysoev <igor@sysoev.ru> | 2017-07-11 20:14:50 +0300 |
commit | 4f4061061b023cd88a2c54121853a8cb1922f100 (patch) | |
tree | f4534cc4da2f43c866570ed8f066ee7ec736be79 | |
parent | 3bccb7f358e27f932c951a55bc17aa113bfae5f0 (diff) | |
download | unit-4f4061061b023cd88a2c54121853a8cb1922f100.tar.gz unit-4f4061061b023cd88a2c54121853a8cb1922f100.tar.bz2 |
Sending a result of configuration applying back to the controller.
Diffstat (limited to '')
-rw-r--r-- | src/nxt_router.c | 325 | ||||
-rw-r--r-- | src/nxt_router.h | 8 |
2 files changed, 230 insertions, 103 deletions
diff --git a/src/nxt_router.c b/src/nxt_router.c index 8c5236be..28765deb 100644 --- a/src/nxt_router.c +++ b/src/nxt_router.c @@ -20,8 +20,17 @@ typedef struct { } nxt_router_listener_conf_t; -static nxt_router_temp_conf_t *nxt_router_temp_conf(nxt_task_t *task, - nxt_router_t *router); +static nxt_router_temp_conf_t *nxt_router_temp_conf(nxt_task_t *task); +static nxt_int_t nxt_router_conf_new(nxt_task_t *task, + nxt_router_temp_conf_t *tmcf, u_char *start, u_char *end); +static void nxt_router_conf_success(nxt_task_t *task, + nxt_router_temp_conf_t *tmcf); +static void nxt_router_conf_error(nxt_task_t *task, + nxt_router_temp_conf_t *tmcf); +static void nxt_router_conf_send(nxt_task_t *task, + nxt_router_temp_conf_t *tmcf, u_char *start, size_t size); +static void nxt_router_conf_buf_completion(nxt_task_t *task, void *obj, + void *data); static void nxt_router_listen_sockets_sort(nxt_router_t *router, nxt_router_temp_conf_t *tmcf); @@ -46,10 +55,10 @@ static nxt_int_t nxt_router_engine_conf_delete(nxt_router_temp_conf_t *tmcf, nxt_router_engine_conf_t *recf); static void nxt_router_engine_socket_count(nxt_queue_t *sockets); static nxt_int_t nxt_router_engine_joints_create(nxt_mp_t *mp, - nxt_router_engine_conf_t *recf, nxt_queue_t *sockets, nxt_array_t *array, - nxt_work_handler_t handler); -static nxt_int_t nxt_router_engine_joints_delete(nxt_router_engine_conf_t *recf, - nxt_queue_t *sockets); + nxt_router_temp_conf_t *tmcf, nxt_router_engine_conf_t *recf, + nxt_queue_t *sockets, nxt_array_t *array, nxt_work_handler_t handler); +static nxt_int_t nxt_router_engine_joints_delete(nxt_router_temp_conf_t *tmcf, + nxt_router_engine_conf_t *recf, nxt_queue_t *sockets); static nxt_int_t nxt_router_threads_create(nxt_task_t *task, nxt_runtime_t *rt, nxt_router_temp_conf_t *tmcf); @@ -59,7 +68,8 @@ static void nxt_router_apps_sort(nxt_router_t *router, nxt_router_temp_conf_t *tmcf); static void nxt_router_engines_post(nxt_router_temp_conf_t *tmcf); -static void nxt_router_engine_post(nxt_router_engine_conf_t *recf); +static void nxt_router_engine_post(nxt_router_temp_conf_t *tmcf, + nxt_router_engine_conf_t *recf); static void nxt_router_thread_start(void *data); static void nxt_router_listen_socket_create(nxt_task_t *task, void *obj, @@ -117,58 +127,51 @@ nxt_router_start(nxt_task_t *task, nxt_runtime_t *rt) } -nxt_int_t -nxt_router_new_conf(nxt_task_t *task, nxt_runtime_t *rt, nxt_router_t *router, - u_char *start, u_char *end) +void +nxt_router_conf_data_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg) { - nxt_int_t ret; - nxt_router_temp_conf_t *tmcf; - const nxt_event_interface_t *interface; - - tmcf = nxt_router_temp_conf(task, router); - if (nxt_slow_path(tmcf == NULL)) { - return NXT_ERROR; - } + size_t dump_size; + nxt_buf_t *b; + nxt_int_t ret; + nxt_router_temp_conf_t *tmcf; - ret = nxt_router_conf_create(task, tmcf, start, end); - if (nxt_slow_path(ret != NXT_OK)) { - return ret; - } + b = msg->buf; - nxt_router_listen_sockets_sort(router, tmcf); + dump_size = nxt_buf_used_size(b); - ret = nxt_router_listen_sockets_stub_create(task, tmcf); - if (nxt_slow_path(ret != NXT_OK)) { - return ret; + if (dump_size > 300) { + dump_size = 300; } - interface = nxt_service_get(rt->services, "engine", NULL); + nxt_debug(task, "router conf data (%z): %*s", + msg->size, dump_size, b->mem.pos); - ret = nxt_router_engines_create(task, router, tmcf, interface); - if (nxt_slow_path(ret != NXT_OK)) { - return ret; + tmcf = nxt_router_temp_conf(task); + if (nxt_slow_path(tmcf == NULL)) { + return; } - ret = nxt_router_threads_create(task, rt, tmcf); - if (nxt_slow_path(ret != NXT_OK)) { - return ret; - } + tmcf->conf->router = nxt_router; + tmcf->stream = msg->port_msg.stream; + tmcf->port = nxt_runtime_port_find(task->thread->runtime, + msg->port_msg.pid, 0); - nxt_router_apps_sort(router, tmcf); + ret = nxt_router_conf_new(task, tmcf, b->mem.pos, b->mem.free); - nxt_router_engines_post(tmcf); + b->mem.pos = b->mem.free; - nxt_queue_add(&router->sockets, &tmcf->updating); - nxt_queue_add(&router->sockets, &tmcf->creating); + if (ret == NXT_OK) { + return nxt_router_conf_success(task, tmcf); + } -// nxt_mp_destroy(tmcf->mem_pool); + nxt_log(task, NXT_LOG_CRIT, "failed to apply new conf"); - return NXT_OK; + return nxt_router_conf_error(task, tmcf); } static nxt_router_temp_conf_t * -nxt_router_temp_conf(nxt_task_t *task, nxt_router_t *router) +nxt_router_temp_conf(nxt_task_t *task) { nxt_mp_t *mp, *tmp; nxt_router_conf_t *rtcf; @@ -185,7 +188,6 @@ nxt_router_temp_conf(nxt_task_t *task, nxt_router_t *router) } rtcf->mem_pool = mp; - rtcf->router = router; rtcf->count = 1; tmp = nxt_mp_create(1024, 128, 256, 32); @@ -200,6 +202,8 @@ nxt_router_temp_conf(nxt_task_t *task, nxt_router_t *router) tmcf->mem_pool = tmp; tmcf->conf = rtcf; + tmcf->count = 1; + tmcf->engine = task->thread->engine; tmcf->engines = nxt_array_create(tmcf->mem_pool, 4, sizeof(nxt_router_engine_conf_t)); @@ -229,6 +233,116 @@ fail: } +static nxt_int_t +nxt_router_conf_new(nxt_task_t *task, nxt_router_temp_conf_t *tmcf, + u_char *start, u_char *end) +{ + nxt_int_t ret; + nxt_router_t *router; + nxt_runtime_t *rt; + const nxt_event_interface_t *interface; + + ret = nxt_router_conf_create(task, tmcf, start, end); + if (nxt_slow_path(ret != NXT_OK)) { + return ret; + } + + router = tmcf->conf->router; + + nxt_router_listen_sockets_sort(router, tmcf); + + ret = nxt_router_listen_sockets_stub_create(task, tmcf); + if (nxt_slow_path(ret != NXT_OK)) { + return ret; + } + + rt = task->thread->runtime; + + interface = nxt_service_get(rt->services, "engine", NULL); + + ret = nxt_router_engines_create(task, router, tmcf, interface); + if (nxt_slow_path(ret != NXT_OK)) { + return ret; + } + + ret = nxt_router_threads_create(task, rt, tmcf); + if (nxt_slow_path(ret != NXT_OK)) { + return ret; + } + + nxt_router_apps_sort(router, tmcf); + + nxt_router_engines_post(tmcf); + + nxt_queue_add(&router->sockets, &tmcf->updating); + nxt_queue_add(&router->sockets, &tmcf->creating); + + return NXT_OK; +} + + +static void +nxt_router_conf_wait(nxt_task_t *task, void *obj, void *data) +{ + nxt_router_temp_conf_t *tmcf; + + tmcf = obj; + + nxt_router_conf_success(task, tmcf); +} + + +static void +nxt_router_conf_success(nxt_task_t *task, nxt_router_temp_conf_t *tmcf) +{ + nxt_debug(task, "temp conf count:%D", tmcf->count); + + if (--tmcf->count == 0) { + nxt_router_conf_send(task, tmcf, (u_char *) "OK", 2); + } +} + + +static void +nxt_router_conf_error(nxt_task_t *task, nxt_router_temp_conf_t *tmcf) +{ + nxt_mp_destroy(tmcf->conf->mem_pool); + + nxt_router_conf_send(task, tmcf, (u_char *) "ERROR", 5); +} + + +static void +nxt_router_conf_send(nxt_task_t *task, nxt_router_temp_conf_t *tmcf, + u_char *start, size_t size) +{ + nxt_buf_t *b; + + b = nxt_buf_mem_alloc(tmcf->mem_pool, size, 0); + if (nxt_slow_path(b == NULL)) { + return; + } + + b->parent = tmcf->mem_pool; + b->completion_handler = nxt_router_conf_buf_completion; + + nxt_port_socket_write(task, tmcf->port, NXT_PORT_MSG_DATA, -1, + tmcf->stream, 0, b); +} + + +static void +nxt_router_conf_buf_completion(nxt_task_t *task, void *obj, void *data) +{ + nxt_mp_t *mp; + + /* nxt_router_temp_conf_t mem pool. */ + mp = data; + + nxt_mp_destroy(mp); +} + + static nxt_conf_map_t nxt_router_conf[] = { { nxt_string("listeners_threads"), @@ -683,8 +797,6 @@ nxt_router_engines_create(nxt_task_t *task, nxt_router_t *router, } recf->engine = nxt_queue_link_data(qlk, nxt_event_engine_t, link0); - // STUB - recf->task = recf->engine->task; if (n < threads) { ret = nxt_router_engine_conf_update(tmcf, recf); @@ -712,8 +824,6 @@ nxt_router_engines_create(nxt_task_t *task, nxt_router_t *router, if (nxt_slow_path(recf->engine == NULL)) { return NXT_ERROR; } - // STUB - recf->task = recf->engine->task; ret = nxt_router_engine_conf_create(tmcf, recf); if (nxt_slow_path(ret != NXT_OK)) { @@ -744,13 +854,13 @@ nxt_router_engine_conf_create(nxt_router_temp_conf_t *tmcf, mp = tmcf->conf->mem_pool; - ret = nxt_router_engine_joints_create(mp, recf, &tmcf->creating, + ret = nxt_router_engine_joints_create(mp, tmcf, recf, &tmcf->creating, recf->creating, nxt_router_listen_socket_create); if (nxt_slow_path(ret != NXT_OK)) { return ret; } - ret = nxt_router_engine_joints_create(mp, recf, &tmcf->updating, + ret = nxt_router_engine_joints_create(mp, tmcf, recf, &tmcf->updating, recf->creating, nxt_router_listen_socket_create); if (nxt_slow_path(ret != NXT_OK)) { return ret; @@ -784,7 +894,7 @@ nxt_router_engine_conf_update(nxt_router_temp_conf_t *tmcf, mp = tmcf->conf->mem_pool; - ret = nxt_router_engine_joints_create(mp, recf, &tmcf->creating, + ret = nxt_router_engine_joints_create(mp, tmcf, recf, &tmcf->creating, recf->creating, nxt_router_listen_socket_create); if (nxt_slow_path(ret != NXT_OK)) { return ret; @@ -795,7 +905,7 @@ nxt_router_engine_conf_update(nxt_router_temp_conf_t *tmcf, return NXT_ERROR; } - ret = nxt_router_engine_joints_create(mp, recf, &tmcf->updating, + ret = nxt_router_engine_joints_create(mp, tmcf, recf, &tmcf->updating, recf->updating, nxt_router_listen_socket_update); if (nxt_slow_path(ret != NXT_OK)) { return ret; @@ -806,7 +916,7 @@ nxt_router_engine_conf_update(nxt_router_temp_conf_t *tmcf, return NXT_ERROR; } - ret = nxt_router_engine_joints_delete(recf, &tmcf->deleting); + ret = nxt_router_engine_joints_delete(tmcf, recf, &tmcf->deleting); if (nxt_slow_path(ret != NXT_OK)) { return ret; } @@ -834,21 +944,21 @@ nxt_router_engine_conf_delete(nxt_router_temp_conf_t *tmcf, return NXT_ERROR; } - ret = nxt_router_engine_joints_delete(recf, &tmcf->updating); + ret = nxt_router_engine_joints_delete(tmcf, recf, &tmcf->updating); if (nxt_slow_path(ret != NXT_OK)) { return ret; } - return nxt_router_engine_joints_delete(recf, &tmcf->deleting); + return nxt_router_engine_joints_delete(tmcf, recf, &tmcf->deleting); } static nxt_int_t -nxt_router_engine_joints_create(nxt_mp_t *mp, nxt_router_engine_conf_t *recf, - nxt_queue_t *sockets, nxt_array_t *array, +nxt_router_engine_joints_create(nxt_mp_t *mp, nxt_router_temp_conf_t *tmcf, + nxt_router_engine_conf_t *recf, nxt_queue_t *sockets, nxt_array_t *array, nxt_work_handler_t handler) { - nxt_work_t *work; + nxt_work_t *work, *back; nxt_queue_link_t *qlk; nxt_socket_conf_joint_t *joint; @@ -856,6 +966,17 @@ nxt_router_engine_joints_create(nxt_mp_t *mp, nxt_router_engine_conf_t *recf, qlk != nxt_queue_tail(sockets); qlk = nxt_queue_next(qlk)) { + back = nxt_mp_get(tmcf->mem_pool, sizeof(nxt_work_t)); + if (nxt_slow_path(back == NULL)) { + return NXT_ERROR; + } + + back->next = NULL; + back->handler = nxt_router_conf_wait; + back->task = &tmcf->engine->task; + back->obj = tmcf; + back->data = NULL; + work = nxt_array_add(array); if (nxt_slow_path(work == NULL)) { return NXT_ERROR; @@ -863,8 +984,8 @@ nxt_router_engine_joints_create(nxt_mp_t *mp, nxt_router_engine_conf_t *recf, work->next = NULL; work->handler = handler; - work->task = &recf->task; - work->obj = recf->engine; + work->task = &recf->engine->task; + work->obj = back; joint = nxt_mp_alloc(mp, sizeof(nxt_socket_conf_joint_t)); if (nxt_slow_path(joint == NULL)) { @@ -901,16 +1022,27 @@ nxt_router_engine_socket_count(nxt_queue_t *sockets) static nxt_int_t -nxt_router_engine_joints_delete(nxt_router_engine_conf_t *recf, - nxt_queue_t *sockets) +nxt_router_engine_joints_delete(nxt_router_temp_conf_t *tmcf, + nxt_router_engine_conf_t *recf, nxt_queue_t *sockets) { - nxt_work_t *work; + nxt_work_t *work, *back; nxt_queue_link_t *qlk; for (qlk = nxt_queue_first(sockets); qlk != nxt_queue_tail(sockets); qlk = nxt_queue_next(qlk)) { + back = nxt_mp_get(tmcf->mem_pool, sizeof(nxt_work_t)); + if (nxt_slow_path(back == NULL)) { + return NXT_ERROR; + } + + back->next = NULL; + back->handler = nxt_router_conf_wait; + back->task = &tmcf->engine->task; + back->obj = tmcf; + back->data = NULL; + work = nxt_array_add(recf->deleting); if (nxt_slow_path(work == NULL)) { return NXT_ERROR; @@ -918,8 +1050,8 @@ nxt_router_engine_joints_delete(nxt_router_engine_conf_t *recf, work->next = NULL; work->handler = nxt_router_listen_socket_delete; - work->task = &recf->task; - work->obj = recf->engine; + work->task = &recf->engine->task; + work->obj = back; work->data = nxt_queue_link_data(qlk, nxt_socket_conf_t, link); } @@ -1037,14 +1169,15 @@ nxt_router_engines_post(nxt_router_temp_conf_t *tmcf) recf = tmcf->engines->elts; for (n = tmcf->engines->nelts; n != 0; n--) { - nxt_router_engine_post(recf); + nxt_router_engine_post(tmcf, recf); recf++; } } static void -nxt_router_engine_post(nxt_router_engine_conf_t *recf) +nxt_router_engine_post(nxt_router_temp_conf_t *tmcf, + nxt_router_engine_conf_t *recf) { nxt_uint_t n; nxt_work_t *work; @@ -1055,6 +1188,7 @@ nxt_router_engine_post(nxt_router_engine_conf_t *recf) for (n = recf->creating->nelts; n != 0; n--) { nxt_event_engine_post(recf->engine, work); work++; + tmcf->count++; } } @@ -1064,6 +1198,7 @@ nxt_router_engine_post(nxt_router_engine_conf_t *recf) for (n = recf->updating->nelts; n != 0; n--) { nxt_event_engine_post(recf->engine, work); work++; + tmcf->count++; } } @@ -1073,6 +1208,7 @@ nxt_router_engine_post(nxt_router_engine_conf_t *recf) for (n = recf->deleting->nelts; n != 0; n--) { nxt_event_engine_post(recf->engine, work); work++; + tmcf->count++; } } } @@ -1128,10 +1264,13 @@ nxt_router_thread_start(void *data) static void nxt_router_listen_socket_create(nxt_task_t *task, void *obj, void *data) { + nxt_work_t *work; nxt_listen_event_t *listen; nxt_listen_socket_t *ls; + nxt_router_temp_conf_t *tmcf; nxt_socket_conf_joint_t *joint; + work = obj; joint = data; ls = &joint->socket_conf->listen; @@ -1143,6 +1282,9 @@ nxt_router_listen_socket_create(nxt_task_t *task, void *obj, void *data) } listen->socket.data = joint; + + tmcf = work->obj; + nxt_event_engine_post(tmcf->engine, work); } @@ -1174,19 +1316,26 @@ nxt_router_listen_event(nxt_queue_t *listen_connections, static void nxt_router_listen_socket_update(nxt_task_t *task, void *obj, void *data) { + nxt_work_t *work; nxt_event_engine_t *engine; nxt_listen_event_t *listen; + nxt_router_temp_conf_t *tmcf; nxt_socket_conf_joint_t *joint, *old; - engine = obj; + work = obj; joint = data; + engine = task->thread->engine; + listen = nxt_router_listen_event(&engine->listen_connections, joint->socket_conf); old = listen->socket.data; listen->socket.data = joint; + tmcf = work->obj; + nxt_event_engine_post(tmcf->engine, work); + nxt_router_conf_release(task, old); } @@ -1194,13 +1343,17 @@ nxt_router_listen_socket_update(nxt_task_t *task, void *obj, void *data) static void nxt_router_listen_socket_delete(nxt_task_t *task, void *obj, void *data) { - nxt_socket_conf_t *skcf; - nxt_listen_event_t *listen; - nxt_event_engine_t *engine; + nxt_work_t *work; + nxt_socket_conf_t *skcf; + nxt_listen_event_t *listen; + nxt_event_engine_t *engine; + nxt_router_temp_conf_t *tmcf; - engine = obj; + work = obj; skcf = data; + engine = task->thread->engine; + listen = nxt_router_listen_event(&engine->listen_connections, skcf); nxt_fd_event_delete(engine, &listen->socket); @@ -1209,6 +1362,9 @@ nxt_router_listen_socket_delete(nxt_task_t *task, void *obj, void *data) listen->timer.work_queue = &engine->fast_work_queue; nxt_timer_add(engine, &listen->timer, 0); + + tmcf = work->obj; + nxt_event_engine_post(tmcf->engine, work); } @@ -1390,35 +1546,6 @@ static const nxt_conn_state_t nxt_router_conn_write_state }; -void -nxt_router_conf_data_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg) -{ - size_t dump_size; - nxt_buf_t *b; - nxt_int_t ret; - - b = msg->buf; - - dump_size = nxt_buf_used_size(b); - - if (dump_size > 300) { - dump_size = 300; - } - - nxt_debug(task, "router conf data (%z): %*s", - msg->size, dump_size, b->mem.pos); - - ret = nxt_router_new_conf(task, task->thread->runtime, nxt_router, - b->mem.pos, b->mem.free); - - b->mem.pos = b->mem.free; - - if (nxt_slow_path(ret != NXT_OK)) { - nxt_log_alert(task->log, "Failed to apply new conf"); - } -} - - static void nxt_router_app_data_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg) { diff --git a/src/nxt_router.h b/src/nxt_router.h index ff5a7c41..99e70559 100644 --- a/src/nxt_router.h +++ b/src/nxt_router.h @@ -33,7 +33,6 @@ typedef struct { typedef struct { nxt_event_engine_t *engine; - nxt_task_t task; nxt_array_t *creating; /* of nxt_work_t */ nxt_array_t *updating; /* of nxt_work_t */ nxt_array_t *deleting; /* of nxt_work_t */ @@ -51,7 +50,11 @@ typedef struct { nxt_queue_t previous; /* of nxt_app_t */ uint32_t new_threads; + uint32_t stream; + uint32_t count; + nxt_event_engine_t *engine; + nxt_port_t *port; nxt_array_t *engines; nxt_router_conf_t *conf; nxt_mp_t *mem_pool; @@ -108,9 +111,6 @@ typedef struct { } nxt_socket_conf_joint_t; -nxt_int_t nxt_router_new_conf(nxt_task_t *task, nxt_runtime_t *rt, - nxt_router_t *router, u_char *start, u_char *end); - void nxt_router_conf_data_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg); |