summaryrefslogtreecommitdiffhomepage
path: root/src/nxt_port_socket.c
diff options
context:
space:
mode:
Diffstat (limited to 'src/nxt_port_socket.c')
-rw-r--r--src/nxt_port_socket.c46
1 files changed, 44 insertions, 2 deletions
diff --git a/src/nxt_port_socket.c b/src/nxt_port_socket.c
index 3bf57479..73bb44bd 100644
--- a/src/nxt_port_socket.c
+++ b/src/nxt_port_socket.c
@@ -135,6 +135,37 @@ nxt_port_write_close(nxt_port_t *port)
}
+static void
+nxt_port_release_send_msg(nxt_task_t *task, void *obj, void *data)
+{
+ nxt_event_engine_t *engine;
+ nxt_port_send_msg_t *msg;
+
+ msg = obj;
+ engine = data;
+
+#if (NXT_DEBUG)
+ if (nxt_slow_path(data != msg->engine)) {
+ nxt_log_alert(task->log, "release msg data (%p) != msg->engine (%p)",
+ data, msg->engine);
+ nxt_abort();
+ }
+#endif
+
+ if (engine != task->thread->engine) {
+
+ nxt_debug(task, "current thread is %PT, expected %PT",
+ task->thread->tid, engine->task.thread->tid);
+
+ nxt_event_engine_post(engine, &msg->work);
+
+ return;
+ }
+
+ nxt_mp_release(msg->mem_pool, obj);
+}
+
+
nxt_int_t
nxt_port_socket_write(nxt_task_t *task, nxt_port_t *port, nxt_uint_t type,
nxt_fd_t fd, uint32_t stream, nxt_port_id_t reply_port, nxt_buf_t *b)
@@ -160,14 +191,25 @@ nxt_port_socket_write(nxt_task_t *task, nxt_port_t *port, nxt_uint_t type,
}
}
- msg = nxt_mp_zalloc(port->mem_pool, sizeof(nxt_port_send_msg_t));
+ msg = nxt_mp_retain(port->mem_pool, sizeof(nxt_port_send_msg_t));
if (nxt_slow_path(msg == NULL)) {
return NXT_ERROR;
}
+ msg->link.next = NULL;
+ msg->link.prev = NULL;
+
msg->buf = b;
msg->fd = fd;
msg->share = 0;
+
+ msg->work.next = NULL;
+ msg->work.handler = nxt_port_release_send_msg;
+ msg->work.task = task;
+ msg->work.obj = msg;
+ msg->work.data = task->thread->engine;
+
+ msg->engine = task->thread->engine;
msg->mem_pool = port->mem_pool;
msg->port_msg.stream = stream;
@@ -292,7 +334,7 @@ nxt_port_write_handler(nxt_task_t *task, void *obj, void *data)
} else {
nxt_queue_remove(link);
- nxt_mp_free(msg->mem_pool, msg);
+ nxt_port_release_send_msg(task, msg, msg->engine);
}
} else if (nxt_slow_path(n == NXT_ERROR)) {