diff options
author | Max Romanov <max.romanov@nginx.com> | 2017-07-07 16:01:34 +0300 |
---|---|---|
committer | Max Romanov <max.romanov@nginx.com> | 2017-07-07 16:01:34 +0300 |
commit | 74cda90e31f97ba0ebd6483f2b073c1240b93a58 (patch) | |
tree | 69727640f476822e3cc1e40ac09bf645251617be /src | |
parent | 61008a7c0b45fe55f3f611c61f81e569e6baee30 (diff) | |
download | unit-74cda90e31f97ba0ebd6483f2b073c1240b93a58.tar.gz unit-74cda90e31f97ba0ebd6483f2b073c1240b93a58.tar.bz2 |
Process stop notification from master to all other processes.
New port message type introduced NXT_PORT_MSG_REMOVE_PID. Default handler
removes process description from nxt_runtime_t with all ports, incoming and
outgoing mmaps etc.
Diffstat (limited to '')
-rw-r--r-- | src/nginext/nxt_go_port.c | 9 | ||||
-rw-r--r-- | src/nxt_master_process.c | 20 | ||||
-rw-r--r-- | src/nxt_port.c | 22 | ||||
-rw-r--r-- | src/nxt_port.h | 9 | ||||
-rw-r--r-- | src/nxt_port_socket.c | 9 | ||||
-rw-r--r-- | src/nxt_process.c | 4 | ||||
-rw-r--r-- | src/nxt_queue.h | 5 | ||||
-rw-r--r-- | src/nxt_router.c | 1 | ||||
-rw-r--r-- | src/nxt_runtime.c | 32 | ||||
-rw-r--r-- | src/nxt_runtime.h | 21 | ||||
-rw-r--r-- | src/nxt_worker_process.c | 3 |
11 files changed, 113 insertions, 22 deletions
diff --git a/src/nginext/nxt_go_port.c b/src/nginext/nxt_go_port.c index 47e46f02..58ba90ea 100644 --- a/src/nginext/nxt_go_port.c +++ b/src/nginext/nxt_go_port.c @@ -146,7 +146,7 @@ nxt_go_port_on_read(void *buf, size_t buf_size, void *oob, size_t oob_size) nxt_go_debug("using data in shared memory"); } - if (port_msg->type > NXT_PORT_MSG_MAX) { + if (port_msg->type >= NXT_PORT_MSG_MAX) { nxt_go_warn("unknown message type (%d)", (int)port_msg->type); goto fail; } @@ -181,6 +181,13 @@ nxt_go_port_on_read(void *buf, size_t buf_size, void *oob, size_t oob_size) return nxt_go_data_handler(port_msg, buf_size); + case NXT_PORT_MSG_REMOVE_PID: + nxt_go_debug("remove pid"); + + /* TODO remove all ports for this pid in Go */ + /* TODO remove incoming & outgoing mmaps for this pid */ + break; + default: goto fail; } diff --git a/src/nxt_master_process.c b/src/nxt_master_process.c index d2262a1b..c9617c9f 100644 --- a/src/nxt_master_process.c +++ b/src/nxt_master_process.c @@ -51,7 +51,7 @@ nxt_master_process_start(nxt_thread_t *thr, nxt_task_t *task, { nxt_int_t ret; - rt->type = NXT_PROCESS_MASTER; + rt->types |= (1U << NXT_PROCESS_MASTER); if (nxt_master_process_port_create(task, rt) != NXT_OK) { return NXT_ERROR; @@ -467,6 +467,7 @@ nxt_master_process_sigchld_handler(nxt_task_t *task, void *obj, void *data) static void nxt_master_cleanup_worker_process(nxt_task_t *task, nxt_pid_t pid) { + nxt_port_t *port; nxt_runtime_t *rt; nxt_process_t *process; nxt_process_init_t *init; @@ -478,10 +479,23 @@ nxt_master_cleanup_worker_process(nxt_task_t *task, nxt_pid_t pid) if (process) { init = process->init; - /* TODO: free ports fds. */ - nxt_runtime_process_remove(rt, process); + if (!nxt_exiting) { + nxt_runtime_process_each(rt, process) + { + if (process->pid == nxt_pid) { + continue; + } + + port = nxt_process_port_first(process); + + nxt_port_socket_write(task, port, NXT_PORT_MSG_REMOVE_PID, + -1, pid, 0, NULL); + } + nxt_runtime_process_loop; + } + if (nxt_exiting) { if (rt->nprocesses == 2) { diff --git a/src/nxt_port.c b/src/nxt_port.c index 249a287c..404e6424 100644 --- a/src/nxt_port.c +++ b/src/nxt_port.c @@ -52,7 +52,7 @@ nxt_port_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg) { nxt_port_handler_t *handlers; - if (nxt_fast_path(msg->port_msg.type <= NXT_PORT_MSG_MAX)) { + if (nxt_fast_path(msg->port_msg.type < NXT_PORT_MSG_MAX)) { nxt_debug(task, "port %d: message type:%uD", msg->port->socket.fd, msg->port_msg.type); @@ -292,6 +292,26 @@ nxt_port_data_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg) void +nxt_port_remove_pid_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg) +{ + nxt_pid_t pid; + nxt_runtime_t *rt; + nxt_process_t *process; + + nxt_debug(task, "port remove pid handler"); + + rt = task->thread->runtime; + pid = msg->port_msg.stream; + + process = nxt_runtime_process_find(rt, pid); + + if (process) { + nxt_runtime_process_remove(rt, process); + } +} + + +void nxt_port_empty_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg) { nxt_debug(task, "port empty handler"); diff --git a/src/nxt_port.h b/src/nxt_port.h index 109071aa..c7566426 100644 --- a/src/nxt_port.h +++ b/src/nxt_port.h @@ -14,9 +14,10 @@ typedef enum { NXT_PORT_MSG_CHANGE_FILE, NXT_PORT_MSG_MMAP, NXT_PORT_MSG_DATA, -} nxt_port_msg_type_t; + NXT_PORT_MSG_REMOVE_PID, -#define NXT_PORT_MSG_MAX NXT_PORT_MSG_DATA + NXT_PORT_MSG_MAX, +} nxt_port_msg_type_t; /* Passed as a first iov chunk. */ @@ -56,10 +57,9 @@ struct nxt_port_recv_msg_s { struct nxt_port_s { - /* Must be the first field. */ nxt_fd_event_t socket; - nxt_queue_link_t link; + nxt_queue_link_t link; /* for nxt_process_t.ports */ nxt_queue_t messages; /* of nxt_port_send_msg_t */ @@ -132,6 +132,7 @@ void nxt_port_change_log_file_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg); void nxt_port_mmap_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg); void nxt_port_data_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg); +void nxt_port_remove_pid_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg); void nxt_port_empty_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg); diff --git a/src/nxt_port_socket.c b/src/nxt_port_socket.c index 73bb44bd..883c45bb 100644 --- a/src/nxt_port_socket.c +++ b/src/nxt_port_socket.c @@ -222,7 +222,7 @@ nxt_port_socket_write(nxt_task_t *task, nxt_port_t *port, nxt_uint_t type, nxt_queue_insert_tail(&port->messages, &msg->link); if (port->socket.write_ready) { - nxt_port_write_handler(task, port, NULL); + nxt_port_write_handler(task, &port->socket, NULL); } return NXT_OK; @@ -236,14 +236,14 @@ nxt_port_write_handler(nxt_task_t *task, void *obj, void *data) nxt_port_t *port; struct iovec iov[NXT_IOBUF_MAX * 10]; nxt_queue_link_t *link; + nxt_port_method_t m; nxt_port_send_msg_t *msg; nxt_sendbuf_coalesce_t sb; - nxt_port_method_t m; size_t plain_size; nxt_buf_t *plain_buf; - port = obj; + port = nxt_container_of(obj, nxt_port_t, socket); do { link = nxt_queue_first(&port->messages); @@ -389,7 +389,7 @@ nxt_port_read_handler(nxt_task_t *task, void *obj, void *data) struct iovec iov[2]; nxt_port_recv_msg_t msg; - port = msg.port = obj; + port = msg.port = nxt_container_of(obj, nxt_port_t, socket); for ( ;; ) { @@ -522,5 +522,6 @@ nxt_port_buf_free(nxt_port_t *port, nxt_buf_t *b) static void nxt_port_error_handler(nxt_task_t *task, void *obj, void *data) { + nxt_debug(task, "port error handler %p", obj); /* TODO */ } diff --git a/src/nxt_process.c b/src/nxt_process.c index 33af310d..97b51c4e 100644 --- a/src/nxt_process.c +++ b/src/nxt_process.c @@ -46,6 +46,8 @@ nxt_process_create(nxt_task_t *task, nxt_process_t *process) process->pid = nxt_pid; process->init->port->pid = nxt_pid; + rt->types = 0; + nxt_runtime_process_add(rt, process); nxt_process_start(task, process->init); @@ -94,7 +96,7 @@ nxt_process_start(nxt_task_t *task, nxt_process_init_t *process) thread = task->thread; rt = thread->runtime; - rt->type = process->type; + rt->types |= (1U << process->type); engine = thread->engine; diff --git a/src/nxt_queue.h b/src/nxt_queue.h index e107b6aa..44e9ad61 100644 --- a/src/nxt_queue.h +++ b/src/nxt_queue.h @@ -222,12 +222,13 @@ NXT_EXPORT void nxt_queue_sort(nxt_queue_t *queue, #define nxt_queue_each(elt, queue, type, link) \ do { \ - nxt_queue_link_t *_lnk; \ + nxt_queue_link_t *_lnk, *_nxt; \ \ for (_lnk = nxt_queue_first(queue); \ _lnk != nxt_queue_tail(queue); \ - _lnk = nxt_queue_next(_lnk)) { \ + _lnk = _nxt) { \ \ + _nxt = nxt_queue_next(_lnk); \ elt = nxt_queue_link_data(_lnk, type, link); \ #define nxt_queue_loop \ diff --git a/src/nxt_router.c b/src/nxt_router.c index 44dbf200..9753d289 100644 --- a/src/nxt_router.c +++ b/src/nxt_router.c @@ -899,6 +899,7 @@ static nxt_port_handler_t nxt_router_app_port_handlers[] = { nxt_port_change_log_file_handler, nxt_port_mmap_handler, nxt_router_app_data_handler, + nxt_port_remove_pid_handler, }; diff --git a/src/nxt_runtime.c b/src/nxt_runtime.c index c707cafd..9d7a3fd7 100644 --- a/src/nxt_runtime.c +++ b/src/nxt_runtime.c @@ -445,7 +445,7 @@ nxt_single_process_start(nxt_thread_t *thr, nxt_task_t *task, nxt_runtime_t *rt) #endif - rt->type = NXT_PROCESS_SINGLE; + rt->types |= (1U << NXT_PROCESS_SINGLE); nxt_runtime_listen_sockets_enable(task, rt); @@ -479,7 +479,7 @@ nxt_runtime_quit(nxt_task_t *task) #endif - if (rt->type == NXT_PROCESS_MASTER) { + if (nxt_runtime_is_master(rt)) { nxt_master_stop_worker_processes(task, rt); done = 0; } @@ -523,7 +523,8 @@ nxt_runtime_close_idle_connections(nxt_event_engine_t *engine) static void nxt_runtime_exit(nxt_task_t *task, void *obj, void *data) { - nxt_runtime_t *rt; + nxt_runtime_t *rt; + nxt_process_t *process; nxt_event_engine_t *engine; rt = obj; @@ -539,7 +540,7 @@ nxt_runtime_exit(nxt_task_t *task, void *obj, void *data) #endif - if (rt->type <= NXT_PROCESS_MASTER) { + if (nxt_runtime_is_master(rt)) { if (rt->pid_file != NULL) { nxt_file_delete(rt->pid_file); } @@ -549,6 +550,14 @@ nxt_runtime_exit(nxt_task_t *task, void *obj, void *data) nxt_event_engine_signals_stop(engine); } + nxt_runtime_process_each(rt, process) { + + nxt_runtime_process_remove(rt, process); + + } nxt_runtime_process_loop; + + nxt_mp_destroy(rt->mem_pool); + nxt_debug(task, "exit"); exit(0); @@ -1642,6 +1651,7 @@ nxt_runtime_process_remove(nxt_runtime_t *rt, nxt_process_t *process) } nxt_process_port_loop; + nxt_mp_free(rt->mem_pool, process); break; default: @@ -1683,6 +1693,20 @@ nxt_runtime_port_remove(nxt_runtime_t *rt, nxt_port_t *port) /* TODO lock ports */ nxt_port_hash_remove(&rt->ports, rt->mem_pool, port); + + if (port->pair[0] != -1) { + nxt_fd_close(port->pair[0]); + } + + if (port->pair[1] != -1) { + nxt_fd_close(port->pair[1]); + } + + if (port->mem_pool) { + nxt_mp_destroy(port->mem_pool); + } + + nxt_mp_free(port->process->mem_pool, port); } diff --git a/src/nxt_runtime.h b/src/nxt_runtime.h index 8316d1e3..46e8e9d2 100644 --- a/src/nxt_runtime.h +++ b/src/nxt_runtime.h @@ -45,7 +45,7 @@ struct nxt_runtime_s { uint32_t last_engine_id; - nxt_process_type_t type; + uint32_t types; /* bitset of nxt_process_type_t */ nxt_timer_t timer; @@ -84,6 +84,20 @@ nxt_int_t nxt_runtime_thread_pool_create(nxt_thread_t *thr, nxt_runtime_t *rt, #endif +nxt_inline nxt_bool_t +nxt_runtime_is_type(nxt_runtime_t *rt, nxt_process_type_t type) +{ + return (rt->types & (1U << type)) != 0; +} + + +nxt_inline nxt_bool_t +nxt_runtime_is_master(nxt_runtime_t *rt) +{ + return nxt_runtime_is_type(rt, NXT_PROCESS_MASTER); +} + + nxt_process_t *nxt_runtime_process_new(nxt_runtime_t *rt); nxt_process_t *nxt_runtime_process_get(nxt_runtime_t *rt, nxt_pid_t pid); @@ -141,10 +155,13 @@ void nxt_port_app_data_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg); #define nxt_runtime_process_each(rt, process) \ do { \ nxt_lvlhsh_each_t _lhe; \ + nxt_process_t *_nxt; \ \ for (process = nxt_runtime_process_first(rt, &_lhe); \ process != NULL; \ - process = nxt_runtime_process_next(rt, &_lhe)) { \ + process = _nxt) { \ + \ + _nxt = nxt_runtime_process_next(rt, &_lhe); \ #define nxt_runtime_process_loop \ } \ diff --git a/src/nxt_worker_process.c b/src/nxt_worker_process.c index b0c68160..cdc1b1c5 100644 --- a/src/nxt_worker_process.c +++ b/src/nxt_worker_process.c @@ -28,6 +28,7 @@ nxt_port_handler_t nxt_worker_process_port_handlers[] = { nxt_port_change_log_file_handler, nxt_port_mmap_handler, nxt_port_data_handler, + nxt_port_remove_pid_handler, }; @@ -37,6 +38,7 @@ nxt_port_handler_t nxt_app_process_port_handlers[] = { nxt_port_change_log_file_handler, nxt_port_mmap_handler, nxt_port_app_data_handler, + nxt_port_remove_pid_handler, }; @@ -46,6 +48,7 @@ nxt_port_handler_t nxt_router_process_port_handlers[] = { nxt_port_change_log_file_handler, nxt_port_mmap_handler, nxt_router_conf_data_handler, + nxt_port_remove_pid_handler, }; |