summaryrefslogtreecommitdiffhomepage
path: root/src/nxt_unit.c
diff options
context:
space:
mode:
Diffstat (limited to 'src/nxt_unit.c')
-rw-r--r--src/nxt_unit.c508
1 files changed, 295 insertions, 213 deletions
diff --git a/src/nxt_unit.c b/src/nxt_unit.c
index ae4499d8..06ad1636 100644
--- a/src/nxt_unit.c
+++ b/src/nxt_unit.c
@@ -3,10 +3,9 @@
* Copyright (C) NGINX, Inc.
*/
-#include <stdlib.h>
-
#include "nxt_main.h"
#include "nxt_port_memory_int.h"
+#include "nxt_socket_msg.h"
#include "nxt_port_queue.h"
#include "nxt_app_queue.h"
@@ -25,6 +24,11 @@
#define NXT_UNIT_LOCAL_BUF_SIZE \
(NXT_UNIT_MAX_PLAIN_SIZE + sizeof(nxt_port_msg_t))
+enum {
+ NXT_QUIT_NORMAL = 0,
+ NXT_QUIT_GRACEFUL = 1,
+};
+
typedef struct nxt_unit_impl_s nxt_unit_impl_t;
typedef struct nxt_unit_mmap_s nxt_unit_mmap_t;
typedef struct nxt_unit_mmaps_s nxt_unit_mmaps_t;
@@ -51,7 +55,8 @@ nxt_inline void nxt_unit_mmap_buf_insert_tail(nxt_unit_mmap_buf_t **prev,
nxt_inline void nxt_unit_mmap_buf_unlink(nxt_unit_mmap_buf_t *mmap_buf);
static int nxt_unit_read_env(nxt_unit_port_t *ready_port,
nxt_unit_port_t *router_port, nxt_unit_port_t *read_port,
- int *log_fd, uint32_t *stream, uint32_t *shm_limit);
+ int *log_fd, uint32_t *stream, uint32_t *shm_limit,
+ uint32_t *request_limit);
static int nxt_unit_ready(nxt_unit_ctx_t *ctx, int ready_fd, uint32_t stream,
int queue_fd);
static int nxt_unit_process_msg(nxt_unit_ctx_t *ctx, nxt_unit_read_buf_t *rbuf,
@@ -130,6 +135,7 @@ static nxt_unit_process_t *nxt_unit_process_find(nxt_unit_impl_t *lib,
static nxt_unit_process_t *nxt_unit_process_pop_first(nxt_unit_impl_t *lib);
static int nxt_unit_run_once_impl(nxt_unit_ctx_t *ctx);
static int nxt_unit_read_buf(nxt_unit_ctx_t *ctx, nxt_unit_read_buf_t *rbuf);
+static int nxt_unit_chk_ready(nxt_unit_ctx_t *ctx);
static int nxt_unit_process_pending_rbuf(nxt_unit_ctx_t *ctx);
static void nxt_unit_process_ready_req(nxt_unit_ctx_t *ctx);
nxt_inline int nxt_unit_is_read_queue(nxt_unit_read_buf_t *rbuf);
@@ -150,20 +156,20 @@ static nxt_unit_port_t *nxt_unit_add_port(nxt_unit_ctx_t *ctx,
nxt_unit_port_t *port, void *queue);
static void nxt_unit_process_awaiting_req(nxt_unit_ctx_t *ctx,
nxt_queue_t *awaiting_req);
-static void nxt_unit_remove_port(nxt_unit_impl_t *lib,
+static void nxt_unit_remove_port(nxt_unit_impl_t *lib, nxt_unit_ctx_t *ctx,
nxt_unit_port_id_t *port_id);
static nxt_unit_port_t *nxt_unit_remove_port_unsafe(nxt_unit_impl_t *lib,
nxt_unit_port_id_t *port_id);
static void nxt_unit_remove_pid(nxt_unit_impl_t *lib, pid_t pid);
static void nxt_unit_remove_process(nxt_unit_impl_t *lib,
nxt_unit_process_t *process);
-static void nxt_unit_quit(nxt_unit_ctx_t *ctx);
+static void nxt_unit_quit(nxt_unit_ctx_t *ctx, uint8_t quit_param);
static int nxt_unit_get_port(nxt_unit_ctx_t *ctx, nxt_unit_port_id_t *port_id);
static ssize_t nxt_unit_port_send(nxt_unit_ctx_t *ctx,
nxt_unit_port_t *port, const void *buf, size_t buf_size,
- const void *oob, size_t oob_size);
+ const nxt_send_oob_t *oob);
static ssize_t nxt_unit_sendmsg(nxt_unit_ctx_t *ctx, int fd,
- const void *buf, size_t buf_size, const void *oob, size_t oob_size);
+ const void *buf, size_t buf_size, const nxt_send_oob_t *oob);
static int nxt_unit_ctx_port_recv(nxt_unit_ctx_t *ctx, nxt_unit_port_t *port,
nxt_unit_read_buf_t *rbuf);
nxt_inline void nxt_unit_rbuf_cpy(nxt_unit_read_buf_t *dst,
@@ -174,7 +180,7 @@ static int nxt_unit_port_recv(nxt_unit_ctx_t *ctx, nxt_unit_port_t *port,
nxt_unit_read_buf_t *rbuf);
static int nxt_unit_port_queue_recv(nxt_unit_port_t *port,
nxt_unit_read_buf_t *rbuf);
-static int nxt_unit_app_queue_recv(nxt_unit_port_t *port,
+static int nxt_unit_app_queue_recv(nxt_unit_ctx_t *ctx, nxt_unit_port_t *port,
nxt_unit_read_buf_t *rbuf);
nxt_inline int nxt_unit_close(int fd);
static int nxt_unit_fd_blocking(int fd);
@@ -271,8 +277,8 @@ struct nxt_unit_read_buf_s {
nxt_queue_link_t link;
nxt_unit_ctx_impl_t *ctx_impl;
ssize_t size;
+ nxt_recv_oob_t oob;
char buf[16384];
- char oob[256];
};
@@ -311,8 +317,9 @@ struct nxt_unit_ctx_impl_s {
/* of nxt_unit_read_buf_t */
nxt_queue_t free_rbuf;
- int online;
- int ready;
+ uint8_t online; /* 1 bit */
+ uint8_t ready; /* 1 bit */
+ uint8_t quit_param;
nxt_unit_mmap_buf_t ctx_buf[2];
nxt_unit_read_buf_t ctx_read_buf;
@@ -344,9 +351,11 @@ struct nxt_unit_impl_s {
nxt_unit_callbacks_t callbacks;
nxt_atomic_t use_count;
+ nxt_atomic_t request_count;
uint32_t request_data_size;
uint32_t shm_mmap_limit;
+ uint32_t request_limit;
pthread_mutex_t mutex;
@@ -409,16 +418,21 @@ typedef struct {
} nxt_unit_port_hash_id_t;
+static pid_t nxt_unit_pid;
+
+
nxt_unit_ctx_t *
nxt_unit_init(nxt_unit_init_t *init)
{
int rc, queue_fd;
void *mem;
- uint32_t ready_stream, shm_limit;
+ uint32_t ready_stream, shm_limit, request_limit;
nxt_unit_ctx_t *ctx;
nxt_unit_impl_t *lib;
nxt_unit_port_t ready_port, router_port, read_port;
+ nxt_unit_pid = getpid();
+
lib = nxt_unit_create(init);
if (nxt_slow_path(lib == NULL)) {
return NULL;
@@ -446,13 +460,15 @@ nxt_unit_init(nxt_unit_init_t *init)
} else {
rc = nxt_unit_read_env(&ready_port, &router_port, &read_port,
- &lib->log_fd, &ready_stream, &shm_limit);
+ &lib->log_fd, &ready_stream, &shm_limit,
+ &request_limit);
if (nxt_slow_path(rc != NXT_UNIT_OK)) {
goto fail;
}
lib->shm_mmap_limit = (shm_limit + PORT_MMAP_DATA_SIZE - 1)
/ PORT_MMAP_DATA_SIZE;
+ lib->request_limit = request_limit;
}
if (nxt_slow_path(lib->shm_mmap_limit < 1)) {
@@ -460,6 +476,7 @@ nxt_unit_init(nxt_unit_init_t *init)
}
lib->pid = read_port.id.pid;
+ nxt_unit_pid = lib->pid;
ctx = &lib->main_ctx.ctx;
@@ -564,6 +581,7 @@ nxt_unit_create(nxt_unit_init_t *init)
lib->request_data_size = init->request_data_size;
lib->shm_mmap_limit = (init->shm_limit + PORT_MMAP_DATA_SIZE - 1)
/ PORT_MMAP_DATA_SIZE;
+ lib->request_limit = init->request_limit;
lib->processes.slot = NULL;
lib->ports.slot = NULL;
@@ -573,6 +591,7 @@ nxt_unit_create(nxt_unit_init_t *init)
nxt_queue_init(&lib->contexts);
lib->use_count = 0;
+ lib->request_count = 0;
lib->router_port = NULL;
lib->shared_port = NULL;
@@ -632,6 +651,7 @@ nxt_unit_ctx_init(nxt_unit_impl_t *lib, nxt_unit_ctx_impl_t *ctx_impl,
ctx_impl->wait_items = 0;
ctx_impl->online = 1;
ctx_impl->ready = 0;
+ ctx_impl->quit_param = NXT_QUIT_GRACEFUL;
nxt_queue_init(&ctx_impl->free_req);
nxt_queue_init(&ctx_impl->free_ws);
@@ -780,7 +800,7 @@ nxt_unit_mmap_buf_unlink(nxt_unit_mmap_buf_t *mmap_buf)
static int
nxt_unit_read_env(nxt_unit_port_t *ready_port, nxt_unit_port_t *router_port,
nxt_unit_port_t *read_port, int *log_fd, uint32_t *stream,
- uint32_t *shm_limit)
+ uint32_t *shm_limit, uint32_t *request_limit)
{
int rc;
int ready_fd, router_fd, read_in_fd, read_out_fd;
@@ -825,12 +845,12 @@ nxt_unit_read_env(nxt_unit_port_t *ready_port, nxt_unit_port_t *router_port,
"%"PRId64",%"PRIu32",%d;"
"%"PRId64",%"PRIu32",%d;"
"%"PRId64",%"PRIu32",%d,%d;"
- "%d,%"PRIu32,
+ "%d,%"PRIu32",%"PRIu32,
&ready_stream,
&ready_pid, &ready_id, &ready_fd,
&router_pid, &router_id, &router_fd,
&read_pid, &read_id, &read_in_fd, &read_out_fd,
- log_fd, shm_limit);
+ log_fd, shm_limit, request_limit);
if (nxt_slow_path(rc == EOF)) {
nxt_unit_alert(NULL, "sscanf(%s) failed: %s (%d) for %s env",
@@ -839,9 +859,9 @@ nxt_unit_read_env(nxt_unit_port_t *ready_port, nxt_unit_port_t *router_port,
return NXT_UNIT_ERROR;
}
- if (nxt_slow_path(rc != 13)) {
+ if (nxt_slow_path(rc != 14)) {
nxt_unit_alert(NULL, "invalid number of variables in %s env: "
- "found %d of %d in %s", NXT_UNIT_INIT_ENV, rc, 13, vars);
+ "found %d of %d in %s", NXT_UNIT_INIT_ENV, rc, 14, vars);
return NXT_UNIT_ERROR;
}
@@ -876,13 +896,10 @@ static int
nxt_unit_ready(nxt_unit_ctx_t *ctx, int ready_fd, uint32_t stream, int queue_fd)
{
ssize_t res;
+ nxt_send_oob_t oob;
nxt_port_msg_t msg;
nxt_unit_impl_t *lib;
-
- union {
- struct cmsghdr cm;
- char space[CMSG_SPACE(sizeof(int))];
- } cmsg;
+ int fds[2] = {queue_fd, -1};
lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit);
@@ -896,25 +913,9 @@ nxt_unit_ready(nxt_unit_ctx_t *ctx, int ready_fd, uint32_t stream, int queue_fd)
msg.mf = 0;
msg.tracking = 0;
- memset(&cmsg, 0, sizeof(cmsg));
-
- cmsg.cm.cmsg_len = CMSG_LEN(sizeof(int));
- cmsg.cm.cmsg_level = SOL_SOCKET;
- cmsg.cm.cmsg_type = SCM_RIGHTS;
-
- /*
- * memcpy() is used instead of simple
- * *(int *) CMSG_DATA(&cmsg.cm) = fd;
- * because GCC 4.4 with -O2/3/s optimization may issue a warning:
- * dereferencing type-punned pointer will break strict-aliasing rules
- *
- * Fortunately, GCC with -O1 compiles this nxt_memcpy()
- * in the same simple assignment as in the code above.
- */
- memcpy(CMSG_DATA(&cmsg.cm), &queue_fd, sizeof(int));
+ nxt_socket_msg_oob_init(&oob, fds);
- res = nxt_unit_sendmsg(ctx, ready_fd, &msg, sizeof(msg),
- &cmsg, sizeof(cmsg));
+ res = nxt_unit_sendmsg(ctx, ready_fd, &msg, sizeof(msg), &oob);
if (res != sizeof(msg)) {
return NXT_UNIT_ERROR;
}
@@ -929,7 +930,7 @@ nxt_unit_process_msg(nxt_unit_ctx_t *ctx, nxt_unit_read_buf_t *rbuf,
{
int rc;
pid_t pid;
- struct cmsghdr *cm;
+ uint8_t quit_param;
nxt_port_msg_t *port_msg;
nxt_unit_impl_t *lib;
nxt_unit_recv_msg_t recv_msg;
@@ -939,18 +940,12 @@ nxt_unit_process_msg(nxt_unit_ctx_t *ctx, nxt_unit_read_buf_t *rbuf,
recv_msg.fd[0] = -1;
recv_msg.fd[1] = -1;
port_msg = (nxt_port_msg_t *) rbuf->buf;
- cm = (struct cmsghdr *) rbuf->oob;
- if (cm->cmsg_level == SOL_SOCKET
- && cm->cmsg_type == SCM_RIGHTS)
- {
- if (cm->cmsg_len == CMSG_LEN(sizeof(int))) {
- memcpy(recv_msg.fd, CMSG_DATA(cm), sizeof(int));
- }
-
- if (cm->cmsg_len == CMSG_LEN(sizeof(int) * 2)) {
- memcpy(recv_msg.fd, CMSG_DATA(cm), sizeof(int) * 2);
- }
+ rc = nxt_socket_msg_oob_get_fds(&rbuf->oob, recv_msg.fd);
+ if (nxt_slow_path(rc != NXT_OK)) {
+ nxt_unit_alert(ctx, "failed to receive file descriptor over cmsg");
+ rc = NXT_UNIT_ERROR;
+ goto done;
}
recv_msg.incoming_buf = NULL;
@@ -959,7 +954,7 @@ nxt_unit_process_msg(nxt_unit_ctx_t *ctx, nxt_unit_read_buf_t *rbuf,
if (nxt_slow_path(rbuf->size == 0)) {
nxt_unit_debug(ctx, "read port closed");
- nxt_unit_quit(ctx);
+ nxt_unit_quit(ctx, NXT_QUIT_GRACEFUL);
rc = NXT_UNIT_OK;
goto done;
}
@@ -1018,9 +1013,18 @@ nxt_unit_process_msg(nxt_unit_ctx_t *ctx, nxt_unit_read_buf_t *rbuf,
break;
case _NXT_PORT_MSG_QUIT:
- nxt_unit_debug(ctx, "#%"PRIu32": quit", port_msg->stream);
+ if (recv_msg.size == sizeof(quit_param)) {
+ memcpy(&quit_param, recv_msg.start, sizeof(quit_param));
+
+ } else {
+ quit_param = NXT_QUIT_NORMAL;
+ }
+
+ nxt_unit_debug(ctx, "#%"PRIu32": %squit", port_msg->stream,
+ (quit_param == NXT_QUIT_GRACEFUL ? "graceful " : ""));
+
+ nxt_unit_quit(ctx, quit_param);
- nxt_unit_quit(ctx);
rc = NXT_UNIT_OK;
break;
@@ -1220,15 +1224,36 @@ nxt_unit_ctx_ready(nxt_unit_ctx_t *ctx)
nxt_unit_impl_t *lib;
nxt_unit_ctx_impl_t *ctx_impl;
- lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit);
ctx_impl = nxt_container_of(ctx, nxt_unit_ctx_impl_t, ctx);
+ if (nxt_slow_path(ctx_impl->ready)) {
+ return NXT_UNIT_OK;
+ }
+
ctx_impl->ready = 1;
- if (lib->callbacks.ready_handler) {
+ lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit);
+
+ /* Call ready_handler() only for main context. */
+ if (&lib->main_ctx == ctx_impl && lib->callbacks.ready_handler != NULL) {
return lib->callbacks.ready_handler(ctx);
}
+ if (&lib->main_ctx != ctx_impl) {
+ /* Check if the main context is already stopped or quit. */
+ if (nxt_slow_path(!lib->main_ctx.ready)) {
+ ctx_impl->ready = 0;
+
+ nxt_unit_quit(ctx, lib->main_ctx.quit_param);
+
+ return NXT_UNIT_OK;
+ }
+
+ if (lib->callbacks.add_port != NULL) {
+ lib->callbacks.add_port(ctx, lib->shared_port);
+ }
+ }
+
return NXT_UNIT_OK;
}
@@ -1561,7 +1586,7 @@ nxt_unit_send_req_headers_ack(nxt_unit_request_info_t *req)
msg.type = _NXT_PORT_MSG_REQ_HEADERS_ACK;
res = nxt_unit_port_send(req->ctx, req->response_port,
- &msg, sizeof(msg), NULL, 0);
+ &msg, sizeof(msg), NULL);
if (nxt_slow_path(res != sizeof(msg))) {
return NXT_UNIT_ERROR;
}
@@ -1741,10 +1766,12 @@ nxt_unit_request_info_get(nxt_unit_ctx_t *ctx)
static void
nxt_unit_request_info_release(nxt_unit_request_info_t *req)
{
+ nxt_unit_ctx_t *ctx;
nxt_unit_ctx_impl_t *ctx_impl;
nxt_unit_request_info_impl_t *req_impl;
- ctx_impl = nxt_container_of(req->ctx, nxt_unit_ctx_impl_t, ctx);
+ ctx = req->ctx;
+ ctx_impl = nxt_container_of(ctx, nxt_unit_ctx_impl_t, ctx);
req_impl = nxt_container_of(req, nxt_unit_request_info_impl_t, req);
req->response = NULL;
@@ -1783,6 +1810,10 @@ nxt_unit_request_info_release(nxt_unit_request_info_t *req)
nxt_queue_insert_tail(&ctx_impl->free_req, &req_impl->link);
pthread_mutex_unlock(&ctx_impl->mutex);
+
+ if (nxt_slow_path(!nxt_unit_chk_ready(ctx))) {
+ nxt_unit_quit(ctx, NXT_QUIT_GRACEFUL);
+ }
}
@@ -2621,7 +2652,7 @@ nxt_unit_mmap_buf_send(nxt_unit_request_info_t *req,
(int) m.mmap_msg.size);
res = nxt_unit_port_send(req->ctx, req->response_port, &m, sizeof(m),
- NULL, 0);
+ NULL);
if (nxt_slow_path(res != sizeof(m))) {
goto free_buf;
}
@@ -2673,8 +2704,8 @@ nxt_unit_mmap_buf_send(nxt_unit_request_info_t *req,
res = nxt_unit_port_send(req->ctx, req->response_port,
buf->start - sizeof(m.msg),
- m.mmap_msg.size + sizeof(m.msg),
- NULL, 0);
+ m.mmap_msg.size + sizeof(m.msg), NULL);
+
if (nxt_slow_path(res != (ssize_t) (m.mmap_msg.size + sizeof(m.msg)))) {
goto free_buf;
}
@@ -2741,7 +2772,7 @@ nxt_unit_read_buf_get(nxt_unit_ctx_t *ctx)
pthread_mutex_unlock(&ctx_impl->mutex);
- memset(rbuf->oob, 0, sizeof(struct cmsghdr));
+ rbuf->oob.size = 0;
return rbuf;
}
@@ -3260,7 +3291,7 @@ skip_response_send:
msg.tracking = 0;
(void) nxt_unit_port_send(req->ctx, req->response_port,
- &msg, sizeof(msg), NULL, 0);
+ &msg, sizeof(msg), NULL);
nxt_unit_request_info_release(req);
}
@@ -3582,7 +3613,7 @@ nxt_unit_send_oosm(nxt_unit_ctx_t *ctx, nxt_unit_port_t *port)
msg.mf = 0;
msg.tracking = 0;
- res = nxt_unit_port_send(ctx, lib->router_port, &msg, sizeof(msg), NULL, 0);
+ res = nxt_unit_port_send(ctx, lib->router_port, &msg, sizeof(msg), NULL);
if (nxt_slow_path(res != sizeof(msg))) {
return NXT_UNIT_ERROR;
}
@@ -3851,12 +3882,10 @@ static int
nxt_unit_send_mmap(nxt_unit_ctx_t *ctx, nxt_unit_port_t *port, int fd)
{
ssize_t res;
+ nxt_send_oob_t oob;
nxt_port_msg_t msg;
nxt_unit_impl_t *lib;
- union {
- struct cmsghdr cm;
- char space[CMSG_SPACE(sizeof(int))];
- } cmsg;
+ int fds[2] = {fd, -1};
lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit);
@@ -3870,30 +3899,9 @@ nxt_unit_send_mmap(nxt_unit_ctx_t *ctx, nxt_unit_port_t *port, int fd)
msg.mf = 0;
msg.tracking = 0;
- /*
- * Fill all padding fields with 0.
- * Code in Go 1.11 validate cmsghdr using padding field as part of len.
- * See Cmsghdr definition and socketControlMessageHeaderAndData function.
- */
- memset(&cmsg, 0, sizeof(cmsg));
-
- cmsg.cm.cmsg_len = CMSG_LEN(sizeof(int));
- cmsg.cm.cmsg_level = SOL_SOCKET;
- cmsg.cm.cmsg_type = SCM_RIGHTS;
-
- /*
- * memcpy() is used instead of simple
- * *(int *) CMSG_DATA(&cmsg.cm) = fd;
- * because GCC 4.4 with -O2/3/s optimization may issue a warning:
- * dereferencing type-punned pointer will break strict-aliasing rules
- *
- * Fortunately, GCC with -O1 compiles this nxt_memcpy()
- * in the same simple assignment as in the code above.
- */
- memcpy(CMSG_DATA(&cmsg.cm), &fd, sizeof(int));
+ nxt_socket_msg_oob_init(&oob, fds);
- res = nxt_unit_port_send(ctx, port, &msg, sizeof(msg),
- &cmsg, sizeof(cmsg));
+ res = nxt_unit_port_send(ctx, port, &msg, sizeof(msg), &oob);
if (nxt_slow_path(res != sizeof(msg))) {
return NXT_UNIT_ERROR;
}
@@ -4083,7 +4091,7 @@ nxt_unit_awake_ctx(nxt_unit_ctx_t *ctx, nxt_unit_ctx_impl_t *ctx_impl)
msg.type = _NXT_PORT_MSG_RPC_READY;
(void) nxt_unit_port_send(ctx, ctx_impl->read_port,
- &msg, sizeof(msg), NULL, 0);
+ &msg, sizeof(msg), NULL);
}
@@ -4306,7 +4314,7 @@ nxt_unit_get_mmap(nxt_unit_ctx_t *ctx, pid_t pid, uint32_t id)
nxt_unit_debug(ctx, "get_mmap: %d %d", (int) pid, (int) id);
- res = nxt_unit_port_send(ctx, lib->router_port, &m, sizeof(m), NULL, 0);
+ res = nxt_unit_port_send(ctx, lib->router_port, &m, sizeof(m), NULL);
if (nxt_slow_path(res != sizeof(m))) {
return NXT_UNIT_ERROR;
}
@@ -4376,7 +4384,7 @@ nxt_unit_send_shm_ack(nxt_unit_ctx_t *ctx, pid_t pid)
msg.mf = 0;
msg.tracking = 0;
- res = nxt_unit_port_send(ctx, lib->router_port, &msg, sizeof(msg), NULL, 0);
+ res = nxt_unit_port_send(ctx, lib->router_port, &msg, sizeof(msg), NULL);
if (nxt_slow_path(res != sizeof(msg))) {
return NXT_UNIT_ERROR;
}
@@ -4522,7 +4530,7 @@ nxt_unit_run(nxt_unit_ctx_t *ctx)
rc = nxt_unit_run_once_impl(ctx);
if (nxt_slow_path(rc == NXT_UNIT_ERROR)) {
- nxt_unit_quit(ctx);
+ nxt_unit_quit(ctx, NXT_QUIT_NORMAL);
break;
}
}
@@ -4586,6 +4594,7 @@ static int
nxt_unit_read_buf(nxt_unit_ctx_t *ctx, nxt_unit_read_buf_t *rbuf)
{
int nevents, res, err;
+ nxt_uint_t nfds;
nxt_unit_impl_t *lib;
nxt_unit_ctx_impl_t *ctx_impl;
nxt_unit_port_impl_t *port_impl;
@@ -4593,7 +4602,7 @@ nxt_unit_read_buf(nxt_unit_ctx_t *ctx, nxt_unit_read_buf_t *rbuf)
ctx_impl = nxt_container_of(ctx, nxt_unit_ctx_impl_t, ctx);
- if (ctx_impl->wait_items > 0 || ctx_impl->ready == 0) {
+ if (ctx_impl->wait_items > 0 || !nxt_unit_chk_ready(ctx)) {
return nxt_unit_ctx_port_recv(ctx, ctx_impl->read_port, rbuf);
}
@@ -4626,20 +4635,28 @@ retry:
}
}
- res = nxt_unit_app_queue_recv(lib->shared_port, rbuf);
- if (res == NXT_UNIT_OK) {
- return NXT_UNIT_OK;
+ if (nxt_fast_path(nxt_unit_chk_ready(ctx))) {
+ res = nxt_unit_app_queue_recv(ctx, lib->shared_port, rbuf);
+ if (res == NXT_UNIT_OK) {
+ return NXT_UNIT_OK;
+ }
+
+ fds[1].fd = lib->shared_port->in_fd;
+ fds[1].events = POLLIN;
+
+ nfds = 2;
+
+ } else {
+ nfds = 1;
}
fds[0].fd = ctx_impl->read_port->in_fd;
fds[0].events = POLLIN;
fds[0].revents = 0;
- fds[1].fd = lib->shared_port->in_fd;
- fds[1].events = POLLIN;
fds[1].revents = 0;
- nevents = poll(fds, 2, -1);
+ nevents = poll(fds, nfds, -1);
if (nxt_slow_path(nevents == -1)) {
err = errno;
@@ -4655,7 +4672,7 @@ retry:
return (err == EAGAIN) ? NXT_UNIT_AGAIN : NXT_UNIT_ERROR;
}
- nxt_unit_debug(ctx, "poll(%d,%d): %d, revents [%04uXi, %04uXi]",
+ nxt_unit_debug(ctx, "poll(%d,%d): %d, revents [%04X, %04X]",
fds[0].fd, fds[1].fd, nevents, fds[0].revents,
fds[1].revents);
@@ -4686,6 +4703,21 @@ retry:
static int
+nxt_unit_chk_ready(nxt_unit_ctx_t *ctx)
+{
+ nxt_unit_impl_t *lib;
+ nxt_unit_ctx_impl_t *ctx_impl;
+
+ ctx_impl = nxt_container_of(ctx, nxt_unit_ctx_impl_t, ctx);
+ lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit);
+
+ return (ctx_impl->ready
+ && (lib->request_limit == 0
+ || lib->request_count < lib->request_limit));
+}
+
+
+static int
nxt_unit_process_pending_rbuf(nxt_unit_ctx_t *ctx)
{
int rc;
@@ -4723,6 +4755,10 @@ nxt_unit_process_pending_rbuf(nxt_unit_ctx_t *ctx)
} nxt_queue_loop;
+ if (!ctx_impl->ready) {
+ nxt_unit_quit(ctx, NXT_QUIT_GRACEFUL);
+ }
+
return rc;
}
@@ -4903,16 +4939,14 @@ nxt_unit_run_shared(nxt_unit_ctx_t *ctx)
int rc;
nxt_unit_impl_t *lib;
nxt_unit_read_buf_t *rbuf;
- nxt_unit_ctx_impl_t *ctx_impl;
nxt_unit_ctx_use(ctx);
lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit);
- ctx_impl = nxt_container_of(ctx, nxt_unit_ctx_impl_t, ctx);
rc = NXT_UNIT_OK;
- while (nxt_fast_path(ctx_impl->online)) {
+ while (nxt_fast_path(nxt_unit_chk_ready(ctx))) {
rbuf = nxt_unit_read_buf_get(ctx);
if (nxt_slow_path(rbuf == NULL)) {
rc = NXT_UNIT_ERROR;
@@ -4949,17 +4983,15 @@ nxt_unit_dequeue_request(nxt_unit_ctx_t *ctx)
int rc;
nxt_unit_impl_t *lib;
nxt_unit_read_buf_t *rbuf;
- nxt_unit_ctx_impl_t *ctx_impl;
nxt_unit_request_info_t *req;
nxt_unit_ctx_use(ctx);
lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit);
- ctx_impl = nxt_container_of(ctx, nxt_unit_ctx_impl_t, ctx);
req = NULL;
- if (nxt_slow_path(!ctx_impl->online)) {
+ if (nxt_slow_path(!nxt_unit_chk_ready(ctx))) {
goto done;
}
@@ -4968,7 +5000,7 @@ nxt_unit_dequeue_request(nxt_unit_ctx_t *ctx)
goto done;
}
- rc = nxt_unit_app_queue_recv(lib->shared_port, rbuf);
+ rc = nxt_unit_app_queue_recv(ctx, lib->shared_port, rbuf);
if (rc != NXT_UNIT_OK) {
nxt_unit_read_buf_release(ctx, rbuf);
goto done;
@@ -4985,17 +5017,6 @@ done:
int
-nxt_unit_is_main_ctx(nxt_unit_ctx_t *ctx)
-{
- nxt_unit_impl_t *lib;
-
- lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit);
-
- return (ctx == &lib->main_ctx.ctx);
-}
-
-
-int
nxt_unit_process_port_msg(nxt_unit_ctx_t *ctx, nxt_unit_port_t *port)
{
int rc;
@@ -5017,13 +5038,17 @@ nxt_unit_process_port_msg_impl(nxt_unit_ctx_t *ctx, nxt_unit_port_t *port)
nxt_unit_impl_t *lib;
nxt_unit_read_buf_t *rbuf;
+ lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit);
+
+ if (port == lib->shared_port && !nxt_unit_chk_ready(ctx)) {
+ return NXT_UNIT_AGAIN;
+ }
+
rbuf = nxt_unit_read_buf_get(ctx);
if (nxt_slow_path(rbuf == NULL)) {
return NXT_UNIT_ERROR;
}
- lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit);
-
if (port == lib->shared_port) {
rc = nxt_unit_shared_port_recv(ctx, port, rbuf);
@@ -5194,7 +5219,7 @@ nxt_unit_ctx_free(nxt_unit_ctx_impl_t *ctx_impl)
pthread_mutex_unlock(&lib->mutex);
if (nxt_fast_path(ctx_impl->read_port != NULL)) {
- nxt_unit_remove_port(lib, &ctx_impl->read_port->id);
+ nxt_unit_remove_port(lib, NULL, &ctx_impl->read_port->id);
nxt_unit_port_release(ctx_impl->read_port);
}
@@ -5246,6 +5271,24 @@ nxt_unit_create_port(nxt_unit_ctx_t *ctx)
return NULL;
}
+#if (NXT_HAVE_SOCKOPT_SO_PASSCRED)
+ int enable_creds = 1;
+
+ if (nxt_slow_path(setsockopt(port_sockets[0], SOL_SOCKET, SO_PASSCRED,
+ &enable_creds, sizeof(enable_creds)) == -1))
+ {
+ nxt_unit_warn(ctx, "failed to set SO_PASSCRED %s", strerror(errno));
+ return NULL;
+ }
+
+ if (nxt_slow_path(setsockopt(port_sockets[1], SOL_SOCKET, SO_PASSCRED,
+ &enable_creds, sizeof(enable_creds)) == -1))
+ {
+ nxt_unit_warn(ctx, "failed to set SO_PASSCRED %s", strerror(errno));
+ return NULL;
+ }
+#endif
+
nxt_unit_debug(ctx, "create_port: new socketpair: %d->%d",
port_sockets[0], port_sockets[1]);
@@ -5286,6 +5329,7 @@ nxt_unit_send_port(nxt_unit_ctx_t *ctx, nxt_unit_port_t *dst,
nxt_unit_port_t *port, int queue_fd)
{
ssize_t res;
+ nxt_send_oob_t oob;
nxt_unit_impl_t *lib;
int fds[2] = { port->out_fd, queue_fd };
@@ -5294,11 +5338,6 @@ nxt_unit_send_port(nxt_unit_ctx_t *ctx, nxt_unit_port_t *dst,
nxt_port_msg_new_port_t new_port;
} m;
- union {
- struct cmsghdr cm;
- char space[CMSG_SPACE(sizeof(int) * 2)];
- } cmsg;
-
lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit);
m.msg.stream = 0;
@@ -5317,24 +5356,9 @@ nxt_unit_send_port(nxt_unit_ctx_t *ctx, nxt_unit_port_t *dst,
m.new_port.max_size = 16 * 1024;
m.new_port.max_share = 64 * 1024;
- memset(&cmsg, 0, sizeof(cmsg));
-
- cmsg.cm.cmsg_len = CMSG_LEN(sizeof(int) * 2);
- cmsg.cm.cmsg_level = SOL_SOCKET;
- cmsg.cm.cmsg_type = SCM_RIGHTS;
-
- /*
- * memcpy() is used instead of simple
- * *(int *) CMSG_DATA(&cmsg.cm) = fd;
- * because GCC 4.4 with -O2/3/s optimization may issue a warning:
- * dereferencing type-punned pointer will break strict-aliasing rules
- *
- * Fortunately, GCC with -O1 compiles this nxt_memcpy()
- * in the same simple assignment as in the code above.
- */
- memcpy(CMSG_DATA(&cmsg.cm), fds, sizeof(int) * 2);
+ nxt_socket_msg_oob_init(&oob, fds);
- res = nxt_unit_port_send(ctx, dst, &m, sizeof(m), &cmsg, sizeof(cmsg));
+ res = nxt_unit_port_send(ctx, dst, &m, sizeof(m), &oob);
return (res == sizeof(m)) ? NXT_UNIT_OK : NXT_UNIT_ERROR;
}
@@ -5605,7 +5629,8 @@ nxt_unit_process_awaiting_req(nxt_unit_ctx_t *ctx, nxt_queue_t *awaiting_req)
static void
-nxt_unit_remove_port(nxt_unit_impl_t *lib, nxt_unit_port_id_t *port_id)
+nxt_unit_remove_port(nxt_unit_impl_t *lib, nxt_unit_ctx_t *ctx,
+ nxt_unit_port_id_t *port_id)
{
nxt_unit_port_t *port;
nxt_unit_port_impl_t *port_impl;
@@ -5623,7 +5648,7 @@ nxt_unit_remove_port(nxt_unit_impl_t *lib, nxt_unit_port_id_t *port_id)
pthread_mutex_unlock(&lib->mutex);
if (lib->callbacks.remove_port != NULL && port != NULL) {
- lib->callbacks.remove_port(&lib->unit, port);
+ lib->callbacks.remove_port(&lib->unit, ctx, port);
}
if (nxt_fast_path(port != NULL)) {
@@ -5700,7 +5725,7 @@ nxt_unit_remove_process(nxt_unit_impl_t *lib, nxt_unit_process_t *process)
nxt_queue_remove(&port->link);
if (lib->callbacks.remove_port != NULL) {
- lib->callbacks.remove_port(&lib->unit, &port->port);
+ lib->callbacks.remove_port(&lib->unit, NULL, &port->port);
}
nxt_unit_port_release(&port->port);
@@ -5712,56 +5737,96 @@ nxt_unit_remove_process(nxt_unit_impl_t *lib, nxt_unit_process_t *process)
static void
-nxt_unit_quit(nxt_unit_ctx_t *ctx)
+nxt_unit_quit(nxt_unit_ctx_t *ctx, uint8_t quit_param)
{
- nxt_port_msg_t msg;
+ nxt_bool_t skip_graceful_broadcast, quit;
nxt_unit_impl_t *lib;
nxt_unit_ctx_impl_t *ctx_impl;
nxt_unit_callbacks_t *cb;
nxt_unit_request_info_t *req;
nxt_unit_request_info_impl_t *req_impl;
+ struct {
+ nxt_port_msg_t msg;
+ uint8_t quit_param;
+ } nxt_packed m;
+
lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit);
ctx_impl = nxt_container_of(ctx, nxt_unit_ctx_impl_t, ctx);
- if (!ctx_impl->online) {
+ nxt_unit_debug(ctx, "quit: %d/%d/%d", (int) quit_param, ctx_impl->ready,
+ ctx_impl->online);
+
+ if (nxt_slow_path(!ctx_impl->online)) {
return;
}
- ctx_impl->online = 0;
+ skip_graceful_broadcast = quit_param == NXT_QUIT_GRACEFUL
+ && !ctx_impl->ready;
cb = &lib->callbacks;
- if (cb->quit != NULL) {
- cb->quit(ctx);
+ if (nxt_fast_path(ctx_impl->ready)) {
+ ctx_impl->ready = 0;
+
+ if (cb->remove_port != NULL) {
+ cb->remove_port(&lib->unit, ctx, lib->shared_port);
+ }
}
- nxt_queue_each(req_impl, &ctx_impl->active_req,
- nxt_unit_request_info_impl_t, link)
- {
- req = &req_impl->req;
+ if (quit_param == NXT_QUIT_GRACEFUL) {
+ pthread_mutex_lock(&ctx_impl->mutex);
- nxt_unit_req_warn(req, "active request on ctx quit");
+ quit = nxt_queue_is_empty(&ctx_impl->active_req)
+ && nxt_queue_is_empty(&ctx_impl->pending_rbuf)
+ && ctx_impl->wait_items == 0;
- if (cb->close_handler) {
- nxt_unit_req_debug(req, "close_handler");
+ pthread_mutex_unlock(&ctx_impl->mutex);
- cb->close_handler(req);
+ } else {
+ quit = 1;
+ ctx_impl->quit_param = NXT_QUIT_GRACEFUL;
+ }
- } else {
- nxt_unit_request_done(req, NXT_UNIT_ERROR);
+ if (quit) {
+ ctx_impl->online = 0;
+
+ if (cb->quit != NULL) {
+ cb->quit(ctx);
}
- } nxt_queue_loop;
+ nxt_queue_each(req_impl, &ctx_impl->active_req,
+ nxt_unit_request_info_impl_t, link)
+ {
+ req = &req_impl->req;
- if (ctx != &lib->main_ctx.ctx) {
+ nxt_unit_req_warn(req, "active request on ctx quit");
+
+ if (cb->close_handler) {
+ nxt_unit_req_debug(req, "close_handler");
+
+ cb->close_handler(req);
+
+ } else {
+ nxt_unit_request_done(req, NXT_UNIT_ERROR);
+ }
+
+ } nxt_queue_loop;
+
+ if (nxt_fast_path(ctx_impl->read_port != NULL)) {
+ nxt_unit_remove_port(lib, ctx, &ctx_impl->read_port->id);
+ }
+ }
+
+ if (ctx != &lib->main_ctx.ctx || skip_graceful_broadcast) {
return;
}
- memset(&msg, 0, sizeof(nxt_port_msg_t));
+ memset(&m.msg, 0, sizeof(nxt_port_msg_t));
- msg.pid = lib->pid;
- msg.type = _NXT_PORT_MSG_QUIT;
+ m.msg.pid = lib->pid;
+ m.msg.type = _NXT_PORT_MSG_QUIT;
+ m.quit_param = quit_param;
pthread_mutex_lock(&lib->mutex);
@@ -5775,7 +5840,7 @@ nxt_unit_quit(nxt_unit_ctx_t *ctx)
}
(void) nxt_unit_port_send(ctx, ctx_impl->read_port,
- &msg, sizeof(msg), NULL, 0);
+ &m, sizeof(m), NULL);
} nxt_queue_loop;
@@ -5810,7 +5875,7 @@ nxt_unit_get_port(nxt_unit_ctx_t *ctx, nxt_unit_port_id_t *port_id)
nxt_unit_debug(ctx, "get_port: %d %d", (int) port_id->pid,
(int) port_id->id);
- res = nxt_unit_port_send(ctx, lib->router_port, &m, sizeof(m), NULL, 0);
+ res = nxt_unit_port_send(ctx, lib->router_port, &m, sizeof(m), NULL);
if (nxt_slow_path(res != sizeof(m))) {
return NXT_UNIT_ERROR;
}
@@ -5821,7 +5886,7 @@ nxt_unit_get_port(nxt_unit_ctx_t *ctx, nxt_unit_port_id_t *port_id)
static ssize_t
nxt_unit_port_send(nxt_unit_ctx_t *ctx, nxt_unit_port_t *port,
- const void *buf, size_t buf_size, const void *oob, size_t oob_size)
+ const void *buf, size_t buf_size, const nxt_send_oob_t *oob)
{
int notify;
ssize_t ret;
@@ -5833,7 +5898,7 @@ nxt_unit_port_send(nxt_unit_ctx_t *ctx, nxt_unit_port_t *port,
lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit);
port_impl = nxt_container_of(port, nxt_unit_port_impl_t, port);
- if (port_impl->queue != NULL && oob_size == 0
+ if (port_impl->queue != NULL && (oob == NULL || oob->size == 0)
&& buf_size <= NXT_PORT_QUEUE_MSG_SIZE)
{
rc = nxt_port_queue_send(port_impl->queue, buf, buf_size, &notify);
@@ -5855,7 +5920,7 @@ nxt_unit_port_send(nxt_unit_ctx_t *ctx, nxt_unit_port_t *port,
if (lib->callbacks.port_send == NULL) {
ret = nxt_unit_sendmsg(ctx, port->out_fd, &msg,
- sizeof(nxt_port_msg_t), NULL, 0);
+ sizeof(nxt_port_msg_t), NULL);
nxt_unit_debug(ctx, "port{%d,%d} send %d read_queue",
(int) port->id.pid, (int) port->id.id,
@@ -5892,15 +5957,15 @@ nxt_unit_port_send(nxt_unit_ctx_t *ctx, nxt_unit_port_t *port,
if (lib->callbacks.port_send != NULL) {
ret = lib->callbacks.port_send(ctx, port, buf, buf_size,
- oob, oob_size);
+ oob != NULL ? oob->buf : NULL,
+ oob != NULL ? oob->size : 0);
nxt_unit_debug(ctx, "port{%d,%d} sendcb %d",
(int) port->id.pid, (int) port->id.id,
(int) ret);
} else {
- ret = nxt_unit_sendmsg(ctx, port->out_fd, buf, buf_size,
- oob, oob_size);
+ ret = nxt_unit_sendmsg(ctx, port->out_fd, buf, buf_size, oob);
nxt_unit_debug(ctx, "port{%d,%d} sendmsg %d",
(int) port->id.pid, (int) port->id.id,
@@ -5913,29 +5978,20 @@ nxt_unit_port_send(nxt_unit_ctx_t *ctx, nxt_unit_port_t *port,
static ssize_t
nxt_unit_sendmsg(nxt_unit_ctx_t *ctx, int fd,
- const void *buf, size_t buf_size, const void *oob, size_t oob_size)
+ const void *buf, size_t buf_size, const nxt_send_oob_t *oob)
{
int err;
- ssize_t res;
+ ssize_t n;
struct iovec iov[1];
- struct msghdr msg;
iov[0].iov_base = (void *) buf;
iov[0].iov_len = buf_size;
- msg.msg_name = NULL;
- msg.msg_namelen = 0;
- msg.msg_iov = iov;
- msg.msg_iovlen = 1;
- msg.msg_flags = 0;
- msg.msg_control = (void *) oob;
- msg.msg_controllen = oob_size;
-
retry:
- res = sendmsg(fd, &msg, 0);
+ n = nxt_sendmsg(fd, iov, 1, oob);
- if (nxt_slow_path(res == -1)) {
+ if (nxt_slow_path(n == -1)) {
err = errno;
if (err == EINTR) {
@@ -5950,11 +6006,11 @@ retry:
fd, (int) buf_size, strerror(err), err);
} else {
- nxt_unit_debug(ctx, "sendmsg(%d, %d): %d", fd, (int) buf_size,
- (int) res);
+ nxt_unit_debug(ctx, "sendmsg(%d, %d, %d): %d", fd, (int) buf_size,
+ (oob != NULL ? (int) oob->size : 0), (int) n);
}
- return res;
+ return n;
}
@@ -6063,7 +6119,7 @@ retry:
nxt_unit_rbuf_cpy(port_impl->socket_rbuf, rbuf);
- memset(rbuf->oob, 0, sizeof(struct cmsghdr));
+ rbuf->oob.size = 0;
goto retry;
}
@@ -6074,7 +6130,8 @@ nxt_unit_rbuf_cpy(nxt_unit_read_buf_t *dst, nxt_unit_read_buf_t *src)
{
memcpy(dst->buf, src->buf, src->size);
dst->size = src->size;
- memcpy(dst->oob, src->oob, sizeof(src->oob));
+ dst->oob.size = src->oob.size;
+ memcpy(dst->oob.buf, src->oob.buf, src->oob.size);
}
@@ -6089,7 +6146,11 @@ nxt_unit_shared_port_recv(nxt_unit_ctx_t *ctx, nxt_unit_port_t *port,
retry:
- res = nxt_unit_app_queue_recv(port, rbuf);
+ res = nxt_unit_app_queue_recv(ctx, port, rbuf);
+
+ if (res == NXT_UNIT_OK) {
+ return NXT_UNIT_OK;
+ }
if (res == NXT_UNIT_AGAIN) {
res = nxt_unit_port_recv(ctx, port, rbuf);
@@ -6116,16 +6177,18 @@ nxt_unit_port_recv(nxt_unit_ctx_t *ctx, nxt_unit_port_t *port,
nxt_unit_read_buf_t *rbuf)
{
int fd, err;
+ size_t oob_size;
struct iovec iov[1];
- struct msghdr msg;
nxt_unit_impl_t *lib;
lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit);
if (lib->callbacks.port_recv != NULL) {
+ oob_size = sizeof(rbuf->oob.buf);
+
rbuf->size = lib->callbacks.port_recv(ctx, port,
rbuf->buf, sizeof(rbuf->buf),
- rbuf->oob, sizeof(rbuf->oob));
+ rbuf->oob.buf, &oob_size);
nxt_unit_debug(ctx, "port{%d,%d} recvcb %d",
(int) port->id.pid, (int) port->id.id, (int) rbuf->size);
@@ -6134,25 +6197,18 @@ nxt_unit_port_recv(nxt_unit_ctx_t *ctx, nxt_unit_port_t *port,
return NXT_UNIT_ERROR;
}
+ rbuf->oob.size = oob_size;
return NXT_UNIT_OK;
}
iov[0].iov_base = rbuf->buf;
iov[0].iov_len = sizeof(rbuf->buf);
- msg.msg_name = NULL;
- msg.msg_namelen = 0;
- msg.msg_iov = iov;
- msg.msg_iovlen = 1;
- msg.msg_flags = 0;
- msg.msg_control = rbuf->oob;
- msg.msg_controllen = sizeof(rbuf->oob);
-
fd = port->in_fd;
retry:
- rbuf->size = recvmsg(fd, &msg, 0);
+ rbuf->size = nxt_recvmsg(fd, iov, 1, &rbuf->oob);
if (nxt_slow_path(rbuf->size == -1)) {
err = errno;
@@ -6194,13 +6250,20 @@ nxt_unit_port_queue_recv(nxt_unit_port_t *port, nxt_unit_read_buf_t *rbuf)
static int
-nxt_unit_app_queue_recv(nxt_unit_port_t *port, nxt_unit_read_buf_t *rbuf)
+nxt_unit_app_queue_recv(nxt_unit_ctx_t *ctx, nxt_unit_port_t *port,
+ nxt_unit_read_buf_t *rbuf)
{
uint32_t cookie;
nxt_port_msg_t *port_msg;
nxt_app_queue_t *queue;
+ nxt_unit_impl_t *lib;
nxt_unit_port_impl_t *port_impl;
+ struct {
+ nxt_port_msg_t msg;
+ uint8_t quit_param;
+ } nxt_packed m;
+
port_impl = nxt_container_of(port, nxt_unit_port_impl_t, port);
queue = port_impl->queue;
@@ -6214,6 +6277,25 @@ retry:
port_msg = (nxt_port_msg_t *) rbuf->buf;
if (nxt_app_queue_cancel(queue, cookie, port_msg->stream)) {
+ lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit);
+
+ if (lib->request_limit != 0) {
+ nxt_atomic_fetch_add(&lib->request_count, 1);
+
+ if (nxt_slow_path(lib->request_count >= lib->request_limit)) {
+ nxt_unit_debug(ctx, "request limit reached");
+
+ memset(&m.msg, 0, sizeof(nxt_port_msg_t));
+
+ m.msg.pid = lib->pid;
+ m.msg.type = _NXT_PORT_MSG_QUIT;
+ m.quit_param = NXT_QUIT_GRACEFUL;
+
+ (void) nxt_unit_port_send(ctx, lib->main_ctx.read_port,
+ &m, sizeof(m), NULL);
+ }
+ }
+
return NXT_UNIT_OK;
}
@@ -6495,7 +6577,7 @@ nxt_unit_log(nxt_unit_ctx_t *ctx, int level, const char *fmt, ...)
log_fd = lib->log_fd;
} else {
- pid = getpid();
+ pid = nxt_unit_pid;
log_fd = STDERR_FILENO;
}
@@ -6539,7 +6621,7 @@ nxt_unit_req_log(nxt_unit_request_info_t *req, int level, const char *fmt, ...)
log_fd = lib->log_fd;
} else {
- pid = getpid();
+ pid = nxt_unit_pid;
log_fd = STDERR_FILENO;
}