summaryrefslogtreecommitdiffhomepage
path: root/src/nxt_router.c
diff options
context:
space:
mode:
authorMax Romanov <max.romanov@nginx.com>2017-07-07 16:01:34 +0300
committerMax Romanov <max.romanov@nginx.com>2017-07-07 16:01:34 +0300
commitf319220a6c4515f3d31e546b4f1f5de0b94aceb7 (patch)
treeeb4a3ed43d9d7ebef794e51fba56715e65b5a936 /src/nxt_router.c
parentc9fbd832ab4f5743824b155fb3bf3a42206fdd52 (diff)
downloadunit-f319220a6c4515f3d31e546b4f1f5de0b94aceb7.tar.gz
unit-f319220a6c4515f3d31e546b4f1f5de0b94aceb7.tar.bz2
Redirecting buffer completion handler to specific engine.
There is a case in router where we use port in router connection thread. Buffers are allocated within connection memory pool which can be used only in this router thread. sendmsg() can be postponed into main router thread and completion handler will compare current engine and post itself to correct engine.
Diffstat (limited to 'src/nxt_router.c')
-rw-r--r--src/nxt_router.c41
1 files changed, 30 insertions, 11 deletions
diff --git a/src/nxt_router.c b/src/nxt_router.c
index f063d87d..8971fb5b 100644
--- a/src/nxt_router.c
+++ b/src/nxt_router.c
@@ -788,7 +788,6 @@ static nxt_int_t
nxt_router_thread_create(nxt_task_t *task, nxt_runtime_t *rt,
nxt_event_engine_t *engine)
{
- nxt_mp_t *mp;
nxt_int_t ret;
nxt_port_t *port;
nxt_process_t *process;
@@ -824,12 +823,6 @@ nxt_router_thread_create(nxt_task_t *task, nxt_runtime_t *rt,
return ret;
}
- mp = nxt_mp_create(1024, 128, 256, 32);
- if (nxt_slow_path(mp == NULL)) {
- return NXT_ERROR;
- }
-
- port->mem_pool = mp;
port->engine = 0;
port->type = NXT_PROCESS_ROUTER;
@@ -1441,6 +1434,8 @@ static void
nxt_router_process_http_request(nxt_task_t *task, nxt_conn_t *c,
nxt_app_parse_ctx_t *ap)
{
+ nxt_mp_t *port_mp;
+ nxt_int_t res;
nxt_port_t *port, *c_port;
nxt_req_id_t req_id;
nxt_app_wmsg_t wmsg;
@@ -1466,6 +1461,7 @@ nxt_router_process_http_request(nxt_task_t *task, nxt_conn_t *c,
} while (nxt_event_engine_request_find(engine, req_id) != NULL);
rc = nxt_conn_request_add(c, req_id);
+
if (nxt_slow_path(rc == NULL)) {
// 500 Failed to allocate req->conn link
nxt_log_alert(task->log, "failed to allocate req->conn link");
@@ -1476,11 +1472,20 @@ nxt_router_process_http_request(nxt_task_t *task, nxt_conn_t *c,
nxt_debug(task, "req_id %uxD linked to conn %p at engine %p",
req_id, c, engine);
+ port_mp = port->mem_pool;
+ port->mem_pool = c->mem_pool;
+
c_port = nxt_process_connected_port_find(port->process,
engine->port->pid,
engine->port->id);
if (nxt_slow_path(c_port != engine->port)) {
- (void) nxt_port_send_port(task, port, engine->port);
+ res = nxt_port_send_port(task, port, engine->port);
+
+ if (nxt_slow_path(res != NXT_OK)) {
+ // 500 Failed to send reply port
+ nxt_log_alert(task->log, "failed to send reply port to application");
+ }
+
nxt_process_connected_port_add(port->process, engine->port);
}
@@ -1489,14 +1494,26 @@ nxt_router_process_http_request(nxt_task_t *task, nxt_conn_t *c,
wmsg.buf = &wmsg.write;
wmsg.stream = req_id;
- (void)nxt_app->prepare_msg(task, &ap->r, &wmsg);
+ res = nxt_app->prepare_msg(task, &ap->r, &wmsg);
+
+ if (nxt_slow_path(res != NXT_OK)) {
+ // 500 Failed to prepare message
+ nxt_log_alert(task->log, "failed to prepare message for application");
+ }
nxt_debug(task, "about to send %d bytes buffer to worker port %d",
nxt_buf_used_size(wmsg.write),
wmsg.port->socket.fd);
- (void) nxt_port_socket_write(task, wmsg.port, NXT_PORT_MSG_DATA,
+ res = nxt_port_socket_write(task, wmsg.port, NXT_PORT_MSG_DATA,
-1, req_id, engine->port->id, wmsg.write);
+
+ if (nxt_slow_path(res != NXT_OK)) {
+ // 500 Failed to send message
+ nxt_log_alert(task->log, "failed to send message to application");
+ }
+
+ port->mem_pool = port_mp;
}
@@ -1597,7 +1614,9 @@ nxt_router_conn_free(nxt_task_t *task, void *obj, void *data)
} nxt_queue_loop;
- nxt_mp_destroy(c->mem_pool);
+ nxt_queue_remove(&c->link);
+
+ nxt_mp_release(c->mem_pool, c);
}