diff options
author | Max Romanov <max.romanov@nginx.com> | 2017-07-18 00:21:16 +0300 |
---|---|---|
committer | Max Romanov <max.romanov@nginx.com> | 2017-07-18 00:21:16 +0300 |
commit | 803855138c3b714c088e42a32e80939a81785944 (patch) | |
tree | f19efadd82ecfb2aa42f93a1fa67c451ca690724 | |
parent | eb675f2d78178b2cdd54d934022f9b739bfa8952 (diff) | |
download | unit-803855138c3b714c088e42a32e80939a81785944.tar.gz unit-803855138c3b714c088e42a32e80939a81785944.tar.bz2 |
Mem pool cleanup introduced.
Used for connection mem pool cleanup, which can be used by buffers.
Used for port mem pool to safely destroy linked process.
-rw-r--r-- | src/nxt_main.h | 19 | ||||
-rw-r--r-- | src/nxt_master_process.c | 7 | ||||
-rw-r--r-- | src/nxt_mp.c | 36 | ||||
-rw-r--r-- | src/nxt_mp.h | 4 | ||||
-rw-r--r-- | src/nxt_port.c | 2 | ||||
-rw-r--r-- | src/nxt_process.c | 22 | ||||
-rw-r--r-- | src/nxt_process.h | 4 | ||||
-rw-r--r-- | src/nxt_router.c | 33 | ||||
-rw-r--r-- | src/nxt_runtime.c | 10 |
9 files changed, 101 insertions, 36 deletions
diff --git a/src/nxt_main.h b/src/nxt_main.h index 05cca62b..547a0321 100644 --- a/src/nxt_main.h +++ b/src/nxt_main.h @@ -10,26 +10,29 @@ #include <nxt_auto_config.h> -#include <nxt_unix.h> -#include <nxt_clang.h> -#include <nxt_types.h> -#include <nxt_time.h> -#include <nxt_mp.h> -#include <nxt_array.h> - typedef struct nxt_port_s nxt_port_t; typedef struct nxt_task_s nxt_task_t; typedef struct nxt_port_recv_msg_s nxt_port_recv_msg_t; typedef void (*nxt_port_handler_t)(nxt_task_t *task, nxt_port_recv_msg_t *msg); typedef struct nxt_sig_event_s nxt_sig_event_t; typedef struct nxt_runtime_s nxt_runtime_t; -typedef uint16_t nxt_port_id_t; typedef struct nxt_thread_s nxt_thread_t; typedef struct nxt_event_engine_s nxt_event_engine_t; typedef struct nxt_log_s nxt_log_t; typedef struct nxt_thread_pool_s nxt_thread_pool_t; +typedef void (*nxt_work_handler_t)(nxt_task_t *task, void *obj, void *data); + +#include <nxt_unix.h> +#include <nxt_clang.h> +#include <nxt_types.h> +#include <nxt_time.h> +#include <nxt_mp.h> +#include <nxt_array.h> + +typedef uint16_t nxt_port_id_t; + #include <nxt_queue.h> #include <nxt_thread_id.h> diff --git a/src/nxt_master_process.c b/src/nxt_master_process.c index 1a8b5ed8..3cb62830 100644 --- a/src/nxt_master_process.c +++ b/src/nxt_master_process.c @@ -212,13 +212,13 @@ nxt_master_process_port_create(nxt_task_t *task, nxt_runtime_t *rt) return NXT_ERROR; } - nxt_process_port_add(process, port); - ret = nxt_port_socket_init(task, port, 0); if (nxt_slow_path(ret != NXT_OK)) { return ret; } + nxt_process_port_add(task, process, port); + nxt_runtime_port_add(rt, port); /* @@ -380,12 +380,11 @@ nxt_master_create_worker_process(nxt_task_t *task, nxt_runtime_t *rt, return NXT_ERROR; } - nxt_process_port_add(process, port); + nxt_process_port_add(task, process, port); ret = nxt_port_socket_init(task, port, 0); if (nxt_slow_path(ret != NXT_OK)) { nxt_mp_release(port->mem_pool, port); - nxt_runtime_process_destroy(rt, process); return ret; } diff --git a/src/nxt_mp.c b/src/nxt_mp.c index 57fe4f09..40d12be0 100644 --- a/src/nxt_mp.c +++ b/src/nxt_mp.c @@ -114,6 +114,8 @@ struct nxt_mp_s { nxt_tid_t tid; #endif + nxt_work_t *cleanup; + /* Lists of nxt_mp_page_t. */ nxt_queue_t free_pages; nxt_queue_t nget_pages; @@ -283,6 +285,7 @@ void nxt_mp_destroy(nxt_mp_t *mp) { void *p; + nxt_work_t *work, *next_work; nxt_mp_block_t *block; nxt_rbtree_node_t *node, *next; @@ -290,6 +293,15 @@ nxt_mp_destroy(nxt_mp_t *mp) nxt_mp_thread_assert(mp); + while (mp->cleanup != NULL) { + work = mp->cleanup; + next_work = work->next; + + work->handler(work->task, work->obj, work->data); + + mp->cleanup = next_work; + } + next = nxt_rbtree_root(&mp->blocks); while (next != nxt_rbtree_sentinel(&mp->blocks)) { @@ -1022,3 +1034,27 @@ nxt_mp_zget(nxt_mp_t *mp, size_t size) return p; } + + +nxt_int_t +nxt_mp_cleanup(nxt_mp_t *mp, nxt_work_handler_t handler, + nxt_task_t *task, void *obj, void *data) +{ + nxt_work_t *work; + + work = nxt_mp_get(mp, sizeof(nxt_work_t)); + + if (nxt_slow_path(work == NULL)) { + return NXT_ERROR; + } + + work->next = mp->cleanup; + work->handler = handler; + work->task = task; + work->obj = obj; + work->data = data; + + mp->cleanup = work; + + return NXT_OK; +} diff --git a/src/nxt_mp.h b/src/nxt_mp.h index 72404e89..71a48de6 100644 --- a/src/nxt_mp.h +++ b/src/nxt_mp.h @@ -109,6 +109,10 @@ NXT_EXPORT void *nxt_mp_zget(nxt_mp_t *mp, size_t size) NXT_MALLOC_LIKE; +NXT_EXPORT nxt_int_t nxt_mp_cleanup(nxt_mp_t *mp, nxt_work_handler_t handler, + nxt_task_t *task, void *obj, void *data); + + NXT_EXPORT void nxt_mp_thread_adopt(nxt_mp_t *mp); #endif /* _NXT_MP_H_INCLUDED_ */ diff --git a/src/nxt_port.c b/src/nxt_port.c index 4c2561d3..3f3fdd19 100644 --- a/src/nxt_port.c +++ b/src/nxt_port.c @@ -245,7 +245,7 @@ nxt_port_new_port_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg) return; } - nxt_process_port_add(process, port); + nxt_process_port_add(task, process, port); port->pair[0] = -1; port->pair[1] = msg->fd; diff --git a/src/nxt_process.c b/src/nxt_process.c index 1832640d..9561fb84 100644 --- a/src/nxt_process.c +++ b/src/nxt_process.c @@ -563,11 +563,31 @@ nxt_user_cred_set(nxt_task_t *task, nxt_user_cred_t *uc) } +static void +nxt_process_port_mp_cleanup(nxt_task_t *task, void *obj, void *data) +{ + nxt_runtime_t *rt; + nxt_process_t *process; + + process = obj; + rt = data; + + process->port_cleanups--; + + if (process->port_cleanups == 0) { + nxt_runtime_process_destroy(rt, process); + } +} + void -nxt_process_port_add(nxt_process_t *process, nxt_port_t *port) +nxt_process_port_add(nxt_task_t *task, nxt_process_t *process, nxt_port_t *port) { port->process = process; nxt_queue_insert_tail(&process->ports, &port->link); + + nxt_mp_cleanup(port->mem_pool, nxt_process_port_mp_cleanup, task, process, + task->thread->runtime); + process->port_cleanups++; } diff --git a/src/nxt_process.h b/src/nxt_process.h index a647c00a..aa3aa7a5 100644 --- a/src/nxt_process.h +++ b/src/nxt_process.h @@ -57,6 +57,7 @@ typedef struct { nxt_pid_t pid; nxt_queue_t ports; /* of nxt_port_t */ nxt_bool_t ready; + nxt_uint_t port_cleanups; nxt_process_init_t *init; @@ -88,7 +89,8 @@ NXT_EXPORT void nxt_process_arguments(nxt_task_t *task, char **orig_argv, #define nxt_process_port_first(process) \ nxt_queue_link_data(nxt_queue_first(&process->ports), nxt_port_t, link) -NXT_EXPORT void nxt_process_port_add(nxt_process_t *process, nxt_port_t *port); +NXT_EXPORT void nxt_process_port_add(nxt_task_t *task, nxt_process_t *process, + nxt_port_t *port); #define nxt_process_port_each(process, port) \ nxt_queue_each(port, &process->ports, nxt_port_t, link) diff --git a/src/nxt_router.c b/src/nxt_router.c index 5ee70377..c103a861 100644 --- a/src/nxt_router.c +++ b/src/nxt_router.c @@ -27,7 +27,6 @@ struct nxt_start_worker_s { nxt_app_t *app; nxt_req_conn_link_t *rc; nxt_mp_t *mem_pool; - void *joint; nxt_work_t work; }; @@ -160,17 +159,13 @@ nxt_router_start(nxt_task_t *task, void *data) static void nxt_router_sw_release(nxt_task_t *task, void *obj, void *data) { - nxt_start_worker_t *sw; - nxt_socket_conf_joint_t *joint; + nxt_start_worker_t *sw; sw = obj; - joint = sw->joint; nxt_debug(task, "sw #%uxD release", sw->stream); - if (nxt_mp_release(sw->mem_pool, sw) == 0) { - nxt_router_conf_release(task, joint); - } + nxt_mp_release(sw->mem_pool, sw); } @@ -1861,7 +1856,6 @@ nxt_router_app_release_port(nxt_task_t *task, void *obj, void *data) nxt_app_t *app; nxt_port_t *port; nxt_work_t *work; - nxt_process_t *process; nxt_queue_link_t *lnk; nxt_req_conn_link_t *rc; @@ -1913,14 +1907,9 @@ nxt_router_app_release_port(nxt_task_t *task, void *obj, void *data) nxt_router_app_free(app); port->app = NULL; - process = port->process; nxt_port_release(port); - if (nxt_queue_is_empty(&process->ports)) { - nxt_runtime_process_destroy(task->thread->runtime, process); - } - return; } @@ -2031,7 +2020,6 @@ nxt_router_app_port(nxt_task_t *task, nxt_req_conn_link_t *rc) sw->app = app; sw->rc = rc; sw->mem_pool = c->mem_pool; - sw->joint = c->listen->socket.data; sw->work.handler = nxt_router_send_sw_request; sw->work.task = task; @@ -2350,6 +2338,17 @@ nxt_router_conn_close(nxt_task_t *task, void *obj, void *data) static void +nxt_router_conn_mp_cleanup(nxt_task_t *task, void *obj, void *data) +{ + nxt_socket_conf_joint_t *joint; + + joint = obj; + + nxt_router_conf_release(task, joint); +} + + +static void nxt_router_conn_free(nxt_task_t *task, void *obj, void *data) { nxt_conn_t *c; @@ -2380,9 +2379,9 @@ nxt_router_conn_free(nxt_task_t *task, void *obj, void *data) task = &task->thread->engine->task; - if (nxt_mp_release(c->mem_pool, c) == 0) { - nxt_router_conf_release(task, joint); - } + nxt_mp_cleanup(c->mem_pool, nxt_router_conn_mp_cleanup, task, joint, NULL); + + nxt_mp_release(c->mem_pool, c); } diff --git a/src/nxt_runtime.c b/src/nxt_runtime.c index 8b216337..31829225 100644 --- a/src/nxt_runtime.c +++ b/src/nxt_runtime.c @@ -1491,6 +1491,8 @@ nxt_runtime_process_new(nxt_runtime_t *rt) void nxt_runtime_process_destroy(nxt_runtime_t *rt, nxt_process_t *process) { + nxt_assert(process->port_cleanups == 0); + nxt_port_mmaps_destroy(process->incoming, 1); nxt_port_mmaps_destroy(process->outgoing, 1); @@ -1657,6 +1659,10 @@ nxt_runtime_process_remove(nxt_runtime_t *rt, nxt_process_t *process) case NXT_OK: rt->nprocesses--; + if (process->port_cleanups == 0) { + nxt_runtime_process_destroy(rt, process); + } + nxt_process_port_each(process, port) { nxt_runtime_port_remove(rt, port); @@ -1665,10 +1671,6 @@ nxt_runtime_process_remove(nxt_runtime_t *rt, nxt_process_t *process) } nxt_process_port_loop; - if (nxt_queue_is_empty(&process->ports)) { - nxt_runtime_process_destroy(rt, process); - } - break; default: |