diff options
author | Max Romanov <max.romanov@nginx.com> | 2020-08-11 19:20:34 +0300 |
---|---|---|
committer | Max Romanov <max.romanov@nginx.com> | 2020-08-11 19:20:34 +0300 |
commit | e227fc9e6281c280c46139a81646ecd7b0510e2b (patch) | |
tree | 8f6e631790e7a83bb46c28bd45ebaa5e737a3424 /src/nxt_unit.c | |
parent | a82cf4ffb68126f2831ab9877a7ef283dd517690 (diff) | |
download | unit-e227fc9e6281c280c46139a81646ecd7b0510e2b.tar.gz unit-e227fc9e6281c280c46139a81646ecd7b0510e2b.tar.bz2 |
Introducing application and port shared memory queues.
The goal is to minimize the number of syscalls needed to deliver a message.
Diffstat (limited to 'src/nxt_unit.c')
-rw-r--r-- | src/nxt_unit.c | 1171 |
1 files changed, 921 insertions, 250 deletions
diff --git a/src/nxt_unit.c b/src/nxt_unit.c index 66aadd98..1008a9d6 100644 --- a/src/nxt_unit.c +++ b/src/nxt_unit.c @@ -7,6 +7,8 @@ #include "nxt_main.h" #include "nxt_port_memory_int.h" +#include "nxt_port_queue.h" +#include "nxt_app_queue.h" #include "nxt_unit.h" #include "nxt_unit_request.h" @@ -50,12 +52,15 @@ nxt_inline void nxt_unit_mmap_buf_unlink(nxt_unit_mmap_buf_t *mmap_buf); static int nxt_unit_read_env(nxt_unit_port_t *ready_port, nxt_unit_port_t *router_port, nxt_unit_port_t *read_port, int *log_fd, uint32_t *stream, uint32_t *shm_limit); -static int nxt_unit_ready(nxt_unit_ctx_t *ctx, int ready_fd, uint32_t stream); +static int nxt_unit_ready(nxt_unit_ctx_t *ctx, int ready_fd, uint32_t stream, + int queue_fd); static int nxt_unit_process_msg(nxt_unit_ctx_t *ctx, nxt_unit_read_buf_t *rbuf); static int nxt_unit_process_new_port(nxt_unit_ctx_t *ctx, nxt_unit_recv_msg_t *recv_msg); static int nxt_unit_process_req_headers(nxt_unit_ctx_t *ctx, nxt_unit_recv_msg_t *recv_msg); +static int nxt_unit_process_req_body(nxt_unit_ctx_t *ctx, + nxt_unit_recv_msg_t *recv_msg); static int nxt_unit_request_check_response_port(nxt_unit_request_info_t *req, nxt_unit_port_id_t *port_id); static int nxt_unit_send_req_headers_ack(nxt_unit_request_info_t *req); @@ -92,6 +97,7 @@ static int nxt_unit_wait_shm_ack(nxt_unit_ctx_t *ctx); static nxt_unit_mmap_t *nxt_unit_mmap_at(nxt_unit_mmaps_t *mmaps, uint32_t i); static nxt_port_mmap_header_t *nxt_unit_new_mmap(nxt_unit_ctx_t *ctx, nxt_unit_port_t *port, int n); +static int nxt_unit_shm_open(nxt_unit_ctx_t *ctx, size_t size); static int nxt_unit_send_mmap(nxt_unit_ctx_t *ctx, nxt_unit_port_t *port, int fd); static int nxt_unit_get_outgoing_buf(nxt_unit_ctx_t *ctx, @@ -103,8 +109,6 @@ static void nxt_unit_mmaps_init(nxt_unit_mmaps_t *mmaps); nxt_inline void nxt_unit_process_use(nxt_unit_process_t *process); nxt_inline void nxt_unit_process_release(nxt_unit_process_t *process); static void nxt_unit_mmaps_destroy(nxt_unit_mmaps_t *mmaps); -static int nxt_unit_tracking_read(nxt_unit_ctx_t *ctx, - nxt_unit_recv_msg_t *recv_msg, nxt_unit_read_buf_t *rbuf); static int nxt_unit_check_rbuf_mmap(nxt_unit_ctx_t *ctx, nxt_unit_mmaps_t *mmaps, pid_t pid, uint32_t id, nxt_port_mmap_header_t **hdr, nxt_unit_read_buf_t *rbuf); @@ -124,18 +128,22 @@ static int nxt_unit_run_once_impl(nxt_unit_ctx_t *ctx); static int nxt_unit_read_buf(nxt_unit_ctx_t *ctx, nxt_unit_read_buf_t *rbuf); static int nxt_unit_process_pending_rbuf(nxt_unit_ctx_t *ctx); static void nxt_unit_process_ready_req(nxt_unit_ctx_t *ctx); +nxt_inline int nxt_unit_is_read_queue(nxt_unit_read_buf_t *rbuf); +nxt_inline int nxt_unit_is_read_socket(nxt_unit_read_buf_t *rbuf); +nxt_inline int nxt_unit_is_shm_ack(nxt_unit_read_buf_t *rbuf); +nxt_inline int nxt_unit_is_quit(nxt_unit_read_buf_t *rbuf); static int nxt_unit_process_port_msg_impl(nxt_unit_ctx_t *ctx, nxt_unit_port_t *port); static void nxt_unit_ctx_free(nxt_unit_ctx_impl_t *ctx_impl); static nxt_unit_port_t *nxt_unit_create_port(nxt_unit_ctx_t *ctx); static int nxt_unit_send_port(nxt_unit_ctx_t *ctx, nxt_unit_port_t *dst, - nxt_unit_port_t *port); + nxt_unit_port_t *port, int queue_fd); nxt_inline void nxt_unit_port_use(nxt_unit_port_t *port); nxt_inline void nxt_unit_port_release(nxt_unit_port_t *port); static nxt_unit_port_t *nxt_unit_add_port(nxt_unit_ctx_t *ctx, - nxt_unit_port_t *port); + nxt_unit_port_t *port, void *queue); static void nxt_unit_remove_port(nxt_unit_impl_t *lib, nxt_unit_port_id_t *port_id); static nxt_unit_port_t *nxt_unit_remove_port_unsafe(nxt_unit_impl_t *lib, @@ -150,18 +158,28 @@ static ssize_t nxt_unit_port_send(nxt_unit_ctx_t *ctx, const void *oob, size_t oob_size); static ssize_t nxt_unit_sendmsg(nxt_unit_ctx_t *ctx, int fd, const void *buf, size_t buf_size, const void *oob, size_t oob_size); +static int nxt_unit_ctx_port_recv(nxt_unit_ctx_t *ctx, nxt_unit_port_t *port, + nxt_unit_read_buf_t *rbuf); +nxt_inline void nxt_unit_rbuf_cpy(nxt_unit_read_buf_t *dst, + nxt_unit_read_buf_t *src); +static int nxt_unit_shared_port_recv(nxt_unit_ctx_t *ctx, nxt_unit_port_t *port, + nxt_unit_read_buf_t *rbuf); static int nxt_unit_port_recv(nxt_unit_ctx_t *ctx, nxt_unit_port_t *port, nxt_unit_read_buf_t *rbuf); +static int nxt_unit_port_queue_recv(nxt_unit_port_t *port, + nxt_unit_read_buf_t *rbuf); +static int nxt_unit_app_queue_recv(nxt_unit_port_t *port, + nxt_unit_read_buf_t *rbuf); static int nxt_unit_port_hash_add(nxt_lvlhsh_t *port_hash, nxt_unit_port_t *port); static nxt_unit_port_t *nxt_unit_port_hash_find(nxt_lvlhsh_t *port_hash, nxt_unit_port_id_t *port_id, int remove); -static int nxt_unit_request_hash_add(nxt_lvlhsh_t *request_hash, - nxt_unit_request_info_impl_t *req_impl); -static nxt_unit_request_info_impl_t *nxt_unit_request_hash_find( - nxt_lvlhsh_t *request_hash, uint32_t stream, int remove); +static int nxt_unit_request_hash_add(nxt_unit_ctx_t *ctx, + nxt_unit_request_info_t *req); +static nxt_unit_request_info_t *nxt_unit_request_hash_find( + nxt_unit_ctx_t *ctx, uint32_t stream, int remove); static char * nxt_unit_snprint_prefix(char *p, char *end, pid_t pid, int level); @@ -217,6 +235,7 @@ struct nxt_unit_request_info_impl_s { nxt_unit_req_state_t state; uint8_t websocket; + uint8_t in_hash; /* for nxt_unit_ctx_impl_t.free_req or active_req */ nxt_queue_link_t link; @@ -349,6 +368,11 @@ struct nxt_unit_port_impl_s { nxt_queue_t awaiting_req; int ready; + + void *queue; + + int from_socket; + nxt_unit_read_buf_t *socket_rbuf; }; @@ -375,7 +399,8 @@ typedef struct { nxt_unit_ctx_t * nxt_unit_init(nxt_unit_init_t *init) { - int rc; + int rc, queue_fd; + void *mem; uint32_t ready_stream, shm_limit; nxt_unit_ctx_t *ctx; nxt_unit_impl_t *lib; @@ -386,6 +411,8 @@ nxt_unit_init(nxt_unit_init_t *init) return NULL; } + queue_fd = -1; + if (init->ready_port.id.pid != 0 && init->ready_stream != 0 && init->read_port.id.pid != 0) @@ -422,33 +449,58 @@ nxt_unit_init(nxt_unit_init_t *init) ctx = &lib->main_ctx.ctx; - lib->router_port = nxt_unit_add_port(ctx, &router_port); + lib->router_port = nxt_unit_add_port(ctx, &router_port, NULL); if (nxt_slow_path(lib->router_port == NULL)) { nxt_unit_alert(NULL, "failed to add router_port"); goto fail; } - lib->main_ctx.read_port = nxt_unit_add_port(ctx, &read_port); + queue_fd = nxt_unit_shm_open(ctx, sizeof(nxt_port_queue_t)); + if (nxt_slow_path(queue_fd == -1)) { + goto fail; + } + + mem = mmap(NULL, sizeof(nxt_port_queue_t), + PROT_READ | PROT_WRITE, MAP_SHARED, queue_fd, 0); + if (nxt_slow_path(mem == MAP_FAILED)) { + nxt_unit_alert(ctx, "mmap(%d) failed: %s (%d)", queue_fd, + strerror(errno), errno); + + goto fail; + } + + nxt_port_queue_init(mem); + + lib->main_ctx.read_port = nxt_unit_add_port(ctx, &read_port, mem); if (nxt_slow_path(lib->main_ctx.read_port == NULL)) { nxt_unit_alert(NULL, "failed to add read_port"); + munmap(mem, sizeof(nxt_port_queue_t)); + goto fail; } - rc = nxt_unit_ready(ctx, ready_port.out_fd, ready_stream); + rc = nxt_unit_ready(ctx, ready_port.out_fd, ready_stream, queue_fd); if (nxt_slow_path(rc != NXT_UNIT_OK)) { nxt_unit_alert(NULL, "failed to send READY message"); + munmap(mem, sizeof(nxt_port_queue_t)); + goto fail; } close(ready_port.out_fd); + close(queue_fd); return ctx; fail: + if (queue_fd != -1) { + close(queue_fd); + } + nxt_unit_ctx_release(&lib->main_ctx.ctx); return NULL; @@ -497,6 +549,7 @@ nxt_unit_create(nxt_unit_init_t *init) rc = nxt_unit_ctx_init(lib, &lib->main_ctx, init->ctx_data); if (nxt_slow_path(rc != NXT_UNIT_OK)) { + pthread_mutex_destroy(&lib->mutex); goto fail; } @@ -505,6 +558,7 @@ nxt_unit_create(nxt_unit_init_t *init) if (cb->request_handler == NULL) { nxt_unit_alert(NULL, "request_handler is NULL"); + pthread_mutex_destroy(&lib->mutex); goto fail; } @@ -765,12 +819,17 @@ nxt_unit_read_env(nxt_unit_port_t *ready_port, nxt_unit_port_t *router_port, static int -nxt_unit_ready(nxt_unit_ctx_t *ctx, int ready_fd, uint32_t stream) +nxt_unit_ready(nxt_unit_ctx_t *ctx, int ready_fd, uint32_t stream, int queue_fd) { ssize_t res; nxt_port_msg_t msg; nxt_unit_impl_t *lib; + union { + struct cmsghdr cm; + char space[CMSG_SPACE(sizeof(int))]; + } cmsg; + lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit); msg.stream = stream; @@ -783,7 +842,25 @@ nxt_unit_ready(nxt_unit_ctx_t *ctx, int ready_fd, uint32_t stream) msg.mf = 0; msg.tracking = 0; - res = nxt_unit_sendmsg(ctx, ready_fd, &msg, sizeof(msg), NULL, 0); + memset(&cmsg, 0, sizeof(cmsg)); + + cmsg.cm.cmsg_len = CMSG_LEN(sizeof(int)); + cmsg.cm.cmsg_level = SOL_SOCKET; + cmsg.cm.cmsg_type = SCM_RIGHTS; + + /* + * memcpy() is used instead of simple + * *(int *) CMSG_DATA(&cmsg.cm) = fd; + * because GCC 4.4 with -O2/3/s optimization may issue a warning: + * dereferencing type-punned pointer will break strict-aliasing rules + * + * Fortunately, GCC with -O1 compiles this nxt_memcpy() + * in the same simple assignment as in the code above. + */ + memcpy(CMSG_DATA(&cmsg.cm), &queue_fd, sizeof(int)); + + res = nxt_unit_sendmsg(ctx, ready_fd, &msg, sizeof(msg), + &cmsg, sizeof(cmsg)); if (res != sizeof(msg)) { return NXT_UNIT_ERROR; } @@ -838,6 +915,10 @@ nxt_unit_process_msg(nxt_unit_ctx_t *ctx, nxt_unit_read_buf_t *rbuf) goto fail; } + nxt_unit_debug(ctx, "#%"PRIu32": process message %d fd %d fd2 %d", + port_msg->stream, (int) port_msg->type, + recv_msg.fd, recv_msg.fd2); + recv_msg.stream = port_msg->stream; recv_msg.pid = port_msg->pid; recv_msg.reply_port = port_msg->reply_port; @@ -853,19 +934,6 @@ nxt_unit_process_msg(nxt_unit_ctx_t *ctx, nxt_unit_read_buf_t *rbuf) goto fail; } - if (port_msg->tracking) { - rc = nxt_unit_tracking_read(ctx, &recv_msg, rbuf); - - if (nxt_slow_path(rc != NXT_UNIT_OK)) { - if (rc == NXT_UNIT_AGAIN) { - recv_msg.fd = -1; - recv_msg.fd2 = -1; - } - - goto fail; - } - } - /* Fragmentation is unsupported. */ if (nxt_slow_path(port_msg->nf != 0 || port_msg->mf != 0)) { nxt_unit_warn(ctx, "#%"PRIu32": fragmented message type (%d)", @@ -929,6 +997,10 @@ nxt_unit_process_msg(nxt_unit_ctx_t *ctx, nxt_unit_read_buf_t *rbuf) rc = nxt_unit_process_req_headers(ctx, &recv_msg); break; + case _NXT_PORT_MSG_REQ_BODY: + rc = nxt_unit_process_req_body(ctx, &recv_msg); + break; + case _NXT_PORT_MSG_WEBSOCKET: rc = nxt_unit_process_websocket(ctx, &recv_msg); break; @@ -992,6 +1064,7 @@ static int nxt_unit_process_new_port(nxt_unit_ctx_t *ctx, nxt_unit_recv_msg_t *recv_msg) { int nb; + void *mem; nxt_unit_impl_t *lib; nxt_unit_port_t new_port, *port; nxt_port_msg_new_port_t *new_port_msg; @@ -1013,9 +1086,9 @@ nxt_unit_process_new_port(nxt_unit_ctx_t *ctx, nxt_unit_recv_msg_t *recv_msg) new_port_msg = recv_msg->start; - nxt_unit_debug(ctx, "#%"PRIu32": new_port: %d,%d fd %d", + nxt_unit_debug(ctx, "#%"PRIu32": new_port: port{%d,%d} fd %d fd2 %d", recv_msg->stream, (int) new_port_msg->pid, - (int) new_port_msg->id, recv_msg->fd); + (int) new_port_msg->id, recv_msg->fd, recv_msg->fd2); lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit); @@ -1025,6 +1098,9 @@ nxt_unit_process_new_port(nxt_unit_ctx_t *ctx, nxt_unit_recv_msg_t *recv_msg) new_port.in_fd = recv_msg->fd; new_port.out_fd = -1; + mem = mmap(NULL, sizeof(nxt_app_queue_t), PROT_READ | PROT_WRITE, + MAP_SHARED, recv_msg->fd2, 0); + } else { nb = 0; @@ -1041,14 +1117,23 @@ nxt_unit_process_new_port(nxt_unit_ctx_t *ctx, nxt_unit_recv_msg_t *recv_msg) new_port.in_fd = -1; new_port.out_fd = recv_msg->fd; + + mem = mmap(NULL, sizeof(nxt_port_queue_t), PROT_READ | PROT_WRITE, + MAP_SHARED, recv_msg->fd2, 0); } + if (nxt_slow_path(mem == MAP_FAILED)) { + nxt_unit_alert(ctx, "mmap(%d) failed: %s (%d)", recv_msg->fd2, + strerror(errno), errno); + + return NXT_UNIT_ERROR; + } new_port.data = NULL; recv_msg->fd = -1; - port = nxt_unit_add_port(ctx, &new_port); + port = nxt_unit_add_port(ctx, &new_port, mem); if (nxt_slow_path(port == NULL)) { return NXT_UNIT_ERROR; } @@ -1134,6 +1219,7 @@ nxt_unit_process_req_headers(nxt_unit_ctx_t *ctx, nxt_unit_recv_msg_t *recv_msg) req->response_max_fields = 0; req_impl->state = NXT_UNIT_RS_START; req_impl->websocket = 0; + req_impl->in_hash = 0; nxt_unit_debug(ctx, "#%"PRIu32": %.*s %.*s (%d)", recv_msg->stream, (int) r->method_length, @@ -1151,12 +1237,82 @@ nxt_unit_process_req_headers(nxt_unit_ctx_t *ctx, nxt_unit_recv_msg_t *recv_msg) if (nxt_fast_path(res == NXT_UNIT_OK)) { res = nxt_unit_send_req_headers_ack(req); - if (nxt_slow_path(res != NXT_UNIT_OK)) { - return res; + if (nxt_slow_path(res == NXT_UNIT_ERROR)) { + nxt_unit_request_done(req, NXT_UNIT_ERROR); + + return NXT_UNIT_ERROR; } lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit); + if (req->content_length + > (uint64_t) (req->content_buf->end - req->content_buf->free)) + { + res = nxt_unit_request_hash_add(ctx, req); + if (nxt_slow_path(res != NXT_UNIT_OK)) { + nxt_unit_req_warn(req, "failed to add request to hash"); + + nxt_unit_request_done(req, NXT_UNIT_ERROR); + + return NXT_UNIT_ERROR; + } + + /* + * If application have separate data handler, we may start + * request processing and process data when it is arrived. + */ + if (lib->callbacks.data_handler == NULL) { + return NXT_UNIT_OK; + } + } + + lib->callbacks.request_handler(req); + } + + return NXT_UNIT_OK; +} + + +static int +nxt_unit_process_req_body(nxt_unit_ctx_t *ctx, nxt_unit_recv_msg_t *recv_msg) +{ + uint64_t l; + nxt_unit_impl_t *lib; + nxt_unit_mmap_buf_t *b; + nxt_unit_request_info_t *req; + + req = nxt_unit_request_hash_find(ctx, recv_msg->stream, recv_msg->last); + if (req == NULL) { + return NXT_UNIT_OK; + } + + l = req->content_buf->end - req->content_buf->free; + + for (b = recv_msg->incoming_buf; b != NULL; b = b->next) { + b->req = req; + l += b->buf.end - b->buf.free; + } + + if (recv_msg->incoming_buf != NULL) { + b = nxt_container_of(req->content_buf, nxt_unit_mmap_buf_t, buf); + + /* "Move" incoming buffer list to req_impl. */ + nxt_unit_mmap_buf_insert_tail(&b->next, recv_msg->incoming_buf); + recv_msg->incoming_buf = NULL; + } + + req->content_fd = recv_msg->fd; + recv_msg->fd = -1; + + lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit); + + if (lib->callbacks.data_handler != NULL) { + lib->callbacks.data_handler(req); + + return NXT_UNIT_OK; + } + + if (req->content_fd != -1 || l == req->content_length) { lib->callbacks.request_handler(req); } @@ -1260,6 +1416,9 @@ nxt_unit_request_check_response_port(nxt_unit_request_info_t *req, nxt_queue_insert_tail(&process->ports, &port_impl->link); port_impl->process = process; + port_impl->queue = NULL; + port_impl->from_socket = 0; + port_impl->socket_rbuf = NULL; nxt_queue_init(&port_impl->awaiting_req); @@ -1321,21 +1480,17 @@ nxt_unit_process_websocket(nxt_unit_ctx_t *ctx, nxt_unit_recv_msg_t *recv_msg) size_t hsize; nxt_unit_impl_t *lib; nxt_unit_mmap_buf_t *b; - nxt_unit_ctx_impl_t *ctx_impl; nxt_unit_callbacks_t *cb; nxt_unit_request_info_t *req; nxt_unit_request_info_impl_t *req_impl; nxt_unit_websocket_frame_impl_t *ws_impl; - ctx_impl = nxt_container_of(ctx, nxt_unit_ctx_impl_t, ctx); - - req_impl = nxt_unit_request_hash_find(&ctx_impl->requests, recv_msg->stream, - recv_msg->last); - if (req_impl == NULL) { + req = nxt_unit_request_hash_find(ctx, recv_msg->stream, recv_msg->last); + if (nxt_slow_path(req == NULL)) { return NXT_UNIT_OK; } - req = &req_impl->req; + req_impl = nxt_container_of(req, nxt_unit_request_info_impl_t, req); lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit); cb = &lib->callbacks; @@ -1501,12 +1656,12 @@ nxt_unit_request_info_release(nxt_unit_request_info_t *req) req->response = NULL; req->response_buf = NULL; - if (req_impl->websocket) { - nxt_unit_request_hash_find(&ctx_impl->requests, req_impl->stream, 1); - - req_impl->websocket = 0; + if (req_impl->in_hash) { + nxt_unit_request_hash_find(req->ctx, req_impl->stream, 1); } + req_impl->websocket = 0; + while (req_impl->outgoing_buf != NULL) { nxt_unit_mmap_buf_free(req_impl->outgoing_buf); } @@ -2170,7 +2325,6 @@ int nxt_unit_response_upgrade(nxt_unit_request_info_t *req) { int rc; - nxt_unit_ctx_impl_t *ctx_impl; nxt_unit_request_info_impl_t *req_impl; req_impl = nxt_container_of(req, nxt_unit_request_info_impl_t, req); @@ -2193,9 +2347,7 @@ nxt_unit_response_upgrade(nxt_unit_request_info_t *req) return NXT_UNIT_ERROR; } - ctx_impl = nxt_container_of(req->ctx, nxt_unit_ctx_impl_t, ctx); - - rc = nxt_unit_request_hash_add(&ctx_impl->requests, req_impl); + rc = nxt_unit_request_hash_add(req->ctx, req); if (nxt_slow_path(rc != NXT_UNIT_OK)) { nxt_unit_req_warn(req, "upgrade: failed to add request to hash"); @@ -2466,6 +2618,8 @@ nxt_unit_read_buf_get(nxt_unit_ctx_t *ctx) pthread_mutex_unlock(&ctx_impl->mutex); + memset(rbuf->oob, 0, sizeof(struct cmsghdr)); + return rbuf; } @@ -2564,6 +2718,8 @@ nxt_unit_response_write_nb(nxt_unit_request_info_t *req, const void *start, nxt_unit_request_info_impl_t *req_impl; char local_buf[NXT_UNIT_LOCAL_BUF_SIZE]; + nxt_unit_req_debug(req, "write: %d", (int) size); + req_impl = nxt_container_of(req, nxt_unit_request_info_impl_t, req); part_start = start; @@ -2743,9 +2899,11 @@ nxt_unit_request_read(nxt_unit_request_info_t *req, void *dst, size_t size) buf_res = nxt_unit_buf_read(&req->content_buf, &req->content_length, dst, size); + nxt_unit_req_debug(req, "read: %d", (int) buf_res); + if (buf_res < (ssize_t) size && req->content_fd != -1) { res = read(req->content_fd, dst, size); - if (res < 0) { + if (nxt_slow_path(res < 0)) { nxt_unit_req_alert(req, "failed to read content: %s (%d)", strerror(errno), errno); @@ -3301,7 +3459,7 @@ nxt_unit_send_oosm(nxt_unit_ctx_t *ctx, nxt_unit_port_t *port) static int nxt_unit_wait_shm_ack(nxt_unit_ctx_t *ctx) { - nxt_port_msg_t *port_msg; + int res; nxt_unit_ctx_impl_t *ctx_impl; nxt_unit_read_buf_t *rbuf; @@ -3313,21 +3471,15 @@ nxt_unit_wait_shm_ack(nxt_unit_ctx_t *ctx) return NXT_UNIT_ERROR; } - memset(rbuf->oob, 0, sizeof(struct cmsghdr)); - - nxt_unit_port_recv(ctx, ctx_impl->read_port, rbuf); - - if (nxt_slow_path(rbuf->size < (ssize_t) sizeof(nxt_port_msg_t))) { + res = nxt_unit_ctx_port_recv(ctx, ctx_impl->read_port, rbuf); + if (res == NXT_UNIT_ERROR) { nxt_unit_read_buf_release(ctx, rbuf); return NXT_UNIT_ERROR; } - port_msg = (nxt_port_msg_t *) rbuf->buf; - - if (port_msg->type == _NXT_PORT_MSG_SHM_ACK) { + if (nxt_unit_is_shm_ack(rbuf)) { nxt_unit_read_buf_release(ctx, rbuf); - break; } @@ -3337,7 +3489,7 @@ nxt_unit_wait_shm_ack(nxt_unit_ctx_t *ctx) pthread_mutex_unlock(&ctx_impl->mutex); - if (port_msg->type == _NXT_PORT_MSG_QUIT) { + if (nxt_unit_is_quit(rbuf)) { nxt_unit_debug(ctx, "oosm: quit received"); return NXT_UNIT_ERROR; @@ -3406,7 +3558,6 @@ nxt_unit_new_mmap(nxt_unit_ctx_t *ctx, nxt_unit_port_t *port, int n) { int i, fd, rc; void *mem; - char name[64]; nxt_unit_mmap_t *mm; nxt_unit_impl_t *lib; nxt_port_mmap_header_t *hdr; @@ -3420,59 +3571,8 @@ nxt_unit_new_mmap(nxt_unit_ctx_t *ctx, nxt_unit_port_t *port, int n) return NULL; } - snprintf(name, sizeof(name), NXT_SHM_PREFIX "unit.%d.%p", - lib->pid, (void *) pthread_self()); - -#if (NXT_HAVE_MEMFD_CREATE) - - fd = syscall(SYS_memfd_create, name, MFD_CLOEXEC); + fd = nxt_unit_shm_open(ctx, PORT_MMAP_SIZE); if (nxt_slow_path(fd == -1)) { - nxt_unit_alert(ctx, "memfd_create(%s) failed: %s (%d)", name, - strerror(errno), errno); - - goto remove_fail; - } - - nxt_unit_debug(ctx, "memfd_create(%s): %d", name, fd); - -#elif (NXT_HAVE_SHM_OPEN_ANON) - - fd = shm_open(SHM_ANON, O_RDWR, S_IRUSR | S_IWUSR); - if (nxt_slow_path(fd == -1)) { - nxt_unit_alert(ctx, "shm_open(SHM_ANON) failed: %s (%d)", - strerror(errno), errno); - - goto remove_fail; - } - -#elif (NXT_HAVE_SHM_OPEN) - - /* Just in case. */ - shm_unlink(name); - - fd = shm_open(name, O_CREAT | O_EXCL | O_RDWR, S_IRUSR | S_IWUSR); - if (nxt_slow_path(fd == -1)) { - nxt_unit_alert(ctx, "shm_open(%s) failed: %s (%d)", name, - strerror(errno), errno); - - goto remove_fail; - } - - if (nxt_slow_path(shm_unlink(name) == -1)) { - nxt_unit_warn(ctx, "shm_unlink(%s) failed: %s (%d)", name, - strerror(errno), errno); - } - -#else - -#error No working shared memory implementation. - -#endif - - if (nxt_slow_path(ftruncate(fd, PORT_MMAP_SIZE) == -1)) { - nxt_unit_alert(ctx, "ftruncate(%d) failed: %s (%d)", fd, - strerror(errno), errno); - goto remove_fail; } @@ -3481,6 +3581,8 @@ nxt_unit_new_mmap(nxt_unit_ctx_t *ctx, nxt_unit_port_t *port, int n) nxt_unit_alert(ctx, "mmap(%d) failed: %s (%d)", fd, strerror(errno), errno); + close(fd); + goto remove_fail; } @@ -3533,6 +3635,80 @@ remove_fail: static int +nxt_unit_shm_open(nxt_unit_ctx_t *ctx, size_t size) +{ + int fd; + nxt_unit_impl_t *lib; + + lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit); + +#if (NXT_HAVE_MEMFD_CREATE || NXT_HAVE_SHM_OPEN) + char name[64]; + + snprintf(name, sizeof(name), NXT_SHM_PREFIX "unit.%d.%p", + lib->pid, (void *) pthread_self()); +#endif + +#if (NXT_HAVE_MEMFD_CREATE) + + fd = syscall(SYS_memfd_create, name, MFD_CLOEXEC); + if (nxt_slow_path(fd == -1)) { + nxt_unit_alert(ctx, "memfd_create(%s) failed: %s (%d)", name, + strerror(errno), errno); + + return -1; + } + + nxt_unit_debug(ctx, "memfd_create(%s): %d", name, fd); + +#elif (NXT_HAVE_SHM_OPEN_ANON) + + fd = shm_open(SHM_ANON, O_RDWR, S_IRUSR | S_IWUSR); + if (nxt_slow_path(fd == -1)) { + nxt_unit_alert(ctx, "shm_open(SHM_ANON) failed: %s (%d)", + strerror(errno), errno); + + return -1; + } + +#elif (NXT_HAVE_SHM_OPEN) + + /* Just in case. */ + shm_unlink(name); + + fd = shm_open(name, O_CREAT | O_EXCL | O_RDWR, S_IRUSR | S_IWUSR); + if (nxt_slow_path(fd == -1)) { + nxt_unit_alert(ctx, "shm_open(%s) failed: %s (%d)", name, + strerror(errno), errno); + + return -1; + } + + if (nxt_slow_path(shm_unlink(name) == -1)) { + nxt_unit_alert(ctx, "shm_unlink(%s) failed: %s (%d)", name, + strerror(errno), errno); + } + +#else + +#error No working shared memory implementation. + +#endif + + if (nxt_slow_path(ftruncate(fd, size) == -1)) { + nxt_unit_alert(ctx, "ftruncate(%d) failed: %s (%d)", fd, + strerror(errno), errno); + + close(fd); + + return -1; + } + + return fd; +} + + +static int nxt_unit_send_mmap(nxt_unit_ctx_t *ctx, nxt_unit_port_t *port, int fd) { ssize_t res; @@ -3797,61 +3973,6 @@ nxt_unit_mmaps_destroy(nxt_unit_mmaps_t *mmaps) static int -nxt_unit_tracking_read(nxt_unit_ctx_t *ctx, nxt_unit_recv_msg_t *recv_msg, - nxt_unit_read_buf_t *rbuf) -{ - int res; - nxt_chunk_id_t c; - nxt_unit_impl_t *lib; - nxt_port_mmap_header_t *hdr; - nxt_port_mmap_tracking_msg_t *tracking_msg; - - if (recv_msg->size < (int) sizeof(nxt_port_mmap_tracking_msg_t)) { - nxt_unit_warn(ctx, "#%"PRIu32": tracking_read: too small message (%d)", - recv_msg->stream, (int) recv_msg->size); - - return NXT_UNIT_ERROR; - } - - tracking_msg = recv_msg->start; - - recv_msg->start = tracking_msg + 1; - recv_msg->size -= sizeof(nxt_port_mmap_tracking_msg_t); - - lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit); - - pthread_mutex_lock(&lib->incoming.mutex); - - res = nxt_unit_check_rbuf_mmap(ctx, &lib->incoming, - recv_msg->pid, tracking_msg->mmap_id, - &hdr, rbuf); - - if (nxt_slow_path(res != NXT_UNIT_OK)) { - return res; - } - - c = tracking_msg->tracking_id; - res = nxt_atomic_cmp_set(hdr->tracking + c, recv_msg->stream, 0); - - if (res == 0) { - nxt_unit_debug(ctx, "#%"PRIu32": tracking cancelled", - recv_msg->stream); - - nxt_port_mmap_set_chunk_free(hdr->free_tracking_map, c); - - res = NXT_UNIT_CANCELLED; - - } else { - res = NXT_UNIT_OK; - } - - pthread_mutex_unlock(&lib->incoming.mutex); - - return res; -} - - -static int nxt_unit_check_rbuf_mmap(nxt_unit_ctx_t *ctx, nxt_unit_mmaps_t *mmaps, pid_t pid, uint32_t id, nxt_port_mmap_header_t **hdr, nxt_unit_read_buf_t *rbuf) @@ -4154,7 +4275,7 @@ nxt_unit_process_get(nxt_unit_impl_t *lib, pid_t pid) } process->pid = pid; - process->use_count = 1; + process->use_count = 2; process->next_port_id = 0; process->lib = lib; @@ -4176,8 +4297,6 @@ nxt_unit_process_get(nxt_unit_impl_t *lib, pid_t pid) break; } - nxt_unit_process_use(process); - return process; } @@ -4293,22 +4412,52 @@ nxt_unit_run_once_impl(nxt_unit_ctx_t *ctx) static int nxt_unit_read_buf(nxt_unit_ctx_t *ctx, nxt_unit_read_buf_t *rbuf) { - int res, err; - nxt_unit_impl_t *lib; - nxt_unit_ctx_impl_t *ctx_impl; - struct pollfd fds[2]; + int nevents, res, err; + nxt_unit_impl_t *lib; + nxt_unit_ctx_impl_t *ctx_impl; + nxt_unit_port_impl_t *port_impl; + struct pollfd fds[2]; lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit); ctx_impl = nxt_container_of(ctx, nxt_unit_ctx_impl_t, ctx); - memset(rbuf->oob, 0, sizeof(struct cmsghdr)); - if (ctx_impl->wait_items > 0 || lib->shared_port == NULL) { - return nxt_unit_port_recv(ctx, ctx_impl->read_port, rbuf); + + return nxt_unit_ctx_port_recv(ctx, ctx_impl->read_port, rbuf); } + port_impl = nxt_container_of(ctx_impl->read_port, nxt_unit_port_impl_t, + port); + retry: + if (port_impl->from_socket == 0) { + res = nxt_unit_port_queue_recv(ctx_impl->read_port, rbuf); + if (res == NXT_UNIT_OK) { + if (nxt_unit_is_read_socket(rbuf)) { + port_impl->from_socket++; + + nxt_unit_debug(ctx, "port{%d,%d} dequeue 1 read_socket %d", + (int) ctx_impl->read_port->id.pid, + (int) ctx_impl->read_port->id.id, + port_impl->from_socket); + + } else { + nxt_unit_debug(ctx, "port{%d,%d} dequeue %d", + (int) ctx_impl->read_port->id.pid, + (int) ctx_impl->read_port->id.id, + (int) rbuf->size); + + return NXT_UNIT_OK; + } + } + } + + res = nxt_unit_app_queue_recv(lib->shared_port, rbuf); + if (res == NXT_UNIT_OK) { + return NXT_UNIT_OK; + } + fds[0].fd = ctx_impl->read_port->in_fd; fds[0].events = POLLIN; fds[0].revents = 0; @@ -4317,31 +4466,47 @@ retry: fds[1].events = POLLIN; fds[1].revents = 0; - res = poll(fds, 2, -1); - if (nxt_slow_path(res < 0)) { + nevents = poll(fds, 2, -1); + if (nxt_slow_path(nevents == -1)) { err = errno; if (err == EINTR) { goto retry; } - nxt_unit_alert(ctx, "poll() failed: %s (%d)", - strerror(err), err); + nxt_unit_alert(ctx, "poll(%d,%d) failed: %s (%d)", + fds[0].fd, fds[1].fd, strerror(err), err); rbuf->size = -1; return (err == EAGAIN) ? NXT_UNIT_AGAIN : NXT_UNIT_ERROR; } + nxt_unit_debug(ctx, "poll(%d,%d): %d, revents [%04uXi, %04uXi]", + fds[0].fd, fds[1].fd, nevents, fds[0].revents, + fds[1].revents); + if ((fds[0].revents & POLLIN) != 0) { - return nxt_unit_port_recv(ctx, ctx_impl->read_port, rbuf); + res = nxt_unit_ctx_port_recv(ctx, ctx_impl->read_port, rbuf); + if (res == NXT_UNIT_AGAIN) { + goto retry; + } + + return res; } if ((fds[1].revents & POLLIN) != 0) { - return nxt_unit_port_recv(ctx, lib->shared_port, rbuf); + res = nxt_unit_shared_port_recv(ctx, lib->shared_port, rbuf); + if (res == NXT_UNIT_AGAIN) { + goto retry; + } + + return res; } - rbuf->size = -1; + nxt_unit_alert(ctx, "poll(%d,%d): %d unexpected revents [%04uXi, %04uXi]", + fds[0].fd, fds[1].fd, nevents, fds[0].revents, + fds[1].revents); return NXT_UNIT_ERROR; } @@ -4392,9 +4557,11 @@ nxt_unit_process_pending_rbuf(nxt_unit_ctx_t *ctx) static void nxt_unit_process_ready_req(nxt_unit_ctx_t *ctx) { + int res; nxt_queue_t ready_req; nxt_unit_impl_t *lib; nxt_unit_ctx_impl_t *ctx_impl; + nxt_unit_request_info_t *req; nxt_unit_request_info_impl_t *req_impl; nxt_queue_init(&ready_req); @@ -4419,7 +4586,35 @@ nxt_unit_process_ready_req(nxt_unit_ctx_t *ctx) { lib = nxt_container_of(ctx_impl->ctx.unit, nxt_unit_impl_t, unit); - (void) nxt_unit_send_req_headers_ack(&req_impl->req); + req = &req_impl->req; + + res = nxt_unit_send_req_headers_ack(req); + if (nxt_slow_path(res != NXT_UNIT_OK)) { + nxt_unit_request_done(req, NXT_UNIT_ERROR); + + continue; + } + + if (req->content_length + > (uint64_t) (req->content_buf->end - req->content_buf->free)) + { + res = nxt_unit_request_hash_add(ctx, req); + if (nxt_slow_path(res != NXT_UNIT_OK)) { + nxt_unit_req_warn(req, "failed to add request to hash"); + + nxt_unit_request_done(req, NXT_UNIT_ERROR); + + continue; + } + + /* + * If application have separate data handler, we may start + * request processing and process data when it is arrived. + */ + if (lib->callbacks.data_handler == NULL) { + continue; + } + } lib->callbacks.request_handler(&req_impl->req); @@ -4432,6 +4627,7 @@ nxt_unit_run_ctx(nxt_unit_ctx_t *ctx) { int rc; nxt_unit_impl_t *lib; + nxt_unit_read_buf_t *rbuf; nxt_unit_ctx_impl_t *ctx_impl; nxt_unit_ctx_use(ctx); @@ -4442,11 +4638,30 @@ nxt_unit_run_ctx(nxt_unit_ctx_t *ctx) rc = NXT_UNIT_OK; while (nxt_fast_path(lib->online)) { - rc = nxt_unit_process_port_msg_impl(ctx, ctx_impl->read_port); + rbuf = nxt_unit_read_buf_get(ctx); + if (nxt_slow_path(rbuf == NULL)) { + rc = NXT_UNIT_ERROR; + break; + } + retry: + + rc = nxt_unit_ctx_port_recv(ctx, ctx_impl->read_port, rbuf); + if (rc == NXT_UNIT_AGAIN) { + goto retry; + } + + rc = nxt_unit_process_msg(ctx, rbuf); + if (nxt_slow_path(rc == NXT_UNIT_ERROR)) { + break; + } + + rc = nxt_unit_process_pending_rbuf(ctx); if (nxt_slow_path(rc == NXT_UNIT_ERROR)) { break; } + + nxt_unit_process_ready_req(ctx); } nxt_unit_ctx_release(ctx); @@ -4455,11 +4670,68 @@ nxt_unit_run_ctx(nxt_unit_ctx_t *ctx) } +nxt_inline int +nxt_unit_is_read_queue(nxt_unit_read_buf_t *rbuf) +{ + nxt_port_msg_t *port_msg; + + if (nxt_fast_path(rbuf->size == (ssize_t) sizeof(nxt_port_msg_t))) { + port_msg = (nxt_port_msg_t *) rbuf->buf; + + return port_msg->type == _NXT_PORT_MSG_READ_QUEUE; + } + + return 0; +} + + +nxt_inline int +nxt_unit_is_read_socket(nxt_unit_read_buf_t *rbuf) +{ + if (nxt_fast_path(rbuf->size == 1)) { + return rbuf->buf[0] == _NXT_PORT_MSG_READ_SOCKET; + } + + return 0; +} + + +nxt_inline int +nxt_unit_is_shm_ack(nxt_unit_read_buf_t *rbuf) +{ + nxt_port_msg_t *port_msg; + + if (nxt_fast_path(rbuf->size == (ssize_t) sizeof(nxt_port_msg_t))) { + port_msg = (nxt_port_msg_t *) rbuf->buf; + + return port_msg->type == _NXT_PORT_MSG_SHM_ACK; + } + + return 0; +} + + +nxt_inline int +nxt_unit_is_quit(nxt_unit_read_buf_t *rbuf) +{ + nxt_port_msg_t *port_msg; + + if (nxt_fast_path(rbuf->size == (ssize_t) sizeof(nxt_port_msg_t))) { + port_msg = (nxt_port_msg_t *) rbuf->buf; + + return port_msg->type == _NXT_PORT_MSG_QUIT; + } + + return 0; +} + + int nxt_unit_run_shared(nxt_unit_ctx_t *ctx) { - int rc; - nxt_unit_impl_t *lib; + int rc; + nxt_unit_impl_t *lib; + nxt_unit_read_buf_t *rbuf; nxt_unit_ctx_use(ctx); @@ -4467,11 +4739,35 @@ nxt_unit_run_shared(nxt_unit_ctx_t *ctx) rc = NXT_UNIT_OK; while (nxt_fast_path(lib->online)) { - rc = nxt_unit_process_port_msg_impl(ctx, lib->shared_port); + rbuf = nxt_unit_read_buf_get(ctx); + if (nxt_slow_path(rbuf == NULL)) { + rc = NXT_UNIT_ERROR; + break; + } + + retry: + rc = nxt_unit_shared_port_recv(ctx, lib->shared_port, rbuf); + if (rc == NXT_UNIT_AGAIN) { + goto retry; + } + + if (nxt_slow_path(rc == NXT_UNIT_ERROR)) { + nxt_unit_read_buf_release(ctx, rbuf); + break; + } + + rc = nxt_unit_process_msg(ctx, rbuf); + if (nxt_slow_path(rc == NXT_UNIT_ERROR)) { + break; + } + + rc = nxt_unit_process_pending_rbuf(ctx); if (nxt_slow_path(rc == NXT_UNIT_ERROR)) { break; } + + nxt_unit_process_ready_req(ctx); } nxt_unit_ctx_release(ctx); @@ -4499,6 +4795,7 @@ static int nxt_unit_process_port_msg_impl(nxt_unit_ctx_t *ctx, nxt_unit_port_t *port) { int rc; + nxt_unit_impl_t *lib; nxt_unit_read_buf_t *rbuf; rbuf = nxt_unit_read_buf_get(ctx); @@ -4506,10 +4803,18 @@ nxt_unit_process_port_msg_impl(nxt_unit_ctx_t *ctx, nxt_unit_port_t *port) return NXT_UNIT_ERROR; } - memset(rbuf->oob, 0, sizeof(struct cmsghdr)); + lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit); - rc = nxt_unit_port_recv(ctx, port, rbuf); - if (nxt_slow_path(rc != NXT_UNIT_OK)) { +retry: + + if (port == lib->shared_port) { + rc = nxt_unit_shared_port_recv(ctx, port, rbuf); + + } else { + rc = nxt_unit_ctx_port_recv(ctx, port, rbuf); + } + + if (rc != NXT_UNIT_OK) { nxt_unit_read_buf_release(ctx, rbuf); return rc; } @@ -4526,6 +4831,15 @@ nxt_unit_process_port_msg_impl(nxt_unit_ctx_t *ctx, nxt_unit_port_t *port) nxt_unit_process_ready_req(ctx); + rbuf = nxt_unit_read_buf_get(ctx); + if (nxt_slow_path(rbuf == NULL)) { + return NXT_UNIT_ERROR; + } + + if (lib->online) { + goto retry; + } + return rc; } @@ -4540,10 +4854,12 @@ nxt_unit_done(nxt_unit_ctx_t *ctx) nxt_unit_ctx_t * nxt_unit_ctx_alloc(nxt_unit_ctx_t *ctx, void *data) { - int rc; - nxt_unit_impl_t *lib; - nxt_unit_port_t *port; - nxt_unit_ctx_impl_t *new_ctx; + int rc, queue_fd; + void *mem; + nxt_unit_impl_t *lib; + nxt_unit_port_t *port; + nxt_unit_ctx_impl_t *new_ctx; + nxt_unit_port_impl_t *port_impl; lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit); @@ -4554,33 +4870,57 @@ nxt_unit_ctx_alloc(nxt_unit_ctx_t *ctx, void *data) return NULL; } + rc = nxt_unit_ctx_init(lib, new_ctx, data); + if (nxt_slow_path(rc != NXT_UNIT_OK)) { + free(new_ctx); + + return NULL; + } + + queue_fd = -1; + port = nxt_unit_create_port(ctx); if (nxt_slow_path(port == NULL)) { - free(new_ctx); + goto fail; + } - return NULL; + new_ctx->read_port = port; + + queue_fd = nxt_unit_shm_open(ctx, sizeof(nxt_port_queue_t)); + if (nxt_slow_path(queue_fd == -1)) { + goto fail; } - rc = nxt_unit_send_port(ctx, lib->router_port, port); - if (nxt_slow_path(rc != NXT_UNIT_OK)) { + mem = mmap(NULL, sizeof(nxt_port_queue_t), + PROT_READ | PROT_WRITE, MAP_SHARED, queue_fd, 0); + if (nxt_slow_path(mem == MAP_FAILED)) { + nxt_unit_alert(ctx, "mmap(%d) failed: %s (%d)", queue_fd, + strerror(errno), errno); + goto fail; } - rc = nxt_unit_ctx_init(lib, new_ctx, data); + nxt_port_queue_init(mem); + + port_impl = nxt_container_of(port, nxt_unit_port_impl_t, port); + port_impl->queue = mem; + + rc = nxt_unit_send_port(ctx, lib->router_port, port, queue_fd); if (nxt_slow_path(rc != NXT_UNIT_OK)) { goto fail; } - new_ctx->read_port = port; + close(queue_fd); return &new_ctx->ctx; fail: - nxt_unit_remove_port(lib, &port->id); - nxt_unit_port_release(port); + if (queue_fd != -1) { + close(queue_fd); + } - free(new_ctx); + nxt_unit_ctx_release(&new_ctx->ctx); return NULL; } @@ -4633,6 +4973,7 @@ nxt_unit_ctx_free(nxt_unit_ctx_impl_t *ctx_impl) nxt_queue_remove(&ctx_impl->link); if (nxt_fast_path(ctx_impl->read_port != NULL)) { + nxt_unit_remove_port(lib, &ctx_impl->read_port->id); nxt_unit_port_release(ctx_impl->read_port); } @@ -4709,10 +5050,8 @@ nxt_unit_create_port(nxt_unit_ctx_t *ctx) nxt_unit_process_release(process); - port = nxt_unit_add_port(ctx, &new_port); + port = nxt_unit_add_port(ctx, &new_port, NULL); if (nxt_slow_path(port == NULL)) { - nxt_unit_alert(ctx, "create_port: add_port() failed"); - close(port_sockets[0]); close(port_sockets[1]); } @@ -4723,10 +5062,11 @@ nxt_unit_create_port(nxt_unit_ctx_t *ctx) static int nxt_unit_send_port(nxt_unit_ctx_t *ctx, nxt_unit_port_t *dst, - nxt_unit_port_t *port) + nxt_unit_port_t *port, int queue_fd) { ssize_t res; nxt_unit_impl_t *lib; + int fds[2] = { port->out_fd, queue_fd }; struct { nxt_port_msg_t msg; @@ -4735,7 +5075,7 @@ nxt_unit_send_port(nxt_unit_ctx_t *ctx, nxt_unit_port_t *dst, union { struct cmsghdr cm; - char space[CMSG_SPACE(sizeof(int))]; + char space[CMSG_SPACE(sizeof(int) * 2)]; } cmsg; lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit); @@ -4758,7 +5098,7 @@ nxt_unit_send_port(nxt_unit_ctx_t *ctx, nxt_unit_port_t *dst, memset(&cmsg, 0, sizeof(cmsg)); - cmsg.cm.cmsg_len = CMSG_LEN(sizeof(int)); + cmsg.cm.cmsg_len = CMSG_LEN(sizeof(int) * 2); cmsg.cm.cmsg_level = SOL_SOCKET; cmsg.cm.cmsg_type = SCM_RIGHTS; @@ -4771,7 +5111,7 @@ nxt_unit_send_port(nxt_unit_ctx_t *ctx, nxt_unit_port_t *dst, * Fortunately, GCC with -O1 compiles this nxt_memcpy() * in the same simple assignment as in the code above. */ - memcpy(CMSG_DATA(&cmsg.cm), &port->out_fd, sizeof(int)); + memcpy(CMSG_DATA(&cmsg.cm), fds, sizeof(int) * 2); res = nxt_unit_port_send(ctx, dst, &m, sizeof(m), &cmsg, sizeof(cmsg)); @@ -4799,7 +5139,7 @@ nxt_inline void nxt_unit_port_release(nxt_unit_port_t *port) c = nxt_atomic_fetch_add(&port_impl->use_count, -1); if (c == 1) { - nxt_unit_debug(NULL, "destroy port %d,%d", + nxt_unit_debug(NULL, "destroy port{%d,%d}", (int) port->id.pid, (int) port->id.id); nxt_unit_process_release(port_impl->process); @@ -4816,13 +5156,31 @@ nxt_inline void nxt_unit_port_release(nxt_unit_port_t *port) port->out_fd = -1; } + if (port->in_fd != -1) { + close(port->in_fd); + + port->in_fd = -1; + } + + if (port->out_fd != -1) { + close(port->out_fd); + + port->out_fd = -1; + } + + if (port_impl->queue != NULL) { + munmap(port_impl->queue, (port->id.id == (nxt_port_id_t) -1) + ? sizeof(nxt_app_queue_t) + : sizeof(nxt_port_queue_t)); + } + free(port_impl); } } static nxt_unit_port_t * -nxt_unit_add_port(nxt_unit_ctx_t *ctx, nxt_unit_port_t *port) +nxt_unit_add_port(nxt_unit_ctx_t *ctx, nxt_unit_port_t *port, void *queue) { int rc; nxt_queue_t awaiting_req; @@ -4840,9 +5198,10 @@ nxt_unit_add_port(nxt_unit_ctx_t *ctx, nxt_unit_port_t *port) old_port = nxt_unit_port_hash_find(&lib->ports, &port->id, 0); if (nxt_slow_path(old_port != NULL)) { - nxt_unit_debug(ctx, "add_port: duplicate %d,%d in_fd %d out_fd %d", - port->id.pid, port->id.id, - port->in_fd, port->out_fd); + nxt_unit_debug(ctx, "add_port: duplicate port{%d,%d} " + "in_fd %d out_fd %d queue %p", + port->id.pid, port->id.id, + port->in_fd, port->out_fd, queue); if (old_port->data == NULL) { old_port->data = port->data; @@ -4875,6 +5234,10 @@ nxt_unit_add_port(nxt_unit_ctx_t *ctx, nxt_unit_port_t *port) old_port_impl = nxt_container_of(old_port, nxt_unit_port_impl_t, port); + if (old_port_impl->queue == NULL) { + old_port_impl->queue = queue; + } + if (!nxt_queue_is_empty(&old_port_impl->awaiting_req)) { nxt_queue_add(&awaiting_req, &old_port_impl->awaiting_req); nxt_queue_init(&old_port_impl->awaiting_req); @@ -4914,9 +5277,9 @@ nxt_unit_add_port(nxt_unit_ctx_t *ctx, nxt_unit_port_t *port) new_port = NULL; - nxt_unit_debug(ctx, "add_port: %d,%d in_fd %d out_fd %d", + nxt_unit_debug(ctx, "add_port: port{%d,%d} in_fd %d out_fd %d queue %p", port->id.pid, port->id.id, - port->in_fd, port->out_fd); + port->in_fd, port->out_fd, queue); process = nxt_unit_process_get(lib, port->id.pid); if (nxt_slow_path(process == NULL)) { @@ -4929,6 +5292,9 @@ nxt_unit_add_port(nxt_unit_ctx_t *ctx, nxt_unit_port_t *port) new_port = malloc(sizeof(nxt_unit_port_impl_t)); if (nxt_slow_path(new_port == NULL)) { + nxt_unit_alert(ctx, "add_port: %d,%d malloc() failed", + port->id.pid, port->id.id); + goto unlock; } @@ -4951,6 +5317,9 @@ nxt_unit_add_port(nxt_unit_ctx_t *ctx, nxt_unit_port_t *port) new_port->use_count = 2; new_port->process = process; new_port->ready = (port->in_fd != -1 || port->out_fd != -1); + new_port->queue = queue; + new_port->from_socket = 0; + new_port->socket_rbuf = NULL; nxt_queue_init(&new_port->awaiting_req); @@ -5010,13 +5379,13 @@ nxt_unit_remove_port_unsafe(nxt_unit_impl_t *lib, nxt_unit_port_id_t *port_id) port = nxt_unit_port_hash_find(&lib->ports, port_id, 1); if (nxt_slow_path(port == NULL)) { - nxt_unit_debug(NULL, "remove_port: port %d,%d not found", + nxt_unit_debug(NULL, "remove_port: port{%d,%d} not found", (int) port_id->pid, (int) port_id->id); return NULL; } - nxt_unit_debug(NULL, "remove_port: port %d,%d, fds %d,%d, data %p", + nxt_unit_debug(NULL, "remove_port: port{%d,%d}, fds %d,%d, data %p", (int) port_id->pid, (int) port_id->id, port->in_fd, port->out_fd, port->data); @@ -5089,10 +5458,12 @@ nxt_unit_quit(nxt_unit_ctx_t *ctx) lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit); - lib->online = 0; + if (lib->online) { + lib->online = 0; - if (lib->callbacks.quit != NULL) { - lib->callbacks.quit(ctx); + if (lib->callbacks.quit != NULL) { + lib->callbacks.quit(ctx); + } } } @@ -5137,20 +5508,91 @@ static ssize_t nxt_unit_port_send(nxt_unit_ctx_t *ctx, nxt_unit_port_t *port, const void *buf, size_t buf_size, const void *oob, size_t oob_size) { - nxt_unit_impl_t *lib; - - nxt_unit_debug(ctx, "port_send: port %d,%d fd %d", - (int) port->id.pid, (int) port->id.id, port->out_fd); + int notify; + ssize_t ret; + nxt_int_t rc; + nxt_port_msg_t msg; + nxt_unit_impl_t *lib; + nxt_unit_port_impl_t *port_impl; lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit); + port_impl = nxt_container_of(port, nxt_unit_port_impl_t, port); + if (port_impl->queue != NULL && oob_size == 0 + && buf_size <= NXT_PORT_QUEUE_MSG_SIZE) + { + rc = nxt_port_queue_send(port_impl->queue, buf, buf_size, ¬ify); + if (nxt_slow_path(rc != NXT_OK)) { + nxt_unit_alert(ctx, "port_send: port %d,%d queue overflow", + (int) port->id.pid, (int) port->id.id); + + return -1; + } + + nxt_unit_debug(ctx, "port{%d,%d} enqueue %d notify %d", + (int) port->id.pid, (int) port->id.id, + (int) buf_size, notify); + + if (notify) { + memcpy(&msg, buf, sizeof(nxt_port_msg_t)); + + msg.type = _NXT_PORT_MSG_READ_QUEUE; + + if (lib->callbacks.port_send == NULL) { + ret = nxt_unit_sendmsg(ctx, port->out_fd, &msg, + sizeof(nxt_port_msg_t), NULL, 0); + + nxt_unit_debug(ctx, "port{%d,%d} send %d read_queue", + (int) port->id.pid, (int) port->id.id, + (int) ret); + + } else { + ret = lib->callbacks.port_send(ctx, port, &msg, + sizeof(nxt_port_msg_t), NULL, 0); + + nxt_unit_debug(ctx, "port{%d,%d} sendcb %d read_queue", + (int) port->id.pid, (int) port->id.id, + (int) ret); + } + + } + + return buf_size; + } + + if (port_impl->queue != NULL) { + msg.type = _NXT_PORT_MSG_READ_SOCKET; + + rc = nxt_port_queue_send(port_impl->queue, &msg.type, 1, ¬ify); + if (nxt_slow_path(rc != NXT_OK)) { + nxt_unit_alert(ctx, "port_send: port %d,%d queue overflow", + (int) port->id.pid, (int) port->id.id); + + return -1; + } + + nxt_unit_debug(ctx, "port{%d,%d} enqueue 1 read_socket notify %d", + (int) port->id.pid, (int) port->id.id, notify); + } + if (lib->callbacks.port_send != NULL) { - return lib->callbacks.port_send(ctx, port, buf, buf_size, - oob, oob_size); + ret = lib->callbacks.port_send(ctx, port, buf, buf_size, + oob, oob_size); + + nxt_unit_debug(ctx, "port{%d,%d} sendcb %d", + (int) port->id.pid, (int) port->id.id, + (int) ret); + + } else { + ret = nxt_unit_sendmsg(ctx, port->out_fd, buf, buf_size, + oob, oob_size); + + nxt_unit_debug(ctx, "port{%d,%d} sendmsg %d", + (int) port->id.pid, (int) port->id.id, + (int) ret); } - return nxt_unit_sendmsg(ctx, port->out_fd, buf, buf_size, - oob, oob_size); + return ret; } @@ -5158,6 +5600,7 @@ static ssize_t nxt_unit_sendmsg(nxt_unit_ctx_t *ctx, int fd, const void *buf, size_t buf_size, const void *oob, size_t oob_size) { + int err; ssize_t res; struct iovec iov[1]; struct msghdr msg; @@ -5178,7 +5621,9 @@ retry: res = sendmsg(fd, &msg, 0); if (nxt_slow_path(res == -1)) { - if (errno == EINTR) { + err = errno; + + if (err == EINTR) { goto retry; } @@ -5187,7 +5632,7 @@ retry: * implementation. */ nxt_unit_warn(ctx, "sendmsg(%d, %d) failed: %s (%d)", - fd, (int) buf_size, strerror(errno), errno); + fd, (int) buf_size, strerror(err), err); } else { nxt_unit_debug(ctx, "sendmsg(%d, %d): %d", fd, (int) buf_size, @@ -5199,6 +5644,158 @@ retry: static int +nxt_unit_ctx_port_recv(nxt_unit_ctx_t *ctx, nxt_unit_port_t *port, + nxt_unit_read_buf_t *rbuf) +{ + int res, read; + nxt_unit_port_impl_t *port_impl; + + port_impl = nxt_container_of(port, nxt_unit_port_impl_t, port); + + read = 0; + +retry: + + if (port_impl->from_socket > 0) { + if (port_impl->socket_rbuf != NULL + && port_impl->socket_rbuf->size > 0) + { + port_impl->from_socket--; + + nxt_unit_rbuf_cpy(rbuf, port_impl->socket_rbuf); + port_impl->socket_rbuf->size = 0; + + nxt_unit_debug(ctx, "port{%d,%d} use suspended message %d", + (int) port->id.pid, (int) port->id.id, + (int) rbuf->size); + + return NXT_UNIT_OK; + } + + } else { + res = nxt_unit_port_queue_recv(port, rbuf); + + if (res == NXT_UNIT_OK) { + if (nxt_unit_is_read_socket(rbuf)) { + port_impl->from_socket++; + + nxt_unit_debug(ctx, "port{%d,%d} dequeue 1 read_socket %d", + (int) port->id.pid, (int) port->id.id, + port_impl->from_socket); + + goto retry; + } + + nxt_unit_debug(ctx, "port{%d,%d} dequeue %d", + (int) port->id.pid, (int) port->id.id, + (int) rbuf->size); + + return NXT_UNIT_OK; + } + } + + if (read) { + return NXT_UNIT_AGAIN; + } + + res = nxt_unit_port_recv(ctx, port, rbuf); + if (nxt_slow_path(res == NXT_UNIT_ERROR)) { + return NXT_UNIT_ERROR; + } + + read = 1; + + if (nxt_unit_is_read_queue(rbuf)) { + nxt_unit_debug(ctx, "port{%d,%d} recv %d read_queue", + (int) port->id.pid, (int) port->id.id, (int) rbuf->size); + + if (port_impl->from_socket) { + nxt_unit_warn(ctx, "port protocol warning: READ_QUEUE after READ_SOCKET"); + } + + goto retry; + } + + nxt_unit_debug(ctx, "port{%d,%d} recvmsg %d", + (int) port->id.pid, (int) port->id.id, + (int) rbuf->size); + + if (res == NXT_UNIT_AGAIN) { + return NXT_UNIT_AGAIN; + } + + if (port_impl->from_socket > 0) { + port_impl->from_socket--; + + return NXT_UNIT_OK; + } + + nxt_unit_debug(ctx, "port{%d,%d} suspend message %d", + (int) port->id.pid, (int) port->id.id, + (int) rbuf->size); + + if (port_impl->socket_rbuf == NULL) { + port_impl->socket_rbuf = nxt_unit_read_buf_get(ctx); + + if (nxt_slow_path(port_impl->socket_rbuf == NULL)) { + return NXT_UNIT_ERROR; + } + + port_impl->socket_rbuf->size = 0; + } + + if (port_impl->socket_rbuf->size > 0) { + nxt_unit_alert(ctx, "too many port socket messages"); + + return NXT_UNIT_ERROR; + } + + nxt_unit_rbuf_cpy(port_impl->socket_rbuf, rbuf); + + memset(rbuf->oob, 0, sizeof(struct cmsghdr)); + + goto retry; +} + + +nxt_inline void +nxt_unit_rbuf_cpy(nxt_unit_read_buf_t *dst, nxt_unit_read_buf_t *src) +{ + memcpy(dst->buf, src->buf, src->size); + dst->size = src->size; + memcpy(dst->oob, src->oob, sizeof(src->oob)); +} + + +static int +nxt_unit_shared_port_recv(nxt_unit_ctx_t *ctx, nxt_unit_port_t *port, + nxt_unit_read_buf_t *rbuf) +{ + int res; + +retry: + + res = nxt_unit_app_queue_recv(port, rbuf); + + if (res == NXT_UNIT_AGAIN) { + res = nxt_unit_port_recv(ctx, port, rbuf); + if (nxt_slow_path(res == NXT_UNIT_ERROR)) { + return NXT_UNIT_ERROR; + } + + if (nxt_unit_is_read_queue(rbuf)) { + nxt_unit_debug(ctx, "port{%d,%d} recv %d read_queue", + (int) port->id.pid, (int) port->id.id, (int) rbuf->size); + + goto retry; + } + } + + return res; +} + + +static int nxt_unit_port_recv(nxt_unit_ctx_t *ctx, nxt_unit_port_t *port, nxt_unit_read_buf_t *rbuf) { @@ -5214,6 +5811,9 @@ nxt_unit_port_recv(nxt_unit_ctx_t *ctx, nxt_unit_port_t *port, rbuf->buf, sizeof(rbuf->buf), rbuf->oob, sizeof(rbuf->oob)); + nxt_unit_debug(ctx, "port{%d,%d} recvcb %d", + (int) port->id.pid, (int) port->id.id, (int) rbuf->size); + if (nxt_slow_path(rbuf->size < 0)) { return NXT_UNIT_ERROR; } @@ -5247,13 +5847,13 @@ retry: if (err == EAGAIN) { nxt_unit_debug(ctx, "recvmsg(%d) failed: %s (%d)", - fd, strerror(errno), errno); + fd, strerror(err), err); return NXT_UNIT_AGAIN; } nxt_unit_alert(ctx, "recvmsg(%d) failed: %s (%d)", - fd, strerror(errno), errno); + fd, strerror(err), err); return NXT_UNIT_ERROR; } @@ -5264,6 +5864,52 @@ retry: } +static int +nxt_unit_port_queue_recv(nxt_unit_port_t *port, nxt_unit_read_buf_t *rbuf) +{ + nxt_unit_port_impl_t *port_impl; + + port_impl = nxt_container_of(port, nxt_unit_port_impl_t, port); + + rbuf->size = nxt_port_queue_recv(port_impl->queue, rbuf->buf); + + return (rbuf->size == -1) ? NXT_UNIT_AGAIN : NXT_UNIT_OK; +} + + +static int +nxt_unit_app_queue_recv(nxt_unit_port_t *port, nxt_unit_read_buf_t *rbuf) +{ + uint32_t cookie; + nxt_port_msg_t *port_msg; + nxt_app_queue_t *queue; + nxt_unit_port_impl_t *port_impl; + + port_impl = nxt_container_of(port, nxt_unit_port_impl_t, port); + queue = port_impl->queue; + +retry: + + rbuf->size = nxt_app_queue_recv(queue, rbuf->buf, &cookie); + + nxt_unit_debug(NULL, "app_queue_recv: %d", (int) rbuf->size); + + if (rbuf->size >= (ssize_t) sizeof(nxt_port_msg_t)) { + port_msg = (nxt_port_msg_t *) rbuf->buf; + + if (nxt_app_queue_cancel(queue, cookie, port_msg->stream)) { + return NXT_UNIT_OK; + } + + nxt_unit_debug(NULL, "app_queue_recv: message cancelled"); + + goto retry; + } + + return (rbuf->size == -1) ? NXT_UNIT_AGAIN : NXT_UNIT_OK; +} + + static nxt_int_t nxt_unit_port_hash_test(nxt_lvlhsh_query_t *lhq, void *data) { @@ -5392,12 +6038,19 @@ static const nxt_lvlhsh_proto_t lvlhsh_requests_proto nxt_aligned(64) = { static int -nxt_unit_request_hash_add(nxt_lvlhsh_t *request_hash, - nxt_unit_request_info_impl_t *req_impl) +nxt_unit_request_hash_add(nxt_unit_ctx_t *ctx, + nxt_unit_request_info_t *req) { - uint32_t *stream; - nxt_int_t res; - nxt_lvlhsh_query_t lhq; + uint32_t *stream; + nxt_int_t res; + nxt_lvlhsh_query_t lhq; + nxt_unit_ctx_impl_t *ctx_impl; + nxt_unit_request_info_impl_t *req_impl; + + req_impl = nxt_container_of(req, nxt_unit_request_info_impl_t, req); + if (req_impl->in_hash) { + return NXT_UNIT_OK; + } stream = &req_impl->stream; @@ -5409,11 +6062,18 @@ nxt_unit_request_hash_add(nxt_lvlhsh_t *request_hash, lhq.replace = 0; lhq.value = req_impl; - res = nxt_lvlhsh_insert(request_hash, &lhq); + ctx_impl = nxt_container_of(ctx, nxt_unit_ctx_impl_t, ctx); + + pthread_mutex_lock(&ctx_impl->mutex); + + res = nxt_lvlhsh_insert(&ctx_impl->requests, &lhq); + + pthread_mutex_unlock(&ctx_impl->mutex); switch (res) { case NXT_OK: + req_impl->in_hash = 1; return NXT_UNIT_OK; default: @@ -5422,12 +6082,13 @@ nxt_unit_request_hash_add(nxt_lvlhsh_t *request_hash, } -static nxt_unit_request_info_impl_t * -nxt_unit_request_hash_find(nxt_lvlhsh_t *request_hash, uint32_t stream, - int remove) +static nxt_unit_request_info_t * +nxt_unit_request_hash_find(nxt_unit_ctx_t *ctx, uint32_t stream, int remove) { - nxt_int_t res; - nxt_lvlhsh_query_t lhq; + nxt_int_t res; + nxt_lvlhsh_query_t lhq; + nxt_unit_ctx_impl_t *ctx_impl; + nxt_unit_request_info_impl_t *req_impl; lhq.key_hash = nxt_murmur_hash2(&stream, sizeof(stream)); lhq.key.length = sizeof(stream); @@ -5435,16 +6096,26 @@ nxt_unit_request_hash_find(nxt_lvlhsh_t *request_hash, uint32_t stream, lhq.proto = &lvlhsh_requests_proto; lhq.pool = NULL; + ctx_impl = nxt_container_of(ctx, nxt_unit_ctx_impl_t, ctx); + + pthread_mutex_lock(&ctx_impl->mutex); + if (remove) { - res = nxt_lvlhsh_delete(request_hash, &lhq); + res = nxt_lvlhsh_delete(&ctx_impl->requests, &lhq); } else { - res = nxt_lvlhsh_find(request_hash, &lhq); + res = nxt_lvlhsh_find(&ctx_impl->requests, &lhq); } + pthread_mutex_unlock(&ctx_impl->mutex); + switch (res) { case NXT_OK: + req_impl = nxt_container_of(lhq.value, nxt_unit_request_info_impl_t, + req); + req_impl->in_hash = 0; + return lhq.value; default: |