summaryrefslogtreecommitdiffhomepage
path: root/src/nxt_unit.c
diff options
context:
space:
mode:
Diffstat (limited to 'src/nxt_unit.c')
-rw-r--r--src/nxt_unit.c102
1 files changed, 73 insertions, 29 deletions
diff --git a/src/nxt_unit.c b/src/nxt_unit.c
index 7a4124fb..67244420 100644
--- a/src/nxt_unit.c
+++ b/src/nxt_unit.c
@@ -767,6 +767,16 @@ nxt_unit_process_msg(nxt_unit_ctx_t *ctx, nxt_unit_port_id_t *port_id,
case _NXT_PORT_MSG_CHANGE_FILE:
nxt_unit_debug(ctx, "#%"PRIu32": change_file: fd %d",
port_msg->stream, recv_msg.fd);
+
+ if (dup2(recv_msg.fd, lib->log_fd) == -1) {
+ nxt_unit_alert(ctx, "#%"PRIu32": dup2(%d, %d) failed: %s (%d)",
+ port_msg->stream, recv_msg.fd, lib->log_fd,
+ strerror(errno), errno);
+
+ goto fail;
+ }
+
+ rc = NXT_UNIT_OK;
break;
case _NXT_PORT_MSG_MMAP:
@@ -971,8 +981,10 @@ nxt_unit_process_req_headers(nxt_unit_ctx_t *ctx, nxt_unit_recv_msg_t *recv_msg)
req_impl->websocket = 0;
nxt_unit_debug(ctx, "#%"PRIu32": %.*s %.*s (%d)", recv_msg->stream,
- (int) r->method_length, nxt_unit_sptr_get(&r->method),
- (int) r->target_length, nxt_unit_sptr_get(&r->target),
+ (int) r->method_length,
+ (char *) nxt_unit_sptr_get(&r->method),
+ (int) r->target_length,
+ (char *) nxt_unit_sptr_get(&r->target),
(int) r->content_length);
lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit);
@@ -2084,7 +2096,7 @@ nxt_unit_mmap_buf_send(nxt_unit_ctx_t *ctx, uint32_t stream,
nxt_unit_debug(ctx, "process %d allocated_chunks %d",
mmap_buf->process->pid,
- mmap_buf->process->outgoing.allocated_chunks);
+ (int) mmap_buf->process->outgoing.allocated_chunks);
} else {
if (nxt_slow_path(mmap_buf->plain_ptr == NULL
@@ -2623,7 +2635,6 @@ nxt_unit_buf_read(nxt_unit_buf_t **b, uint64_t *len, void *dst, size_t size)
void
nxt_unit_request_done(nxt_unit_request_info_t *req, int rc)
{
- ssize_t res;
uint32_t size;
nxt_port_msg_t msg;
nxt_unit_impl_t *lib;
@@ -2678,12 +2689,8 @@ skip_response_send:
msg.mf = 0;
msg.tracking = 0;
- res = lib->callbacks.port_send(req->ctx, &req->response_port,
- &msg, sizeof(msg), NULL, 0);
- if (nxt_slow_path(res != sizeof(msg))) {
- nxt_unit_req_alert(req, "last message send failed: %s (%d)",
- strerror(errno), errno);
- }
+ (void) lib->callbacks.port_send(req->ctx, &req->response_port,
+ &msg, sizeof(msg), NULL, 0);
nxt_unit_request_info_release(req);
}
@@ -2972,7 +2979,7 @@ unlock:
nxt_unit_debug(ctx, "process %d allocated_chunks %d",
process->pid,
- process->outgoing.allocated_chunks);
+ (int) process->outgoing.allocated_chunks);
pthread_mutex_unlock(&process->outgoing.mutex);
@@ -3001,9 +3008,6 @@ nxt_unit_send_oosm(nxt_unit_ctx_t *ctx, nxt_unit_port_id_t *port_id)
res = lib->callbacks.port_send(ctx, port_id, &msg, sizeof(msg), NULL, 0);
if (nxt_slow_path(res != sizeof(msg))) {
- nxt_unit_warn(ctx, "failed to send oosm to %d: %s (%d)",
- (int) port_id->pid, strerror(errno), errno);
-
return NXT_UNIT_ERROR;
}
@@ -3027,6 +3031,7 @@ nxt_unit_wait_shm_ack(nxt_unit_ctx_t *ctx)
}
nxt_unit_read_buf(ctx, rbuf);
+
if (nxt_slow_path(rbuf->size < (ssize_t) sizeof(nxt_port_msg_t))) {
nxt_unit_read_buf_release(ctx, rbuf);
@@ -3282,9 +3287,6 @@ nxt_unit_send_mmap(nxt_unit_ctx_t *ctx, nxt_unit_port_id_t *port_id, int fd)
res = lib->callbacks.port_send(ctx, port_id, &msg, sizeof(msg),
&cmsg, sizeof(cmsg));
if (nxt_slow_path(res != sizeof(msg))) {
- nxt_unit_warn(ctx, "failed to send shm to %d: %s (%d)",
- (int) port_id->pid, strerror(errno), errno);
-
return NXT_UNIT_ERROR;
}
@@ -3691,7 +3693,7 @@ nxt_unit_mmap_release(nxt_unit_ctx_t *ctx,
nxt_unit_debug(ctx, "process %d allocated_chunks %d",
process->pid,
- process->outgoing.allocated_chunks);
+ (int) process->outgoing.allocated_chunks);
}
if (hdr->dst_pid == lib->pid
@@ -3727,9 +3729,6 @@ nxt_unit_send_shm_ack(nxt_unit_ctx_t *ctx, pid_t pid)
res = lib->callbacks.port_send(ctx, &port_id, &msg, sizeof(msg), NULL, 0);
if (nxt_slow_path(res != sizeof(msg))) {
- nxt_unit_warn(ctx, "failed to send ack to %d: %s (%d)",
- (int) port_id.pid, strerror(errno), errno);
-
return NXT_UNIT_ERROR;
}
@@ -3882,6 +3881,10 @@ nxt_unit_run(nxt_unit_ctx_t *ctx)
while (nxt_fast_path(lib->online)) {
rc = nxt_unit_run_once(ctx);
+
+ if (nxt_slow_path(rc != NXT_UNIT_OK)) {
+ break;
+ }
}
return rc;
@@ -4279,16 +4282,38 @@ nxt_unit_add_port(nxt_unit_ctx_t *ctx, nxt_unit_port_t *port)
int rc;
nxt_unit_impl_t *lib;
nxt_unit_process_t *process;
- nxt_unit_port_impl_t *new_port;
+ nxt_unit_port_impl_t *new_port, *old_port;
lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit);
+ pthread_mutex_lock(&lib->mutex);
+
+ old_port = nxt_unit_port_hash_find(&lib->ports, &port->id, 0);
+
+ if (nxt_slow_path(old_port != NULL)) {
+ nxt_unit_debug(ctx, "add_port: duplicate %d,%d in_fd %d out_fd %d",
+ port->id.pid, port->id.id,
+ port->in_fd, port->out_fd);
+
+ if (port->in_fd != -1) {
+ close(port->in_fd);
+ port->in_fd = -1;
+ }
+
+ if (port->out_fd != -1) {
+ close(port->out_fd);
+ port->out_fd = -1;
+ }
+
+ pthread_mutex_unlock(&lib->mutex);
+
+ return NXT_UNIT_OK;
+ }
+
nxt_unit_debug(ctx, "add_port: %d,%d in_fd %d out_fd %d",
port->id.pid, port->id.id,
port->in_fd, port->out_fd);
- pthread_mutex_lock(&lib->mutex);
-
process = nxt_unit_process_get(ctx, port->id.pid);
if (nxt_slow_path(process == NULL)) {
rc = NXT_UNIT_ERROR;
@@ -4309,6 +4334,9 @@ nxt_unit_add_port(nxt_unit_ctx_t *ctx, nxt_unit_port_t *port)
rc = nxt_unit_port_hash_add(&lib->ports, &new_port->port);
if (nxt_slow_path(rc != NXT_UNIT_OK)) {
+ nxt_unit_alert(ctx, "add_port: %d,%d hash_add failed",
+ port->id.pid, port->id.id);
+
goto unlock;
}
@@ -4540,14 +4568,24 @@ nxt_unit_port_send(nxt_unit_ctx_t *ctx, int fd,
msg.msg_control = (void *) oob;
msg.msg_controllen = oob_size;
+retry:
+
res = sendmsg(fd, &msg, 0);
if (nxt_slow_path(res == -1)) {
- nxt_unit_warn(ctx, "port_send(%d, %d) failed: %s (%d)",
+ if (errno == EINTR) {
+ goto retry;
+ }
+
+ /*
+ * FIXME: This should be "alert" after router graceful shutdown
+ * implementation.
+ */
+ nxt_unit_warn(ctx, "sendmsg(%d, %d) failed: %s (%d)",
fd, (int) buf_size, strerror(errno), errno);
} else {
- nxt_unit_debug(ctx, "port_send(%d, %d): %d", fd, (int) buf_size,
+ nxt_unit_debug(ctx, "sendmsg(%d, %d): %d", fd, (int) buf_size,
(int) res);
}
@@ -4617,14 +4655,20 @@ nxt_unit_port_recv(nxt_unit_ctx_t *ctx, int fd, void *buf, size_t buf_size,
msg.msg_control = oob;
msg.msg_controllen = oob_size;
+retry:
+
res = recvmsg(fd, &msg, 0);
if (nxt_slow_path(res == -1)) {
- nxt_unit_warn(ctx, "port_recv(%d) failed: %s (%d)",
- fd, strerror(errno), errno);
+ if (errno == EINTR) {
+ goto retry;
+ }
+
+ nxt_unit_alert(ctx, "recvmsg(%d) failed: %s (%d)",
+ fd, strerror(errno), errno);
} else {
- nxt_unit_debug(ctx, "port_recv(%d): %d", fd, (int) res);
+ nxt_unit_debug(ctx, "recvmsg(%d): %d", fd, (int) res);
}
return res;