summaryrefslogtreecommitdiffhomepage
diff options
context:
space:
mode:
authorMax Romanov <max.romanov@nginx.com>2017-07-18 00:21:16 +0300
committerMax Romanov <max.romanov@nginx.com>2017-07-18 00:21:16 +0300
commit803855138c3b714c088e42a32e80939a81785944 (patch)
treef19efadd82ecfb2aa42f93a1fa67c451ca690724
parenteb675f2d78178b2cdd54d934022f9b739bfa8952 (diff)
downloadunit-803855138c3b714c088e42a32e80939a81785944.tar.gz
unit-803855138c3b714c088e42a32e80939a81785944.tar.bz2
Mem pool cleanup introduced.
Used for connection mem pool cleanup, which can be used by buffers. Used for port mem pool to safely destroy linked process.
-rw-r--r--src/nxt_main.h19
-rw-r--r--src/nxt_master_process.c7
-rw-r--r--src/nxt_mp.c36
-rw-r--r--src/nxt_mp.h4
-rw-r--r--src/nxt_port.c2
-rw-r--r--src/nxt_process.c22
-rw-r--r--src/nxt_process.h4
-rw-r--r--src/nxt_router.c33
-rw-r--r--src/nxt_runtime.c10
9 files changed, 101 insertions, 36 deletions
diff --git a/src/nxt_main.h b/src/nxt_main.h
index 05cca62b..547a0321 100644
--- a/src/nxt_main.h
+++ b/src/nxt_main.h
@@ -10,26 +10,29 @@
#include <nxt_auto_config.h>
-#include <nxt_unix.h>
-#include <nxt_clang.h>
-#include <nxt_types.h>
-#include <nxt_time.h>
-#include <nxt_mp.h>
-#include <nxt_array.h>
-
typedef struct nxt_port_s nxt_port_t;
typedef struct nxt_task_s nxt_task_t;
typedef struct nxt_port_recv_msg_s nxt_port_recv_msg_t;
typedef void (*nxt_port_handler_t)(nxt_task_t *task, nxt_port_recv_msg_t *msg);
typedef struct nxt_sig_event_s nxt_sig_event_t;
typedef struct nxt_runtime_s nxt_runtime_t;
-typedef uint16_t nxt_port_id_t;
typedef struct nxt_thread_s nxt_thread_t;
typedef struct nxt_event_engine_s nxt_event_engine_t;
typedef struct nxt_log_s nxt_log_t;
typedef struct nxt_thread_pool_s nxt_thread_pool_t;
+typedef void (*nxt_work_handler_t)(nxt_task_t *task, void *obj, void *data);
+
+#include <nxt_unix.h>
+#include <nxt_clang.h>
+#include <nxt_types.h>
+#include <nxt_time.h>
+#include <nxt_mp.h>
+#include <nxt_array.h>
+
+typedef uint16_t nxt_port_id_t;
+
#include <nxt_queue.h>
#include <nxt_thread_id.h>
diff --git a/src/nxt_master_process.c b/src/nxt_master_process.c
index 1a8b5ed8..3cb62830 100644
--- a/src/nxt_master_process.c
+++ b/src/nxt_master_process.c
@@ -212,13 +212,13 @@ nxt_master_process_port_create(nxt_task_t *task, nxt_runtime_t *rt)
return NXT_ERROR;
}
- nxt_process_port_add(process, port);
-
ret = nxt_port_socket_init(task, port, 0);
if (nxt_slow_path(ret != NXT_OK)) {
return ret;
}
+ nxt_process_port_add(task, process, port);
+
nxt_runtime_port_add(rt, port);
/*
@@ -380,12 +380,11 @@ nxt_master_create_worker_process(nxt_task_t *task, nxt_runtime_t *rt,
return NXT_ERROR;
}
- nxt_process_port_add(process, port);
+ nxt_process_port_add(task, process, port);
ret = nxt_port_socket_init(task, port, 0);
if (nxt_slow_path(ret != NXT_OK)) {
nxt_mp_release(port->mem_pool, port);
- nxt_runtime_process_destroy(rt, process);
return ret;
}
diff --git a/src/nxt_mp.c b/src/nxt_mp.c
index 57fe4f09..40d12be0 100644
--- a/src/nxt_mp.c
+++ b/src/nxt_mp.c
@@ -114,6 +114,8 @@ struct nxt_mp_s {
nxt_tid_t tid;
#endif
+ nxt_work_t *cleanup;
+
/* Lists of nxt_mp_page_t. */
nxt_queue_t free_pages;
nxt_queue_t nget_pages;
@@ -283,6 +285,7 @@ void
nxt_mp_destroy(nxt_mp_t *mp)
{
void *p;
+ nxt_work_t *work, *next_work;
nxt_mp_block_t *block;
nxt_rbtree_node_t *node, *next;
@@ -290,6 +293,15 @@ nxt_mp_destroy(nxt_mp_t *mp)
nxt_mp_thread_assert(mp);
+ while (mp->cleanup != NULL) {
+ work = mp->cleanup;
+ next_work = work->next;
+
+ work->handler(work->task, work->obj, work->data);
+
+ mp->cleanup = next_work;
+ }
+
next = nxt_rbtree_root(&mp->blocks);
while (next != nxt_rbtree_sentinel(&mp->blocks)) {
@@ -1022,3 +1034,27 @@ nxt_mp_zget(nxt_mp_t *mp, size_t size)
return p;
}
+
+
+nxt_int_t
+nxt_mp_cleanup(nxt_mp_t *mp, nxt_work_handler_t handler,
+ nxt_task_t *task, void *obj, void *data)
+{
+ nxt_work_t *work;
+
+ work = nxt_mp_get(mp, sizeof(nxt_work_t));
+
+ if (nxt_slow_path(work == NULL)) {
+ return NXT_ERROR;
+ }
+
+ work->next = mp->cleanup;
+ work->handler = handler;
+ work->task = task;
+ work->obj = obj;
+ work->data = data;
+
+ mp->cleanup = work;
+
+ return NXT_OK;
+}
diff --git a/src/nxt_mp.h b/src/nxt_mp.h
index 72404e89..71a48de6 100644
--- a/src/nxt_mp.h
+++ b/src/nxt_mp.h
@@ -109,6 +109,10 @@ NXT_EXPORT void *nxt_mp_zget(nxt_mp_t *mp, size_t size)
NXT_MALLOC_LIKE;
+NXT_EXPORT nxt_int_t nxt_mp_cleanup(nxt_mp_t *mp, nxt_work_handler_t handler,
+ nxt_task_t *task, void *obj, void *data);
+
+
NXT_EXPORT void nxt_mp_thread_adopt(nxt_mp_t *mp);
#endif /* _NXT_MP_H_INCLUDED_ */
diff --git a/src/nxt_port.c b/src/nxt_port.c
index 4c2561d3..3f3fdd19 100644
--- a/src/nxt_port.c
+++ b/src/nxt_port.c
@@ -245,7 +245,7 @@ nxt_port_new_port_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg)
return;
}
- nxt_process_port_add(process, port);
+ nxt_process_port_add(task, process, port);
port->pair[0] = -1;
port->pair[1] = msg->fd;
diff --git a/src/nxt_process.c b/src/nxt_process.c
index 1832640d..9561fb84 100644
--- a/src/nxt_process.c
+++ b/src/nxt_process.c
@@ -563,11 +563,31 @@ nxt_user_cred_set(nxt_task_t *task, nxt_user_cred_t *uc)
}
+static void
+nxt_process_port_mp_cleanup(nxt_task_t *task, void *obj, void *data)
+{
+ nxt_runtime_t *rt;
+ nxt_process_t *process;
+
+ process = obj;
+ rt = data;
+
+ process->port_cleanups--;
+
+ if (process->port_cleanups == 0) {
+ nxt_runtime_process_destroy(rt, process);
+ }
+}
+
void
-nxt_process_port_add(nxt_process_t *process, nxt_port_t *port)
+nxt_process_port_add(nxt_task_t *task, nxt_process_t *process, nxt_port_t *port)
{
port->process = process;
nxt_queue_insert_tail(&process->ports, &port->link);
+
+ nxt_mp_cleanup(port->mem_pool, nxt_process_port_mp_cleanup, task, process,
+ task->thread->runtime);
+ process->port_cleanups++;
}
diff --git a/src/nxt_process.h b/src/nxt_process.h
index a647c00a..aa3aa7a5 100644
--- a/src/nxt_process.h
+++ b/src/nxt_process.h
@@ -57,6 +57,7 @@ typedef struct {
nxt_pid_t pid;
nxt_queue_t ports; /* of nxt_port_t */
nxt_bool_t ready;
+ nxt_uint_t port_cleanups;
nxt_process_init_t *init;
@@ -88,7 +89,8 @@ NXT_EXPORT void nxt_process_arguments(nxt_task_t *task, char **orig_argv,
#define nxt_process_port_first(process) \
nxt_queue_link_data(nxt_queue_first(&process->ports), nxt_port_t, link)
-NXT_EXPORT void nxt_process_port_add(nxt_process_t *process, nxt_port_t *port);
+NXT_EXPORT void nxt_process_port_add(nxt_task_t *task, nxt_process_t *process,
+ nxt_port_t *port);
#define nxt_process_port_each(process, port) \
nxt_queue_each(port, &process->ports, nxt_port_t, link)
diff --git a/src/nxt_router.c b/src/nxt_router.c
index 5ee70377..c103a861 100644
--- a/src/nxt_router.c
+++ b/src/nxt_router.c
@@ -27,7 +27,6 @@ struct nxt_start_worker_s {
nxt_app_t *app;
nxt_req_conn_link_t *rc;
nxt_mp_t *mem_pool;
- void *joint;
nxt_work_t work;
};
@@ -160,17 +159,13 @@ nxt_router_start(nxt_task_t *task, void *data)
static void
nxt_router_sw_release(nxt_task_t *task, void *obj, void *data)
{
- nxt_start_worker_t *sw;
- nxt_socket_conf_joint_t *joint;
+ nxt_start_worker_t *sw;
sw = obj;
- joint = sw->joint;
nxt_debug(task, "sw #%uxD release", sw->stream);
- if (nxt_mp_release(sw->mem_pool, sw) == 0) {
- nxt_router_conf_release(task, joint);
- }
+ nxt_mp_release(sw->mem_pool, sw);
}
@@ -1861,7 +1856,6 @@ nxt_router_app_release_port(nxt_task_t *task, void *obj, void *data)
nxt_app_t *app;
nxt_port_t *port;
nxt_work_t *work;
- nxt_process_t *process;
nxt_queue_link_t *lnk;
nxt_req_conn_link_t *rc;
@@ -1913,14 +1907,9 @@ nxt_router_app_release_port(nxt_task_t *task, void *obj, void *data)
nxt_router_app_free(app);
port->app = NULL;
- process = port->process;
nxt_port_release(port);
- if (nxt_queue_is_empty(&process->ports)) {
- nxt_runtime_process_destroy(task->thread->runtime, process);
- }
-
return;
}
@@ -2031,7 +2020,6 @@ nxt_router_app_port(nxt_task_t *task, nxt_req_conn_link_t *rc)
sw->app = app;
sw->rc = rc;
sw->mem_pool = c->mem_pool;
- sw->joint = c->listen->socket.data;
sw->work.handler = nxt_router_send_sw_request;
sw->work.task = task;
@@ -2350,6 +2338,17 @@ nxt_router_conn_close(nxt_task_t *task, void *obj, void *data)
static void
+nxt_router_conn_mp_cleanup(nxt_task_t *task, void *obj, void *data)
+{
+ nxt_socket_conf_joint_t *joint;
+
+ joint = obj;
+
+ nxt_router_conf_release(task, joint);
+}
+
+
+static void
nxt_router_conn_free(nxt_task_t *task, void *obj, void *data)
{
nxt_conn_t *c;
@@ -2380,9 +2379,9 @@ nxt_router_conn_free(nxt_task_t *task, void *obj, void *data)
task = &task->thread->engine->task;
- if (nxt_mp_release(c->mem_pool, c) == 0) {
- nxt_router_conf_release(task, joint);
- }
+ nxt_mp_cleanup(c->mem_pool, nxt_router_conn_mp_cleanup, task, joint, NULL);
+
+ nxt_mp_release(c->mem_pool, c);
}
diff --git a/src/nxt_runtime.c b/src/nxt_runtime.c
index 8b216337..31829225 100644
--- a/src/nxt_runtime.c
+++ b/src/nxt_runtime.c
@@ -1491,6 +1491,8 @@ nxt_runtime_process_new(nxt_runtime_t *rt)
void
nxt_runtime_process_destroy(nxt_runtime_t *rt, nxt_process_t *process)
{
+ nxt_assert(process->port_cleanups == 0);
+
nxt_port_mmaps_destroy(process->incoming, 1);
nxt_port_mmaps_destroy(process->outgoing, 1);
@@ -1657,6 +1659,10 @@ nxt_runtime_process_remove(nxt_runtime_t *rt, nxt_process_t *process)
case NXT_OK:
rt->nprocesses--;
+ if (process->port_cleanups == 0) {
+ nxt_runtime_process_destroy(rt, process);
+ }
+
nxt_process_port_each(process, port) {
nxt_runtime_port_remove(rt, port);
@@ -1665,10 +1671,6 @@ nxt_runtime_process_remove(nxt_runtime_t *rt, nxt_process_t *process)
} nxt_process_port_loop;
- if (nxt_queue_is_empty(&process->ports)) {
- nxt_runtime_process_destroy(rt, process);
- }
-
break;
default: