From 82b899b1365431951afc1da9b2b30065ac98fc94 Mon Sep 17 00:00:00 2001 From: Max Romanov Date: Mon, 30 Mar 2020 14:08:20 +0300 Subject: Attributing libunit logging function for arguments validation. --- src/nxt_unit.c | 12 +++++++----- 1 file changed, 7 insertions(+), 5 deletions(-) (limited to 'src/nxt_unit.c') diff --git a/src/nxt_unit.c b/src/nxt_unit.c index 7a4124fb..77e36771 100644 --- a/src/nxt_unit.c +++ b/src/nxt_unit.c @@ -971,8 +971,10 @@ nxt_unit_process_req_headers(nxt_unit_ctx_t *ctx, nxt_unit_recv_msg_t *recv_msg) req_impl->websocket = 0; nxt_unit_debug(ctx, "#%"PRIu32": %.*s %.*s (%d)", recv_msg->stream, - (int) r->method_length, nxt_unit_sptr_get(&r->method), - (int) r->target_length, nxt_unit_sptr_get(&r->target), + (int) r->method_length, + (char *) nxt_unit_sptr_get(&r->method), + (int) r->target_length, + (char *) nxt_unit_sptr_get(&r->target), (int) r->content_length); lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit); @@ -2084,7 +2086,7 @@ nxt_unit_mmap_buf_send(nxt_unit_ctx_t *ctx, uint32_t stream, nxt_unit_debug(ctx, "process %d allocated_chunks %d", mmap_buf->process->pid, - mmap_buf->process->outgoing.allocated_chunks); + (int) mmap_buf->process->outgoing.allocated_chunks); } else { if (nxt_slow_path(mmap_buf->plain_ptr == NULL @@ -2972,7 +2974,7 @@ unlock: nxt_unit_debug(ctx, "process %d allocated_chunks %d", process->pid, - process->outgoing.allocated_chunks); + (int) process->outgoing.allocated_chunks); pthread_mutex_unlock(&process->outgoing.mutex); @@ -3691,7 +3693,7 @@ nxt_unit_mmap_release(nxt_unit_ctx_t *ctx, nxt_unit_debug(ctx, "process %d allocated_chunks %d", process->pid, - process->outgoing.allocated_chunks); + (int) process->outgoing.allocated_chunks); } if (hdr->dst_pid == lib->pid -- cgit From ab7b42a072e741b226749c416440f89fcaff3d2c Mon Sep 17 00:00:00 2001 From: Max Romanov Date: Mon, 30 Mar 2020 14:18:41 +0300 Subject: Handling change file message in libunit. This is required for proper log file rotation action. --- src/nxt_unit.c | 10 ++++++++++ 1 file changed, 10 insertions(+) (limited to 'src/nxt_unit.c') diff --git a/src/nxt_unit.c b/src/nxt_unit.c index 77e36771..55926431 100644 --- a/src/nxt_unit.c +++ b/src/nxt_unit.c @@ -767,6 +767,16 @@ nxt_unit_process_msg(nxt_unit_ctx_t *ctx, nxt_unit_port_id_t *port_id, case _NXT_PORT_MSG_CHANGE_FILE: nxt_unit_debug(ctx, "#%"PRIu32": change_file: fd %d", port_msg->stream, recv_msg.fd); + + if (dup2(recv_msg.fd, lib->log_fd) == -1) { + nxt_unit_alert(ctx, "#%"PRIu32": dup2(%d, %d) failed: %s (%d)", + port_msg->stream, recv_msg.fd, lib->log_fd, + strerror(errno), errno); + + goto fail; + } + + rc = NXT_UNIT_OK; break; case _NXT_PORT_MSG_MMAP: -- cgit From 0935630cba069d6619e967404bb6c7c2a93fbe7e Mon Sep 17 00:00:00 2001 From: Max Romanov Date: Mon, 30 Mar 2020 14:18:51 +0300 Subject: Fixing application process infinite loop. Main process exiting before app process init may have caused hanging. --- src/nxt_unit.c | 49 ++++++++++++++++++++++++++++--------------------- 1 file changed, 28 insertions(+), 21 deletions(-) (limited to 'src/nxt_unit.c') diff --git a/src/nxt_unit.c b/src/nxt_unit.c index 55926431..160b849a 100644 --- a/src/nxt_unit.c +++ b/src/nxt_unit.c @@ -2635,7 +2635,6 @@ nxt_unit_buf_read(nxt_unit_buf_t **b, uint64_t *len, void *dst, size_t size) void nxt_unit_request_done(nxt_unit_request_info_t *req, int rc) { - ssize_t res; uint32_t size; nxt_port_msg_t msg; nxt_unit_impl_t *lib; @@ -2690,12 +2689,8 @@ skip_response_send: msg.mf = 0; msg.tracking = 0; - res = lib->callbacks.port_send(req->ctx, &req->response_port, - &msg, sizeof(msg), NULL, 0); - if (nxt_slow_path(res != sizeof(msg))) { - nxt_unit_req_alert(req, "last message send failed: %s (%d)", - strerror(errno), errno); - } + (void) lib->callbacks.port_send(req->ctx, &req->response_port, + &msg, sizeof(msg), NULL, 0); nxt_unit_request_info_release(req); } @@ -3013,9 +3008,6 @@ nxt_unit_send_oosm(nxt_unit_ctx_t *ctx, nxt_unit_port_id_t *port_id) res = lib->callbacks.port_send(ctx, port_id, &msg, sizeof(msg), NULL, 0); if (nxt_slow_path(res != sizeof(msg))) { - nxt_unit_warn(ctx, "failed to send oosm to %d: %s (%d)", - (int) port_id->pid, strerror(errno), errno); - return NXT_UNIT_ERROR; } @@ -3039,6 +3031,7 @@ nxt_unit_wait_shm_ack(nxt_unit_ctx_t *ctx) } nxt_unit_read_buf(ctx, rbuf); + if (nxt_slow_path(rbuf->size < (ssize_t) sizeof(nxt_port_msg_t))) { nxt_unit_read_buf_release(ctx, rbuf); @@ -3294,9 +3287,6 @@ nxt_unit_send_mmap(nxt_unit_ctx_t *ctx, nxt_unit_port_id_t *port_id, int fd) res = lib->callbacks.port_send(ctx, port_id, &msg, sizeof(msg), &cmsg, sizeof(cmsg)); if (nxt_slow_path(res != sizeof(msg))) { - nxt_unit_warn(ctx, "failed to send shm to %d: %s (%d)", - (int) port_id->pid, strerror(errno), errno); - return NXT_UNIT_ERROR; } @@ -3739,9 +3729,6 @@ nxt_unit_send_shm_ack(nxt_unit_ctx_t *ctx, pid_t pid) res = lib->callbacks.port_send(ctx, &port_id, &msg, sizeof(msg), NULL, 0); if (nxt_slow_path(res != sizeof(msg))) { - nxt_unit_warn(ctx, "failed to send ack to %d: %s (%d)", - (int) port_id.pid, strerror(errno), errno); - return NXT_UNIT_ERROR; } @@ -3894,6 +3881,10 @@ nxt_unit_run(nxt_unit_ctx_t *ctx) while (nxt_fast_path(lib->online)) { rc = nxt_unit_run_once(ctx); + + if (nxt_slow_path(rc != NXT_UNIT_OK)) { + break; + } } return rc; @@ -4552,14 +4543,24 @@ nxt_unit_port_send(nxt_unit_ctx_t *ctx, int fd, msg.msg_control = (void *) oob; msg.msg_controllen = oob_size; +retry: + res = sendmsg(fd, &msg, 0); if (nxt_slow_path(res == -1)) { - nxt_unit_warn(ctx, "port_send(%d, %d) failed: %s (%d)", + if (errno == EINTR) { + goto retry; + } + + /* + * FIXME: This should be "alert" after router graceful shutdown + * implementation. + */ + nxt_unit_warn(ctx, "sendmsg(%d, %d) failed: %s (%d)", fd, (int) buf_size, strerror(errno), errno); } else { - nxt_unit_debug(ctx, "port_send(%d, %d): %d", fd, (int) buf_size, + nxt_unit_debug(ctx, "sendmsg(%d, %d): %d", fd, (int) buf_size, (int) res); } @@ -4629,14 +4630,20 @@ nxt_unit_port_recv(nxt_unit_ctx_t *ctx, int fd, void *buf, size_t buf_size, msg.msg_control = oob; msg.msg_controllen = oob_size; +retry: + res = recvmsg(fd, &msg, 0); if (nxt_slow_path(res == -1)) { - nxt_unit_warn(ctx, "port_recv(%d) failed: %s (%d)", - fd, strerror(errno), errno); + if (errno == EINTR) { + goto retry; + } + + nxt_unit_alert(ctx, "recvmsg(%d) failed: %s (%d)", + fd, strerror(errno), errno); } else { - nxt_unit_debug(ctx, "port_recv(%d): %d", fd, (int) res); + nxt_unit_debug(ctx, "recvmsg(%d): %d", fd, (int) res); } return res; -- cgit From 792ef9d3c71c6843dbbde450a2d6d1ade538f1f3 Mon Sep 17 00:00:00 2001 From: Max Romanov Date: Mon, 6 Apr 2020 16:52:11 +0300 Subject: Fixing 'find & add' racing condition in connected ports hash. Missing error log messages added. --- src/nxt_unit.c | 3 +++ 1 file changed, 3 insertions(+) (limited to 'src/nxt_unit.c') diff --git a/src/nxt_unit.c b/src/nxt_unit.c index 160b849a..c2e7f198 100644 --- a/src/nxt_unit.c +++ b/src/nxt_unit.c @@ -4312,6 +4312,9 @@ nxt_unit_add_port(nxt_unit_ctx_t *ctx, nxt_unit_port_t *port) rc = nxt_unit_port_hash_add(&lib->ports, &new_port->port); if (nxt_slow_path(rc != NXT_UNIT_OK)) { + nxt_unit_alert(ctx, "add_port: %d,%d hash_add failed", + port->id.pid, port->id.id); + goto unlock; } -- cgit From 58cc13ab291cac5b13462006e3feb780178ef5f3 Mon Sep 17 00:00:00 2001 From: Max Romanov Date: Fri, 10 Apr 2020 16:21:58 +0300 Subject: Resolving a racing condition while adding ports on the app's side. An earlier attempt (ad6265786871) to resolve this condition on the router's side added a new issue: the app could get a request before acquiring a port. --- src/nxt_unit.c | 28 +++++++++++++++++++++++++--- 1 file changed, 25 insertions(+), 3 deletions(-) (limited to 'src/nxt_unit.c') diff --git a/src/nxt_unit.c b/src/nxt_unit.c index c2e7f198..67244420 100644 --- a/src/nxt_unit.c +++ b/src/nxt_unit.c @@ -4282,16 +4282,38 @@ nxt_unit_add_port(nxt_unit_ctx_t *ctx, nxt_unit_port_t *port) int rc; nxt_unit_impl_t *lib; nxt_unit_process_t *process; - nxt_unit_port_impl_t *new_port; + nxt_unit_port_impl_t *new_port, *old_port; lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit); + pthread_mutex_lock(&lib->mutex); + + old_port = nxt_unit_port_hash_find(&lib->ports, &port->id, 0); + + if (nxt_slow_path(old_port != NULL)) { + nxt_unit_debug(ctx, "add_port: duplicate %d,%d in_fd %d out_fd %d", + port->id.pid, port->id.id, + port->in_fd, port->out_fd); + + if (port->in_fd != -1) { + close(port->in_fd); + port->in_fd = -1; + } + + if (port->out_fd != -1) { + close(port->out_fd); + port->out_fd = -1; + } + + pthread_mutex_unlock(&lib->mutex); + + return NXT_UNIT_OK; + } + nxt_unit_debug(ctx, "add_port: %d,%d in_fd %d out_fd %d", port->id.pid, port->id.id, port->in_fd, port->out_fd); - pthread_mutex_lock(&lib->mutex); - process = nxt_unit_process_get(ctx, port->id.pid); if (nxt_slow_path(process == NULL)) { rc = NXT_UNIT_ERROR; -- cgit