diff options
Diffstat (limited to 'src/nxt_port_socket.c')
-rw-r--r-- | src/nxt_port_socket.c | 46 |
1 files changed, 44 insertions, 2 deletions
diff --git a/src/nxt_port_socket.c b/src/nxt_port_socket.c index 3bf57479..73bb44bd 100644 --- a/src/nxt_port_socket.c +++ b/src/nxt_port_socket.c @@ -135,6 +135,37 @@ nxt_port_write_close(nxt_port_t *port) } +static void +nxt_port_release_send_msg(nxt_task_t *task, void *obj, void *data) +{ + nxt_event_engine_t *engine; + nxt_port_send_msg_t *msg; + + msg = obj; + engine = data; + +#if (NXT_DEBUG) + if (nxt_slow_path(data != msg->engine)) { + nxt_log_alert(task->log, "release msg data (%p) != msg->engine (%p)", + data, msg->engine); + nxt_abort(); + } +#endif + + if (engine != task->thread->engine) { + + nxt_debug(task, "current thread is %PT, expected %PT", + task->thread->tid, engine->task.thread->tid); + + nxt_event_engine_post(engine, &msg->work); + + return; + } + + nxt_mp_release(msg->mem_pool, obj); +} + + nxt_int_t nxt_port_socket_write(nxt_task_t *task, nxt_port_t *port, nxt_uint_t type, nxt_fd_t fd, uint32_t stream, nxt_port_id_t reply_port, nxt_buf_t *b) @@ -160,14 +191,25 @@ nxt_port_socket_write(nxt_task_t *task, nxt_port_t *port, nxt_uint_t type, } } - msg = nxt_mp_zalloc(port->mem_pool, sizeof(nxt_port_send_msg_t)); + msg = nxt_mp_retain(port->mem_pool, sizeof(nxt_port_send_msg_t)); if (nxt_slow_path(msg == NULL)) { return NXT_ERROR; } + msg->link.next = NULL; + msg->link.prev = NULL; + msg->buf = b; msg->fd = fd; msg->share = 0; + + msg->work.next = NULL; + msg->work.handler = nxt_port_release_send_msg; + msg->work.task = task; + msg->work.obj = msg; + msg->work.data = task->thread->engine; + + msg->engine = task->thread->engine; msg->mem_pool = port->mem_pool; msg->port_msg.stream = stream; @@ -292,7 +334,7 @@ nxt_port_write_handler(nxt_task_t *task, void *obj, void *data) } else { nxt_queue_remove(link); - nxt_mp_free(msg->mem_pool, msg); + nxt_port_release_send_msg(task, msg, msg->engine); } } else if (nxt_slow_path(n == NXT_ERROR)) { |