diff options
Diffstat (limited to '')
-rw-r--r-- | src/nxt_unit.c | 80 |
1 files changed, 44 insertions, 36 deletions
diff --git a/src/nxt_unit.c b/src/nxt_unit.c index 135c06ed..dac612b8 100644 --- a/src/nxt_unit.c +++ b/src/nxt_unit.c @@ -55,6 +55,7 @@ nxt_inline void nxt_unit_mmap_buf_insert_tail(nxt_unit_mmap_buf_t **prev, nxt_inline void nxt_unit_mmap_buf_unlink(nxt_unit_mmap_buf_t *mmap_buf); static int nxt_unit_read_env(nxt_unit_port_t *ready_port, nxt_unit_port_t *router_port, nxt_unit_port_t *read_port, + int *shared_port_fd, int *shared_queue_fd, int *log_fd, uint32_t *stream, uint32_t *shm_limit, uint32_t *request_limit); static int nxt_unit_ready(nxt_unit_ctx_t *ctx, int ready_fd, uint32_t stream, @@ -424,12 +425,12 @@ static pid_t nxt_unit_pid; nxt_unit_ctx_t * nxt_unit_init(nxt_unit_init_t *init) { - int rc, queue_fd; + int rc, queue_fd, shared_queue_fd; void *mem; uint32_t ready_stream, shm_limit, request_limit; nxt_unit_ctx_t *ctx; nxt_unit_impl_t *lib; - nxt_unit_port_t ready_port, router_port, read_port; + nxt_unit_port_t ready_port, router_port, read_port, shared_port; nxt_unit_pid = getpid(); @@ -440,6 +441,8 @@ nxt_unit_init(nxt_unit_init_t *init) queue_fd = -1; mem = MAP_FAILED; + shared_port.out_fd = -1; + shared_port.data = NULL; if (init->ready_port.id.pid != 0 && init->ready_stream != 0 @@ -458,8 +461,12 @@ nxt_unit_init(nxt_unit_init_t *init) nxt_unit_port_id_init(&read_port.id, read_port.id.pid, read_port.id.id); + shared_port.in_fd = init->shared_port_fd; + shared_queue_fd = init->shared_queue_fd; + } else { rc = nxt_unit_read_env(&ready_port, &router_port, &read_port, + &shared_port.in_fd, &shared_queue_fd, &lib->log_fd, &ready_stream, &shm_limit, &request_limit); if (nxt_slow_path(rc != NXT_UNIT_OK)) { @@ -525,6 +532,27 @@ nxt_unit_init(nxt_unit_init_t *init) goto fail; } + nxt_unit_port_id_init(&shared_port.id, read_port.id.pid, + NXT_UNIT_SHARED_PORT_ID); + + mem = mmap(NULL, sizeof(nxt_app_queue_t), PROT_READ | PROT_WRITE, + MAP_SHARED, shared_queue_fd, 0); + if (nxt_slow_path(mem == MAP_FAILED)) { + nxt_unit_alert(ctx, "mmap(%d) failed: %s (%d)", shared_queue_fd, + strerror(errno), errno); + + goto fail; + } + + nxt_unit_close(shared_queue_fd); + + lib->shared_port = nxt_unit_add_port(ctx, &shared_port, mem); + if (nxt_slow_path(lib->shared_port == NULL)) { + nxt_unit_alert(NULL, "failed to add shared_port"); + + goto fail; + } + rc = nxt_unit_ready(ctx, ready_port.out_fd, ready_stream, queue_fd); if (nxt_slow_path(rc != NXT_UNIT_OK)) { nxt_unit_alert(NULL, "failed to send READY message"); @@ -799,7 +827,8 @@ nxt_unit_mmap_buf_unlink(nxt_unit_mmap_buf_t *mmap_buf) static int nxt_unit_read_env(nxt_unit_port_t *ready_port, nxt_unit_port_t *router_port, - nxt_unit_port_t *read_port, int *log_fd, uint32_t *stream, + nxt_unit_port_t *read_port, int *shared_port_fd, int *shared_queue_fd, + int *log_fd, uint32_t *stream, uint32_t *shm_limit, uint32_t *request_limit) { int rc; @@ -845,11 +874,13 @@ nxt_unit_read_env(nxt_unit_port_t *ready_port, nxt_unit_port_t *router_port, "%"PRId64",%"PRIu32",%d;" "%"PRId64",%"PRIu32",%d;" "%"PRId64",%"PRIu32",%d,%d;" + "%d,%d;" "%d,%"PRIu32",%"PRIu32, &ready_stream, &ready_pid, &ready_id, &ready_fd, &router_pid, &router_id, &router_fd, &read_pid, &read_id, &read_in_fd, &read_out_fd, + shared_port_fd, shared_queue_fd, log_fd, shm_limit, request_limit); if (nxt_slow_path(rc == EOF)) { @@ -859,9 +890,9 @@ nxt_unit_read_env(nxt_unit_port_t *ready_port, nxt_unit_port_t *router_port, return NXT_UNIT_ERROR; } - if (nxt_slow_path(rc != 14)) { + if (nxt_slow_path(rc != 16)) { nxt_unit_alert(NULL, "invalid number of variables in %s env: " - "found %d of %d in %s", NXT_UNIT_INIT_ENV, rc, 14, vars); + "found %d of %d in %s", NXT_UNIT_INIT_ENV, rc, 16, vars); return NXT_UNIT_ERROR; } @@ -1137,7 +1168,6 @@ static int nxt_unit_process_new_port(nxt_unit_ctx_t *ctx, nxt_unit_recv_msg_t *recv_msg) { void *mem; - nxt_unit_impl_t *lib; nxt_unit_port_t new_port, *port; nxt_port_msg_new_port_t *new_port_msg; @@ -1162,33 +1192,17 @@ nxt_unit_process_new_port(nxt_unit_ctx_t *ctx, nxt_unit_recv_msg_t *recv_msg) recv_msg->stream, (int) new_port_msg->pid, (int) new_port_msg->id, recv_msg->fd[0], recv_msg->fd[1]); - lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit); - - if (new_port_msg->id == NXT_UNIT_SHARED_PORT_ID) { - nxt_unit_port_id_init(&new_port.id, lib->pid, new_port_msg->id); - - new_port.in_fd = recv_msg->fd[0]; - new_port.out_fd = -1; - - mem = mmap(NULL, sizeof(nxt_app_queue_t), PROT_READ | PROT_WRITE, - MAP_SHARED, recv_msg->fd[1], 0); - - } else { - if (nxt_slow_path(nxt_unit_fd_blocking(recv_msg->fd[0]) - != NXT_UNIT_OK)) - { - return NXT_UNIT_ERROR; - } + if (nxt_slow_path(nxt_unit_fd_blocking(recv_msg->fd[0]) != NXT_UNIT_OK)) { + return NXT_UNIT_ERROR; + } - nxt_unit_port_id_init(&new_port.id, new_port_msg->pid, - new_port_msg->id); + nxt_unit_port_id_init(&new_port.id, new_port_msg->pid, new_port_msg->id); - new_port.in_fd = -1; - new_port.out_fd = recv_msg->fd[0]; + new_port.in_fd = -1; + new_port.out_fd = recv_msg->fd[0]; - mem = mmap(NULL, sizeof(nxt_port_queue_t), PROT_READ | PROT_WRITE, - MAP_SHARED, recv_msg->fd[1], 0); - } + mem = mmap(NULL, sizeof(nxt_port_queue_t), PROT_READ | PROT_WRITE, + MAP_SHARED, recv_msg->fd[1], 0); if (nxt_slow_path(mem == MAP_FAILED)) { nxt_unit_alert(ctx, "mmap(%d) failed: %s (%d)", recv_msg->fd[1], @@ -1206,12 +1220,6 @@ nxt_unit_process_new_port(nxt_unit_ctx_t *ctx, nxt_unit_recv_msg_t *recv_msg) return NXT_UNIT_ERROR; } - if (new_port_msg->id == NXT_UNIT_SHARED_PORT_ID) { - lib->shared_port = port; - - return nxt_unit_ctx_ready(ctx); - } - nxt_unit_port_release(port); return NXT_UNIT_OK; |