summaryrefslogtreecommitdiffhomepage
path: root/src/nxt_port_socket.c
diff options
context:
space:
mode:
authorMax Romanov <max.romanov@nginx.com>2017-07-07 16:01:34 +0300
committerMax Romanov <max.romanov@nginx.com>2017-07-07 16:01:34 +0300
commitf319220a6c4515f3d31e546b4f1f5de0b94aceb7 (patch)
treeeb4a3ed43d9d7ebef794e51fba56715e65b5a936 /src/nxt_port_socket.c
parentc9fbd832ab4f5743824b155fb3bf3a42206fdd52 (diff)
downloadunit-f319220a6c4515f3d31e546b4f1f5de0b94aceb7.tar.gz
unit-f319220a6c4515f3d31e546b4f1f5de0b94aceb7.tar.bz2
Redirecting buffer completion handler to specific engine.
There is a case in router where we use port in router connection thread. Buffers are allocated within connection memory pool which can be used only in this router thread. sendmsg() can be postponed into main router thread and completion handler will compare current engine and post itself to correct engine.
Diffstat (limited to 'src/nxt_port_socket.c')
-rw-r--r--src/nxt_port_socket.c46
1 files changed, 44 insertions, 2 deletions
diff --git a/src/nxt_port_socket.c b/src/nxt_port_socket.c
index 3bf57479..73bb44bd 100644
--- a/src/nxt_port_socket.c
+++ b/src/nxt_port_socket.c
@@ -135,6 +135,37 @@ nxt_port_write_close(nxt_port_t *port)
}
+static void
+nxt_port_release_send_msg(nxt_task_t *task, void *obj, void *data)
+{
+ nxt_event_engine_t *engine;
+ nxt_port_send_msg_t *msg;
+
+ msg = obj;
+ engine = data;
+
+#if (NXT_DEBUG)
+ if (nxt_slow_path(data != msg->engine)) {
+ nxt_log_alert(task->log, "release msg data (%p) != msg->engine (%p)",
+ data, msg->engine);
+ nxt_abort();
+ }
+#endif
+
+ if (engine != task->thread->engine) {
+
+ nxt_debug(task, "current thread is %PT, expected %PT",
+ task->thread->tid, engine->task.thread->tid);
+
+ nxt_event_engine_post(engine, &msg->work);
+
+ return;
+ }
+
+ nxt_mp_release(msg->mem_pool, obj);
+}
+
+
nxt_int_t
nxt_port_socket_write(nxt_task_t *task, nxt_port_t *port, nxt_uint_t type,
nxt_fd_t fd, uint32_t stream, nxt_port_id_t reply_port, nxt_buf_t *b)
@@ -160,14 +191,25 @@ nxt_port_socket_write(nxt_task_t *task, nxt_port_t *port, nxt_uint_t type,
}
}
- msg = nxt_mp_zalloc(port->mem_pool, sizeof(nxt_port_send_msg_t));
+ msg = nxt_mp_retain(port->mem_pool, sizeof(nxt_port_send_msg_t));
if (nxt_slow_path(msg == NULL)) {
return NXT_ERROR;
}
+ msg->link.next = NULL;
+ msg->link.prev = NULL;
+
msg->buf = b;
msg->fd = fd;
msg->share = 0;
+
+ msg->work.next = NULL;
+ msg->work.handler = nxt_port_release_send_msg;
+ msg->work.task = task;
+ msg->work.obj = msg;
+ msg->work.data = task->thread->engine;
+
+ msg->engine = task->thread->engine;
msg->mem_pool = port->mem_pool;
msg->port_msg.stream = stream;
@@ -292,7 +334,7 @@ nxt_port_write_handler(nxt_task_t *task, void *obj, void *data)
} else {
nxt_queue_remove(link);
- nxt_mp_free(msg->mem_pool, msg);
+ nxt_port_release_send_msg(task, msg, msg->engine);
}
} else if (nxt_slow_path(n == NXT_ERROR)) {