summaryrefslogtreecommitdiffhomepage
path: root/src/nxt_unit.c
diff options
context:
space:
mode:
Diffstat (limited to 'src/nxt_unit.c')
-rw-r--r--src/nxt_unit.c84
1 files changed, 46 insertions, 38 deletions
diff --git a/src/nxt_unit.c b/src/nxt_unit.c
index 135c06ed..32bb07ab 100644
--- a/src/nxt_unit.c
+++ b/src/nxt_unit.c
@@ -55,6 +55,7 @@ nxt_inline void nxt_unit_mmap_buf_insert_tail(nxt_unit_mmap_buf_t **prev,
nxt_inline void nxt_unit_mmap_buf_unlink(nxt_unit_mmap_buf_t *mmap_buf);
static int nxt_unit_read_env(nxt_unit_port_t *ready_port,
nxt_unit_port_t *router_port, nxt_unit_port_t *read_port,
+ int *shared_port_fd, int *shared_queue_fd,
int *log_fd, uint32_t *stream, uint32_t *shm_limit,
uint32_t *request_limit);
static int nxt_unit_ready(nxt_unit_ctx_t *ctx, int ready_fd, uint32_t stream,
@@ -424,12 +425,12 @@ static pid_t nxt_unit_pid;
nxt_unit_ctx_t *
nxt_unit_init(nxt_unit_init_t *init)
{
- int rc, queue_fd;
+ int rc, queue_fd, shared_queue_fd;
void *mem;
uint32_t ready_stream, shm_limit, request_limit;
nxt_unit_ctx_t *ctx;
nxt_unit_impl_t *lib;
- nxt_unit_port_t ready_port, router_port, read_port;
+ nxt_unit_port_t ready_port, router_port, read_port, shared_port;
nxt_unit_pid = getpid();
@@ -440,6 +441,8 @@ nxt_unit_init(nxt_unit_init_t *init)
queue_fd = -1;
mem = MAP_FAILED;
+ shared_port.out_fd = -1;
+ shared_port.data = NULL;
if (init->ready_port.id.pid != 0
&& init->ready_stream != 0
@@ -458,8 +461,12 @@ nxt_unit_init(nxt_unit_init_t *init)
nxt_unit_port_id_init(&read_port.id, read_port.id.pid,
read_port.id.id);
+ shared_port.in_fd = init->shared_port_fd;
+ shared_queue_fd = init->shared_queue_fd;
+
} else {
rc = nxt_unit_read_env(&ready_port, &router_port, &read_port,
+ &shared_port.in_fd, &shared_queue_fd,
&lib->log_fd, &ready_stream, &shm_limit,
&request_limit);
if (nxt_slow_path(rc != NXT_UNIT_OK)) {
@@ -525,6 +532,27 @@ nxt_unit_init(nxt_unit_init_t *init)
goto fail;
}
+ nxt_unit_port_id_init(&shared_port.id, read_port.id.pid,
+ NXT_UNIT_SHARED_PORT_ID);
+
+ mem = mmap(NULL, sizeof(nxt_app_queue_t), PROT_READ | PROT_WRITE,
+ MAP_SHARED, shared_queue_fd, 0);
+ if (nxt_slow_path(mem == MAP_FAILED)) {
+ nxt_unit_alert(ctx, "mmap(%d) failed: %s (%d)", shared_queue_fd,
+ strerror(errno), errno);
+
+ goto fail;
+ }
+
+ nxt_unit_close(shared_queue_fd);
+
+ lib->shared_port = nxt_unit_add_port(ctx, &shared_port, mem);
+ if (nxt_slow_path(lib->shared_port == NULL)) {
+ nxt_unit_alert(NULL, "failed to add shared_port");
+
+ goto fail;
+ }
+
rc = nxt_unit_ready(ctx, ready_port.out_fd, ready_stream, queue_fd);
if (nxt_slow_path(rc != NXT_UNIT_OK)) {
nxt_unit_alert(NULL, "failed to send READY message");
@@ -799,7 +827,8 @@ nxt_unit_mmap_buf_unlink(nxt_unit_mmap_buf_t *mmap_buf)
static int
nxt_unit_read_env(nxt_unit_port_t *ready_port, nxt_unit_port_t *router_port,
- nxt_unit_port_t *read_port, int *log_fd, uint32_t *stream,
+ nxt_unit_port_t *read_port, int *shared_port_fd, int *shared_queue_fd,
+ int *log_fd, uint32_t *stream,
uint32_t *shm_limit, uint32_t *request_limit)
{
int rc;
@@ -845,11 +874,13 @@ nxt_unit_read_env(nxt_unit_port_t *ready_port, nxt_unit_port_t *router_port,
"%"PRId64",%"PRIu32",%d;"
"%"PRId64",%"PRIu32",%d;"
"%"PRId64",%"PRIu32",%d,%d;"
+ "%d,%d;"
"%d,%"PRIu32",%"PRIu32,
&ready_stream,
&ready_pid, &ready_id, &ready_fd,
&router_pid, &router_id, &router_fd,
&read_pid, &read_id, &read_in_fd, &read_out_fd,
+ shared_port_fd, shared_queue_fd,
log_fd, shm_limit, request_limit);
if (nxt_slow_path(rc == EOF)) {
@@ -859,9 +890,9 @@ nxt_unit_read_env(nxt_unit_port_t *ready_port, nxt_unit_port_t *router_port,
return NXT_UNIT_ERROR;
}
- if (nxt_slow_path(rc != 14)) {
+ if (nxt_slow_path(rc != 16)) {
nxt_unit_alert(NULL, "invalid number of variables in %s env: "
- "found %d of %d in %s", NXT_UNIT_INIT_ENV, rc, 14, vars);
+ "found %d of %d in %s", NXT_UNIT_INIT_ENV, rc, 16, vars);
return NXT_UNIT_ERROR;
}
@@ -1137,7 +1168,6 @@ static int
nxt_unit_process_new_port(nxt_unit_ctx_t *ctx, nxt_unit_recv_msg_t *recv_msg)
{
void *mem;
- nxt_unit_impl_t *lib;
nxt_unit_port_t new_port, *port;
nxt_port_msg_new_port_t *new_port_msg;
@@ -1162,33 +1192,17 @@ nxt_unit_process_new_port(nxt_unit_ctx_t *ctx, nxt_unit_recv_msg_t *recv_msg)
recv_msg->stream, (int) new_port_msg->pid,
(int) new_port_msg->id, recv_msg->fd[0], recv_msg->fd[1]);
- lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit);
-
- if (new_port_msg->id == NXT_UNIT_SHARED_PORT_ID) {
- nxt_unit_port_id_init(&new_port.id, lib->pid, new_port_msg->id);
-
- new_port.in_fd = recv_msg->fd[0];
- new_port.out_fd = -1;
-
- mem = mmap(NULL, sizeof(nxt_app_queue_t), PROT_READ | PROT_WRITE,
- MAP_SHARED, recv_msg->fd[1], 0);
-
- } else {
- if (nxt_slow_path(nxt_unit_fd_blocking(recv_msg->fd[0])
- != NXT_UNIT_OK))
- {
- return NXT_UNIT_ERROR;
- }
+ if (nxt_slow_path(nxt_unit_fd_blocking(recv_msg->fd[0]) != NXT_UNIT_OK)) {
+ return NXT_UNIT_ERROR;
+ }
- nxt_unit_port_id_init(&new_port.id, new_port_msg->pid,
- new_port_msg->id);
+ nxt_unit_port_id_init(&new_port.id, new_port_msg->pid, new_port_msg->id);
- new_port.in_fd = -1;
- new_port.out_fd = recv_msg->fd[0];
+ new_port.in_fd = -1;
+ new_port.out_fd = recv_msg->fd[0];
- mem = mmap(NULL, sizeof(nxt_port_queue_t), PROT_READ | PROT_WRITE,
- MAP_SHARED, recv_msg->fd[1], 0);
- }
+ mem = mmap(NULL, sizeof(nxt_port_queue_t), PROT_READ | PROT_WRITE,
+ MAP_SHARED, recv_msg->fd[1], 0);
if (nxt_slow_path(mem == MAP_FAILED)) {
nxt_unit_alert(ctx, "mmap(%d) failed: %s (%d)", recv_msg->fd[1],
@@ -1206,12 +1220,6 @@ nxt_unit_process_new_port(nxt_unit_ctx_t *ctx, nxt_unit_recv_msg_t *recv_msg)
return NXT_UNIT_ERROR;
}
- if (new_port_msg->id == NXT_UNIT_SHARED_PORT_ID) {
- lib->shared_port = port;
-
- return nxt_unit_ctx_ready(ctx);
- }
-
nxt_unit_port_release(port);
return NXT_UNIT_OK;
@@ -5106,9 +5114,9 @@ nxt_unit_ctx_alloc(nxt_unit_ctx_t *ctx, void *data)
rc = nxt_unit_ctx_init(lib, new_ctx, data);
if (nxt_slow_path(rc != NXT_UNIT_OK)) {
- nxt_unit_free(ctx, new_ctx);
+ nxt_unit_free(ctx, new_ctx);
- return NULL;
+ return NULL;
}
queue_fd = -1;