summaryrefslogtreecommitdiffhomepage
path: root/src/nxt_unit.c
diff options
context:
space:
mode:
Diffstat (limited to 'src/nxt_unit.c')
-rw-r--r--src/nxt_unit.c68
1 files changed, 41 insertions, 27 deletions
diff --git a/src/nxt_unit.c b/src/nxt_unit.c
index 097f50d6..2fef17c5 100644
--- a/src/nxt_unit.c
+++ b/src/nxt_unit.c
@@ -784,8 +784,8 @@ nxt_unit_read_env(nxt_unit_port_t *ready_port, nxt_unit_port_t *router_port,
{
int rc;
int ready_fd, router_fd, read_in_fd, read_out_fd;
- char *unit_init, *version_end;
- long version_length;
+ char *unit_init, *version_end, *vars;
+ size_t version_length;
int64_t ready_pid, router_pid, read_pid;
uint32_t ready_stream, router_id, ready_id, read_id;
@@ -797,21 +797,30 @@ nxt_unit_read_env(nxt_unit_port_t *ready_port, nxt_unit_port_t *router_port,
return NXT_UNIT_ERROR;
}
- nxt_unit_debug(NULL, "%s='%s'", NXT_UNIT_INIT_ENV, unit_init);
+ version_end = strchr(unit_init, ';');
+ if (nxt_slow_path(version_end == NULL)) {
+ nxt_unit_alert(NULL, "Unit version not found in %s=\"%s\"",
+ NXT_UNIT_INIT_ENV, unit_init);
- version_length = nxt_length(NXT_VERSION);
+ return NXT_UNIT_ERROR;
+ }
- version_end = strchr(unit_init, ';');
- if (version_end == NULL
- || version_end - unit_init != version_length
- || memcmp(unit_init, NXT_VERSION, version_length) != 0)
- {
- nxt_unit_alert(NULL, "version check error");
+ version_length = version_end - unit_init;
+
+ rc = version_length != nxt_length(NXT_VERSION)
+ || memcmp(unit_init, NXT_VERSION, nxt_length(NXT_VERSION));
+
+ if (nxt_slow_path(rc != 0)) {
+ nxt_unit_alert(NULL, "versions mismatch: the Unit daemon has version "
+ "%.*s, while the app was compiled with libunit %s",
+ (int) version_length, unit_init, NXT_VERSION);
return NXT_UNIT_ERROR;
}
- rc = sscanf(version_end + 1,
+ vars = version_end + 1;
+
+ rc = sscanf(vars,
"%"PRIu32";"
"%"PRId64",%"PRIu32",%d;"
"%"PRId64",%"PRIu32",%d;"
@@ -823,12 +832,22 @@ nxt_unit_read_env(nxt_unit_port_t *ready_port, nxt_unit_port_t *router_port,
&read_pid, &read_id, &read_in_fd, &read_out_fd,
log_fd, shm_limit);
+ if (nxt_slow_path(rc == EOF)) {
+ nxt_unit_alert(NULL, "sscanf(%s) failed: %s (%d) for %s env",
+ vars, strerror(errno), errno, NXT_UNIT_INIT_ENV);
+
+ return NXT_UNIT_ERROR;
+ }
+
if (nxt_slow_path(rc != 13)) {
- nxt_unit_alert(NULL, "failed to scan variables: %d", rc);
+ nxt_unit_alert(NULL, "invalid number of variables in %s env: "
+ "found %d of %d in %s", NXT_UNIT_INIT_ENV, rc, 13, vars);
return NXT_UNIT_ERROR;
}
+ nxt_unit_debug(NULL, "%s='%s'", NXT_UNIT_INIT_ENV, unit_init);
+
nxt_unit_port_id_init(&ready_port->id, (pid_t) ready_pid, ready_id);
ready_port->in_fd = -1;
@@ -3587,7 +3606,10 @@ nxt_unit_wait_shm_ack(nxt_unit_ctx_t *ctx)
return NXT_UNIT_ERROR;
}
- res = nxt_unit_ctx_port_recv(ctx, ctx_impl->read_port, rbuf);
+ do {
+ res = nxt_unit_ctx_port_recv(ctx, ctx_impl->read_port, rbuf);
+ } while (res == NXT_UNIT_AGAIN);
+
if (res == NXT_UNIT_ERROR) {
nxt_unit_read_buf_release(ctx, rbuf);
@@ -4994,7 +5016,6 @@ nxt_unit_process_port_msg_impl(nxt_unit_ctx_t *ctx, nxt_unit_port_t *port)
int rc;
nxt_unit_impl_t *lib;
nxt_unit_read_buf_t *rbuf;
- nxt_unit_ctx_impl_t *ctx_impl;
rbuf = nxt_unit_read_buf_get(ctx);
if (nxt_slow_path(rbuf == NULL)) {
@@ -5002,9 +5023,6 @@ nxt_unit_process_port_msg_impl(nxt_unit_ctx_t *ctx, nxt_unit_port_t *port)
}
lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit);
- ctx_impl = nxt_container_of(ctx, nxt_unit_ctx_impl_t, ctx);
-
-retry:
if (port == lib->shared_port) {
rc = nxt_unit_shared_port_recv(ctx, port, rbuf);
@@ -5030,15 +5048,6 @@ retry:
nxt_unit_process_ready_req(ctx);
- if (ctx_impl->online) {
- rbuf = nxt_unit_read_buf_get(ctx);
- if (nxt_slow_path(rbuf == NULL)) {
- return NXT_UNIT_ERROR;
- }
-
- goto retry;
- }
-
return rc;
}
@@ -6073,7 +6082,10 @@ static int
nxt_unit_shared_port_recv(nxt_unit_ctx_t *ctx, nxt_unit_port_t *port,
nxt_unit_read_buf_t *rbuf)
{
- int res;
+ int res;
+ nxt_unit_port_impl_t *port_impl;
+
+ port_impl = nxt_container_of(port, nxt_unit_port_impl_t, port);
retry:
@@ -6086,6 +6098,8 @@ retry:
}
if (nxt_unit_is_read_queue(rbuf)) {
+ nxt_app_queue_notification_received(port_impl->queue);
+
nxt_unit_debug(ctx, "port{%d,%d} recv %d read_queue",
(int) port->id.pid, (int) port->id.id, (int) rbuf->size);