diff options
Diffstat (limited to 'src/nxt_unit.c')
-rw-r--r-- | src/nxt_unit.c | 508 |
1 files changed, 295 insertions, 213 deletions
diff --git a/src/nxt_unit.c b/src/nxt_unit.c index ae4499d8..06ad1636 100644 --- a/src/nxt_unit.c +++ b/src/nxt_unit.c @@ -3,10 +3,9 @@ * Copyright (C) NGINX, Inc. */ -#include <stdlib.h> - #include "nxt_main.h" #include "nxt_port_memory_int.h" +#include "nxt_socket_msg.h" #include "nxt_port_queue.h" #include "nxt_app_queue.h" @@ -25,6 +24,11 @@ #define NXT_UNIT_LOCAL_BUF_SIZE \ (NXT_UNIT_MAX_PLAIN_SIZE + sizeof(nxt_port_msg_t)) +enum { + NXT_QUIT_NORMAL = 0, + NXT_QUIT_GRACEFUL = 1, +}; + typedef struct nxt_unit_impl_s nxt_unit_impl_t; typedef struct nxt_unit_mmap_s nxt_unit_mmap_t; typedef struct nxt_unit_mmaps_s nxt_unit_mmaps_t; @@ -51,7 +55,8 @@ nxt_inline void nxt_unit_mmap_buf_insert_tail(nxt_unit_mmap_buf_t **prev, nxt_inline void nxt_unit_mmap_buf_unlink(nxt_unit_mmap_buf_t *mmap_buf); static int nxt_unit_read_env(nxt_unit_port_t *ready_port, nxt_unit_port_t *router_port, nxt_unit_port_t *read_port, - int *log_fd, uint32_t *stream, uint32_t *shm_limit); + int *log_fd, uint32_t *stream, uint32_t *shm_limit, + uint32_t *request_limit); static int nxt_unit_ready(nxt_unit_ctx_t *ctx, int ready_fd, uint32_t stream, int queue_fd); static int nxt_unit_process_msg(nxt_unit_ctx_t *ctx, nxt_unit_read_buf_t *rbuf, @@ -130,6 +135,7 @@ static nxt_unit_process_t *nxt_unit_process_find(nxt_unit_impl_t *lib, static nxt_unit_process_t *nxt_unit_process_pop_first(nxt_unit_impl_t *lib); static int nxt_unit_run_once_impl(nxt_unit_ctx_t *ctx); static int nxt_unit_read_buf(nxt_unit_ctx_t *ctx, nxt_unit_read_buf_t *rbuf); +static int nxt_unit_chk_ready(nxt_unit_ctx_t *ctx); static int nxt_unit_process_pending_rbuf(nxt_unit_ctx_t *ctx); static void nxt_unit_process_ready_req(nxt_unit_ctx_t *ctx); nxt_inline int nxt_unit_is_read_queue(nxt_unit_read_buf_t *rbuf); @@ -150,20 +156,20 @@ static nxt_unit_port_t *nxt_unit_add_port(nxt_unit_ctx_t *ctx, nxt_unit_port_t *port, void *queue); static void nxt_unit_process_awaiting_req(nxt_unit_ctx_t *ctx, nxt_queue_t *awaiting_req); -static void nxt_unit_remove_port(nxt_unit_impl_t *lib, +static void nxt_unit_remove_port(nxt_unit_impl_t *lib, nxt_unit_ctx_t *ctx, nxt_unit_port_id_t *port_id); static nxt_unit_port_t *nxt_unit_remove_port_unsafe(nxt_unit_impl_t *lib, nxt_unit_port_id_t *port_id); static void nxt_unit_remove_pid(nxt_unit_impl_t *lib, pid_t pid); static void nxt_unit_remove_process(nxt_unit_impl_t *lib, nxt_unit_process_t *process); -static void nxt_unit_quit(nxt_unit_ctx_t *ctx); +static void nxt_unit_quit(nxt_unit_ctx_t *ctx, uint8_t quit_param); static int nxt_unit_get_port(nxt_unit_ctx_t *ctx, nxt_unit_port_id_t *port_id); static ssize_t nxt_unit_port_send(nxt_unit_ctx_t *ctx, nxt_unit_port_t *port, const void *buf, size_t buf_size, - const void *oob, size_t oob_size); + const nxt_send_oob_t *oob); static ssize_t nxt_unit_sendmsg(nxt_unit_ctx_t *ctx, int fd, - const void *buf, size_t buf_size, const void *oob, size_t oob_size); + const void *buf, size_t buf_size, const nxt_send_oob_t *oob); static int nxt_unit_ctx_port_recv(nxt_unit_ctx_t *ctx, nxt_unit_port_t *port, nxt_unit_read_buf_t *rbuf); nxt_inline void nxt_unit_rbuf_cpy(nxt_unit_read_buf_t *dst, @@ -174,7 +180,7 @@ static int nxt_unit_port_recv(nxt_unit_ctx_t *ctx, nxt_unit_port_t *port, nxt_unit_read_buf_t *rbuf); static int nxt_unit_port_queue_recv(nxt_unit_port_t *port, nxt_unit_read_buf_t *rbuf); -static int nxt_unit_app_queue_recv(nxt_unit_port_t *port, +static int nxt_unit_app_queue_recv(nxt_unit_ctx_t *ctx, nxt_unit_port_t *port, nxt_unit_read_buf_t *rbuf); nxt_inline int nxt_unit_close(int fd); static int nxt_unit_fd_blocking(int fd); @@ -271,8 +277,8 @@ struct nxt_unit_read_buf_s { nxt_queue_link_t link; nxt_unit_ctx_impl_t *ctx_impl; ssize_t size; + nxt_recv_oob_t oob; char buf[16384]; - char oob[256]; }; @@ -311,8 +317,9 @@ struct nxt_unit_ctx_impl_s { /* of nxt_unit_read_buf_t */ nxt_queue_t free_rbuf; - int online; - int ready; + uint8_t online; /* 1 bit */ + uint8_t ready; /* 1 bit */ + uint8_t quit_param; nxt_unit_mmap_buf_t ctx_buf[2]; nxt_unit_read_buf_t ctx_read_buf; @@ -344,9 +351,11 @@ struct nxt_unit_impl_s { nxt_unit_callbacks_t callbacks; nxt_atomic_t use_count; + nxt_atomic_t request_count; uint32_t request_data_size; uint32_t shm_mmap_limit; + uint32_t request_limit; pthread_mutex_t mutex; @@ -409,16 +418,21 @@ typedef struct { } nxt_unit_port_hash_id_t; +static pid_t nxt_unit_pid; + + nxt_unit_ctx_t * nxt_unit_init(nxt_unit_init_t *init) { int rc, queue_fd; void *mem; - uint32_t ready_stream, shm_limit; + uint32_t ready_stream, shm_limit, request_limit; nxt_unit_ctx_t *ctx; nxt_unit_impl_t *lib; nxt_unit_port_t ready_port, router_port, read_port; + nxt_unit_pid = getpid(); + lib = nxt_unit_create(init); if (nxt_slow_path(lib == NULL)) { return NULL; @@ -446,13 +460,15 @@ nxt_unit_init(nxt_unit_init_t *init) } else { rc = nxt_unit_read_env(&ready_port, &router_port, &read_port, - &lib->log_fd, &ready_stream, &shm_limit); + &lib->log_fd, &ready_stream, &shm_limit, + &request_limit); if (nxt_slow_path(rc != NXT_UNIT_OK)) { goto fail; } lib->shm_mmap_limit = (shm_limit + PORT_MMAP_DATA_SIZE - 1) / PORT_MMAP_DATA_SIZE; + lib->request_limit = request_limit; } if (nxt_slow_path(lib->shm_mmap_limit < 1)) { @@ -460,6 +476,7 @@ nxt_unit_init(nxt_unit_init_t *init) } lib->pid = read_port.id.pid; + nxt_unit_pid = lib->pid; ctx = &lib->main_ctx.ctx; @@ -564,6 +581,7 @@ nxt_unit_create(nxt_unit_init_t *init) lib->request_data_size = init->request_data_size; lib->shm_mmap_limit = (init->shm_limit + PORT_MMAP_DATA_SIZE - 1) / PORT_MMAP_DATA_SIZE; + lib->request_limit = init->request_limit; lib->processes.slot = NULL; lib->ports.slot = NULL; @@ -573,6 +591,7 @@ nxt_unit_create(nxt_unit_init_t *init) nxt_queue_init(&lib->contexts); lib->use_count = 0; + lib->request_count = 0; lib->router_port = NULL; lib->shared_port = NULL; @@ -632,6 +651,7 @@ nxt_unit_ctx_init(nxt_unit_impl_t *lib, nxt_unit_ctx_impl_t *ctx_impl, ctx_impl->wait_items = 0; ctx_impl->online = 1; ctx_impl->ready = 0; + ctx_impl->quit_param = NXT_QUIT_GRACEFUL; nxt_queue_init(&ctx_impl->free_req); nxt_queue_init(&ctx_impl->free_ws); @@ -780,7 +800,7 @@ nxt_unit_mmap_buf_unlink(nxt_unit_mmap_buf_t *mmap_buf) static int nxt_unit_read_env(nxt_unit_port_t *ready_port, nxt_unit_port_t *router_port, nxt_unit_port_t *read_port, int *log_fd, uint32_t *stream, - uint32_t *shm_limit) + uint32_t *shm_limit, uint32_t *request_limit) { int rc; int ready_fd, router_fd, read_in_fd, read_out_fd; @@ -825,12 +845,12 @@ nxt_unit_read_env(nxt_unit_port_t *ready_port, nxt_unit_port_t *router_port, "%"PRId64",%"PRIu32",%d;" "%"PRId64",%"PRIu32",%d;" "%"PRId64",%"PRIu32",%d,%d;" - "%d,%"PRIu32, + "%d,%"PRIu32",%"PRIu32, &ready_stream, &ready_pid, &ready_id, &ready_fd, &router_pid, &router_id, &router_fd, &read_pid, &read_id, &read_in_fd, &read_out_fd, - log_fd, shm_limit); + log_fd, shm_limit, request_limit); if (nxt_slow_path(rc == EOF)) { nxt_unit_alert(NULL, "sscanf(%s) failed: %s (%d) for %s env", @@ -839,9 +859,9 @@ nxt_unit_read_env(nxt_unit_port_t *ready_port, nxt_unit_port_t *router_port, return NXT_UNIT_ERROR; } - if (nxt_slow_path(rc != 13)) { + if (nxt_slow_path(rc != 14)) { nxt_unit_alert(NULL, "invalid number of variables in %s env: " - "found %d of %d in %s", NXT_UNIT_INIT_ENV, rc, 13, vars); + "found %d of %d in %s", NXT_UNIT_INIT_ENV, rc, 14, vars); return NXT_UNIT_ERROR; } @@ -876,13 +896,10 @@ static int nxt_unit_ready(nxt_unit_ctx_t *ctx, int ready_fd, uint32_t stream, int queue_fd) { ssize_t res; + nxt_send_oob_t oob; nxt_port_msg_t msg; nxt_unit_impl_t *lib; - - union { - struct cmsghdr cm; - char space[CMSG_SPACE(sizeof(int))]; - } cmsg; + int fds[2] = {queue_fd, -1}; lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit); @@ -896,25 +913,9 @@ nxt_unit_ready(nxt_unit_ctx_t *ctx, int ready_fd, uint32_t stream, int queue_fd) msg.mf = 0; msg.tracking = 0; - memset(&cmsg, 0, sizeof(cmsg)); - - cmsg.cm.cmsg_len = CMSG_LEN(sizeof(int)); - cmsg.cm.cmsg_level = SOL_SOCKET; - cmsg.cm.cmsg_type = SCM_RIGHTS; - - /* - * memcpy() is used instead of simple - * *(int *) CMSG_DATA(&cmsg.cm) = fd; - * because GCC 4.4 with -O2/3/s optimization may issue a warning: - * dereferencing type-punned pointer will break strict-aliasing rules - * - * Fortunately, GCC with -O1 compiles this nxt_memcpy() - * in the same simple assignment as in the code above. - */ - memcpy(CMSG_DATA(&cmsg.cm), &queue_fd, sizeof(int)); + nxt_socket_msg_oob_init(&oob, fds); - res = nxt_unit_sendmsg(ctx, ready_fd, &msg, sizeof(msg), - &cmsg, sizeof(cmsg)); + res = nxt_unit_sendmsg(ctx, ready_fd, &msg, sizeof(msg), &oob); if (res != sizeof(msg)) { return NXT_UNIT_ERROR; } @@ -929,7 +930,7 @@ nxt_unit_process_msg(nxt_unit_ctx_t *ctx, nxt_unit_read_buf_t *rbuf, { int rc; pid_t pid; - struct cmsghdr *cm; + uint8_t quit_param; nxt_port_msg_t *port_msg; nxt_unit_impl_t *lib; nxt_unit_recv_msg_t recv_msg; @@ -939,18 +940,12 @@ nxt_unit_process_msg(nxt_unit_ctx_t *ctx, nxt_unit_read_buf_t *rbuf, recv_msg.fd[0] = -1; recv_msg.fd[1] = -1; port_msg = (nxt_port_msg_t *) rbuf->buf; - cm = (struct cmsghdr *) rbuf->oob; - if (cm->cmsg_level == SOL_SOCKET - && cm->cmsg_type == SCM_RIGHTS) - { - if (cm->cmsg_len == CMSG_LEN(sizeof(int))) { - memcpy(recv_msg.fd, CMSG_DATA(cm), sizeof(int)); - } - - if (cm->cmsg_len == CMSG_LEN(sizeof(int) * 2)) { - memcpy(recv_msg.fd, CMSG_DATA(cm), sizeof(int) * 2); - } + rc = nxt_socket_msg_oob_get_fds(&rbuf->oob, recv_msg.fd); + if (nxt_slow_path(rc != NXT_OK)) { + nxt_unit_alert(ctx, "failed to receive file descriptor over cmsg"); + rc = NXT_UNIT_ERROR; + goto done; } recv_msg.incoming_buf = NULL; @@ -959,7 +954,7 @@ nxt_unit_process_msg(nxt_unit_ctx_t *ctx, nxt_unit_read_buf_t *rbuf, if (nxt_slow_path(rbuf->size == 0)) { nxt_unit_debug(ctx, "read port closed"); - nxt_unit_quit(ctx); + nxt_unit_quit(ctx, NXT_QUIT_GRACEFUL); rc = NXT_UNIT_OK; goto done; } @@ -1018,9 +1013,18 @@ nxt_unit_process_msg(nxt_unit_ctx_t *ctx, nxt_unit_read_buf_t *rbuf, break; case _NXT_PORT_MSG_QUIT: - nxt_unit_debug(ctx, "#%"PRIu32": quit", port_msg->stream); + if (recv_msg.size == sizeof(quit_param)) { + memcpy(&quit_param, recv_msg.start, sizeof(quit_param)); + + } else { + quit_param = NXT_QUIT_NORMAL; + } + + nxt_unit_debug(ctx, "#%"PRIu32": %squit", port_msg->stream, + (quit_param == NXT_QUIT_GRACEFUL ? "graceful " : "")); + + nxt_unit_quit(ctx, quit_param); - nxt_unit_quit(ctx); rc = NXT_UNIT_OK; break; @@ -1220,15 +1224,36 @@ nxt_unit_ctx_ready(nxt_unit_ctx_t *ctx) nxt_unit_impl_t *lib; nxt_unit_ctx_impl_t *ctx_impl; - lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit); ctx_impl = nxt_container_of(ctx, nxt_unit_ctx_impl_t, ctx); + if (nxt_slow_path(ctx_impl->ready)) { + return NXT_UNIT_OK; + } + ctx_impl->ready = 1; - if (lib->callbacks.ready_handler) { + lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit); + + /* Call ready_handler() only for main context. */ + if (&lib->main_ctx == ctx_impl && lib->callbacks.ready_handler != NULL) { return lib->callbacks.ready_handler(ctx); } + if (&lib->main_ctx != ctx_impl) { + /* Check if the main context is already stopped or quit. */ + if (nxt_slow_path(!lib->main_ctx.ready)) { + ctx_impl->ready = 0; + + nxt_unit_quit(ctx, lib->main_ctx.quit_param); + + return NXT_UNIT_OK; + } + + if (lib->callbacks.add_port != NULL) { + lib->callbacks.add_port(ctx, lib->shared_port); + } + } + return NXT_UNIT_OK; } @@ -1561,7 +1586,7 @@ nxt_unit_send_req_headers_ack(nxt_unit_request_info_t *req) msg.type = _NXT_PORT_MSG_REQ_HEADERS_ACK; res = nxt_unit_port_send(req->ctx, req->response_port, - &msg, sizeof(msg), NULL, 0); + &msg, sizeof(msg), NULL); if (nxt_slow_path(res != sizeof(msg))) { return NXT_UNIT_ERROR; } @@ -1741,10 +1766,12 @@ nxt_unit_request_info_get(nxt_unit_ctx_t *ctx) static void nxt_unit_request_info_release(nxt_unit_request_info_t *req) { + nxt_unit_ctx_t *ctx; nxt_unit_ctx_impl_t *ctx_impl; nxt_unit_request_info_impl_t *req_impl; - ctx_impl = nxt_container_of(req->ctx, nxt_unit_ctx_impl_t, ctx); + ctx = req->ctx; + ctx_impl = nxt_container_of(ctx, nxt_unit_ctx_impl_t, ctx); req_impl = nxt_container_of(req, nxt_unit_request_info_impl_t, req); req->response = NULL; @@ -1783,6 +1810,10 @@ nxt_unit_request_info_release(nxt_unit_request_info_t *req) nxt_queue_insert_tail(&ctx_impl->free_req, &req_impl->link); pthread_mutex_unlock(&ctx_impl->mutex); + + if (nxt_slow_path(!nxt_unit_chk_ready(ctx))) { + nxt_unit_quit(ctx, NXT_QUIT_GRACEFUL); + } } @@ -2621,7 +2652,7 @@ nxt_unit_mmap_buf_send(nxt_unit_request_info_t *req, (int) m.mmap_msg.size); res = nxt_unit_port_send(req->ctx, req->response_port, &m, sizeof(m), - NULL, 0); + NULL); if (nxt_slow_path(res != sizeof(m))) { goto free_buf; } @@ -2673,8 +2704,8 @@ nxt_unit_mmap_buf_send(nxt_unit_request_info_t *req, res = nxt_unit_port_send(req->ctx, req->response_port, buf->start - sizeof(m.msg), - m.mmap_msg.size + sizeof(m.msg), - NULL, 0); + m.mmap_msg.size + sizeof(m.msg), NULL); + if (nxt_slow_path(res != (ssize_t) (m.mmap_msg.size + sizeof(m.msg)))) { goto free_buf; } @@ -2741,7 +2772,7 @@ nxt_unit_read_buf_get(nxt_unit_ctx_t *ctx) pthread_mutex_unlock(&ctx_impl->mutex); - memset(rbuf->oob, 0, sizeof(struct cmsghdr)); + rbuf->oob.size = 0; return rbuf; } @@ -3260,7 +3291,7 @@ skip_response_send: msg.tracking = 0; (void) nxt_unit_port_send(req->ctx, req->response_port, - &msg, sizeof(msg), NULL, 0); + &msg, sizeof(msg), NULL); nxt_unit_request_info_release(req); } @@ -3582,7 +3613,7 @@ nxt_unit_send_oosm(nxt_unit_ctx_t *ctx, nxt_unit_port_t *port) msg.mf = 0; msg.tracking = 0; - res = nxt_unit_port_send(ctx, lib->router_port, &msg, sizeof(msg), NULL, 0); + res = nxt_unit_port_send(ctx, lib->router_port, &msg, sizeof(msg), NULL); if (nxt_slow_path(res != sizeof(msg))) { return NXT_UNIT_ERROR; } @@ -3851,12 +3882,10 @@ static int nxt_unit_send_mmap(nxt_unit_ctx_t *ctx, nxt_unit_port_t *port, int fd) { ssize_t res; + nxt_send_oob_t oob; nxt_port_msg_t msg; nxt_unit_impl_t *lib; - union { - struct cmsghdr cm; - char space[CMSG_SPACE(sizeof(int))]; - } cmsg; + int fds[2] = {fd, -1}; lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit); @@ -3870,30 +3899,9 @@ nxt_unit_send_mmap(nxt_unit_ctx_t *ctx, nxt_unit_port_t *port, int fd) msg.mf = 0; msg.tracking = 0; - /* - * Fill all padding fields with 0. - * Code in Go 1.11 validate cmsghdr using padding field as part of len. - * See Cmsghdr definition and socketControlMessageHeaderAndData function. - */ - memset(&cmsg, 0, sizeof(cmsg)); - - cmsg.cm.cmsg_len = CMSG_LEN(sizeof(int)); - cmsg.cm.cmsg_level = SOL_SOCKET; - cmsg.cm.cmsg_type = SCM_RIGHTS; - - /* - * memcpy() is used instead of simple - * *(int *) CMSG_DATA(&cmsg.cm) = fd; - * because GCC 4.4 with -O2/3/s optimization may issue a warning: - * dereferencing type-punned pointer will break strict-aliasing rules - * - * Fortunately, GCC with -O1 compiles this nxt_memcpy() - * in the same simple assignment as in the code above. - */ - memcpy(CMSG_DATA(&cmsg.cm), &fd, sizeof(int)); + nxt_socket_msg_oob_init(&oob, fds); - res = nxt_unit_port_send(ctx, port, &msg, sizeof(msg), - &cmsg, sizeof(cmsg)); + res = nxt_unit_port_send(ctx, port, &msg, sizeof(msg), &oob); if (nxt_slow_path(res != sizeof(msg))) { return NXT_UNIT_ERROR; } @@ -4083,7 +4091,7 @@ nxt_unit_awake_ctx(nxt_unit_ctx_t *ctx, nxt_unit_ctx_impl_t *ctx_impl) msg.type = _NXT_PORT_MSG_RPC_READY; (void) nxt_unit_port_send(ctx, ctx_impl->read_port, - &msg, sizeof(msg), NULL, 0); + &msg, sizeof(msg), NULL); } @@ -4306,7 +4314,7 @@ nxt_unit_get_mmap(nxt_unit_ctx_t *ctx, pid_t pid, uint32_t id) nxt_unit_debug(ctx, "get_mmap: %d %d", (int) pid, (int) id); - res = nxt_unit_port_send(ctx, lib->router_port, &m, sizeof(m), NULL, 0); + res = nxt_unit_port_send(ctx, lib->router_port, &m, sizeof(m), NULL); if (nxt_slow_path(res != sizeof(m))) { return NXT_UNIT_ERROR; } @@ -4376,7 +4384,7 @@ nxt_unit_send_shm_ack(nxt_unit_ctx_t *ctx, pid_t pid) msg.mf = 0; msg.tracking = 0; - res = nxt_unit_port_send(ctx, lib->router_port, &msg, sizeof(msg), NULL, 0); + res = nxt_unit_port_send(ctx, lib->router_port, &msg, sizeof(msg), NULL); if (nxt_slow_path(res != sizeof(msg))) { return NXT_UNIT_ERROR; } @@ -4522,7 +4530,7 @@ nxt_unit_run(nxt_unit_ctx_t *ctx) rc = nxt_unit_run_once_impl(ctx); if (nxt_slow_path(rc == NXT_UNIT_ERROR)) { - nxt_unit_quit(ctx); + nxt_unit_quit(ctx, NXT_QUIT_NORMAL); break; } } @@ -4586,6 +4594,7 @@ static int nxt_unit_read_buf(nxt_unit_ctx_t *ctx, nxt_unit_read_buf_t *rbuf) { int nevents, res, err; + nxt_uint_t nfds; nxt_unit_impl_t *lib; nxt_unit_ctx_impl_t *ctx_impl; nxt_unit_port_impl_t *port_impl; @@ -4593,7 +4602,7 @@ nxt_unit_read_buf(nxt_unit_ctx_t *ctx, nxt_unit_read_buf_t *rbuf) ctx_impl = nxt_container_of(ctx, nxt_unit_ctx_impl_t, ctx); - if (ctx_impl->wait_items > 0 || ctx_impl->ready == 0) { + if (ctx_impl->wait_items > 0 || !nxt_unit_chk_ready(ctx)) { return nxt_unit_ctx_port_recv(ctx, ctx_impl->read_port, rbuf); } @@ -4626,20 +4635,28 @@ retry: } } - res = nxt_unit_app_queue_recv(lib->shared_port, rbuf); - if (res == NXT_UNIT_OK) { - return NXT_UNIT_OK; + if (nxt_fast_path(nxt_unit_chk_ready(ctx))) { + res = nxt_unit_app_queue_recv(ctx, lib->shared_port, rbuf); + if (res == NXT_UNIT_OK) { + return NXT_UNIT_OK; + } + + fds[1].fd = lib->shared_port->in_fd; + fds[1].events = POLLIN; + + nfds = 2; + + } else { + nfds = 1; } fds[0].fd = ctx_impl->read_port->in_fd; fds[0].events = POLLIN; fds[0].revents = 0; - fds[1].fd = lib->shared_port->in_fd; - fds[1].events = POLLIN; fds[1].revents = 0; - nevents = poll(fds, 2, -1); + nevents = poll(fds, nfds, -1); if (nxt_slow_path(nevents == -1)) { err = errno; @@ -4655,7 +4672,7 @@ retry: return (err == EAGAIN) ? NXT_UNIT_AGAIN : NXT_UNIT_ERROR; } - nxt_unit_debug(ctx, "poll(%d,%d): %d, revents [%04uXi, %04uXi]", + nxt_unit_debug(ctx, "poll(%d,%d): %d, revents [%04X, %04X]", fds[0].fd, fds[1].fd, nevents, fds[0].revents, fds[1].revents); @@ -4686,6 +4703,21 @@ retry: static int +nxt_unit_chk_ready(nxt_unit_ctx_t *ctx) +{ + nxt_unit_impl_t *lib; + nxt_unit_ctx_impl_t *ctx_impl; + + ctx_impl = nxt_container_of(ctx, nxt_unit_ctx_impl_t, ctx); + lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit); + + return (ctx_impl->ready + && (lib->request_limit == 0 + || lib->request_count < lib->request_limit)); +} + + +static int nxt_unit_process_pending_rbuf(nxt_unit_ctx_t *ctx) { int rc; @@ -4723,6 +4755,10 @@ nxt_unit_process_pending_rbuf(nxt_unit_ctx_t *ctx) } nxt_queue_loop; + if (!ctx_impl->ready) { + nxt_unit_quit(ctx, NXT_QUIT_GRACEFUL); + } + return rc; } @@ -4903,16 +4939,14 @@ nxt_unit_run_shared(nxt_unit_ctx_t *ctx) int rc; nxt_unit_impl_t *lib; nxt_unit_read_buf_t *rbuf; - nxt_unit_ctx_impl_t *ctx_impl; nxt_unit_ctx_use(ctx); lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit); - ctx_impl = nxt_container_of(ctx, nxt_unit_ctx_impl_t, ctx); rc = NXT_UNIT_OK; - while (nxt_fast_path(ctx_impl->online)) { + while (nxt_fast_path(nxt_unit_chk_ready(ctx))) { rbuf = nxt_unit_read_buf_get(ctx); if (nxt_slow_path(rbuf == NULL)) { rc = NXT_UNIT_ERROR; @@ -4949,17 +4983,15 @@ nxt_unit_dequeue_request(nxt_unit_ctx_t *ctx) int rc; nxt_unit_impl_t *lib; nxt_unit_read_buf_t *rbuf; - nxt_unit_ctx_impl_t *ctx_impl; nxt_unit_request_info_t *req; nxt_unit_ctx_use(ctx); lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit); - ctx_impl = nxt_container_of(ctx, nxt_unit_ctx_impl_t, ctx); req = NULL; - if (nxt_slow_path(!ctx_impl->online)) { + if (nxt_slow_path(!nxt_unit_chk_ready(ctx))) { goto done; } @@ -4968,7 +5000,7 @@ nxt_unit_dequeue_request(nxt_unit_ctx_t *ctx) goto done; } - rc = nxt_unit_app_queue_recv(lib->shared_port, rbuf); + rc = nxt_unit_app_queue_recv(ctx, lib->shared_port, rbuf); if (rc != NXT_UNIT_OK) { nxt_unit_read_buf_release(ctx, rbuf); goto done; @@ -4985,17 +5017,6 @@ done: int -nxt_unit_is_main_ctx(nxt_unit_ctx_t *ctx) -{ - nxt_unit_impl_t *lib; - - lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit); - - return (ctx == &lib->main_ctx.ctx); -} - - -int nxt_unit_process_port_msg(nxt_unit_ctx_t *ctx, nxt_unit_port_t *port) { int rc; @@ -5017,13 +5038,17 @@ nxt_unit_process_port_msg_impl(nxt_unit_ctx_t *ctx, nxt_unit_port_t *port) nxt_unit_impl_t *lib; nxt_unit_read_buf_t *rbuf; + lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit); + + if (port == lib->shared_port && !nxt_unit_chk_ready(ctx)) { + return NXT_UNIT_AGAIN; + } + rbuf = nxt_unit_read_buf_get(ctx); if (nxt_slow_path(rbuf == NULL)) { return NXT_UNIT_ERROR; } - lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit); - if (port == lib->shared_port) { rc = nxt_unit_shared_port_recv(ctx, port, rbuf); @@ -5194,7 +5219,7 @@ nxt_unit_ctx_free(nxt_unit_ctx_impl_t *ctx_impl) pthread_mutex_unlock(&lib->mutex); if (nxt_fast_path(ctx_impl->read_port != NULL)) { - nxt_unit_remove_port(lib, &ctx_impl->read_port->id); + nxt_unit_remove_port(lib, NULL, &ctx_impl->read_port->id); nxt_unit_port_release(ctx_impl->read_port); } @@ -5246,6 +5271,24 @@ nxt_unit_create_port(nxt_unit_ctx_t *ctx) return NULL; } +#if (NXT_HAVE_SOCKOPT_SO_PASSCRED) + int enable_creds = 1; + + if (nxt_slow_path(setsockopt(port_sockets[0], SOL_SOCKET, SO_PASSCRED, + &enable_creds, sizeof(enable_creds)) == -1)) + { + nxt_unit_warn(ctx, "failed to set SO_PASSCRED %s", strerror(errno)); + return NULL; + } + + if (nxt_slow_path(setsockopt(port_sockets[1], SOL_SOCKET, SO_PASSCRED, + &enable_creds, sizeof(enable_creds)) == -1)) + { + nxt_unit_warn(ctx, "failed to set SO_PASSCRED %s", strerror(errno)); + return NULL; + } +#endif + nxt_unit_debug(ctx, "create_port: new socketpair: %d->%d", port_sockets[0], port_sockets[1]); @@ -5286,6 +5329,7 @@ nxt_unit_send_port(nxt_unit_ctx_t *ctx, nxt_unit_port_t *dst, nxt_unit_port_t *port, int queue_fd) { ssize_t res; + nxt_send_oob_t oob; nxt_unit_impl_t *lib; int fds[2] = { port->out_fd, queue_fd }; @@ -5294,11 +5338,6 @@ nxt_unit_send_port(nxt_unit_ctx_t *ctx, nxt_unit_port_t *dst, nxt_port_msg_new_port_t new_port; } m; - union { - struct cmsghdr cm; - char space[CMSG_SPACE(sizeof(int) * 2)]; - } cmsg; - lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit); m.msg.stream = 0; @@ -5317,24 +5356,9 @@ nxt_unit_send_port(nxt_unit_ctx_t *ctx, nxt_unit_port_t *dst, m.new_port.max_size = 16 * 1024; m.new_port.max_share = 64 * 1024; - memset(&cmsg, 0, sizeof(cmsg)); - - cmsg.cm.cmsg_len = CMSG_LEN(sizeof(int) * 2); - cmsg.cm.cmsg_level = SOL_SOCKET; - cmsg.cm.cmsg_type = SCM_RIGHTS; - - /* - * memcpy() is used instead of simple - * *(int *) CMSG_DATA(&cmsg.cm) = fd; - * because GCC 4.4 with -O2/3/s optimization may issue a warning: - * dereferencing type-punned pointer will break strict-aliasing rules - * - * Fortunately, GCC with -O1 compiles this nxt_memcpy() - * in the same simple assignment as in the code above. - */ - memcpy(CMSG_DATA(&cmsg.cm), fds, sizeof(int) * 2); + nxt_socket_msg_oob_init(&oob, fds); - res = nxt_unit_port_send(ctx, dst, &m, sizeof(m), &cmsg, sizeof(cmsg)); + res = nxt_unit_port_send(ctx, dst, &m, sizeof(m), &oob); return (res == sizeof(m)) ? NXT_UNIT_OK : NXT_UNIT_ERROR; } @@ -5605,7 +5629,8 @@ nxt_unit_process_awaiting_req(nxt_unit_ctx_t *ctx, nxt_queue_t *awaiting_req) static void -nxt_unit_remove_port(nxt_unit_impl_t *lib, nxt_unit_port_id_t *port_id) +nxt_unit_remove_port(nxt_unit_impl_t *lib, nxt_unit_ctx_t *ctx, + nxt_unit_port_id_t *port_id) { nxt_unit_port_t *port; nxt_unit_port_impl_t *port_impl; @@ -5623,7 +5648,7 @@ nxt_unit_remove_port(nxt_unit_impl_t *lib, nxt_unit_port_id_t *port_id) pthread_mutex_unlock(&lib->mutex); if (lib->callbacks.remove_port != NULL && port != NULL) { - lib->callbacks.remove_port(&lib->unit, port); + lib->callbacks.remove_port(&lib->unit, ctx, port); } if (nxt_fast_path(port != NULL)) { @@ -5700,7 +5725,7 @@ nxt_unit_remove_process(nxt_unit_impl_t *lib, nxt_unit_process_t *process) nxt_queue_remove(&port->link); if (lib->callbacks.remove_port != NULL) { - lib->callbacks.remove_port(&lib->unit, &port->port); + lib->callbacks.remove_port(&lib->unit, NULL, &port->port); } nxt_unit_port_release(&port->port); @@ -5712,56 +5737,96 @@ nxt_unit_remove_process(nxt_unit_impl_t *lib, nxt_unit_process_t *process) static void -nxt_unit_quit(nxt_unit_ctx_t *ctx) +nxt_unit_quit(nxt_unit_ctx_t *ctx, uint8_t quit_param) { - nxt_port_msg_t msg; + nxt_bool_t skip_graceful_broadcast, quit; nxt_unit_impl_t *lib; nxt_unit_ctx_impl_t *ctx_impl; nxt_unit_callbacks_t *cb; nxt_unit_request_info_t *req; nxt_unit_request_info_impl_t *req_impl; + struct { + nxt_port_msg_t msg; + uint8_t quit_param; + } nxt_packed m; + lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit); ctx_impl = nxt_container_of(ctx, nxt_unit_ctx_impl_t, ctx); - if (!ctx_impl->online) { + nxt_unit_debug(ctx, "quit: %d/%d/%d", (int) quit_param, ctx_impl->ready, + ctx_impl->online); + + if (nxt_slow_path(!ctx_impl->online)) { return; } - ctx_impl->online = 0; + skip_graceful_broadcast = quit_param == NXT_QUIT_GRACEFUL + && !ctx_impl->ready; cb = &lib->callbacks; - if (cb->quit != NULL) { - cb->quit(ctx); + if (nxt_fast_path(ctx_impl->ready)) { + ctx_impl->ready = 0; + + if (cb->remove_port != NULL) { + cb->remove_port(&lib->unit, ctx, lib->shared_port); + } } - nxt_queue_each(req_impl, &ctx_impl->active_req, - nxt_unit_request_info_impl_t, link) - { - req = &req_impl->req; + if (quit_param == NXT_QUIT_GRACEFUL) { + pthread_mutex_lock(&ctx_impl->mutex); - nxt_unit_req_warn(req, "active request on ctx quit"); + quit = nxt_queue_is_empty(&ctx_impl->active_req) + && nxt_queue_is_empty(&ctx_impl->pending_rbuf) + && ctx_impl->wait_items == 0; - if (cb->close_handler) { - nxt_unit_req_debug(req, "close_handler"); + pthread_mutex_unlock(&ctx_impl->mutex); - cb->close_handler(req); + } else { + quit = 1; + ctx_impl->quit_param = NXT_QUIT_GRACEFUL; + } - } else { - nxt_unit_request_done(req, NXT_UNIT_ERROR); + if (quit) { + ctx_impl->online = 0; + + if (cb->quit != NULL) { + cb->quit(ctx); } - } nxt_queue_loop; + nxt_queue_each(req_impl, &ctx_impl->active_req, + nxt_unit_request_info_impl_t, link) + { + req = &req_impl->req; - if (ctx != &lib->main_ctx.ctx) { + nxt_unit_req_warn(req, "active request on ctx quit"); + + if (cb->close_handler) { + nxt_unit_req_debug(req, "close_handler"); + + cb->close_handler(req); + + } else { + nxt_unit_request_done(req, NXT_UNIT_ERROR); + } + + } nxt_queue_loop; + + if (nxt_fast_path(ctx_impl->read_port != NULL)) { + nxt_unit_remove_port(lib, ctx, &ctx_impl->read_port->id); + } + } + + if (ctx != &lib->main_ctx.ctx || skip_graceful_broadcast) { return; } - memset(&msg, 0, sizeof(nxt_port_msg_t)); + memset(&m.msg, 0, sizeof(nxt_port_msg_t)); - msg.pid = lib->pid; - msg.type = _NXT_PORT_MSG_QUIT; + m.msg.pid = lib->pid; + m.msg.type = _NXT_PORT_MSG_QUIT; + m.quit_param = quit_param; pthread_mutex_lock(&lib->mutex); @@ -5775,7 +5840,7 @@ nxt_unit_quit(nxt_unit_ctx_t *ctx) } (void) nxt_unit_port_send(ctx, ctx_impl->read_port, - &msg, sizeof(msg), NULL, 0); + &m, sizeof(m), NULL); } nxt_queue_loop; @@ -5810,7 +5875,7 @@ nxt_unit_get_port(nxt_unit_ctx_t *ctx, nxt_unit_port_id_t *port_id) nxt_unit_debug(ctx, "get_port: %d %d", (int) port_id->pid, (int) port_id->id); - res = nxt_unit_port_send(ctx, lib->router_port, &m, sizeof(m), NULL, 0); + res = nxt_unit_port_send(ctx, lib->router_port, &m, sizeof(m), NULL); if (nxt_slow_path(res != sizeof(m))) { return NXT_UNIT_ERROR; } @@ -5821,7 +5886,7 @@ nxt_unit_get_port(nxt_unit_ctx_t *ctx, nxt_unit_port_id_t *port_id) static ssize_t nxt_unit_port_send(nxt_unit_ctx_t *ctx, nxt_unit_port_t *port, - const void *buf, size_t buf_size, const void *oob, size_t oob_size) + const void *buf, size_t buf_size, const nxt_send_oob_t *oob) { int notify; ssize_t ret; @@ -5833,7 +5898,7 @@ nxt_unit_port_send(nxt_unit_ctx_t *ctx, nxt_unit_port_t *port, lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit); port_impl = nxt_container_of(port, nxt_unit_port_impl_t, port); - if (port_impl->queue != NULL && oob_size == 0 + if (port_impl->queue != NULL && (oob == NULL || oob->size == 0) && buf_size <= NXT_PORT_QUEUE_MSG_SIZE) { rc = nxt_port_queue_send(port_impl->queue, buf, buf_size, ¬ify); @@ -5855,7 +5920,7 @@ nxt_unit_port_send(nxt_unit_ctx_t *ctx, nxt_unit_port_t *port, if (lib->callbacks.port_send == NULL) { ret = nxt_unit_sendmsg(ctx, port->out_fd, &msg, - sizeof(nxt_port_msg_t), NULL, 0); + sizeof(nxt_port_msg_t), NULL); nxt_unit_debug(ctx, "port{%d,%d} send %d read_queue", (int) port->id.pid, (int) port->id.id, @@ -5892,15 +5957,15 @@ nxt_unit_port_send(nxt_unit_ctx_t *ctx, nxt_unit_port_t *port, if (lib->callbacks.port_send != NULL) { ret = lib->callbacks.port_send(ctx, port, buf, buf_size, - oob, oob_size); + oob != NULL ? oob->buf : NULL, + oob != NULL ? oob->size : 0); nxt_unit_debug(ctx, "port{%d,%d} sendcb %d", (int) port->id.pid, (int) port->id.id, (int) ret); } else { - ret = nxt_unit_sendmsg(ctx, port->out_fd, buf, buf_size, - oob, oob_size); + ret = nxt_unit_sendmsg(ctx, port->out_fd, buf, buf_size, oob); nxt_unit_debug(ctx, "port{%d,%d} sendmsg %d", (int) port->id.pid, (int) port->id.id, @@ -5913,29 +5978,20 @@ nxt_unit_port_send(nxt_unit_ctx_t *ctx, nxt_unit_port_t *port, static ssize_t nxt_unit_sendmsg(nxt_unit_ctx_t *ctx, int fd, - const void *buf, size_t buf_size, const void *oob, size_t oob_size) + const void *buf, size_t buf_size, const nxt_send_oob_t *oob) { int err; - ssize_t res; + ssize_t n; struct iovec iov[1]; - struct msghdr msg; iov[0].iov_base = (void *) buf; iov[0].iov_len = buf_size; - msg.msg_name = NULL; - msg.msg_namelen = 0; - msg.msg_iov = iov; - msg.msg_iovlen = 1; - msg.msg_flags = 0; - msg.msg_control = (void *) oob; - msg.msg_controllen = oob_size; - retry: - res = sendmsg(fd, &msg, 0); + n = nxt_sendmsg(fd, iov, 1, oob); - if (nxt_slow_path(res == -1)) { + if (nxt_slow_path(n == -1)) { err = errno; if (err == EINTR) { @@ -5950,11 +6006,11 @@ retry: fd, (int) buf_size, strerror(err), err); } else { - nxt_unit_debug(ctx, "sendmsg(%d, %d): %d", fd, (int) buf_size, - (int) res); + nxt_unit_debug(ctx, "sendmsg(%d, %d, %d): %d", fd, (int) buf_size, + (oob != NULL ? (int) oob->size : 0), (int) n); } - return res; + return n; } @@ -6063,7 +6119,7 @@ retry: nxt_unit_rbuf_cpy(port_impl->socket_rbuf, rbuf); - memset(rbuf->oob, 0, sizeof(struct cmsghdr)); + rbuf->oob.size = 0; goto retry; } @@ -6074,7 +6130,8 @@ nxt_unit_rbuf_cpy(nxt_unit_read_buf_t *dst, nxt_unit_read_buf_t *src) { memcpy(dst->buf, src->buf, src->size); dst->size = src->size; - memcpy(dst->oob, src->oob, sizeof(src->oob)); + dst->oob.size = src->oob.size; + memcpy(dst->oob.buf, src->oob.buf, src->oob.size); } @@ -6089,7 +6146,11 @@ nxt_unit_shared_port_recv(nxt_unit_ctx_t *ctx, nxt_unit_port_t *port, retry: - res = nxt_unit_app_queue_recv(port, rbuf); + res = nxt_unit_app_queue_recv(ctx, port, rbuf); + + if (res == NXT_UNIT_OK) { + return NXT_UNIT_OK; + } if (res == NXT_UNIT_AGAIN) { res = nxt_unit_port_recv(ctx, port, rbuf); @@ -6116,16 +6177,18 @@ nxt_unit_port_recv(nxt_unit_ctx_t *ctx, nxt_unit_port_t *port, nxt_unit_read_buf_t *rbuf) { int fd, err; + size_t oob_size; struct iovec iov[1]; - struct msghdr msg; nxt_unit_impl_t *lib; lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit); if (lib->callbacks.port_recv != NULL) { + oob_size = sizeof(rbuf->oob.buf); + rbuf->size = lib->callbacks.port_recv(ctx, port, rbuf->buf, sizeof(rbuf->buf), - rbuf->oob, sizeof(rbuf->oob)); + rbuf->oob.buf, &oob_size); nxt_unit_debug(ctx, "port{%d,%d} recvcb %d", (int) port->id.pid, (int) port->id.id, (int) rbuf->size); @@ -6134,25 +6197,18 @@ nxt_unit_port_recv(nxt_unit_ctx_t *ctx, nxt_unit_port_t *port, return NXT_UNIT_ERROR; } + rbuf->oob.size = oob_size; return NXT_UNIT_OK; } iov[0].iov_base = rbuf->buf; iov[0].iov_len = sizeof(rbuf->buf); - msg.msg_name = NULL; - msg.msg_namelen = 0; - msg.msg_iov = iov; - msg.msg_iovlen = 1; - msg.msg_flags = 0; - msg.msg_control = rbuf->oob; - msg.msg_controllen = sizeof(rbuf->oob); - fd = port->in_fd; retry: - rbuf->size = recvmsg(fd, &msg, 0); + rbuf->size = nxt_recvmsg(fd, iov, 1, &rbuf->oob); if (nxt_slow_path(rbuf->size == -1)) { err = errno; @@ -6194,13 +6250,20 @@ nxt_unit_port_queue_recv(nxt_unit_port_t *port, nxt_unit_read_buf_t *rbuf) static int -nxt_unit_app_queue_recv(nxt_unit_port_t *port, nxt_unit_read_buf_t *rbuf) +nxt_unit_app_queue_recv(nxt_unit_ctx_t *ctx, nxt_unit_port_t *port, + nxt_unit_read_buf_t *rbuf) { uint32_t cookie; nxt_port_msg_t *port_msg; nxt_app_queue_t *queue; + nxt_unit_impl_t *lib; nxt_unit_port_impl_t *port_impl; + struct { + nxt_port_msg_t msg; + uint8_t quit_param; + } nxt_packed m; + port_impl = nxt_container_of(port, nxt_unit_port_impl_t, port); queue = port_impl->queue; @@ -6214,6 +6277,25 @@ retry: port_msg = (nxt_port_msg_t *) rbuf->buf; if (nxt_app_queue_cancel(queue, cookie, port_msg->stream)) { + lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit); + + if (lib->request_limit != 0) { + nxt_atomic_fetch_add(&lib->request_count, 1); + + if (nxt_slow_path(lib->request_count >= lib->request_limit)) { + nxt_unit_debug(ctx, "request limit reached"); + + memset(&m.msg, 0, sizeof(nxt_port_msg_t)); + + m.msg.pid = lib->pid; + m.msg.type = _NXT_PORT_MSG_QUIT; + m.quit_param = NXT_QUIT_GRACEFUL; + + (void) nxt_unit_port_send(ctx, lib->main_ctx.read_port, + &m, sizeof(m), NULL); + } + } + return NXT_UNIT_OK; } @@ -6495,7 +6577,7 @@ nxt_unit_log(nxt_unit_ctx_t *ctx, int level, const char *fmt, ...) log_fd = lib->log_fd; } else { - pid = getpid(); + pid = nxt_unit_pid; log_fd = STDERR_FILENO; } @@ -6539,7 +6621,7 @@ nxt_unit_req_log(nxt_unit_request_info_t *req, int level, const char *fmt, ...) log_fd = lib->log_fd; } else { - pid = getpid(); + pid = nxt_unit_pid; log_fd = STDERR_FILENO; } |