diff options
Diffstat (limited to 'src/nxt_router.c')
-rw-r--r-- | src/nxt_router.c | 41 |
1 files changed, 30 insertions, 11 deletions
diff --git a/src/nxt_router.c b/src/nxt_router.c index f063d87d..8971fb5b 100644 --- a/src/nxt_router.c +++ b/src/nxt_router.c @@ -788,7 +788,6 @@ static nxt_int_t nxt_router_thread_create(nxt_task_t *task, nxt_runtime_t *rt, nxt_event_engine_t *engine) { - nxt_mp_t *mp; nxt_int_t ret; nxt_port_t *port; nxt_process_t *process; @@ -824,12 +823,6 @@ nxt_router_thread_create(nxt_task_t *task, nxt_runtime_t *rt, return ret; } - mp = nxt_mp_create(1024, 128, 256, 32); - if (nxt_slow_path(mp == NULL)) { - return NXT_ERROR; - } - - port->mem_pool = mp; port->engine = 0; port->type = NXT_PROCESS_ROUTER; @@ -1441,6 +1434,8 @@ static void nxt_router_process_http_request(nxt_task_t *task, nxt_conn_t *c, nxt_app_parse_ctx_t *ap) { + nxt_mp_t *port_mp; + nxt_int_t res; nxt_port_t *port, *c_port; nxt_req_id_t req_id; nxt_app_wmsg_t wmsg; @@ -1466,6 +1461,7 @@ nxt_router_process_http_request(nxt_task_t *task, nxt_conn_t *c, } while (nxt_event_engine_request_find(engine, req_id) != NULL); rc = nxt_conn_request_add(c, req_id); + if (nxt_slow_path(rc == NULL)) { // 500 Failed to allocate req->conn link nxt_log_alert(task->log, "failed to allocate req->conn link"); @@ -1476,11 +1472,20 @@ nxt_router_process_http_request(nxt_task_t *task, nxt_conn_t *c, nxt_debug(task, "req_id %uxD linked to conn %p at engine %p", req_id, c, engine); + port_mp = port->mem_pool; + port->mem_pool = c->mem_pool; + c_port = nxt_process_connected_port_find(port->process, engine->port->pid, engine->port->id); if (nxt_slow_path(c_port != engine->port)) { - (void) nxt_port_send_port(task, port, engine->port); + res = nxt_port_send_port(task, port, engine->port); + + if (nxt_slow_path(res != NXT_OK)) { + // 500 Failed to send reply port + nxt_log_alert(task->log, "failed to send reply port to application"); + } + nxt_process_connected_port_add(port->process, engine->port); } @@ -1489,14 +1494,26 @@ nxt_router_process_http_request(nxt_task_t *task, nxt_conn_t *c, wmsg.buf = &wmsg.write; wmsg.stream = req_id; - (void)nxt_app->prepare_msg(task, &ap->r, &wmsg); + res = nxt_app->prepare_msg(task, &ap->r, &wmsg); + + if (nxt_slow_path(res != NXT_OK)) { + // 500 Failed to prepare message + nxt_log_alert(task->log, "failed to prepare message for application"); + } nxt_debug(task, "about to send %d bytes buffer to worker port %d", nxt_buf_used_size(wmsg.write), wmsg.port->socket.fd); - (void) nxt_port_socket_write(task, wmsg.port, NXT_PORT_MSG_DATA, + res = nxt_port_socket_write(task, wmsg.port, NXT_PORT_MSG_DATA, -1, req_id, engine->port->id, wmsg.write); + + if (nxt_slow_path(res != NXT_OK)) { + // 500 Failed to send message + nxt_log_alert(task->log, "failed to send message to application"); + } + + port->mem_pool = port_mp; } @@ -1597,7 +1614,9 @@ nxt_router_conn_free(nxt_task_t *task, void *obj, void *data) } nxt_queue_loop; - nxt_mp_destroy(c->mem_pool); + nxt_queue_remove(&c->link); + + nxt_mp_release(c->mem_pool, c); } |