diff options
-rw-r--r-- | src/nxt_unit.c | 38 |
1 files changed, 37 insertions, 1 deletions
diff --git a/src/nxt_unit.c b/src/nxt_unit.c index d35a3307..7e97c050 100644 --- a/src/nxt_unit.c +++ b/src/nxt_unit.c @@ -107,6 +107,8 @@ static int nxt_unit_get_outgoing_buf(nxt_unit_ctx_t *ctx, uint32_t min_size, nxt_unit_mmap_buf_t *mmap_buf, char *local_buf); static int nxt_unit_incoming_mmap(nxt_unit_ctx_t *ctx, pid_t pid, int fd); +static void nxt_unit_awake_ctx(nxt_unit_ctx_t *ctx, + nxt_unit_ctx_impl_t *ctx_impl); static void nxt_unit_mmaps_init(nxt_unit_mmaps_t *mmaps); nxt_inline void nxt_unit_process_use(nxt_unit_process_t *process); nxt_inline void nxt_unit_process_release(nxt_unit_process_t *process); @@ -988,6 +990,10 @@ nxt_unit_process_msg(nxt_unit_ctx_t *ctx, nxt_unit_read_buf_t *rbuf) switch (port_msg->type) { + case _NXT_PORT_MSG_RPC_READY: + rc = NXT_UNIT_OK; + break; + case _NXT_PORT_MSG_QUIT: nxt_unit_debug(ctx, "#%"PRIu32": quit", port_msg->stream); @@ -1068,7 +1074,7 @@ nxt_unit_process_msg(nxt_unit_ctx_t *ctx, nxt_unit_read_buf_t *rbuf) break; default: - nxt_unit_debug(ctx, "#%"PRIu32": ignore message type: %d", + nxt_unit_alert(ctx, "#%"PRIu32": ignore message type: %d", port_msg->stream, (int) port_msg->type); rc = NXT_UNIT_ERROR; @@ -4012,6 +4018,8 @@ nxt_unit_incoming_mmap(nxt_unit_ctx_t *ctx, pid_t pid, int fd) nxt_atomic_fetch_add(&ctx_impl->wait_items, -1); + nxt_unit_awake_ctx(ctx, ctx_impl); + } nxt_queue_loop; return rc; @@ -4019,6 +4027,32 @@ nxt_unit_incoming_mmap(nxt_unit_ctx_t *ctx, pid_t pid, int fd) static void +nxt_unit_awake_ctx(nxt_unit_ctx_t *ctx, nxt_unit_ctx_impl_t *ctx_impl) +{ + nxt_port_msg_t msg; + + if (nxt_fast_path(ctx == &ctx_impl->ctx)) { + return; + } + + if (nxt_slow_path(ctx_impl->read_port == NULL + || ctx_impl->read_port->out_fd == -1)) + { + nxt_unit_alert(ctx, "target context read_port is NULL or not writable"); + + return; + } + + memset(&msg, 0, sizeof(nxt_port_msg_t)); + + msg.type = _NXT_PORT_MSG_RPC_READY; + + (void) nxt_unit_port_send(ctx, ctx_impl->read_port, + &msg, sizeof(msg), NULL, 0); +} + + +static void nxt_unit_mmaps_init(nxt_unit_mmaps_t *mmaps) { pthread_mutex_init(&mmaps->mutex, NULL); @@ -5390,6 +5424,8 @@ nxt_unit_add_port(nxt_unit_ctx_t *ctx, nxt_unit_port_t *port, void *queue) nxt_atomic_fetch_add(&ctx_impl->wait_items, -1); + nxt_unit_awake_ctx(ctx, ctx_impl); + } nxt_queue_loop; return old_port; |