diff options
author | Max Romanov <max.romanov@nginx.com> | 2017-08-02 13:36:29 +0300 |
---|---|---|
committer | Max Romanov <max.romanov@nginx.com> | 2017-08-02 13:36:29 +0300 |
commit | b150f07e801802124ca6f3b29b4b9f97aa68d52b (patch) | |
tree | 6dbf83af6b7e2787e0c918b0017765d074c70057 /src | |
parent | f23f985899760fafd853e993d9023b1339f09533 (diff) | |
download | unit-b150f07e801802124ca6f3b29b4b9f97aa68d52b.tar.gz unit-b150f07e801802124ca6f3b29b4b9f97aa68d52b.tar.bz2 |
Added basic port error handler.
Diffstat (limited to 'src')
-rw-r--r-- | src/nxt_master_process.c | 4 | ||||
-rw-r--r-- | src/nxt_mp.c | 4 | ||||
-rw-r--r-- | src/nxt_port.c | 32 | ||||
-rw-r--r-- | src/nxt_port.h | 5 | ||||
-rw-r--r-- | src/nxt_port_memory.c | 14 | ||||
-rw-r--r-- | src/nxt_port_memory.h | 5 | ||||
-rw-r--r-- | src/nxt_port_socket.c | 54 | ||||
-rw-r--r-- | src/nxt_router.c | 3 |
8 files changed, 81 insertions, 40 deletions
diff --git a/src/nxt_master_process.c b/src/nxt_master_process.c index 41e659a4..313456ac 100644 --- a/src/nxt_master_process.c +++ b/src/nxt_master_process.c @@ -223,7 +223,7 @@ nxt_master_process_port_create(nxt_task_t *task, nxt_runtime_t *rt) return NXT_ERROR; } - port = nxt_port_new(0, nxt_pid, NXT_PROCESS_MASTER); + port = nxt_port_new(task, 0, nxt_pid, NXT_PROCESS_MASTER); if (nxt_slow_path(port == NULL)) { return NXT_ERROR; } @@ -404,7 +404,7 @@ nxt_master_create_worker_process(nxt_task_t *task, nxt_runtime_t *rt, process->init = init; - port = nxt_port_new(0, 0, init->type); + port = nxt_port_new(task, 0, 0, init->type); if (nxt_slow_path(port == NULL)) { nxt_runtime_process_remove(rt, process); return NXT_ERROR; diff --git a/src/nxt_mp.c b/src/nxt_mp.c index 40d12be0..8c2da262 100644 --- a/src/nxt_mp.c +++ b/src/nxt_mp.c @@ -952,7 +952,9 @@ nxt_mp_retain(nxt_mp_t *mp, size_t size) uint32_t nxt_mp_release(nxt_mp_t *mp, void *p) { - nxt_mp_free(mp, p); + if (nxt_fast_path(p != NULL)) { + nxt_mp_free(mp, p); + } mp->retain--; diff --git a/src/nxt_port.c b/src/nxt_port.c index 2e5b229b..358e30f2 100644 --- a/src/nxt_port.c +++ b/src/nxt_port.c @@ -15,8 +15,32 @@ static void nxt_port_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg); static nxt_atomic_uint_t nxt_port_last_id = 1; +static void +nxt_port_mp_cleanup(nxt_task_t *task, void *obj, void *data) +{ + nxt_mp_t *mp; + nxt_port_t *port; + + port = obj; + mp = data; + + nxt_assert(port->pair[0] == -1); + nxt_assert(port->pair[1] == -1); + + nxt_assert(port->app_req_id == 0); + nxt_assert(port->app_link.next == NULL); + + nxt_assert(nxt_queue_is_empty(&port->messages)); + nxt_assert(nxt_lvlhsh_is_empty(&port->rpc_streams)); + nxt_assert(nxt_lvlhsh_is_empty(&port->rpc_peers)); + + nxt_mp_free(mp, port); +} + + nxt_port_t * -nxt_port_new(nxt_port_id_t id, nxt_pid_t pid, nxt_process_type_t type) +nxt_port_new(nxt_task_t *task, nxt_port_id_t id, nxt_pid_t pid, + nxt_process_type_t type) { nxt_mp_t *mp; nxt_port_t *port; @@ -36,6 +60,8 @@ nxt_port_new(nxt_port_id_t id, nxt_pid_t pid, nxt_process_type_t type) port->mem_pool = mp; port->next_stream = 1; + nxt_mp_cleanup(mp, nxt_port_mp_cleanup, task, port, mp); + nxt_queue_init(&port->messages); } else { @@ -74,7 +100,7 @@ nxt_port_release(nxt_port_t *port) nxt_process_port_remove(port); } - nxt_mp_release(port->mem_pool, port); + nxt_mp_release(port->mem_pool, NULL); return 1; } @@ -222,7 +248,7 @@ nxt_port_new_port_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg) return; } - port = nxt_port_new(new_port_msg->id, new_port_msg->pid, + port = nxt_port_new(task, new_port_msg->id, new_port_msg->pid, new_port_msg->type); if (nxt_slow_path(port == NULL)) { return; diff --git a/src/nxt_port.h b/src/nxt_port.h index a7f78b4c..1e39c732 100644 --- a/src/nxt_port.h +++ b/src/nxt_port.h @@ -118,6 +118,9 @@ struct nxt_port_s { nxt_process_type_t type; nxt_work_t work; + + struct iovec *iov; + void *mmsg_buf; }; @@ -140,7 +143,7 @@ typedef union { } nxt_port_data_t; -nxt_port_t *nxt_port_new(nxt_port_id_t id, nxt_pid_t pid, +nxt_port_t *nxt_port_new(nxt_task_t *task, nxt_port_id_t id, nxt_pid_t pid, nxt_process_type_t type); nxt_bool_t nxt_port_release(nxt_port_t *port); diff --git a/src/nxt_port_memory.c b/src/nxt_port_memory.c index ec3227b1..e714b654 100644 --- a/src/nxt_port_memory.c +++ b/src/nxt_port_memory.c @@ -568,7 +568,7 @@ nxt_port_mmap_write(nxt_task_t *task, nxt_port_t *port, nxt_port_send_msg_t *msg, nxt_sendbuf_coalesce_t *sb) { size_t bsize; - nxt_buf_t *b, *bmem; + nxt_buf_t *bmem; nxt_uint_t i; nxt_port_mmap_msg_t *mmap_msg; nxt_port_mmap_header_t *hdr; @@ -577,13 +577,8 @@ nxt_port_mmap_write(nxt_task_t *task, nxt_port_t *port, "via shared memory", sb->size, port->pid); bsize = sb->niov * sizeof(nxt_port_mmap_msg_t); + mmap_msg = port->mmsg_buf; - b = nxt_buf_mem_ts_alloc(task, port->mem_pool, bsize); - if (nxt_slow_path(b == NULL)) { - return; - } - - mmap_msg = (nxt_port_mmap_msg_t *) b->mem.start; bmem = msg->buf; for (i = 0; i < sb->niov; i++, mmap_msg++) { @@ -611,10 +606,7 @@ nxt_port_mmap_write(nxt_task_t *task, nxt_port_t *port, port->pid); } - msg->buf = b; - b->mem.free += bsize; - - sb->iobuf[0].iov_base = b->mem.pos; + sb->iobuf[0].iov_base = port->mmsg_buf; sb->iobuf[0].iov_len = bsize; sb->niov = 1; sb->size = bsize; diff --git a/src/nxt_port_memory.h b/src/nxt_port_memory.h index 38e17a50..0b64fa89 100644 --- a/src/nxt_port_memory.h +++ b/src/nxt_port_memory.h @@ -38,11 +38,6 @@ void nxt_port_mmap_write(nxt_task_t *task, nxt_port_t *port, nxt_port_send_msg_t *msg, nxt_sendbuf_coalesce_t *sb); -nxt_inline void -nxt_port_mmap_completion(nxt_task_t *task, nxt_work_queue_t *wq, nxt_buf_t *b) { - nxt_work_queue_add(wq, b->completion_handler, task, b, b->parent); -} - void nxt_port_mmap_read(nxt_task_t *task, nxt_port_t *port, nxt_port_recv_msg_t *msg); diff --git a/src/nxt_port_socket.c b/src/nxt_port_socket.c index 30c64a94..01e46f93 100644 --- a/src/nxt_port_socket.c +++ b/src/nxt_port_socket.c @@ -114,6 +114,13 @@ nxt_port_write_enable(nxt_task_t *task, nxt_port_t *port) port->socket.write_work_queue = &port->engine->fast_work_queue; port->socket.write_handler = nxt_port_write_handler; port->socket.error_handler = nxt_port_error_handler; + + if (port->iov == NULL) { + port->iov = nxt_mp_get(port->mem_pool, sizeof(struct iovec) * + NXT_IOBUF_MAX * 10); + port->mmsg_buf = nxt_mp_get(port->mem_pool, sizeof(uint32_t) * 3 * + NXT_IOBUF_MAX * 10); + } } @@ -224,20 +231,20 @@ nxt_port_socket_write(nxt_task_t *task, nxt_port_t *port, nxt_uint_t type, static void nxt_port_write_handler(nxt_task_t *task, void *obj, void *data) { + size_t plain_size; ssize_t n; nxt_port_t *port; - struct iovec iov[NXT_IOBUF_MAX * 10]; + struct iovec *iov; nxt_work_queue_t *wq; nxt_queue_link_t *link; nxt_port_method_t m; nxt_port_send_msg_t *msg; nxt_sendbuf_coalesce_t sb; - size_t plain_size; - nxt_buf_t *plain_buf; - port = nxt_container_of(obj, nxt_port_t, socket); + iov = port->iov; + do { link = nxt_queue_first(&port->messages); @@ -269,7 +276,6 @@ nxt_port_write_handler(nxt_task_t *task, void *obj, void *data) nxt_sendbuf_mem_coalesce(task, &sb); plain_size = sb.size; - plain_buf = msg->buf; /* * Send through mmap enabled only when payload @@ -302,16 +308,6 @@ nxt_port_write_handler(nxt_task_t *task, void *obj, void *data) wq = &task->thread->engine->fast_work_queue; - if (msg->buf != plain_buf) { - /* - * Complete crafted mmap_msgs buf and restore msg->buf - * for regular completion call. - */ - nxt_port_mmap_completion(task, wq, msg->buf); - - msg->buf = plain_buf; - } - msg->buf = nxt_sendbuf_completion(task, wq, msg->buf, plain_size); if (msg->buf != NULL) { @@ -330,7 +326,8 @@ nxt_port_write_handler(nxt_task_t *task, void *obj, void *data) } else { nxt_queue_remove(link); - nxt_port_release_send_msg(task, msg, msg->engine); + nxt_work_queue_add(wq, nxt_port_release_send_msg, task, msg, + msg->engine); } } else if (nxt_slow_path(n == NXT_ERROR)) { @@ -536,6 +533,31 @@ nxt_port_buf_free(nxt_port_t *port, nxt_buf_t *b) static void nxt_port_error_handler(nxt_task_t *task, void *obj, void *data) { + nxt_buf_t *b; + nxt_port_t *port; + nxt_work_queue_t *wq; + nxt_port_send_msg_t *msg; + nxt_debug(task, "port error handler %p", obj); /* TODO */ + + port = nxt_container_of(obj, nxt_port_t, socket); + + nxt_queue_each(msg, &port->messages, nxt_port_send_msg_t, link) { + + wq = &task->thread->engine->fast_work_queue; + + for(b = msg->buf; b != NULL; b = b->next) { + if (nxt_buf_is_sync(b)) { + continue; + } + + nxt_work_queue_add(wq, b->completion_handler, task, b, b->parent); + } + + nxt_queue_remove(&msg->link); + nxt_work_queue_add(wq, nxt_port_release_send_msg, task, msg, + msg->engine); + + } nxt_queue_loop; } diff --git a/src/nxt_router.c b/src/nxt_router.c index 7c197a44..5f654050 100644 --- a/src/nxt_router.c +++ b/src/nxt_router.c @@ -1386,7 +1386,8 @@ nxt_router_thread_start(void *data) engine->mem_pool = nxt_mp_create(4096, 128, 1024, 64); - port = nxt_port_new(nxt_port_get_next_id(), nxt_pid, NXT_PROCESS_ROUTER); + port = nxt_port_new(task, nxt_port_get_next_id(), nxt_pid, + NXT_PROCESS_ROUTER); if (nxt_slow_path(port == NULL)) { return; } |