summaryrefslogtreecommitdiffhomepage
path: root/src/nxt_unit.c
diff options
context:
space:
mode:
Diffstat (limited to 'src/nxt_unit.c')
-rw-r--r--src/nxt_unit.c38
1 files changed, 37 insertions, 1 deletions
diff --git a/src/nxt_unit.c b/src/nxt_unit.c
index d35a3307..7e97c050 100644
--- a/src/nxt_unit.c
+++ b/src/nxt_unit.c
@@ -107,6 +107,8 @@ static int nxt_unit_get_outgoing_buf(nxt_unit_ctx_t *ctx,
uint32_t min_size, nxt_unit_mmap_buf_t *mmap_buf, char *local_buf);
static int nxt_unit_incoming_mmap(nxt_unit_ctx_t *ctx, pid_t pid, int fd);
+static void nxt_unit_awake_ctx(nxt_unit_ctx_t *ctx,
+ nxt_unit_ctx_impl_t *ctx_impl);
static void nxt_unit_mmaps_init(nxt_unit_mmaps_t *mmaps);
nxt_inline void nxt_unit_process_use(nxt_unit_process_t *process);
nxt_inline void nxt_unit_process_release(nxt_unit_process_t *process);
@@ -988,6 +990,10 @@ nxt_unit_process_msg(nxt_unit_ctx_t *ctx, nxt_unit_read_buf_t *rbuf)
switch (port_msg->type) {
+ case _NXT_PORT_MSG_RPC_READY:
+ rc = NXT_UNIT_OK;
+ break;
+
case _NXT_PORT_MSG_QUIT:
nxt_unit_debug(ctx, "#%"PRIu32": quit", port_msg->stream);
@@ -1068,7 +1074,7 @@ nxt_unit_process_msg(nxt_unit_ctx_t *ctx, nxt_unit_read_buf_t *rbuf)
break;
default:
- nxt_unit_debug(ctx, "#%"PRIu32": ignore message type: %d",
+ nxt_unit_alert(ctx, "#%"PRIu32": ignore message type: %d",
port_msg->stream, (int) port_msg->type);
rc = NXT_UNIT_ERROR;
@@ -4012,6 +4018,8 @@ nxt_unit_incoming_mmap(nxt_unit_ctx_t *ctx, pid_t pid, int fd)
nxt_atomic_fetch_add(&ctx_impl->wait_items, -1);
+ nxt_unit_awake_ctx(ctx, ctx_impl);
+
} nxt_queue_loop;
return rc;
@@ -4019,6 +4027,32 @@ nxt_unit_incoming_mmap(nxt_unit_ctx_t *ctx, pid_t pid, int fd)
static void
+nxt_unit_awake_ctx(nxt_unit_ctx_t *ctx, nxt_unit_ctx_impl_t *ctx_impl)
+{
+ nxt_port_msg_t msg;
+
+ if (nxt_fast_path(ctx == &ctx_impl->ctx)) {
+ return;
+ }
+
+ if (nxt_slow_path(ctx_impl->read_port == NULL
+ || ctx_impl->read_port->out_fd == -1))
+ {
+ nxt_unit_alert(ctx, "target context read_port is NULL or not writable");
+
+ return;
+ }
+
+ memset(&msg, 0, sizeof(nxt_port_msg_t));
+
+ msg.type = _NXT_PORT_MSG_RPC_READY;
+
+ (void) nxt_unit_port_send(ctx, ctx_impl->read_port,
+ &msg, sizeof(msg), NULL, 0);
+}
+
+
+static void
nxt_unit_mmaps_init(nxt_unit_mmaps_t *mmaps)
{
pthread_mutex_init(&mmaps->mutex, NULL);
@@ -5390,6 +5424,8 @@ nxt_unit_add_port(nxt_unit_ctx_t *ctx, nxt_unit_port_t *port, void *queue)
nxt_atomic_fetch_add(&ctx_impl->wait_items, -1);
+ nxt_unit_awake_ctx(ctx, ctx_impl);
+
} nxt_queue_loop;
return old_port;