diff options
Diffstat (limited to '')
-rw-r--r-- | src/nxt_unit.c | 3179 |
1 files changed, 2247 insertions, 932 deletions
diff --git a/src/nxt_unit.c b/src/nxt_unit.c index 9f6eab95..6b7d631d 100644 --- a/src/nxt_unit.c +++ b/src/nxt_unit.c @@ -7,6 +7,8 @@ #include "nxt_main.h" #include "nxt_port_memory_int.h" +#include "nxt_port_queue.h" +#include "nxt_app_queue.h" #include "nxt_unit.h" #include "nxt_unit_request.h" @@ -38,20 +40,30 @@ typedef struct nxt_unit_websocket_frame_impl_s nxt_unit_websocket_frame_impl_t; static nxt_unit_impl_t *nxt_unit_create(nxt_unit_init_t *init); static int nxt_unit_ctx_init(nxt_unit_impl_t *lib, nxt_unit_ctx_impl_t *ctx_impl, void *data); +nxt_inline void nxt_unit_ctx_use(nxt_unit_ctx_t *ctx); +nxt_inline void nxt_unit_ctx_release(nxt_unit_ctx_t *ctx); +nxt_inline void nxt_unit_lib_use(nxt_unit_impl_t *lib); +nxt_inline void nxt_unit_lib_release(nxt_unit_impl_t *lib); nxt_inline void nxt_unit_mmap_buf_insert(nxt_unit_mmap_buf_t **head, nxt_unit_mmap_buf_t *mmap_buf); nxt_inline void nxt_unit_mmap_buf_insert_tail(nxt_unit_mmap_buf_t **prev, nxt_unit_mmap_buf_t *mmap_buf); nxt_inline void nxt_unit_mmap_buf_unlink(nxt_unit_mmap_buf_t *mmap_buf); static int nxt_unit_read_env(nxt_unit_port_t *ready_port, - nxt_unit_port_t *read_port, int *log_fd, uint32_t *stream, - uint32_t *shm_limit); -static int nxt_unit_ready(nxt_unit_ctx_t *ctx, nxt_unit_port_id_t *port_id, - uint32_t stream); + nxt_unit_port_t *router_port, nxt_unit_port_t *read_port, + int *log_fd, uint32_t *stream, uint32_t *shm_limit); +static int nxt_unit_ready(nxt_unit_ctx_t *ctx, int ready_fd, uint32_t stream, + int queue_fd); +static int nxt_unit_process_msg(nxt_unit_ctx_t *ctx, nxt_unit_read_buf_t *rbuf); static int nxt_unit_process_new_port(nxt_unit_ctx_t *ctx, nxt_unit_recv_msg_t *recv_msg); static int nxt_unit_process_req_headers(nxt_unit_ctx_t *ctx, nxt_unit_recv_msg_t *recv_msg); +static int nxt_unit_process_req_body(nxt_unit_ctx_t *ctx, + nxt_unit_recv_msg_t *recv_msg); +static int nxt_unit_request_check_response_port(nxt_unit_request_info_t *req, + nxt_unit_port_id_t *port_id); +static int nxt_unit_send_req_headers_ack(nxt_unit_request_info_t *req); static int nxt_unit_process_websocket(nxt_unit_ctx_t *ctx, nxt_unit_recv_msg_t *recv_msg); static int nxt_unit_process_shm_ack(nxt_unit_ctx_t *ctx); @@ -63,11 +75,9 @@ static nxt_unit_websocket_frame_impl_t *nxt_unit_websocket_frame_get( nxt_unit_ctx_t *ctx); static void nxt_unit_websocket_frame_release(nxt_unit_websocket_frame_t *ws); static void nxt_unit_websocket_frame_free(nxt_unit_websocket_frame_impl_t *ws); -static nxt_unit_process_t *nxt_unit_msg_get_process(nxt_unit_ctx_t *ctx, - nxt_unit_recv_msg_t *recv_msg); static nxt_unit_mmap_buf_t *nxt_unit_mmap_buf_get(nxt_unit_ctx_t *ctx); static void nxt_unit_mmap_buf_release(nxt_unit_mmap_buf_t *mmap_buf); -static int nxt_unit_mmap_buf_send(nxt_unit_ctx_t *ctx, uint32_t stream, +static int nxt_unit_mmap_buf_send(nxt_unit_request_info_t *req, nxt_unit_mmap_buf_t *mmap_buf, int last); static void nxt_unit_mmap_buf_free(nxt_unit_mmap_buf_t *mmap_buf); static void nxt_unit_free_outgoing_buf(nxt_unit_mmap_buf_t *mmap_buf); @@ -81,70 +91,97 @@ static nxt_unit_mmap_buf_t *nxt_unit_request_preread( static ssize_t nxt_unit_buf_read(nxt_unit_buf_t **b, uint64_t *len, void *dst, size_t size); static nxt_port_mmap_header_t *nxt_unit_mmap_get(nxt_unit_ctx_t *ctx, - nxt_unit_process_t *process, nxt_unit_port_id_t *port_id, - nxt_chunk_id_t *c, int *n, int min_n); -static int nxt_unit_send_oosm(nxt_unit_ctx_t *ctx, nxt_unit_port_id_t *port_id); + nxt_unit_port_t *port, nxt_chunk_id_t *c, int *n, int min_n); +static int nxt_unit_send_oosm(nxt_unit_ctx_t *ctx, nxt_unit_port_t *port); static int nxt_unit_wait_shm_ack(nxt_unit_ctx_t *ctx); static nxt_unit_mmap_t *nxt_unit_mmap_at(nxt_unit_mmaps_t *mmaps, uint32_t i); static nxt_port_mmap_header_t *nxt_unit_new_mmap(nxt_unit_ctx_t *ctx, - nxt_unit_process_t *process, nxt_unit_port_id_t *port_id, int n); -static int nxt_unit_send_mmap(nxt_unit_ctx_t *ctx, nxt_unit_port_id_t *port_id, + nxt_unit_port_t *port, int n); +static int nxt_unit_shm_open(nxt_unit_ctx_t *ctx, size_t size); +static int nxt_unit_send_mmap(nxt_unit_ctx_t *ctx, nxt_unit_port_t *port, int fd); static int nxt_unit_get_outgoing_buf(nxt_unit_ctx_t *ctx, - nxt_unit_process_t *process, nxt_unit_port_id_t *port_id, uint32_t size, + nxt_unit_port_t *port, uint32_t size, uint32_t min_size, nxt_unit_mmap_buf_t *mmap_buf, char *local_buf); static int nxt_unit_incoming_mmap(nxt_unit_ctx_t *ctx, pid_t pid, int fd); static void nxt_unit_mmaps_init(nxt_unit_mmaps_t *mmaps); -static void nxt_unit_process_use(nxt_unit_ctx_t *ctx, - nxt_unit_process_t *process, int i); +nxt_inline void nxt_unit_process_use(nxt_unit_process_t *process); +nxt_inline void nxt_unit_process_release(nxt_unit_process_t *process); static void nxt_unit_mmaps_destroy(nxt_unit_mmaps_t *mmaps); -static nxt_port_mmap_header_t *nxt_unit_get_incoming_mmap(nxt_unit_ctx_t *ctx, - nxt_unit_process_t *process, uint32_t id); -static int nxt_unit_tracking_read(nxt_unit_ctx_t *ctx, - nxt_unit_recv_msg_t *recv_msg); +static int nxt_unit_check_rbuf_mmap(nxt_unit_ctx_t *ctx, + nxt_unit_mmaps_t *mmaps, pid_t pid, uint32_t id, + nxt_port_mmap_header_t **hdr, nxt_unit_read_buf_t *rbuf); static int nxt_unit_mmap_read(nxt_unit_ctx_t *ctx, - nxt_unit_recv_msg_t *recv_msg); + nxt_unit_recv_msg_t *recv_msg, nxt_unit_read_buf_t *rbuf); +static int nxt_unit_get_mmap(nxt_unit_ctx_t *ctx, pid_t pid, uint32_t id); static void nxt_unit_mmap_release(nxt_unit_ctx_t *ctx, - nxt_unit_process_t *process, nxt_port_mmap_header_t *hdr, void *start, uint32_t size); static int nxt_unit_send_shm_ack(nxt_unit_ctx_t *ctx, pid_t pid); -static nxt_unit_process_t *nxt_unit_process_get(nxt_unit_ctx_t *ctx, +static nxt_unit_process_t *nxt_unit_process_get(nxt_unit_impl_t *lib, pid_t pid); -static nxt_unit_process_t *nxt_unit_process_find(nxt_unit_ctx_t *ctx, +static nxt_unit_process_t *nxt_unit_process_find(nxt_unit_impl_t *lib, pid_t pid, int remove); static nxt_unit_process_t *nxt_unit_process_pop_first(nxt_unit_impl_t *lib); -static void nxt_unit_read_buf(nxt_unit_ctx_t *ctx, - nxt_unit_read_buf_t *rbuf); -static int nxt_unit_create_port(nxt_unit_ctx_t *ctx, - nxt_unit_port_id_t *port_id, int *fd); - -static int nxt_unit_send_port(nxt_unit_ctx_t *ctx, nxt_unit_port_id_t *dst, - nxt_unit_port_id_t *new_port, int fd); - -static void nxt_unit_remove_port_unsafe(nxt_unit_ctx_t *ctx, - nxt_unit_port_id_t *port_id, nxt_unit_port_t *r_port, - nxt_unit_process_t **process); -static void nxt_unit_remove_process(nxt_unit_ctx_t *ctx, +static int nxt_unit_run_once_impl(nxt_unit_ctx_t *ctx); +static int nxt_unit_read_buf(nxt_unit_ctx_t *ctx, nxt_unit_read_buf_t *rbuf); +static int nxt_unit_process_pending_rbuf(nxt_unit_ctx_t *ctx); +static void nxt_unit_process_ready_req(nxt_unit_ctx_t *ctx); +nxt_inline int nxt_unit_is_read_queue(nxt_unit_read_buf_t *rbuf); +nxt_inline int nxt_unit_is_read_socket(nxt_unit_read_buf_t *rbuf); +nxt_inline int nxt_unit_is_shm_ack(nxt_unit_read_buf_t *rbuf); +nxt_inline int nxt_unit_is_quit(nxt_unit_read_buf_t *rbuf); +static int nxt_unit_process_port_msg_impl(nxt_unit_ctx_t *ctx, + nxt_unit_port_t *port); +static void nxt_unit_ctx_free(nxt_unit_ctx_impl_t *ctx_impl); +static nxt_unit_port_t *nxt_unit_create_port(nxt_unit_ctx_t *ctx); + +static int nxt_unit_send_port(nxt_unit_ctx_t *ctx, nxt_unit_port_t *dst, + nxt_unit_port_t *port, int queue_fd); + +nxt_inline void nxt_unit_port_use(nxt_unit_port_t *port); +nxt_inline void nxt_unit_port_release(nxt_unit_port_t *port); +static nxt_unit_port_t *nxt_unit_add_port(nxt_unit_ctx_t *ctx, + nxt_unit_port_t *port, void *queue); +static void nxt_unit_remove_port(nxt_unit_impl_t *lib, + nxt_unit_port_id_t *port_id); +static nxt_unit_port_t *nxt_unit_remove_port_unsafe(nxt_unit_impl_t *lib, + nxt_unit_port_id_t *port_id); +static void nxt_unit_remove_pid(nxt_unit_impl_t *lib, pid_t pid); +static void nxt_unit_remove_process(nxt_unit_impl_t *lib, nxt_unit_process_t *process); - -static ssize_t nxt_unit_port_send_default(nxt_unit_ctx_t *ctx, - nxt_unit_port_id_t *port_id, const void *buf, size_t buf_size, +static void nxt_unit_quit(nxt_unit_ctx_t *ctx); +static int nxt_unit_get_port(nxt_unit_ctx_t *ctx, nxt_unit_port_id_t *port_id); +static ssize_t nxt_unit_port_send(nxt_unit_ctx_t *ctx, + nxt_unit_port_t *port, const void *buf, size_t buf_size, const void *oob, size_t oob_size); -static ssize_t nxt_unit_port_recv_default(nxt_unit_ctx_t *ctx, - nxt_unit_port_id_t *port_id, void *buf, size_t buf_size, - void *oob, size_t oob_size); +static ssize_t nxt_unit_sendmsg(nxt_unit_ctx_t *ctx, int fd, + const void *buf, size_t buf_size, const void *oob, size_t oob_size); +static int nxt_unit_ctx_port_recv(nxt_unit_ctx_t *ctx, nxt_unit_port_t *port, + nxt_unit_read_buf_t *rbuf); +nxt_inline void nxt_unit_rbuf_cpy(nxt_unit_read_buf_t *dst, + nxt_unit_read_buf_t *src); +static int nxt_unit_shared_port_recv(nxt_unit_ctx_t *ctx, nxt_unit_port_t *port, + nxt_unit_read_buf_t *rbuf); +static int nxt_unit_port_recv(nxt_unit_ctx_t *ctx, nxt_unit_port_t *port, + nxt_unit_read_buf_t *rbuf); +static int nxt_unit_port_queue_recv(nxt_unit_port_t *port, + nxt_unit_read_buf_t *rbuf); +static int nxt_unit_app_queue_recv(nxt_unit_port_t *port, + nxt_unit_read_buf_t *rbuf); +nxt_inline int nxt_unit_close(int fd); +static int nxt_unit_fd_blocking(int fd); static int nxt_unit_port_hash_add(nxt_lvlhsh_t *port_hash, nxt_unit_port_t *port); -static nxt_unit_port_impl_t *nxt_unit_port_hash_find(nxt_lvlhsh_t *port_hash, +static nxt_unit_port_t *nxt_unit_port_hash_find(nxt_lvlhsh_t *port_hash, nxt_unit_port_id_t *port_id, int remove); -static int nxt_unit_request_hash_add(nxt_lvlhsh_t *request_hash, - nxt_unit_request_info_impl_t *req_impl); -static nxt_unit_request_info_impl_t *nxt_unit_request_hash_find( - nxt_lvlhsh_t *request_hash, uint32_t stream, int remove); +static int nxt_unit_request_hash_add(nxt_unit_ctx_t *ctx, + nxt_unit_request_info_t *req); +static nxt_unit_request_info_t *nxt_unit_request_hash_find( + nxt_unit_ctx_t *ctx, uint32_t stream, int remove); static char * nxt_unit_snprint_prefix(char *p, char *end, pid_t pid, int level); @@ -156,10 +193,8 @@ struct nxt_unit_mmap_buf_s { nxt_unit_mmap_buf_t **prev; nxt_port_mmap_header_t *hdr; - nxt_unit_port_id_t port_id; nxt_unit_request_info_t *req; nxt_unit_ctx_impl_t *ctx_impl; - nxt_unit_process_t *process; char *free_ptr; char *plain_ptr; }; @@ -176,8 +211,7 @@ struct nxt_unit_recv_msg_s { void *start; uint32_t size; - int fd; - nxt_unit_process_t *process; + int fd[2]; nxt_unit_mmap_buf_t *incoming_buf; }; @@ -197,15 +231,17 @@ struct nxt_unit_request_info_impl_s { uint32_t stream; - nxt_unit_process_t *process; - nxt_unit_mmap_buf_t *outgoing_buf; nxt_unit_mmap_buf_t *incoming_buf; nxt_unit_req_state_t state; uint8_t websocket; + uint8_t in_hash; + /* for nxt_unit_ctx_impl_t.free_req or active_req */ nxt_queue_link_t link; + /* for nxt_unit_port_impl_t.awaiting_req */ + nxt_queue_link_t port_wait_link; char extra_data[]; }; @@ -223,7 +259,8 @@ struct nxt_unit_websocket_frame_impl_s { struct nxt_unit_read_buf_s { - nxt_unit_read_buf_t *next; + nxt_queue_link_t link; + nxt_unit_ctx_impl_t *ctx_impl; ssize_t size; char buf[16384]; char oob[256]; @@ -233,10 +270,12 @@ struct nxt_unit_read_buf_s { struct nxt_unit_ctx_impl_s { nxt_unit_ctx_t ctx; + nxt_atomic_t use_count; + nxt_atomic_t wait_items; + pthread_mutex_t mutex; - nxt_unit_port_id_t read_port_id; - int read_port_fd; + nxt_unit_port_t *read_port; nxt_queue_link_t link; @@ -254,9 +293,14 @@ struct nxt_unit_ctx_impl_s { /* of nxt_unit_request_info_impl_t */ nxt_lvlhsh_t requests; - nxt_unit_read_buf_t *pending_read_head; - nxt_unit_read_buf_t **pending_read_tail; - nxt_unit_read_buf_t *free_read_buf; + /* of nxt_unit_request_info_impl_t */ + nxt_queue_t ready_req; + + /* of nxt_unit_read_buf_t */ + nxt_queue_t pending_rbuf; + + /* of nxt_unit_read_buf_t */ + nxt_queue_t free_rbuf; nxt_unit_mmap_buf_t ctx_buf[2]; nxt_unit_read_buf_t ctx_read_buf; @@ -265,10 +309,29 @@ struct nxt_unit_ctx_impl_s { }; +struct nxt_unit_mmap_s { + nxt_port_mmap_header_t *hdr; + + /* of nxt_unit_read_buf_t */ + nxt_queue_t awaiting_rbuf; +}; + + +struct nxt_unit_mmaps_s { + pthread_mutex_t mutex; + uint32_t size; + uint32_t cap; + nxt_atomic_t allocated_chunks; + nxt_unit_mmap_t *elts; +}; + + struct nxt_unit_impl_s { nxt_unit_t unit; nxt_unit_callbacks_t callbacks; + nxt_atomic_t use_count; + uint32_t request_data_size; uint32_t shm_mmap_limit; @@ -277,10 +340,14 @@ struct nxt_unit_impl_s { nxt_lvlhsh_t processes; /* of nxt_unit_process_t */ nxt_lvlhsh_t ports; /* of nxt_unit_port_impl_t */ - nxt_unit_port_id_t ready_port_id; + nxt_unit_port_t *router_port; + nxt_unit_port_t *shared_port; nxt_queue_t contexts; /* of nxt_unit_ctx_impl_t */ + nxt_unit_mmaps_t incoming; + nxt_unit_mmaps_t outgoing; + pid_t pid; int log_fd; int online; @@ -292,32 +359,28 @@ struct nxt_unit_impl_s { struct nxt_unit_port_impl_s { nxt_unit_port_t port; + nxt_atomic_t use_count; + + /* for nxt_unit_process_t.ports */ nxt_queue_link_t link; nxt_unit_process_t *process; -}; + /* of nxt_unit_request_info_impl_t */ + nxt_queue_t awaiting_req; -struct nxt_unit_mmap_s { - nxt_port_mmap_header_t *hdr; -}; + int ready; + void *queue; -struct nxt_unit_mmaps_s { - pthread_mutex_t mutex; - uint32_t size; - uint32_t cap; - nxt_atomic_t allocated_chunks; - nxt_unit_mmap_t *elts; + int from_socket; + nxt_unit_read_buf_t *socket_rbuf; }; struct nxt_unit_process_s { pid_t pid; - nxt_queue_t ports; - - nxt_unit_mmaps_t incoming; - nxt_unit_mmaps_t outgoing; + nxt_queue_t ports; /* of nxt_unit_port_impl_t */ nxt_unit_impl_t *lib; @@ -337,34 +400,41 @@ typedef struct { nxt_unit_ctx_t * nxt_unit_init(nxt_unit_init_t *init) { - int rc; + int rc, queue_fd; + void *mem; uint32_t ready_stream, shm_limit; nxt_unit_ctx_t *ctx; nxt_unit_impl_t *lib; - nxt_unit_port_t ready_port, read_port; + nxt_unit_port_t ready_port, router_port, read_port; lib = nxt_unit_create(init); if (nxt_slow_path(lib == NULL)) { return NULL; } + queue_fd = -1; + mem = MAP_FAILED; + if (init->ready_port.id.pid != 0 && init->ready_stream != 0 && init->read_port.id.pid != 0) { ready_port = init->ready_port; ready_stream = init->ready_stream; + router_port = init->router_port; read_port = init->read_port; lib->log_fd = init->log_fd; nxt_unit_port_id_init(&ready_port.id, ready_port.id.pid, ready_port.id.id); + nxt_unit_port_id_init(&router_port.id, router_port.id.pid, + router_port.id.id); nxt_unit_port_id_init(&read_port.id, read_port.id.pid, read_port.id.id); } else { - rc = nxt_unit_read_env(&ready_port, &read_port, &lib->log_fd, - &ready_stream, &shm_limit); + rc = nxt_unit_read_env(&ready_port, &router_port, &read_port, + &lib->log_fd, &ready_stream, &shm_limit); if (nxt_slow_path(rc != NXT_UNIT_OK)) { goto fail; } @@ -378,37 +448,77 @@ nxt_unit_init(nxt_unit_init_t *init) } lib->pid = read_port.id.pid; + ctx = &lib->main_ctx.ctx; - rc = lib->callbacks.add_port(ctx, &ready_port); - if (rc != NXT_UNIT_OK) { - nxt_unit_alert(NULL, "failed to add ready_port"); + rc = nxt_unit_fd_blocking(router_port.out_fd); + if (nxt_slow_path(rc != NXT_UNIT_OK)) { + goto fail; + } + + lib->router_port = nxt_unit_add_port(ctx, &router_port, NULL); + if (nxt_slow_path(lib->router_port == NULL)) { + nxt_unit_alert(NULL, "failed to add router_port"); + + goto fail; + } + queue_fd = nxt_unit_shm_open(ctx, sizeof(nxt_port_queue_t)); + if (nxt_slow_path(queue_fd == -1)) { goto fail; } - rc = lib->callbacks.add_port(ctx, &read_port); + mem = mmap(NULL, sizeof(nxt_port_queue_t), + PROT_READ | PROT_WRITE, MAP_SHARED, queue_fd, 0); + if (nxt_slow_path(mem == MAP_FAILED)) { + nxt_unit_alert(ctx, "mmap(%d) failed: %s (%d)", queue_fd, + strerror(errno), errno); + + goto fail; + } + + nxt_port_queue_init(mem); + + rc = nxt_unit_fd_blocking(read_port.in_fd); if (nxt_slow_path(rc != NXT_UNIT_OK)) { + goto fail; + } + + lib->main_ctx.read_port = nxt_unit_add_port(ctx, &read_port, mem); + if (nxt_slow_path(lib->main_ctx.read_port == NULL)) { nxt_unit_alert(NULL, "failed to add read_port"); goto fail; } - lib->main_ctx.read_port_id = read_port.id; - lib->ready_port_id = ready_port.id; + rc = nxt_unit_fd_blocking(ready_port.out_fd); + if (nxt_slow_path(rc != NXT_UNIT_OK)) { + goto fail; + } - rc = nxt_unit_ready(ctx, &ready_port.id, ready_stream); + rc = nxt_unit_ready(ctx, ready_port.out_fd, ready_stream, queue_fd); if (nxt_slow_path(rc != NXT_UNIT_OK)) { nxt_unit_alert(NULL, "failed to send READY message"); goto fail; } + nxt_unit_close(ready_port.out_fd); + nxt_unit_close(queue_fd); + return ctx; fail: - free(lib); + if (mem != MAP_FAILED) { + munmap(mem, sizeof(nxt_port_queue_t)); + } + + if (queue_fd != -1) { + nxt_unit_close(queue_fd); + } + + nxt_unit_ctx_release(&lib->main_ctx.ctx); return NULL; } @@ -450,8 +560,13 @@ nxt_unit_create(nxt_unit_init_t *init) nxt_queue_init(&lib->contexts); + lib->use_count = 0; + lib->router_port = NULL; + lib->shared_port = NULL; + rc = nxt_unit_ctx_init(lib, &lib->main_ctx, init->ctx_data); if (nxt_slow_path(rc != NXT_UNIT_OK)) { + pthread_mutex_destroy(&lib->mutex); goto fail; } @@ -460,32 +575,12 @@ nxt_unit_create(nxt_unit_init_t *init) if (cb->request_handler == NULL) { nxt_unit_alert(NULL, "request_handler is NULL"); + pthread_mutex_destroy(&lib->mutex); goto fail; } - if (cb->add_port == NULL) { - cb->add_port = nxt_unit_add_port; - } - - if (cb->remove_port == NULL) { - cb->remove_port = nxt_unit_remove_port; - } - - if (cb->remove_pid == NULL) { - cb->remove_pid = nxt_unit_remove_pid; - } - - if (cb->quit == NULL) { - cb->quit = nxt_unit_quit; - } - - if (cb->port_send == NULL) { - cb->port_send = nxt_unit_port_send_default; - } - - if (cb->port_recv == NULL) { - cb->port_recv = nxt_unit_port_recv_default; - } + nxt_unit_mmaps_init(&lib->incoming); + nxt_unit_mmaps_init(&lib->outgoing); return lib; @@ -506,8 +601,6 @@ nxt_unit_ctx_init(nxt_unit_impl_t *lib, nxt_unit_ctx_impl_t *ctx_impl, ctx_impl->ctx.data = data; ctx_impl->ctx.unit = &lib->unit; - nxt_queue_insert_tail(&lib->contexts, &ctx_impl->link); - rc = pthread_mutex_init(&ctx_impl->mutex, NULL); if (nxt_slow_path(rc != 0)) { nxt_unit_alert(NULL, "failed to initialize mutex (%d)", rc); @@ -515,25 +608,33 @@ nxt_unit_ctx_init(nxt_unit_impl_t *lib, nxt_unit_ctx_impl_t *ctx_impl, return NXT_UNIT_ERROR; } + nxt_unit_lib_use(lib); + + nxt_queue_insert_tail(&lib->contexts, &ctx_impl->link); + + ctx_impl->use_count = 1; + ctx_impl->wait_items = 0; + nxt_queue_init(&ctx_impl->free_req); nxt_queue_init(&ctx_impl->free_ws); nxt_queue_init(&ctx_impl->active_req); + nxt_queue_init(&ctx_impl->ready_req); + nxt_queue_init(&ctx_impl->pending_rbuf); + nxt_queue_init(&ctx_impl->free_rbuf); ctx_impl->free_buf = NULL; nxt_unit_mmap_buf_insert(&ctx_impl->free_buf, &ctx_impl->ctx_buf[1]); nxt_unit_mmap_buf_insert(&ctx_impl->free_buf, &ctx_impl->ctx_buf[0]); nxt_queue_insert_tail(&ctx_impl->free_req, &ctx_impl->req.link); + nxt_queue_insert_tail(&ctx_impl->free_rbuf, &ctx_impl->ctx_read_buf.link); - ctx_impl->pending_read_head = NULL; - ctx_impl->pending_read_tail = &ctx_impl->pending_read_head; - ctx_impl->free_read_buf = &ctx_impl->ctx_read_buf; - ctx_impl->ctx_read_buf.next = NULL; + ctx_impl->ctx_read_buf.ctx_impl = ctx_impl; ctx_impl->req.req.ctx = &ctx_impl->ctx; ctx_impl->req.req.unit = &lib->unit; - ctx_impl->read_port_fd = -1; + ctx_impl->read_port = NULL; ctx_impl->requests.slot = 0; return NXT_UNIT_OK; @@ -541,6 +642,80 @@ nxt_unit_ctx_init(nxt_unit_impl_t *lib, nxt_unit_ctx_impl_t *ctx_impl, nxt_inline void +nxt_unit_ctx_use(nxt_unit_ctx_t *ctx) +{ + nxt_unit_ctx_impl_t *ctx_impl; + + ctx_impl = nxt_container_of(ctx, nxt_unit_ctx_impl_t, ctx); + + nxt_atomic_fetch_add(&ctx_impl->use_count, 1); +} + + +nxt_inline void +nxt_unit_ctx_release(nxt_unit_ctx_t *ctx) +{ + long c; + nxt_unit_ctx_impl_t *ctx_impl; + + ctx_impl = nxt_container_of(ctx, nxt_unit_ctx_impl_t, ctx); + + c = nxt_atomic_fetch_add(&ctx_impl->use_count, -1); + + if (c == 1) { + nxt_unit_ctx_free(ctx_impl); + } +} + + +nxt_inline void +nxt_unit_lib_use(nxt_unit_impl_t *lib) +{ + nxt_atomic_fetch_add(&lib->use_count, 1); +} + + +nxt_inline void +nxt_unit_lib_release(nxt_unit_impl_t *lib) +{ + long c; + nxt_unit_process_t *process; + + c = nxt_atomic_fetch_add(&lib->use_count, -1); + + if (c == 1) { + for ( ;; ) { + pthread_mutex_lock(&lib->mutex); + + process = nxt_unit_process_pop_first(lib); + if (process == NULL) { + pthread_mutex_unlock(&lib->mutex); + + break; + } + + nxt_unit_remove_process(lib, process); + } + + pthread_mutex_destroy(&lib->mutex); + + if (nxt_fast_path(lib->router_port != NULL)) { + nxt_unit_port_release(lib->router_port); + } + + if (nxt_fast_path(lib->shared_port != NULL)) { + nxt_unit_port_release(lib->shared_port); + } + + nxt_unit_mmaps_destroy(&lib->incoming); + nxt_unit_mmaps_destroy(&lib->outgoing); + + free(lib); + } +} + + +nxt_inline void nxt_unit_mmap_buf_insert(nxt_unit_mmap_buf_t **head, nxt_unit_mmap_buf_t *mmap_buf) { @@ -585,15 +760,16 @@ nxt_unit_mmap_buf_unlink(nxt_unit_mmap_buf_t *mmap_buf) static int -nxt_unit_read_env(nxt_unit_port_t *ready_port, nxt_unit_port_t *read_port, - int *log_fd, uint32_t *stream, uint32_t *shm_limit) +nxt_unit_read_env(nxt_unit_port_t *ready_port, nxt_unit_port_t *router_port, + nxt_unit_port_t *read_port, int *log_fd, uint32_t *stream, + uint32_t *shm_limit) { int rc; - int ready_fd, read_fd; + int ready_fd, router_fd, read_fd; char *unit_init, *version_end; long version_length; - int64_t ready_pid, read_pid; - uint32_t ready_stream, ready_id, read_id; + int64_t ready_pid, router_pid, read_pid; + uint32_t ready_stream, router_id, ready_id, read_id; unit_init = getenv(NXT_UNIT_INIT_ENV); if (nxt_slow_path(unit_init == NULL)) { @@ -621,13 +797,15 @@ nxt_unit_read_env(nxt_unit_port_t *ready_port, nxt_unit_port_t *read_port, "%"PRIu32";" "%"PRId64",%"PRIu32",%d;" "%"PRId64",%"PRIu32",%d;" + "%"PRId64",%"PRIu32",%d;" "%d,%"PRIu32, &ready_stream, &ready_pid, &ready_id, &ready_fd, + &router_pid, &router_id, &router_fd, &read_pid, &read_id, &read_fd, log_fd, shm_limit); - if (nxt_slow_path(rc != 9)) { + if (nxt_slow_path(rc != 12)) { nxt_unit_alert(NULL, "failed to scan variables: %d", rc); return NXT_UNIT_ERROR; @@ -639,6 +817,12 @@ nxt_unit_read_env(nxt_unit_port_t *ready_port, nxt_unit_port_t *read_port, ready_port->out_fd = ready_fd; ready_port->data = NULL; + nxt_unit_port_id_init(&router_port->id, (pid_t) router_pid, router_id); + + router_port->in_fd = -1; + router_port->out_fd = router_fd; + router_port->data = NULL; + nxt_unit_port_id_init(&read_port->id, (pid_t) read_pid, read_id); read_port->in_fd = read_fd; @@ -652,13 +836,17 @@ nxt_unit_read_env(nxt_unit_port_t *ready_port, nxt_unit_port_t *read_port, static int -nxt_unit_ready(nxt_unit_ctx_t *ctx, nxt_unit_port_id_t *port_id, - uint32_t stream) +nxt_unit_ready(nxt_unit_ctx_t *ctx, int ready_fd, uint32_t stream, int queue_fd) { ssize_t res; nxt_port_msg_t msg; nxt_unit_impl_t *lib; + union { + struct cmsghdr cm; + char space[CMSG_SPACE(sizeof(int))]; + } cmsg; + lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit); msg.stream = stream; @@ -671,7 +859,25 @@ nxt_unit_ready(nxt_unit_ctx_t *ctx, nxt_unit_port_id_t *port_id, msg.mf = 0; msg.tracking = 0; - res = lib->callbacks.port_send(ctx, port_id, &msg, sizeof(msg), NULL, 0); + memset(&cmsg, 0, sizeof(cmsg)); + + cmsg.cm.cmsg_len = CMSG_LEN(sizeof(int)); + cmsg.cm.cmsg_level = SOL_SOCKET; + cmsg.cm.cmsg_type = SCM_RIGHTS; + + /* + * memcpy() is used instead of simple + * *(int *) CMSG_DATA(&cmsg.cm) = fd; + * because GCC 4.4 with -O2/3/s optimization may issue a warning: + * dereferencing type-punned pointer will break strict-aliasing rules + * + * Fortunately, GCC with -O1 compiles this nxt_memcpy() + * in the same simple assignment as in the code above. + */ + memcpy(CMSG_DATA(&cmsg.cm), &queue_fd, sizeof(int)); + + res = nxt_unit_sendmsg(ctx, ready_fd, &msg, sizeof(msg), + &cmsg, sizeof(cmsg)); if (res != sizeof(msg)) { return NXT_UNIT_ERROR; } @@ -680,41 +886,56 @@ nxt_unit_ready(nxt_unit_ctx_t *ctx, nxt_unit_port_id_t *port_id, } -int -nxt_unit_process_msg(nxt_unit_ctx_t *ctx, nxt_unit_port_id_t *port_id, - void *buf, size_t buf_size, void *oob, size_t oob_size) +static int +nxt_unit_process_msg(nxt_unit_ctx_t *ctx, nxt_unit_read_buf_t *rbuf) { - int rc; - pid_t pid; - struct cmsghdr *cm; - nxt_port_msg_t *port_msg; - nxt_unit_impl_t *lib; - nxt_unit_recv_msg_t recv_msg; - nxt_unit_callbacks_t *cb; + int rc; + pid_t pid; + struct cmsghdr *cm; + nxt_port_msg_t *port_msg; + nxt_unit_impl_t *lib; + nxt_unit_recv_msg_t recv_msg; lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit); - rc = NXT_UNIT_ERROR; - recv_msg.fd = -1; - recv_msg.process = NULL; - port_msg = buf; - cm = oob; - - if (oob_size >= CMSG_SPACE(sizeof(int)) - && cm->cmsg_len == CMSG_LEN(sizeof(int)) - && cm->cmsg_level == SOL_SOCKET + recv_msg.fd[0] = -1; + recv_msg.fd[1] = -1; + port_msg = (nxt_port_msg_t *) rbuf->buf; + cm = (struct cmsghdr *) rbuf->oob; + + if (cm->cmsg_level == SOL_SOCKET && cm->cmsg_type == SCM_RIGHTS) { - memcpy(&recv_msg.fd, CMSG_DATA(cm), sizeof(int)); + if (cm->cmsg_len == CMSG_LEN(sizeof(int))) { + memcpy(recv_msg.fd, CMSG_DATA(cm), sizeof(int)); + } + + if (cm->cmsg_len == CMSG_LEN(sizeof(int) * 2)) { + memcpy(recv_msg.fd, CMSG_DATA(cm), sizeof(int) * 2); + } } recv_msg.incoming_buf = NULL; - if (nxt_slow_path(buf_size < sizeof(nxt_port_msg_t))) { - nxt_unit_warn(ctx, "message too small (%d bytes)", (int) buf_size); - goto fail; + if (nxt_slow_path(rbuf->size < (ssize_t) sizeof(nxt_port_msg_t))) { + if (nxt_slow_path(rbuf->size == 0)) { + nxt_unit_debug(ctx, "read port closed"); + + nxt_unit_quit(ctx); + rc = NXT_UNIT_OK; + goto done; + } + + nxt_unit_alert(ctx, "message too small (%d bytes)", (int) rbuf->size); + + rc = NXT_UNIT_ERROR; + goto done; } + nxt_unit_debug(ctx, "#%"PRIu32": process message %d fd[0] %d fd[1] %d", + port_msg->stream, (int) port_msg->type, + recv_msg.fd[0], recv_msg.fd[1]); + recv_msg.stream = port_msg->stream; recv_msg.pid = port_msg->pid; recv_msg.reply_port = port_msg->reply_port; @@ -722,41 +943,42 @@ nxt_unit_process_msg(nxt_unit_ctx_t *ctx, nxt_unit_port_id_t *port_id, recv_msg.mmap = port_msg->mmap; recv_msg.start = port_msg + 1; - recv_msg.size = buf_size - sizeof(nxt_port_msg_t); + recv_msg.size = rbuf->size - sizeof(nxt_port_msg_t); if (nxt_slow_path(port_msg->type >= NXT_PORT_MSG_MAX)) { - nxt_unit_warn(ctx, "#%"PRIu32": unknown message type (%d)", - port_msg->stream, (int) port_msg->type); - goto fail; - } - - if (port_msg->tracking && nxt_unit_tracking_read(ctx, &recv_msg) == 0) { - rc = NXT_UNIT_OK; - - goto fail; + nxt_unit_alert(ctx, "#%"PRIu32": unknown message type (%d)", + port_msg->stream, (int) port_msg->type); + rc = NXT_UNIT_ERROR; + goto done; } /* Fragmentation is unsupported. */ if (nxt_slow_path(port_msg->nf != 0 || port_msg->mf != 0)) { - nxt_unit_warn(ctx, "#%"PRIu32": fragmented message type (%d)", - port_msg->stream, (int) port_msg->type); - goto fail; + nxt_unit_alert(ctx, "#%"PRIu32": fragmented message type (%d)", + port_msg->stream, (int) port_msg->type); + rc = NXT_UNIT_ERROR; + goto done; } if (port_msg->mmap) { - if (nxt_unit_mmap_read(ctx, &recv_msg) != NXT_UNIT_OK) { - goto fail; + rc = nxt_unit_mmap_read(ctx, &recv_msg, rbuf); + + if (nxt_slow_path(rc != NXT_UNIT_OK)) { + if (rc == NXT_UNIT_AGAIN) { + recv_msg.fd[0] = -1; + recv_msg.fd[1] = -1; + } + + goto done; } } - cb = &lib->callbacks; - switch (port_msg->type) { case _NXT_PORT_MSG_QUIT: nxt_unit_debug(ctx, "#%"PRIu32": quit", port_msg->stream); - cb->quit(ctx); + nxt_unit_quit(ctx); rc = NXT_UNIT_OK; break; @@ -766,45 +988,52 @@ nxt_unit_process_msg(nxt_unit_ctx_t *ctx, nxt_unit_port_id_t *port_id, case _NXT_PORT_MSG_CHANGE_FILE: nxt_unit_debug(ctx, "#%"PRIu32": change_file: fd %d", - port_msg->stream, recv_msg.fd); + port_msg->stream, recv_msg.fd[0]); - if (dup2(recv_msg.fd, lib->log_fd) == -1) { + if (dup2(recv_msg.fd[0], lib->log_fd) == -1) { nxt_unit_alert(ctx, "#%"PRIu32": dup2(%d, %d) failed: %s (%d)", - port_msg->stream, recv_msg.fd, lib->log_fd, + port_msg->stream, recv_msg.fd[0], lib->log_fd, strerror(errno), errno); - goto fail; + rc = NXT_UNIT_ERROR; + goto done; } rc = NXT_UNIT_OK; break; case _NXT_PORT_MSG_MMAP: - if (nxt_slow_path(recv_msg.fd < 0)) { + if (nxt_slow_path(recv_msg.fd[0] < 0)) { nxt_unit_alert(ctx, "#%"PRIu32": invalid fd %d for mmap", - port_msg->stream, recv_msg.fd); + port_msg->stream, recv_msg.fd[0]); - goto fail; + rc = NXT_UNIT_ERROR; + goto done; } - rc = nxt_unit_incoming_mmap(ctx, port_msg->pid, recv_msg.fd); + rc = nxt_unit_incoming_mmap(ctx, port_msg->pid, recv_msg.fd[0]); break; case _NXT_PORT_MSG_REQ_HEADERS: rc = nxt_unit_process_req_headers(ctx, &recv_msg); break; + case _NXT_PORT_MSG_REQ_BODY: + rc = nxt_unit_process_req_body(ctx, &recv_msg); + break; + case _NXT_PORT_MSG_WEBSOCKET: rc = nxt_unit_process_websocket(ctx, &recv_msg); break; case _NXT_PORT_MSG_REMOVE_PID: if (nxt_slow_path(recv_msg.size != sizeof(pid))) { - nxt_unit_warn(ctx, "#%"PRIu32": remove_pid: invalid message size " - "(%d != %d)", port_msg->stream, (int) recv_msg.size, - (int) sizeof(pid)); + nxt_unit_alert(ctx, "#%"PRIu32": remove_pid: invalid message size " + "(%d != %d)", port_msg->stream, (int) recv_msg.size, + (int) sizeof(pid)); - goto fail; + rc = NXT_UNIT_ERROR; + goto done; } memcpy(&pid, recv_msg.start, sizeof(pid)); @@ -812,7 +1041,7 @@ nxt_unit_process_msg(nxt_unit_ctx_t *ctx, nxt_unit_port_id_t *port_id, nxt_unit_debug(ctx, "#%"PRIu32": remove_pid: %d", port_msg->stream, (int) pid); - cb->remove_pid(ctx, pid); + nxt_unit_remove_pid(lib, pid); rc = NXT_UNIT_OK; break; @@ -825,21 +1054,29 @@ nxt_unit_process_msg(nxt_unit_ctx_t *ctx, nxt_unit_port_id_t *port_id, nxt_unit_debug(ctx, "#%"PRIu32": ignore message type: %d", port_msg->stream, (int) port_msg->type); - goto fail; + rc = NXT_UNIT_ERROR; + goto done; } -fail: +done: - if (recv_msg.fd != -1) { - close(recv_msg.fd); + if (recv_msg.fd[0] != -1) { + nxt_unit_close(recv_msg.fd[0]); + } + + if (recv_msg.fd[1] != -1) { + nxt_unit_close(recv_msg.fd[1]); } while (recv_msg.incoming_buf != NULL) { nxt_unit_mmap_buf_free(recv_msg.incoming_buf); } - if (recv_msg.process != NULL) { - nxt_unit_process_use(ctx, recv_msg.process, -1); + if (nxt_fast_path(rc != NXT_UNIT_AGAIN)) { +#if (NXT_DEBUG) + memset(rbuf->buf, 0xAC, rbuf->size); +#endif + nxt_unit_read_buf_release(ctx, rbuf); } return rc; @@ -849,9 +1086,9 @@ fail: static int nxt_unit_process_new_port(nxt_unit_ctx_t *ctx, nxt_unit_recv_msg_t *recv_msg) { - int nb; + void *mem; nxt_unit_impl_t *lib; - nxt_unit_port_t new_port; + nxt_unit_port_t new_port, *port; nxt_port_msg_new_port_t *new_port_msg; if (nxt_slow_path(recv_msg->size != sizeof(nxt_port_msg_new_port_t))) { @@ -862,48 +1099,80 @@ nxt_unit_process_new_port(nxt_unit_ctx_t *ctx, nxt_unit_recv_msg_t *recv_msg) return NXT_UNIT_ERROR; } - if (nxt_slow_path(recv_msg->fd < 0)) { + if (nxt_slow_path(recv_msg->fd[0] < 0)) { nxt_unit_alert(ctx, "#%"PRIu32": invalid fd %d for new port", - recv_msg->stream, recv_msg->fd); + recv_msg->stream, recv_msg->fd[0]); return NXT_UNIT_ERROR; } new_port_msg = recv_msg->start; - nxt_unit_debug(ctx, "#%"PRIu32": new_port: %d,%d fd %d", + nxt_unit_debug(ctx, "#%"PRIu32": new_port: port{%d,%d} fd[0] %d fd[1] %d", recv_msg->stream, (int) new_port_msg->pid, - (int) new_port_msg->id, recv_msg->fd); + (int) new_port_msg->id, recv_msg->fd[0], recv_msg->fd[1]); - nb = 0; + lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit); - if (nxt_slow_path(ioctl(recv_msg->fd, FIONBIO, &nb) == -1)) { - nxt_unit_alert(ctx, "#%"PRIu32": new_port: ioctl(%d, FIONBIO, 0) " - "failed: %s (%d)", - recv_msg->stream, recv_msg->fd, strerror(errno), errno); + if (new_port_msg->id == (nxt_port_id_t) -1) { + nxt_unit_port_id_init(&new_port.id, lib->pid, new_port_msg->id); - return NXT_UNIT_ERROR; + new_port.in_fd = recv_msg->fd[0]; + new_port.out_fd = -1; + + mem = mmap(NULL, sizeof(nxt_app_queue_t), PROT_READ | PROT_WRITE, + MAP_SHARED, recv_msg->fd[1], 0); + + } else { + if (nxt_slow_path(nxt_unit_fd_blocking(recv_msg->fd[0]) + != NXT_UNIT_OK)) + { + return NXT_UNIT_ERROR; + } + + nxt_unit_port_id_init(&new_port.id, new_port_msg->pid, + new_port_msg->id); + + new_port.in_fd = -1; + new_port.out_fd = recv_msg->fd[0]; + + mem = mmap(NULL, sizeof(nxt_port_queue_t), PROT_READ | PROT_WRITE, + MAP_SHARED, recv_msg->fd[1], 0); } - nxt_unit_port_id_init(&new_port.id, new_port_msg->pid, - new_port_msg->id); + if (nxt_slow_path(mem == MAP_FAILED)) { + nxt_unit_alert(ctx, "mmap(%d) failed: %s (%d)", recv_msg->fd[1], + strerror(errno), errno); + + return NXT_UNIT_ERROR; + } - new_port.in_fd = -1; - new_port.out_fd = recv_msg->fd; new_port.data = NULL; - recv_msg->fd = -1; + recv_msg->fd[0] = -1; - lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit); + port = nxt_unit_add_port(ctx, &new_port, mem); + if (nxt_slow_path(port == NULL)) { + return NXT_UNIT_ERROR; + } + + if (new_port_msg->id == (nxt_port_id_t) -1) { + lib->shared_port = port; + + } else { + nxt_unit_port_release(port); + } - return lib->callbacks.add_port(ctx, &new_port); + return NXT_UNIT_OK; } static int nxt_unit_process_req_headers(nxt_unit_ctx_t *ctx, nxt_unit_recv_msg_t *recv_msg) { + int res; nxt_unit_impl_t *lib; + nxt_unit_port_id_t port_id; nxt_unit_request_t *r; nxt_unit_mmap_buf_t *b; nxt_unit_request_info_t *req; @@ -934,9 +1203,6 @@ nxt_unit_process_req_headers(nxt_unit_ctx_t *ctx, nxt_unit_recv_msg_t *recv_msg) req = &req_impl->req; - nxt_unit_port_id_init(&req->response_port, recv_msg->pid, - recv_msg->reply_port); - req->request = recv_msg->start; b = recv_msg->incoming_buf; @@ -952,14 +1218,6 @@ nxt_unit_process_req_headers(nxt_unit_ctx_t *ctx, nxt_unit_recv_msg_t *recv_msg) req->content_buf = req->request_buf; req->content_buf->free = nxt_unit_sptr_get(&r->preread_content); - /* "Move" process reference to req_impl. */ - req_impl->process = nxt_unit_msg_get_process(ctx, recv_msg); - if (nxt_slow_path(req_impl->process == NULL)) { - return NXT_UNIT_ERROR; - } - - recv_msg->process = NULL; - req_impl->stream = recv_msg->stream; req_impl->outgoing_buf = NULL; @@ -973,12 +1231,13 @@ nxt_unit_process_req_headers(nxt_unit_ctx_t *ctx, nxt_unit_recv_msg_t *recv_msg) req_impl->incoming_buf->prev = &req_impl->incoming_buf; recv_msg->incoming_buf = NULL; - req->content_fd = recv_msg->fd; - recv_msg->fd = -1; + req->content_fd = recv_msg->fd[0]; + recv_msg->fd[0] = -1; req->response_max_fields = 0; req_impl->state = NXT_UNIT_RS_START; req_impl->websocket = 0; + req_impl->in_hash = 0; nxt_unit_debug(ctx, "#%"PRIu32": %.*s %.*s (%d)", recv_msg->stream, (int) r->method_length, @@ -987,9 +1246,247 @@ nxt_unit_process_req_headers(nxt_unit_ctx_t *ctx, nxt_unit_recv_msg_t *recv_msg) (char *) nxt_unit_sptr_get(&r->target), (int) r->content_length); + nxt_unit_port_id_init(&port_id, recv_msg->pid, recv_msg->reply_port); + + res = nxt_unit_request_check_response_port(req, &port_id); + if (nxt_slow_path(res == NXT_UNIT_ERROR)) { + return NXT_UNIT_ERROR; + } + + if (nxt_fast_path(res == NXT_UNIT_OK)) { + res = nxt_unit_send_req_headers_ack(req); + if (nxt_slow_path(res == NXT_UNIT_ERROR)) { + nxt_unit_request_done(req, NXT_UNIT_ERROR); + + return NXT_UNIT_ERROR; + } + + lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit); + + if (req->content_length + > (uint64_t) (req->content_buf->end - req->content_buf->free)) + { + res = nxt_unit_request_hash_add(ctx, req); + if (nxt_slow_path(res != NXT_UNIT_OK)) { + nxt_unit_req_warn(req, "failed to add request to hash"); + + nxt_unit_request_done(req, NXT_UNIT_ERROR); + + return NXT_UNIT_ERROR; + } + + /* + * If application have separate data handler, we may start + * request processing and process data when it is arrived. + */ + if (lib->callbacks.data_handler == NULL) { + return NXT_UNIT_OK; + } + } + + lib->callbacks.request_handler(req); + } + + return NXT_UNIT_OK; +} + + +static int +nxt_unit_process_req_body(nxt_unit_ctx_t *ctx, nxt_unit_recv_msg_t *recv_msg) +{ + uint64_t l; + nxt_unit_impl_t *lib; + nxt_unit_mmap_buf_t *b; + nxt_unit_request_info_t *req; + + req = nxt_unit_request_hash_find(ctx, recv_msg->stream, recv_msg->last); + if (req == NULL) { + return NXT_UNIT_OK; + } + + l = req->content_buf->end - req->content_buf->free; + + for (b = recv_msg->incoming_buf; b != NULL; b = b->next) { + b->req = req; + l += b->buf.end - b->buf.free; + } + + if (recv_msg->incoming_buf != NULL) { + b = nxt_container_of(req->content_buf, nxt_unit_mmap_buf_t, buf); + + /* "Move" incoming buffer list to req_impl. */ + nxt_unit_mmap_buf_insert_tail(&b->next, recv_msg->incoming_buf); + recv_msg->incoming_buf = NULL; + } + + req->content_fd = recv_msg->fd[0]; + recv_msg->fd[0] = -1; + lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit); - lib->callbacks.request_handler(req); + if (lib->callbacks.data_handler != NULL) { + lib->callbacks.data_handler(req); + + return NXT_UNIT_OK; + } + + if (req->content_fd != -1 || l == req->content_length) { + lib->callbacks.request_handler(req); + } + + return NXT_UNIT_OK; +} + + +static int +nxt_unit_request_check_response_port(nxt_unit_request_info_t *req, + nxt_unit_port_id_t *port_id) +{ + int res; + nxt_unit_ctx_t *ctx; + nxt_unit_impl_t *lib; + nxt_unit_port_t *port; + nxt_unit_process_t *process; + nxt_unit_ctx_impl_t *ctx_impl; + nxt_unit_port_impl_t *port_impl; + nxt_unit_request_info_impl_t *req_impl; + + ctx = req->ctx; + lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit); + ctx_impl = nxt_container_of(ctx, nxt_unit_ctx_impl_t, ctx); + + pthread_mutex_lock(&lib->mutex); + + port = nxt_unit_port_hash_find(&lib->ports, port_id, 0); + port_impl = nxt_container_of(port, nxt_unit_port_impl_t, port); + + if (nxt_fast_path(port != NULL)) { + req->response_port = port; + + if (nxt_fast_path(port_impl->ready)) { + pthread_mutex_unlock(&lib->mutex); + + nxt_unit_debug(ctx, "check_response_port: found port{%d,%d}", + (int) port->id.pid, (int) port->id.id); + + return NXT_UNIT_OK; + } + + nxt_unit_debug(ctx, "check_response_port: " + "port{%d,%d} already requested", + (int) port->id.pid, (int) port->id.id); + + req_impl = nxt_container_of(req, nxt_unit_request_info_impl_t, req); + + nxt_queue_insert_tail(&port_impl->awaiting_req, + &req_impl->port_wait_link); + + pthread_mutex_unlock(&lib->mutex); + + nxt_atomic_fetch_add(&ctx_impl->wait_items, 1); + + return NXT_UNIT_AGAIN; + } + + port_impl = malloc(sizeof(nxt_unit_port_impl_t)); + if (nxt_slow_path(port_impl == NULL)) { + nxt_unit_alert(ctx, "check_response_port: malloc(%d) failed", + (int) sizeof(nxt_unit_port_impl_t)); + + pthread_mutex_unlock(&lib->mutex); + + return NXT_UNIT_ERROR; + } + + port = &port_impl->port; + + port->id = *port_id; + port->in_fd = -1; + port->out_fd = -1; + port->data = NULL; + + res = nxt_unit_port_hash_add(&lib->ports, port); + if (nxt_slow_path(res != NXT_UNIT_OK)) { + nxt_unit_alert(ctx, "check_response_port: %d,%d hash_add failed", + port->id.pid, port->id.id); + + pthread_mutex_unlock(&lib->mutex); + + free(port); + + return NXT_UNIT_ERROR; + } + + process = nxt_unit_process_find(lib, port_id->pid, 0); + if (nxt_slow_path(process == NULL)) { + nxt_unit_alert(ctx, "check_response_port: process %d not found", + port->id.pid); + + nxt_unit_port_hash_find(&lib->ports, port_id, 1); + + pthread_mutex_unlock(&lib->mutex); + + free(port); + + return NXT_UNIT_ERROR; + } + + nxt_queue_insert_tail(&process->ports, &port_impl->link); + + port_impl->process = process; + port_impl->queue = NULL; + port_impl->from_socket = 0; + port_impl->socket_rbuf = NULL; + + nxt_queue_init(&port_impl->awaiting_req); + + req_impl = nxt_container_of(req, nxt_unit_request_info_impl_t, req); + + nxt_queue_insert_tail(&port_impl->awaiting_req, &req_impl->port_wait_link); + + port_impl->use_count = 2; + port_impl->ready = 0; + + req->response_port = port; + + pthread_mutex_unlock(&lib->mutex); + + res = nxt_unit_get_port(ctx, port_id); + if (nxt_slow_path(res == NXT_UNIT_ERROR)) { + return NXT_UNIT_ERROR; + } + + nxt_atomic_fetch_add(&ctx_impl->wait_items, 1); + + return NXT_UNIT_AGAIN; +} + + +static int +nxt_unit_send_req_headers_ack(nxt_unit_request_info_t *req) +{ + ssize_t res; + nxt_port_msg_t msg; + nxt_unit_impl_t *lib; + nxt_unit_ctx_impl_t *ctx_impl; + nxt_unit_request_info_impl_t *req_impl; + + lib = nxt_container_of(req->ctx->unit, nxt_unit_impl_t, unit); + ctx_impl = nxt_container_of(req->ctx, nxt_unit_ctx_impl_t, ctx); + req_impl = nxt_container_of(req, nxt_unit_request_info_impl_t, req); + + memset(&msg, 0, sizeof(nxt_port_msg_t)); + + msg.stream = req_impl->stream; + msg.pid = lib->pid; + msg.reply_port = ctx_impl->read_port->id.id; + msg.type = _NXT_PORT_MSG_REQ_HEADERS_ACK; + + res = nxt_unit_port_send(req->ctx, req->response_port, + &msg, sizeof(msg), NULL, 0); + if (nxt_slow_path(res != sizeof(msg))) { + return NXT_UNIT_ERROR; + } return NXT_UNIT_OK; } @@ -1001,21 +1498,17 @@ nxt_unit_process_websocket(nxt_unit_ctx_t *ctx, nxt_unit_recv_msg_t *recv_msg) size_t hsize; nxt_unit_impl_t *lib; nxt_unit_mmap_buf_t *b; - nxt_unit_ctx_impl_t *ctx_impl; nxt_unit_callbacks_t *cb; nxt_unit_request_info_t *req; nxt_unit_request_info_impl_t *req_impl; nxt_unit_websocket_frame_impl_t *ws_impl; - ctx_impl = nxt_container_of(ctx, nxt_unit_ctx_impl_t, ctx); - - req_impl = nxt_unit_request_hash_find(&ctx_impl->requests, recv_msg->stream, - recv_msg->last); - if (req_impl == NULL) { + req = nxt_unit_request_hash_find(ctx, recv_msg->stream, recv_msg->last); + if (nxt_slow_path(req == NULL)) { return NXT_UNIT_OK; } - req = &req_impl->req; + req_impl = nxt_container_of(req, nxt_unit_request_info_impl_t, req); lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit); cb = &lib->callbacks; @@ -1181,12 +1674,12 @@ nxt_unit_request_info_release(nxt_unit_request_info_t *req) req->response = NULL; req->response_buf = NULL; - if (req_impl->websocket) { - nxt_unit_request_hash_find(&ctx_impl->requests, req_impl->stream, 1); - - req_impl->websocket = 0; + if (req_impl->in_hash) { + nxt_unit_request_hash_find(req->ctx, req_impl->stream, 1); } + req_impl->websocket = 0; + while (req_impl->outgoing_buf != NULL) { nxt_unit_mmap_buf_free(req_impl->outgoing_buf); } @@ -1196,19 +1689,15 @@ nxt_unit_request_info_release(nxt_unit_request_info_t *req) } if (req->content_fd != -1) { - close(req->content_fd); + nxt_unit_close(req->content_fd); req->content_fd = -1; } - /* - * Process release should go after buffers release to guarantee mmap - * existence. - */ - if (req_impl->process != NULL) { - nxt_unit_process_use(req->ctx, req_impl->process, -1); + if (req->response_port != NULL) { + nxt_unit_port_release(req->response_port); - req_impl->process = NULL; + req->response_port = NULL; } pthread_mutex_lock(&ctx_impl->mutex); @@ -1729,7 +2218,7 @@ nxt_unit_response_send(nxt_unit_request_info_t *req) mmap_buf = nxt_container_of(req->response_buf, nxt_unit_mmap_buf_t, buf); - rc = nxt_unit_mmap_buf_send(req->ctx, req_impl->stream, mmap_buf, 0); + rc = nxt_unit_mmap_buf_send(req, mmap_buf, 0); if (nxt_fast_path(rc == NXT_UNIT_OK)) { req->response = NULL; req->response_buf = NULL; @@ -1782,8 +2271,8 @@ nxt_unit_response_buf_alloc(nxt_unit_request_info_t *req, uint32_t size) nxt_unit_mmap_buf_insert_tail(&req_impl->outgoing_buf, mmap_buf); - rc = nxt_unit_get_outgoing_buf(req->ctx, req_impl->process, - &req->response_port, size, size, mmap_buf, + rc = nxt_unit_get_outgoing_buf(req->ctx, req->response_port, + size, size, mmap_buf, NULL); if (nxt_slow_path(rc != NXT_UNIT_OK)) { nxt_unit_mmap_buf_release(mmap_buf); @@ -1795,32 +2284,6 @@ nxt_unit_response_buf_alloc(nxt_unit_request_info_t *req, uint32_t size) } -static nxt_unit_process_t * -nxt_unit_msg_get_process(nxt_unit_ctx_t *ctx, nxt_unit_recv_msg_t *recv_msg) -{ - nxt_unit_impl_t *lib; - - if (recv_msg->process != NULL) { - return recv_msg->process; - } - - lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit); - - pthread_mutex_lock(&lib->mutex); - - recv_msg->process = nxt_unit_process_find(ctx, recv_msg->pid, 0); - - pthread_mutex_unlock(&lib->mutex); - - if (recv_msg->process == NULL) { - nxt_unit_warn(ctx, "#%"PRIu32": process %d not found", - recv_msg->stream, (int) recv_msg->pid); - } - - return recv_msg->process; -} - - static nxt_unit_mmap_buf_t * nxt_unit_mmap_buf_get(nxt_unit_ctx_t *ctx) { @@ -1869,15 +2332,6 @@ nxt_unit_mmap_buf_release(nxt_unit_mmap_buf_t *mmap_buf) } -typedef struct { - size_t len; - const char *str; -} nxt_unit_str_t; - - -#define nxt_unit_str(str) { nxt_length(str), str } - - int nxt_unit_request_is_websocket_handshake(nxt_unit_request_info_t *req) { @@ -1889,7 +2343,6 @@ int nxt_unit_response_upgrade(nxt_unit_request_info_t *req) { int rc; - nxt_unit_ctx_impl_t *ctx_impl; nxt_unit_request_info_impl_t *req_impl; req_impl = nxt_container_of(req, nxt_unit_request_info_impl_t, req); @@ -1912,9 +2365,7 @@ nxt_unit_response_upgrade(nxt_unit_request_info_t *req) return NXT_UNIT_ERROR; } - ctx_impl = nxt_container_of(req->ctx, nxt_unit_ctx_impl_t, ctx); - - rc = nxt_unit_request_hash_add(&ctx_impl->requests, req_impl); + rc = nxt_unit_request_hash_add(req->ctx, req); if (nxt_slow_path(rc != NXT_UNIT_OK)) { nxt_unit_req_warn(req, "upgrade: failed to add request to hash"); @@ -1980,7 +2431,7 @@ nxt_unit_buf_send(nxt_unit_buf_t *buf) } if (nxt_fast_path(buf->free > buf->start)) { - rc = nxt_unit_mmap_buf_send(req->ctx, req_impl->stream, mmap_buf, 0); + rc = nxt_unit_mmap_buf_send(req, mmap_buf, 0); if (nxt_slow_path(rc != NXT_UNIT_OK)) { return rc; } @@ -1995,17 +2446,15 @@ nxt_unit_buf_send(nxt_unit_buf_t *buf) static void nxt_unit_buf_send_done(nxt_unit_buf_t *buf) { - int rc; - nxt_unit_mmap_buf_t *mmap_buf; - nxt_unit_request_info_t *req; - nxt_unit_request_info_impl_t *req_impl; + int rc; + nxt_unit_mmap_buf_t *mmap_buf; + nxt_unit_request_info_t *req; mmap_buf = nxt_container_of(buf, nxt_unit_mmap_buf_t, buf); req = mmap_buf->req; - req_impl = nxt_container_of(req, nxt_unit_request_info_impl_t, req); - rc = nxt_unit_mmap_buf_send(req->ctx, req_impl->stream, mmap_buf, 1); + rc = nxt_unit_mmap_buf_send(req, mmap_buf, 1); if (nxt_slow_path(rc == NXT_UNIT_OK)) { nxt_unit_mmap_buf_free(mmap_buf); @@ -2018,7 +2467,7 @@ nxt_unit_buf_send_done(nxt_unit_buf_t *buf) static int -nxt_unit_mmap_buf_send(nxt_unit_ctx_t *ctx, uint32_t stream, +nxt_unit_mmap_buf_send(nxt_unit_request_info_t *req, nxt_unit_mmap_buf_t *mmap_buf, int last) { struct { @@ -2026,22 +2475,24 @@ nxt_unit_mmap_buf_send(nxt_unit_ctx_t *ctx, uint32_t stream, nxt_port_mmap_msg_t mmap_msg; } m; - int rc; - u_char *last_used, *first_free; - ssize_t res; - nxt_chunk_id_t first_free_chunk; - nxt_unit_buf_t *buf; - nxt_unit_impl_t *lib; - nxt_port_mmap_header_t *hdr; + int rc; + u_char *last_used, *first_free; + ssize_t res; + nxt_chunk_id_t first_free_chunk; + nxt_unit_buf_t *buf; + nxt_unit_impl_t *lib; + nxt_port_mmap_header_t *hdr; + nxt_unit_request_info_impl_t *req_impl; - lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit); + lib = nxt_container_of(req->ctx->unit, nxt_unit_impl_t, unit); + req_impl = nxt_container_of(req, nxt_unit_request_info_impl_t, req); buf = &mmap_buf->buf; hdr = mmap_buf->hdr; m.mmap_msg.size = buf->free - buf->start; - m.msg.stream = stream; + m.msg.stream = req_impl->stream; m.msg.pid = lib->pid; m.msg.reply_port = 0; m.msg.type = _NXT_PORT_MSG_DATA; @@ -2058,14 +2509,14 @@ nxt_unit_mmap_buf_send(nxt_unit_ctx_t *ctx, uint32_t stream, m.mmap_msg.chunk_id = nxt_port_mmap_chunk_id(hdr, (u_char *) buf->start); - nxt_unit_debug(ctx, "#%"PRIu32": send mmap: (%d,%d,%d)", - stream, + nxt_unit_debug(req->ctx, "#%"PRIu32": send mmap: (%d,%d,%d)", + req_impl->stream, (int) m.mmap_msg.mmap_id, (int) m.mmap_msg.chunk_id, (int) m.mmap_msg.size); - res = lib->callbacks.port_send(ctx, &mmap_buf->port_id, &m, sizeof(m), - NULL, 0); + res = nxt_unit_port_send(req->ctx, req->response_port, &m, sizeof(m), + NULL, 0); if (nxt_slow_path(res != sizeof(m))) { goto free_buf; } @@ -2091,33 +2542,34 @@ nxt_unit_mmap_buf_send(nxt_unit_ctx_t *ctx, uint32_t stream, mmap_buf->hdr = NULL; } - nxt_atomic_fetch_add(&mmap_buf->process->outgoing.allocated_chunks, + nxt_atomic_fetch_add(&lib->outgoing.allocated_chunks, (int) m.mmap_msg.chunk_id - (int) first_free_chunk); - nxt_unit_debug(ctx, "process %d allocated_chunks %d", - mmap_buf->process->pid, - (int) mmap_buf->process->outgoing.allocated_chunks); + nxt_unit_debug(req->ctx, "allocated_chunks %d", + (int) lib->outgoing.allocated_chunks); } else { if (nxt_slow_path(mmap_buf->plain_ptr == NULL || mmap_buf->plain_ptr > buf->start - sizeof(m.msg))) { - nxt_unit_warn(ctx, "#%"PRIu32": failed to send plain memory buffer" - ": no space reserved for message header", stream); + nxt_unit_alert(req->ctx, + "#%"PRIu32": failed to send plain memory buffer" + ": no space reserved for message header", + req_impl->stream); goto free_buf; } memcpy(buf->start - sizeof(m.msg), &m.msg, sizeof(m.msg)); - nxt_unit_debug(ctx, "#%"PRIu32": send plain: %d", - stream, + nxt_unit_debug(req->ctx, "#%"PRIu32": send plain: %d", + req_impl->stream, (int) (sizeof(m.msg) + m.mmap_msg.size)); - res = lib->callbacks.port_send(ctx, &mmap_buf->port_id, - buf->start - sizeof(m.msg), - m.mmap_msg.size + sizeof(m.msg), - NULL, 0); + res = nxt_unit_port_send(req->ctx, req->response_port, + buf->start - sizeof(m.msg), + m.mmap_msg.size + sizeof(m.msg), + NULL, 0); if (nxt_slow_path(res != (ssize_t) (m.mmap_msg.size + sizeof(m.msg)))) { goto free_buf; } @@ -2154,7 +2606,6 @@ nxt_unit_free_outgoing_buf(nxt_unit_mmap_buf_t *mmap_buf) { if (mmap_buf->hdr != NULL) { nxt_unit_mmap_release(&mmap_buf->ctx_impl->ctx, - mmap_buf->process, mmap_buf->hdr, mmap_buf->buf.start, mmap_buf->buf.end - mmap_buf->buf.start); @@ -2175,33 +2626,43 @@ static nxt_unit_read_buf_t * nxt_unit_read_buf_get(nxt_unit_ctx_t *ctx) { nxt_unit_ctx_impl_t *ctx_impl; + nxt_unit_read_buf_t *rbuf; ctx_impl = nxt_container_of(ctx, nxt_unit_ctx_impl_t, ctx); pthread_mutex_lock(&ctx_impl->mutex); - return nxt_unit_read_buf_get_impl(ctx_impl); + rbuf = nxt_unit_read_buf_get_impl(ctx_impl); + + pthread_mutex_unlock(&ctx_impl->mutex); + + memset(rbuf->oob, 0, sizeof(struct cmsghdr)); + + return rbuf; } static nxt_unit_read_buf_t * nxt_unit_read_buf_get_impl(nxt_unit_ctx_impl_t *ctx_impl) { + nxt_queue_link_t *link; nxt_unit_read_buf_t *rbuf; - if (ctx_impl->free_read_buf != NULL) { - rbuf = ctx_impl->free_read_buf; - ctx_impl->free_read_buf = rbuf->next; + if (!nxt_queue_is_empty(&ctx_impl->free_rbuf)) { + link = nxt_queue_first(&ctx_impl->free_rbuf); + nxt_queue_remove(link); - pthread_mutex_unlock(&ctx_impl->mutex); + rbuf = nxt_container_of(link, nxt_unit_read_buf_t, link); return rbuf; } - pthread_mutex_unlock(&ctx_impl->mutex); - rbuf = malloc(sizeof(nxt_unit_read_buf_t)); + if (nxt_fast_path(rbuf != NULL)) { + rbuf->ctx_impl = ctx_impl; + } + return rbuf; } @@ -2216,8 +2677,7 @@ nxt_unit_read_buf_release(nxt_unit_ctx_t *ctx, pthread_mutex_lock(&ctx_impl->mutex); - rbuf->next = ctx_impl->free_read_buf; - ctx_impl->free_read_buf = rbuf; + nxt_queue_insert_head(&ctx_impl->free_rbuf, &rbuf->link); pthread_mutex_unlock(&ctx_impl->mutex); } @@ -2276,13 +2736,15 @@ nxt_unit_response_write_nb(nxt_unit_request_info_t *req, const void *start, nxt_unit_request_info_impl_t *req_impl; char local_buf[NXT_UNIT_LOCAL_BUF_SIZE]; + nxt_unit_req_debug(req, "write: %d", (int) size); + req_impl = nxt_container_of(req, nxt_unit_request_info_impl_t, req); part_start = start; sent = 0; if (nxt_slow_path(req_impl->state < NXT_UNIT_RS_RESPONSE_INIT)) { - nxt_unit_req_warn(req, "write: response not initialized yet"); + nxt_unit_req_alert(req, "write: response not initialized yet"); return -NXT_UNIT_ERROR; } @@ -2314,8 +2776,7 @@ nxt_unit_response_write_nb(nxt_unit_request_info_t *req, const void *start, min_part_size = nxt_min(min_size, part_size); min_part_size = nxt_min(min_part_size, PORT_MMAP_CHUNK_SIZE); - rc = nxt_unit_get_outgoing_buf(req->ctx, req_impl->process, - &req->response_port, part_size, + rc = nxt_unit_get_outgoing_buf(req->ctx, req->response_port, part_size, min_part_size, &mmap_buf, local_buf); if (nxt_slow_path(rc != NXT_UNIT_OK)) { return -rc; @@ -2330,7 +2791,7 @@ nxt_unit_response_write_nb(nxt_unit_request_info_t *req, const void *start, mmap_buf.buf.free = nxt_cpymem(mmap_buf.buf.free, part_start, part_size); - rc = nxt_unit_mmap_buf_send(req->ctx, req_impl->stream, &mmap_buf, 0); + rc = nxt_unit_mmap_buf_send(req, &mmap_buf, 0); if (nxt_slow_path(rc != NXT_UNIT_OK)) { return -rc; } @@ -2360,8 +2821,14 @@ nxt_unit_response_write_cb(nxt_unit_request_info_t *req, req_impl = nxt_container_of(req, nxt_unit_request_info_impl_t, req); + if (nxt_slow_path(req_impl->state < NXT_UNIT_RS_RESPONSE_INIT)) { + nxt_unit_req_alert(req, "write: response not initialized yet"); + + return NXT_UNIT_ERROR; + } + /* Check if response is not send yet. */ - if (nxt_slow_path(req->response_buf)) { + if (nxt_slow_path(req->response_buf != NULL)) { /* Enable content in headers buf. */ rc = nxt_unit_response_add_content(req, "", 0); @@ -2408,8 +2875,7 @@ nxt_unit_response_write_cb(nxt_unit_request_info_t *req, buf_size = nxt_min(read_info->buf_size, PORT_MMAP_DATA_SIZE); - rc = nxt_unit_get_outgoing_buf(req->ctx, req_impl->process, - &req->response_port, + rc = nxt_unit_get_outgoing_buf(req->ctx, req->response_port, buf_size, buf_size, &mmap_buf, local_buf); if (nxt_slow_path(rc != NXT_UNIT_OK)) { @@ -2431,7 +2897,7 @@ nxt_unit_response_write_cb(nxt_unit_request_info_t *req, buf->free += n; } - rc = nxt_unit_mmap_buf_send(req->ctx, req_impl->stream, &mmap_buf, 0); + rc = nxt_unit_mmap_buf_send(req, &mmap_buf, 0); if (nxt_slow_path(rc != NXT_UNIT_OK)) { nxt_unit_req_error(req, "Failed to send content"); @@ -2451,9 +2917,11 @@ nxt_unit_request_read(nxt_unit_request_info_t *req, void *dst, size_t size) buf_res = nxt_unit_buf_read(&req->content_buf, &req->content_length, dst, size); + nxt_unit_req_debug(req, "read: %d", (int) buf_res); + if (buf_res < (ssize_t) size && req->content_fd != -1) { res = read(req->content_fd, dst, size); - if (res < 0) { + if (nxt_slow_path(res < 0)) { nxt_unit_req_alert(req, "failed to read content: %s (%d)", strerror(errno), errno); @@ -2461,7 +2929,7 @@ nxt_unit_request_read(nxt_unit_request_info_t *req, void *dst, size_t size) } if (res < (ssize_t) size) { - close(req->content_fd); + nxt_unit_close(req->content_fd); req->content_fd = -1; } @@ -2561,7 +3029,6 @@ nxt_unit_request_preread(nxt_unit_request_info_t *req, size_t size) mmap_buf->buf.start = mmap_buf->free_ptr; mmap_buf->buf.free = mmap_buf->buf.start; mmap_buf->buf.end = mmap_buf->buf.start + size; - mmap_buf->process = NULL; res = read(req->content_fd, mmap_buf->free_ptr, size); if (res < 0) { @@ -2574,7 +3041,7 @@ nxt_unit_request_preread(nxt_unit_request_info_t *req, size_t size) } if (res < (ssize_t) size) { - close(req->content_fd); + nxt_unit_close(req->content_fd); req->content_fd = -1; } @@ -2689,8 +3156,8 @@ skip_response_send: msg.mf = 0; msg.tracking = 0; - (void) lib->callbacks.port_send(req->ctx, &req->response_port, - &msg, sizeof(msg), NULL, 0); + (void) nxt_unit_port_send(req->ctx, req->response_port, + &msg, sizeof(msg), NULL, 0); nxt_unit_request_info_release(req); } @@ -2710,17 +3177,14 @@ int nxt_unit_websocket_sendv(nxt_unit_request_info_t *req, uint8_t opcode, uint8_t last, const struct iovec *iov, int iovcnt) { - int i, rc; - size_t l, copy; - uint32_t payload_len, buf_size, alloc_size; - const uint8_t *b; - nxt_unit_buf_t *buf; - nxt_unit_mmap_buf_t mmap_buf; - nxt_websocket_header_t *wh; - nxt_unit_request_info_impl_t *req_impl; - char local_buf[NXT_UNIT_LOCAL_BUF_SIZE]; - - req_impl = nxt_container_of(req, nxt_unit_request_info_impl_t, req); + int i, rc; + size_t l, copy; + uint32_t payload_len, buf_size, alloc_size; + const uint8_t *b; + nxt_unit_buf_t *buf; + nxt_unit_mmap_buf_t mmap_buf; + nxt_websocket_header_t *wh; + char local_buf[NXT_UNIT_LOCAL_BUF_SIZE]; payload_len = 0; @@ -2731,8 +3195,7 @@ nxt_unit_websocket_sendv(nxt_unit_request_info_t *req, uint8_t opcode, buf_size = 10 + payload_len; alloc_size = nxt_min(buf_size, PORT_MMAP_DATA_SIZE); - rc = nxt_unit_get_outgoing_buf(req->ctx, req_impl->process, - &req->response_port, + rc = nxt_unit_get_outgoing_buf(req->ctx, req->response_port, alloc_size, alloc_size, &mmap_buf, local_buf); if (nxt_slow_path(rc != NXT_UNIT_OK)) { @@ -2766,8 +3229,7 @@ nxt_unit_websocket_sendv(nxt_unit_request_info_t *req, uint8_t opcode, if (l > 0) { if (nxt_fast_path(buf->free > buf->start)) { - rc = nxt_unit_mmap_buf_send(req->ctx, req_impl->stream, - &mmap_buf, 0); + rc = nxt_unit_mmap_buf_send(req, &mmap_buf, 0); if (nxt_slow_path(rc != NXT_UNIT_OK)) { return rc; @@ -2776,8 +3238,7 @@ nxt_unit_websocket_sendv(nxt_unit_request_info_t *req, uint8_t opcode, alloc_size = nxt_min(buf_size, PORT_MMAP_DATA_SIZE); - rc = nxt_unit_get_outgoing_buf(req->ctx, req_impl->process, - &req->response_port, + rc = nxt_unit_get_outgoing_buf(req->ctx, req->response_port, alloc_size, alloc_size, &mmap_buf, local_buf); if (nxt_slow_path(rc != NXT_UNIT_OK)) { @@ -2790,8 +3251,7 @@ nxt_unit_websocket_sendv(nxt_unit_request_info_t *req, uint8_t opcode, } if (buf->free > buf->start) { - rc = nxt_unit_mmap_buf_send(req->ctx, req_impl->stream, - &mmap_buf, 0); + rc = nxt_unit_mmap_buf_send(req, &mmap_buf, 0); } return rc; @@ -2864,8 +3324,8 @@ nxt_unit_websocket_done(nxt_unit_websocket_frame_t *ws) static nxt_port_mmap_header_t * -nxt_unit_mmap_get(nxt_unit_ctx_t *ctx, nxt_unit_process_t *process, - nxt_unit_port_id_t *port_id, nxt_chunk_id_t *c, int *n, int min_n) +nxt_unit_mmap_get(nxt_unit_ctx_t *ctx, nxt_unit_port_t *port, + nxt_chunk_id_t *c, int *n, int min_n) { int res, nchunks, i; uint32_t outgoing_size; @@ -2875,18 +3335,18 @@ nxt_unit_mmap_get(nxt_unit_ctx_t *ctx, nxt_unit_process_t *process, lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit); - pthread_mutex_lock(&process->outgoing.mutex); + pthread_mutex_lock(&lib->outgoing.mutex); retry: - outgoing_size = process->outgoing.size; + outgoing_size = lib->outgoing.size; - mm_end = process->outgoing.elts + outgoing_size; + mm_end = lib->outgoing.elts + outgoing_size; - for (mm = process->outgoing.elts; mm < mm_end; mm++) { + for (mm = lib->outgoing.elts; mm < mm_end; mm++) { hdr = mm->hdr; - if (hdr->sent_over != 0xFFFFu && hdr->sent_over != port_id->id) { + if (hdr->sent_over != 0xFFFFu && hdr->sent_over != port->id.id) { continue; } @@ -2930,13 +3390,13 @@ retry: if (outgoing_size >= lib->shm_mmap_limit) { /* Cannot allocate more shared memory. */ - pthread_mutex_unlock(&process->outgoing.mutex); + pthread_mutex_unlock(&lib->outgoing.mutex); if (min_n == 0) { *n = 0; } - if (nxt_slow_path(process->outgoing.allocated_chunks + min_n + if (nxt_slow_path(lib->outgoing.allocated_chunks + min_n >= lib->shm_mmap_limit * PORT_MMAP_CHUNK_COUNT)) { /* Memory allocated by application, but not send to router. */ @@ -2945,7 +3405,7 @@ retry: /* Notify router about OOSM condition. */ - res = nxt_unit_send_oosm(ctx, port_id); + res = nxt_unit_send_oosm(ctx, port); if (nxt_slow_path(res != NXT_UNIT_OK)) { return NULL; } @@ -2965,30 +3425,29 @@ retry: nxt_unit_debug(ctx, "oosm: retry"); - pthread_mutex_lock(&process->outgoing.mutex); + pthread_mutex_lock(&lib->outgoing.mutex); goto retry; } *c = 0; - hdr = nxt_unit_new_mmap(ctx, process, port_id, *n); + hdr = nxt_unit_new_mmap(ctx, port, *n); unlock: - nxt_atomic_fetch_add(&process->outgoing.allocated_chunks, *n); + nxt_atomic_fetch_add(&lib->outgoing.allocated_chunks, *n); - nxt_unit_debug(ctx, "process %d allocated_chunks %d", - process->pid, - (int) process->outgoing.allocated_chunks); + nxt_unit_debug(ctx, "allocated_chunks %d", + (int) lib->outgoing.allocated_chunks); - pthread_mutex_unlock(&process->outgoing.mutex); + pthread_mutex_unlock(&lib->outgoing.mutex); return hdr; } static int -nxt_unit_send_oosm(nxt_unit_ctx_t *ctx, nxt_unit_port_id_t *port_id) +nxt_unit_send_oosm(nxt_unit_ctx_t *ctx, nxt_unit_port_t *port) { ssize_t res; nxt_port_msg_t msg; @@ -3006,7 +3465,7 @@ nxt_unit_send_oosm(nxt_unit_ctx_t *ctx, nxt_unit_port_id_t *port_id) msg.mf = 0; msg.tracking = 0; - res = lib->callbacks.port_send(ctx, port_id, &msg, sizeof(msg), NULL, 0); + res = nxt_unit_port_send(ctx, lib->router_port, &msg, sizeof(msg), NULL, 0); if (nxt_slow_path(res != sizeof(msg))) { return NXT_UNIT_ERROR; } @@ -3018,7 +3477,7 @@ nxt_unit_send_oosm(nxt_unit_ctx_t *ctx, nxt_unit_port_id_t *port_id) static int nxt_unit_wait_shm_ack(nxt_unit_ctx_t *ctx) { - nxt_port_msg_t *port_msg; + int res; nxt_unit_ctx_impl_t *ctx_impl; nxt_unit_read_buf_t *rbuf; @@ -3030,31 +3489,25 @@ nxt_unit_wait_shm_ack(nxt_unit_ctx_t *ctx) return NXT_UNIT_ERROR; } - nxt_unit_read_buf(ctx, rbuf); - - if (nxt_slow_path(rbuf->size < (ssize_t) sizeof(nxt_port_msg_t))) { + res = nxt_unit_ctx_port_recv(ctx, ctx_impl->read_port, rbuf); + if (res == NXT_UNIT_ERROR) { nxt_unit_read_buf_release(ctx, rbuf); return NXT_UNIT_ERROR; } - port_msg = (nxt_port_msg_t *) rbuf->buf; - - if (port_msg->type == _NXT_PORT_MSG_SHM_ACK) { + if (nxt_unit_is_shm_ack(rbuf)) { nxt_unit_read_buf_release(ctx, rbuf); - break; } pthread_mutex_lock(&ctx_impl->mutex); - *ctx_impl->pending_read_tail = rbuf; - ctx_impl->pending_read_tail = &rbuf->next; - rbuf->next = NULL; + nxt_queue_insert_tail(&ctx_impl->pending_rbuf, &rbuf->link); pthread_mutex_unlock(&ctx_impl->mutex); - if (port_msg->type == _NXT_PORT_MSG_QUIT) { + if (nxt_unit_is_quit(rbuf)) { nxt_unit_debug(ctx, "oosm: quit received"); return NXT_UNIT_ERROR; @@ -3068,7 +3521,12 @@ nxt_unit_wait_shm_ack(nxt_unit_ctx_t *ctx) static nxt_unit_mmap_t * nxt_unit_mmap_at(nxt_unit_mmaps_t *mmaps, uint32_t i) { - uint32_t cap; + uint32_t cap, n; + nxt_unit_mmap_t *e; + + if (nxt_fast_path(mmaps->size > i)) { + return mmaps->elts + i; + } cap = mmaps->cap; @@ -3088,13 +3546,19 @@ nxt_unit_mmap_at(nxt_unit_mmaps_t *mmaps, uint32_t i) if (cap != mmaps->cap) { - mmaps->elts = realloc(mmaps->elts, cap * sizeof(*mmaps->elts)); - if (nxt_slow_path(mmaps->elts == NULL)) { + e = realloc(mmaps->elts, cap * sizeof(nxt_unit_mmap_t)); + if (nxt_slow_path(e == NULL)) { return NULL; } - memset(mmaps->elts + mmaps->cap, 0, - sizeof(*mmaps->elts) * (cap - mmaps->cap)); + mmaps->elts = e; + + for (n = mmaps->cap; n < cap; n++) { + e = mmaps->elts + n; + + e->hdr = NULL; + nxt_queue_init(&e->awaiting_rbuf); + } mmaps->cap = cap; } @@ -3108,27 +3572,100 @@ nxt_unit_mmap_at(nxt_unit_mmaps_t *mmaps, uint32_t i) static nxt_port_mmap_header_t * -nxt_unit_new_mmap(nxt_unit_ctx_t *ctx, nxt_unit_process_t *process, - nxt_unit_port_id_t *port_id, int n) +nxt_unit_new_mmap(nxt_unit_ctx_t *ctx, nxt_unit_port_t *port, int n) { int i, fd, rc; void *mem; - char name[64]; nxt_unit_mmap_t *mm; nxt_unit_impl_t *lib; nxt_port_mmap_header_t *hdr; - lib = process->lib; + lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit); - mm = nxt_unit_mmap_at(&process->outgoing, process->outgoing.size); + mm = nxt_unit_mmap_at(&lib->outgoing, lib->outgoing.size); if (nxt_slow_path(mm == NULL)) { - nxt_unit_warn(ctx, "failed to add mmap to outgoing array"); + nxt_unit_alert(ctx, "failed to add mmap to outgoing array"); return NULL; } + fd = nxt_unit_shm_open(ctx, PORT_MMAP_SIZE); + if (nxt_slow_path(fd == -1)) { + goto remove_fail; + } + + mem = mmap(NULL, PORT_MMAP_SIZE, PROT_READ | PROT_WRITE, MAP_SHARED, fd, 0); + if (nxt_slow_path(mem == MAP_FAILED)) { + nxt_unit_alert(ctx, "mmap(%d) failed: %s (%d)", fd, + strerror(errno), errno); + + nxt_unit_close(fd); + + goto remove_fail; + } + + mm->hdr = mem; + hdr = mem; + + memset(hdr->free_map, 0xFFU, sizeof(hdr->free_map)); + memset(hdr->free_tracking_map, 0xFFU, sizeof(hdr->free_tracking_map)); + + hdr->id = lib->outgoing.size - 1; + hdr->src_pid = lib->pid; + hdr->dst_pid = port->id.pid; + hdr->sent_over = port->id.id; + + /* Mark first n chunk(s) as busy */ + for (i = 0; i < n; i++) { + nxt_port_mmap_set_chunk_busy(hdr->free_map, i); + } + + /* Mark as busy chunk followed the last available chunk. */ + nxt_port_mmap_set_chunk_busy(hdr->free_map, PORT_MMAP_CHUNK_COUNT); + nxt_port_mmap_set_chunk_busy(hdr->free_tracking_map, PORT_MMAP_CHUNK_COUNT); + + pthread_mutex_unlock(&lib->outgoing.mutex); + + rc = nxt_unit_send_mmap(ctx, port, fd); + if (nxt_slow_path(rc != NXT_UNIT_OK)) { + munmap(mem, PORT_MMAP_SIZE); + hdr = NULL; + + } else { + nxt_unit_debug(ctx, "new mmap #%"PRIu32" created for %d -> %d", + hdr->id, (int) lib->pid, (int) port->id.pid); + } + + nxt_unit_close(fd); + + pthread_mutex_lock(&lib->outgoing.mutex); + + if (nxt_fast_path(hdr != NULL)) { + return hdr; + } + +remove_fail: + + lib->outgoing.size--; + + return NULL; +} + + +static int +nxt_unit_shm_open(nxt_unit_ctx_t *ctx, size_t size) +{ + int fd; + nxt_unit_impl_t *lib; + + lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit); + +#if (NXT_HAVE_MEMFD_CREATE || NXT_HAVE_SHM_OPEN) + char name[64]; + snprintf(name, sizeof(name), NXT_SHM_PREFIX "unit.%d.%p", lib->pid, (void *) pthread_self()); +#endif #if (NXT_HAVE_MEMFD_CREATE) @@ -3137,7 +3674,7 @@ nxt_unit_new_mmap(nxt_unit_ctx_t *ctx, nxt_unit_process_t *process, nxt_unit_alert(ctx, "memfd_create(%s) failed: %s (%d)", name, strerror(errno), errno); - goto remove_fail; + return -1; } nxt_unit_debug(ctx, "memfd_create(%s): %d", name, fd); @@ -3149,7 +3686,7 @@ nxt_unit_new_mmap(nxt_unit_ctx_t *ctx, nxt_unit_process_t *process, nxt_unit_alert(ctx, "shm_open(SHM_ANON) failed: %s (%d)", strerror(errno), errno); - goto remove_fail; + return -1; } #elif (NXT_HAVE_SHM_OPEN) @@ -3162,12 +3699,12 @@ nxt_unit_new_mmap(nxt_unit_ctx_t *ctx, nxt_unit_process_t *process, nxt_unit_alert(ctx, "shm_open(%s) failed: %s (%d)", name, strerror(errno), errno); - goto remove_fail; + return -1; } if (nxt_slow_path(shm_unlink(name) == -1)) { - nxt_unit_warn(ctx, "shm_unlink(%s) failed: %s (%d)", name, - strerror(errno), errno); + nxt_unit_alert(ctx, "shm_unlink(%s) failed: %s (%d)", name, + strerror(errno), errno); } #else @@ -3176,71 +3713,21 @@ nxt_unit_new_mmap(nxt_unit_ctx_t *ctx, nxt_unit_process_t *process, #endif - if (nxt_slow_path(ftruncate(fd, PORT_MMAP_SIZE) == -1)) { + if (nxt_slow_path(ftruncate(fd, size) == -1)) { nxt_unit_alert(ctx, "ftruncate(%d) failed: %s (%d)", fd, strerror(errno), errno); - goto remove_fail; - } - - mem = mmap(NULL, PORT_MMAP_SIZE, PROT_READ | PROT_WRITE, MAP_SHARED, fd, 0); - if (nxt_slow_path(mem == MAP_FAILED)) { - nxt_unit_alert(ctx, "mmap(%d) failed: %s (%d)", fd, - strerror(errno), errno); - - goto remove_fail; - } - - mm->hdr = mem; - hdr = mem; - - memset(hdr->free_map, 0xFFU, sizeof(hdr->free_map)); - memset(hdr->free_tracking_map, 0xFFU, sizeof(hdr->free_tracking_map)); - - hdr->id = process->outgoing.size - 1; - hdr->src_pid = lib->pid; - hdr->dst_pid = process->pid; - hdr->sent_over = port_id->id; + nxt_unit_close(fd); - /* Mark first n chunk(s) as busy */ - for (i = 0; i < n; i++) { - nxt_port_mmap_set_chunk_busy(hdr->free_map, i); - } - - /* Mark as busy chunk followed the last available chunk. */ - nxt_port_mmap_set_chunk_busy(hdr->free_map, PORT_MMAP_CHUNK_COUNT); - nxt_port_mmap_set_chunk_busy(hdr->free_tracking_map, PORT_MMAP_CHUNK_COUNT); - - pthread_mutex_unlock(&process->outgoing.mutex); - - rc = nxt_unit_send_mmap(ctx, port_id, fd); - if (nxt_slow_path(rc != NXT_UNIT_OK)) { - munmap(mem, PORT_MMAP_SIZE); - hdr = NULL; - - } else { - nxt_unit_debug(ctx, "new mmap #%"PRIu32" created for %d -> %d", - hdr->id, (int) lib->pid, (int) process->pid); - } - - close(fd); - - pthread_mutex_lock(&process->outgoing.mutex); - - if (nxt_fast_path(hdr != NULL)) { - return hdr; + return -1; } -remove_fail: - - process->outgoing.size--; - - return NULL; + return fd; } static int -nxt_unit_send_mmap(nxt_unit_ctx_t *ctx, nxt_unit_port_id_t *port_id, int fd) +nxt_unit_send_mmap(nxt_unit_ctx_t *ctx, nxt_unit_port_t *port, int fd) { ssize_t res; nxt_port_msg_t msg; @@ -3284,8 +3771,8 @@ nxt_unit_send_mmap(nxt_unit_ctx_t *ctx, nxt_unit_port_id_t *port_id, int fd) */ memcpy(CMSG_DATA(&cmsg.cm), &fd, sizeof(int)); - res = lib->callbacks.port_send(ctx, port_id, &msg, sizeof(msg), - &cmsg, sizeof(cmsg)); + res = nxt_unit_port_send(ctx, port, &msg, sizeof(msg), + &cmsg, sizeof(cmsg)); if (nxt_slow_path(res != sizeof(msg))) { return NXT_UNIT_ERROR; } @@ -3295,8 +3782,8 @@ nxt_unit_send_mmap(nxt_unit_ctx_t *ctx, nxt_unit_port_id_t *port_id, int fd) static int -nxt_unit_get_outgoing_buf(nxt_unit_ctx_t *ctx, nxt_unit_process_t *process, - nxt_unit_port_id_t *port_id, uint32_t size, uint32_t min_size, +nxt_unit_get_outgoing_buf(nxt_unit_ctx_t *ctx, nxt_unit_port_t *port, + uint32_t size, uint32_t min_size, nxt_unit_mmap_buf_t *mmap_buf, char *local_buf) { int nchunks, min_nchunks; @@ -3321,8 +3808,6 @@ nxt_unit_get_outgoing_buf(nxt_unit_ctx_t *ctx, nxt_unit_process_t *process, mmap_buf->buf.start = mmap_buf->plain_ptr + sizeof(nxt_port_msg_t); mmap_buf->buf.free = mmap_buf->buf.start; mmap_buf->buf.end = mmap_buf->buf.start + size; - mmap_buf->port_id = *port_id; - mmap_buf->process = process; nxt_unit_debug(ctx, "outgoing plain buffer allocation: (%p, %d)", mmap_buf->buf.start, (int) size); @@ -3333,7 +3818,7 @@ nxt_unit_get_outgoing_buf(nxt_unit_ctx_t *ctx, nxt_unit_process_t *process, nchunks = (size + PORT_MMAP_CHUNK_SIZE - 1) / PORT_MMAP_CHUNK_SIZE; min_nchunks = (min_size + PORT_MMAP_CHUNK_SIZE - 1) / PORT_MMAP_CHUNK_SIZE; - hdr = nxt_unit_mmap_get(ctx, process, port_id, &c, &nchunks, min_nchunks); + hdr = nxt_unit_mmap_get(ctx, port, &c, &nchunks, min_nchunks); if (nxt_slow_path(hdr == NULL)) { if (nxt_fast_path(min_nchunks == 0 && nchunks == 0)) { mmap_buf->hdr = NULL; @@ -3352,8 +3837,6 @@ nxt_unit_get_outgoing_buf(nxt_unit_ctx_t *ctx, nxt_unit_process_t *process, mmap_buf->buf.start = (char *) nxt_port_mmap_chunk_start(hdr, c); mmap_buf->buf.free = mmap_buf->buf.start; mmap_buf->buf.end = mmap_buf->buf.start + nchunks * PORT_MMAP_CHUNK_SIZE; - mmap_buf->port_id = *port_id; - mmap_buf->process = process; mmap_buf->free_ptr = NULL; mmap_buf->ctx_impl = nxt_container_of(ctx, nxt_unit_ctx_impl_t, ctx); @@ -3368,83 +3851,87 @@ nxt_unit_get_outgoing_buf(nxt_unit_ctx_t *ctx, nxt_unit_process_t *process, static int nxt_unit_incoming_mmap(nxt_unit_ctx_t *ctx, pid_t pid, int fd) { - int rc; - void *mem; - struct stat mmap_stat; - nxt_unit_mmap_t *mm; - nxt_unit_impl_t *lib; - nxt_unit_process_t *process; - nxt_port_mmap_header_t *hdr; + int rc; + void *mem; + nxt_queue_t awaiting_rbuf; + struct stat mmap_stat; + nxt_unit_mmap_t *mm; + nxt_unit_impl_t *lib; + nxt_unit_ctx_impl_t *ctx_impl; + nxt_unit_read_buf_t *rbuf; + nxt_port_mmap_header_t *hdr; lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit); nxt_unit_debug(ctx, "incoming_mmap: fd %d from process %d", fd, (int) pid); - pthread_mutex_lock(&lib->mutex); - - process = nxt_unit_process_find(ctx, pid, 0); - - pthread_mutex_unlock(&lib->mutex); - - if (nxt_slow_path(process == NULL)) { - nxt_unit_warn(ctx, "incoming_mmap: process %d not found, fd %d", - (int) pid, fd); - - return NXT_UNIT_ERROR; - } - - rc = NXT_UNIT_ERROR; - if (fstat(fd, &mmap_stat) == -1) { - nxt_unit_warn(ctx, "incoming_mmap: fstat(%d) failed: %s (%d)", fd, - strerror(errno), errno); + nxt_unit_alert(ctx, "incoming_mmap: fstat(%d) failed: %s (%d)", fd, + strerror(errno), errno); - goto fail; + return NXT_UNIT_ERROR; } mem = mmap(NULL, mmap_stat.st_size, PROT_READ | PROT_WRITE, MAP_SHARED, fd, 0); if (nxt_slow_path(mem == MAP_FAILED)) { - nxt_unit_warn(ctx, "incoming_mmap: mmap() failed: %s (%d)", - strerror(errno), errno); + nxt_unit_alert(ctx, "incoming_mmap: mmap() failed: %s (%d)", + strerror(errno), errno); - goto fail; + return NXT_UNIT_ERROR; } hdr = mem; - if (nxt_slow_path(hdr->src_pid != pid || hdr->dst_pid != lib->pid)) { + if (nxt_slow_path(hdr->src_pid != pid)) { - nxt_unit_warn(ctx, "incoming_mmap: unexpected pid in mmap header " - "detected: %d != %d or %d != %d", (int) hdr->src_pid, - (int) pid, (int) hdr->dst_pid, (int) lib->pid); + nxt_unit_alert(ctx, "incoming_mmap: unexpected pid in mmap header " + "detected: %d != %d or %d != %d", (int) hdr->src_pid, + (int) pid, (int) hdr->dst_pid, (int) lib->pid); munmap(mem, PORT_MMAP_SIZE); - goto fail; + return NXT_UNIT_ERROR; } - pthread_mutex_lock(&process->incoming.mutex); + nxt_queue_init(&awaiting_rbuf); + + pthread_mutex_lock(&lib->incoming.mutex); - mm = nxt_unit_mmap_at(&process->incoming, hdr->id); + mm = nxt_unit_mmap_at(&lib->incoming, hdr->id); if (nxt_slow_path(mm == NULL)) { - nxt_unit_warn(ctx, "incoming_mmap: failed to add to incoming array"); + nxt_unit_alert(ctx, "incoming_mmap: failed to add to incoming array"); munmap(mem, PORT_MMAP_SIZE); + rc = NXT_UNIT_ERROR; + } else { mm->hdr = hdr; hdr->sent_over = 0xFFFFu; + nxt_queue_add(&awaiting_rbuf, &mm->awaiting_rbuf); + nxt_queue_init(&mm->awaiting_rbuf); + rc = NXT_UNIT_OK; } - pthread_mutex_unlock(&process->incoming.mutex); + pthread_mutex_unlock(&lib->incoming.mutex); -fail: + nxt_queue_each(rbuf, &awaiting_rbuf, nxt_unit_read_buf_t, link) { - nxt_unit_process_use(ctx, process, -1); + ctx_impl = rbuf->ctx_impl; + + pthread_mutex_lock(&ctx_impl->mutex); + + nxt_queue_insert_head(&ctx_impl->pending_rbuf, &rbuf->link); + + pthread_mutex_unlock(&ctx_impl->mutex); + + nxt_atomic_fetch_add(&ctx_impl->wait_items, -1); + + } nxt_queue_loop; return rc; } @@ -3462,18 +3949,22 @@ nxt_unit_mmaps_init(nxt_unit_mmaps_t *mmaps) } -static void -nxt_unit_process_use(nxt_unit_ctx_t *ctx, nxt_unit_process_t *process, int i) +nxt_inline void +nxt_unit_process_use(nxt_unit_process_t *process) { - long c; + nxt_atomic_fetch_add(&process->use_count, 1); +} + - c = nxt_atomic_fetch_add(&process->use_count, i); +nxt_inline void +nxt_unit_process_release(nxt_unit_process_t *process) +{ + long c; - if (i < 0 && c == -i) { - nxt_unit_debug(ctx, "destroy process #%d", (int) process->pid); + c = nxt_atomic_fetch_add(&process->use_count, -1); - nxt_unit_mmaps_destroy(&process->incoming); - nxt_unit_mmaps_destroy(&process->outgoing); + if (c == 1) { + nxt_unit_debug(NULL, "destroy process #%d", (int) process->pid); free(process); } @@ -3499,85 +3990,62 @@ nxt_unit_mmaps_destroy(nxt_unit_mmaps_t *mmaps) } -static nxt_port_mmap_header_t * -nxt_unit_get_incoming_mmap(nxt_unit_ctx_t *ctx, nxt_unit_process_t *process, - uint32_t id) +static int +nxt_unit_check_rbuf_mmap(nxt_unit_ctx_t *ctx, nxt_unit_mmaps_t *mmaps, + pid_t pid, uint32_t id, nxt_port_mmap_header_t **hdr, + nxt_unit_read_buf_t *rbuf) { - nxt_port_mmap_header_t *hdr; - - if (nxt_fast_path(process->incoming.size > id)) { - hdr = process->incoming.elts[id].hdr; - - } else { - hdr = NULL; - } + int res, need_rbuf; + nxt_unit_mmap_t *mm; + nxt_unit_ctx_impl_t *ctx_impl; - return hdr; -} + mm = nxt_unit_mmap_at(mmaps, id); + if (nxt_slow_path(mm == NULL)) { + nxt_unit_alert(ctx, "failed to allocate mmap"); + pthread_mutex_unlock(&mmaps->mutex); -static int -nxt_unit_tracking_read(nxt_unit_ctx_t *ctx, nxt_unit_recv_msg_t *recv_msg) -{ - int rc; - nxt_chunk_id_t c; - nxt_unit_process_t *process; - nxt_port_mmap_header_t *hdr; - nxt_port_mmap_tracking_msg_t *tracking_msg; + *hdr = NULL; - if (recv_msg->size < (int) sizeof(nxt_port_mmap_tracking_msg_t)) { - nxt_unit_warn(ctx, "#%"PRIu32": tracking_read: too small message (%d)", - recv_msg->stream, (int) recv_msg->size); - - return 0; + return NXT_UNIT_ERROR; } - tracking_msg = recv_msg->start; + *hdr = mm->hdr; - recv_msg->start = tracking_msg + 1; - recv_msg->size -= sizeof(nxt_port_mmap_tracking_msg_t); - - process = nxt_unit_msg_get_process(ctx, recv_msg); - if (nxt_slow_path(process == NULL)) { - return 0; + if (nxt_fast_path(*hdr != NULL)) { + return NXT_UNIT_OK; } - pthread_mutex_lock(&process->incoming.mutex); - - hdr = nxt_unit_get_incoming_mmap(ctx, process, tracking_msg->mmap_id); - if (nxt_slow_path(hdr == NULL)) { - pthread_mutex_unlock(&process->incoming.mutex); + need_rbuf = nxt_queue_is_empty(&mm->awaiting_rbuf); - nxt_unit_warn(ctx, "#%"PRIu32": tracking_read: " - "invalid mmap id %d,%"PRIu32, - recv_msg->stream, (int) process->pid, - tracking_msg->mmap_id); + nxt_queue_insert_tail(&mm->awaiting_rbuf, &rbuf->link); - return 0; - } + pthread_mutex_unlock(&mmaps->mutex); - c = tracking_msg->tracking_id; - rc = nxt_atomic_cmp_set(hdr->tracking + c, recv_msg->stream, 0); + ctx_impl = nxt_container_of(ctx, nxt_unit_ctx_impl_t, ctx); - if (rc == 0) { - nxt_unit_debug(ctx, "#%"PRIu32": tracking cancelled", - recv_msg->stream); + nxt_atomic_fetch_add(&ctx_impl->wait_items, 1); - nxt_port_mmap_set_chunk_free(hdr->free_tracking_map, c); + if (need_rbuf) { + res = nxt_unit_get_mmap(ctx, pid, id); + if (nxt_slow_path(res == NXT_UNIT_ERROR)) { + return NXT_UNIT_ERROR; + } } - pthread_mutex_unlock(&process->incoming.mutex); - - return rc; + return NXT_UNIT_AGAIN; } static int -nxt_unit_mmap_read(nxt_unit_ctx_t *ctx, nxt_unit_recv_msg_t *recv_msg) +nxt_unit_mmap_read(nxt_unit_ctx_t *ctx, nxt_unit_recv_msg_t *recv_msg, + nxt_unit_read_buf_t *rbuf) { + int res; void *start; uint32_t size; - nxt_unit_process_t *process; + nxt_unit_impl_t *lib; + nxt_unit_mmaps_t *mmaps; nxt_unit_mmap_buf_t *b, **incoming_tail; nxt_port_mmap_msg_t *mmap_msg, *end; nxt_port_mmap_header_t *hdr; @@ -3589,22 +4057,22 @@ nxt_unit_mmap_read(nxt_unit_ctx_t *ctx, nxt_unit_recv_msg_t *recv_msg) return NXT_UNIT_ERROR; } - process = nxt_unit_msg_get_process(ctx, recv_msg); - if (nxt_slow_path(process == NULL)) { - return NXT_UNIT_ERROR; - } - mmap_msg = recv_msg->start; end = nxt_pointer_to(recv_msg->start, recv_msg->size); incoming_tail = &recv_msg->incoming_buf; + /* Allocating buffer structures. */ for (; mmap_msg < end; mmap_msg++) { b = nxt_unit_mmap_buf_get(ctx); if (nxt_slow_path(b == NULL)) { nxt_unit_warn(ctx, "#%"PRIu32": mmap_read: failed to allocate buf", recv_msg->stream); + while (recv_msg->incoming_buf != NULL) { + nxt_unit_mmap_buf_release(recv_msg->incoming_buf); + } + return NXT_UNIT_ERROR; } @@ -3615,19 +4083,23 @@ nxt_unit_mmap_read(nxt_unit_ctx_t *ctx, nxt_unit_recv_msg_t *recv_msg) b = recv_msg->incoming_buf; mmap_msg = recv_msg->start; - pthread_mutex_lock(&process->incoming.mutex); + lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit); + + mmaps = &lib->incoming; + + pthread_mutex_lock(&mmaps->mutex); for (; mmap_msg < end; mmap_msg++) { - hdr = nxt_unit_get_incoming_mmap(ctx, process, mmap_msg->mmap_id); - if (nxt_slow_path(hdr == NULL)) { - pthread_mutex_unlock(&process->incoming.mutex); + res = nxt_unit_check_rbuf_mmap(ctx, mmaps, + recv_msg->pid, mmap_msg->mmap_id, + &hdr, rbuf); - nxt_unit_warn(ctx, "#%"PRIu32": mmap_read: " - "invalid mmap id %d,%"PRIu32, - recv_msg->stream, (int) process->pid, - mmap_msg->mmap_id); + if (nxt_slow_path(res != NXT_UNIT_OK)) { + while (recv_msg->incoming_buf != NULL) { + nxt_unit_mmap_buf_release(recv_msg->incoming_buf); + } - return NXT_UNIT_ERROR; + return res; } start = nxt_port_mmap_chunk_start(hdr, mmap_msg->chunk_id); @@ -3642,7 +4114,6 @@ nxt_unit_mmap_read(nxt_unit_ctx_t *ctx, nxt_unit_recv_msg_t *recv_msg) b->buf.free = start; b->buf.end = b->buf.start + size; b->hdr = hdr; - b->process = process; b = b->next; @@ -3654,15 +4125,48 @@ nxt_unit_mmap_read(nxt_unit_ctx_t *ctx, nxt_unit_recv_msg_t *recv_msg) (int) mmap_msg->size); } - pthread_mutex_unlock(&process->incoming.mutex); + pthread_mutex_unlock(&mmaps->mutex); + + return NXT_UNIT_OK; +} + + +static int +nxt_unit_get_mmap(nxt_unit_ctx_t *ctx, pid_t pid, uint32_t id) +{ + ssize_t res; + nxt_unit_impl_t *lib; + nxt_unit_ctx_impl_t *ctx_impl; + + struct { + nxt_port_msg_t msg; + nxt_port_msg_get_mmap_t get_mmap; + } m; + + lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit); + ctx_impl = nxt_container_of(ctx, nxt_unit_ctx_impl_t, ctx); + + memset(&m.msg, 0, sizeof(nxt_port_msg_t)); + + m.msg.pid = lib->pid; + m.msg.reply_port = ctx_impl->read_port->id.id; + m.msg.type = _NXT_PORT_MSG_GET_MMAP; + + m.get_mmap.id = id; + + nxt_unit_debug(ctx, "get_mmap: %d %d", (int) pid, (int) id); + + res = nxt_unit_port_send(ctx, lib->router_port, &m, sizeof(m), NULL, 0); + if (nxt_slow_path(res != sizeof(m))) { + return NXT_UNIT_ERROR; + } return NXT_UNIT_OK; } static void -nxt_unit_mmap_release(nxt_unit_ctx_t *ctx, - nxt_unit_process_t *process, nxt_port_mmap_header_t *hdr, +nxt_unit_mmap_release(nxt_unit_ctx_t *ctx, nxt_port_mmap_header_t *hdr, void *start, uint32_t size) { int freed_chunks; @@ -3688,12 +4192,10 @@ nxt_unit_mmap_release(nxt_unit_ctx_t *ctx, lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit); if (hdr->src_pid == lib->pid && freed_chunks != 0) { - nxt_atomic_fetch_add(&process->outgoing.allocated_chunks, - -freed_chunks); + nxt_atomic_fetch_add(&lib->outgoing.allocated_chunks, -freed_chunks); - nxt_unit_debug(ctx, "process %d allocated_chunks %d", - process->pid, - (int) process->outgoing.allocated_chunks); + nxt_unit_debug(ctx, "allocated_chunks %d", + (int) lib->outgoing.allocated_chunks); } if (hdr->dst_pid == lib->pid @@ -3708,15 +4210,12 @@ nxt_unit_mmap_release(nxt_unit_ctx_t *ctx, static int nxt_unit_send_shm_ack(nxt_unit_ctx_t *ctx, pid_t pid) { - ssize_t res; - nxt_port_msg_t msg; - nxt_unit_impl_t *lib; - nxt_unit_port_id_t port_id; + ssize_t res; + nxt_port_msg_t msg; + nxt_unit_impl_t *lib; lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit); - nxt_unit_port_id_init(&port_id, pid, 0); - msg.stream = 0; msg.pid = lib->pid; msg.reply_port = 0; @@ -3727,7 +4226,7 @@ nxt_unit_send_shm_ack(nxt_unit_ctx_t *ctx, pid_t pid) msg.mf = 0; msg.tracking = 0; - res = lib->callbacks.port_send(ctx, &port_id, &msg, sizeof(msg), NULL, 0); + res = nxt_unit_port_send(ctx, lib->router_port, &msg, sizeof(msg), NULL, 0); if (nxt_slow_path(res != sizeof(msg))) { return NXT_UNIT_ERROR; } @@ -3772,40 +4271,34 @@ nxt_unit_process_lhq_pid(nxt_lvlhsh_query_t *lhq, pid_t *pid) static nxt_unit_process_t * -nxt_unit_process_get(nxt_unit_ctx_t *ctx, pid_t pid) +nxt_unit_process_get(nxt_unit_impl_t *lib, pid_t pid) { - nxt_unit_impl_t *lib; nxt_unit_process_t *process; nxt_lvlhsh_query_t lhq; - lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit); - nxt_unit_process_lhq_pid(&lhq, &pid); if (nxt_lvlhsh_find(&lib->processes, &lhq) == NXT_OK) { process = lhq.value; - nxt_unit_process_use(ctx, process, 1); + nxt_unit_process_use(process); return process; } process = malloc(sizeof(nxt_unit_process_t)); if (nxt_slow_path(process == NULL)) { - nxt_unit_warn(ctx, "failed to allocate process for #%d", (int) pid); + nxt_unit_alert(NULL, "failed to allocate process for #%d", (int) pid); return NULL; } process->pid = pid; - process->use_count = 1; + process->use_count = 2; process->next_port_id = 0; process->lib = lib; nxt_queue_init(&process->ports); - nxt_unit_mmaps_init(&process->incoming); - nxt_unit_mmaps_init(&process->outgoing); - lhq.replace = 0; lhq.value = process; @@ -3815,31 +4308,23 @@ nxt_unit_process_get(nxt_unit_ctx_t *ctx, pid_t pid) break; default: - nxt_unit_warn(ctx, "process %d insert failed", (int) pid); + nxt_unit_alert(NULL, "process %d insert failed", (int) pid); - pthread_mutex_destroy(&process->outgoing.mutex); - pthread_mutex_destroy(&process->incoming.mutex); free(process); process = NULL; break; } - nxt_unit_process_use(ctx, process, 1); - return process; } static nxt_unit_process_t * -nxt_unit_process_find(nxt_unit_ctx_t *ctx, pid_t pid, int remove) +nxt_unit_process_find(nxt_unit_impl_t *lib, pid_t pid, int remove) { int rc; - nxt_unit_impl_t *lib; - nxt_unit_process_t *process; nxt_lvlhsh_query_t lhq; - lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit); - nxt_unit_process_lhq_pid(&lhq, &pid); if (remove) { @@ -3850,13 +4335,11 @@ nxt_unit_process_find(nxt_unit_ctx_t *ctx, pid_t pid, int remove) } if (rc == NXT_OK) { - process = lhq.value; - if (!remove) { - nxt_unit_process_use(ctx, process, 1); + nxt_unit_process_use(lhq.value); } - return process; + return lhq.value; } return NULL; @@ -3876,17 +4359,21 @@ nxt_unit_run(nxt_unit_ctx_t *ctx) int rc; nxt_unit_impl_t *lib; + nxt_unit_ctx_use(ctx); + lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit); rc = NXT_UNIT_OK; while (nxt_fast_path(lib->online)) { - rc = nxt_unit_run_once(ctx); + rc = nxt_unit_run_once_impl(ctx); - if (nxt_slow_path(rc != NXT_UNIT_OK)) { + if (nxt_slow_path(rc == NXT_UNIT_ERROR)) { break; } } + nxt_unit_ctx_release(ctx); + return rc; } @@ -3894,174 +4381,578 @@ nxt_unit_run(nxt_unit_ctx_t *ctx) int nxt_unit_run_once(nxt_unit_ctx_t *ctx) { + int rc; + + nxt_unit_ctx_use(ctx); + + rc = nxt_unit_run_once_impl(ctx); + + nxt_unit_ctx_release(ctx); + + return rc; +} + + +static int +nxt_unit_run_once_impl(nxt_unit_ctx_t *ctx) +{ int rc; - nxt_unit_ctx_impl_t *ctx_impl; nxt_unit_read_buf_t *rbuf; + rbuf = nxt_unit_read_buf_get(ctx); + if (nxt_slow_path(rbuf == NULL)) { + return NXT_UNIT_ERROR; + } + + rc = nxt_unit_read_buf(ctx, rbuf); + if (nxt_slow_path(rc != NXT_UNIT_OK)) { + nxt_unit_read_buf_release(ctx, rbuf); + + return rc; + } + + rc = nxt_unit_process_msg(ctx, rbuf); + if (nxt_slow_path(rc == NXT_UNIT_ERROR)) { + return NXT_UNIT_ERROR; + } + + rc = nxt_unit_process_pending_rbuf(ctx); + if (nxt_slow_path(rc == NXT_UNIT_ERROR)) { + return NXT_UNIT_ERROR; + } + + nxt_unit_process_ready_req(ctx); + + return rc; +} + + +static int +nxt_unit_read_buf(nxt_unit_ctx_t *ctx, nxt_unit_read_buf_t *rbuf) +{ + int nevents, res, err; + nxt_unit_impl_t *lib; + nxt_unit_ctx_impl_t *ctx_impl; + nxt_unit_port_impl_t *port_impl; + struct pollfd fds[2]; + + lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit); ctx_impl = nxt_container_of(ctx, nxt_unit_ctx_impl_t, ctx); - pthread_mutex_lock(&ctx_impl->mutex); + if (ctx_impl->wait_items > 0 || lib->shared_port == NULL) { - if (ctx_impl->pending_read_head != NULL) { - rbuf = ctx_impl->pending_read_head; - ctx_impl->pending_read_head = rbuf->next; + return nxt_unit_ctx_port_recv(ctx, ctx_impl->read_port, rbuf); + } + + port_impl = nxt_container_of(ctx_impl->read_port, nxt_unit_port_impl_t, + port); - if (ctx_impl->pending_read_tail == &rbuf->next) { - ctx_impl->pending_read_tail = &ctx_impl->pending_read_head; +retry: + + if (port_impl->from_socket == 0) { + res = nxt_unit_port_queue_recv(ctx_impl->read_port, rbuf); + if (res == NXT_UNIT_OK) { + if (nxt_unit_is_read_socket(rbuf)) { + port_impl->from_socket++; + + nxt_unit_debug(ctx, "port{%d,%d} dequeue 1 read_socket %d", + (int) ctx_impl->read_port->id.pid, + (int) ctx_impl->read_port->id.id, + port_impl->from_socket); + + } else { + nxt_unit_debug(ctx, "port{%d,%d} dequeue %d", + (int) ctx_impl->read_port->id.pid, + (int) ctx_impl->read_port->id.id, + (int) rbuf->size); + + return NXT_UNIT_OK; + } } + } - pthread_mutex_unlock(&ctx_impl->mutex); + res = nxt_unit_app_queue_recv(lib->shared_port, rbuf); + if (res == NXT_UNIT_OK) { + return NXT_UNIT_OK; + } - } else { - rbuf = nxt_unit_read_buf_get_impl(ctx_impl); - if (nxt_slow_path(rbuf == NULL)) { - return NXT_UNIT_ERROR; + fds[0].fd = ctx_impl->read_port->in_fd; + fds[0].events = POLLIN; + fds[0].revents = 0; + + fds[1].fd = lib->shared_port->in_fd; + fds[1].events = POLLIN; + fds[1].revents = 0; + + nevents = poll(fds, 2, -1); + if (nxt_slow_path(nevents == -1)) { + err = errno; + + if (err == EINTR) { + goto retry; } - nxt_unit_read_buf(ctx, rbuf); + nxt_unit_alert(ctx, "poll(%d,%d) failed: %s (%d)", + fds[0].fd, fds[1].fd, strerror(err), err); + + rbuf->size = -1; + + return (err == EAGAIN) ? NXT_UNIT_AGAIN : NXT_UNIT_ERROR; } - if (nxt_fast_path(rbuf->size > 0)) { - rc = nxt_unit_process_msg(ctx, &ctx_impl->read_port_id, - rbuf->buf, rbuf->size, - rbuf->oob, sizeof(rbuf->oob)); + nxt_unit_debug(ctx, "poll(%d,%d): %d, revents [%04uXi, %04uXi]", + fds[0].fd, fds[1].fd, nevents, fds[0].revents, + fds[1].revents); -#if (NXT_DEBUG) - memset(rbuf->buf, 0xAC, rbuf->size); -#endif + if ((fds[0].revents & POLLIN) != 0) { + res = nxt_unit_ctx_port_recv(ctx, ctx_impl->read_port, rbuf); + if (res == NXT_UNIT_AGAIN) { + goto retry; + } - } else { - rc = NXT_UNIT_ERROR; + return res; + } + + if ((fds[1].revents & POLLIN) != 0) { + res = nxt_unit_shared_port_recv(ctx, lib->shared_port, rbuf); + if (res == NXT_UNIT_AGAIN) { + goto retry; + } + + return res; } - nxt_unit_read_buf_release(ctx, rbuf); + nxt_unit_alert(ctx, "poll(%d,%d): %d unexpected revents [%04uXi, %04uXi]", + fds[0].fd, fds[1].fd, nevents, fds[0].revents, + fds[1].revents); + + return NXT_UNIT_ERROR; +} + + +static int +nxt_unit_process_pending_rbuf(nxt_unit_ctx_t *ctx) +{ + int rc; + nxt_queue_t pending_rbuf; + nxt_unit_ctx_impl_t *ctx_impl; + nxt_unit_read_buf_t *rbuf; + + nxt_queue_init(&pending_rbuf); + + ctx_impl = nxt_container_of(ctx, nxt_unit_ctx_impl_t, ctx); + + pthread_mutex_lock(&ctx_impl->mutex); + + if (nxt_queue_is_empty(&ctx_impl->pending_rbuf)) { + pthread_mutex_unlock(&ctx_impl->mutex); + + return NXT_UNIT_OK; + } + + nxt_queue_add(&pending_rbuf, &ctx_impl->pending_rbuf); + nxt_queue_init(&ctx_impl->pending_rbuf); + + pthread_mutex_unlock(&ctx_impl->mutex); + + rc = NXT_UNIT_OK; + + nxt_queue_each(rbuf, &pending_rbuf, nxt_unit_read_buf_t, link) { + + if (nxt_fast_path(rc != NXT_UNIT_ERROR)) { + rc = nxt_unit_process_msg(&ctx_impl->ctx, rbuf); + + } else { + nxt_unit_read_buf_release(ctx, rbuf); + } + + } nxt_queue_loop; return rc; } static void -nxt_unit_read_buf(nxt_unit_ctx_t *ctx, nxt_unit_read_buf_t *rbuf) +nxt_unit_process_ready_req(nxt_unit_ctx_t *ctx) +{ + int res; + nxt_queue_t ready_req; + nxt_unit_impl_t *lib; + nxt_unit_ctx_impl_t *ctx_impl; + nxt_unit_request_info_t *req; + nxt_unit_request_info_impl_t *req_impl; + + nxt_queue_init(&ready_req); + + ctx_impl = nxt_container_of(ctx, nxt_unit_ctx_impl_t, ctx); + + pthread_mutex_lock(&ctx_impl->mutex); + + if (nxt_queue_is_empty(&ctx_impl->ready_req)) { + pthread_mutex_unlock(&ctx_impl->mutex); + + return; + } + + nxt_queue_add(&ready_req, &ctx_impl->ready_req); + nxt_queue_init(&ctx_impl->ready_req); + + pthread_mutex_unlock(&ctx_impl->mutex); + + nxt_queue_each(req_impl, &ready_req, + nxt_unit_request_info_impl_t, port_wait_link) + { + lib = nxt_container_of(ctx_impl->ctx.unit, nxt_unit_impl_t, unit); + + req = &req_impl->req; + + res = nxt_unit_send_req_headers_ack(req); + if (nxt_slow_path(res != NXT_UNIT_OK)) { + nxt_unit_request_done(req, NXT_UNIT_ERROR); + + continue; + } + + if (req->content_length + > (uint64_t) (req->content_buf->end - req->content_buf->free)) + { + res = nxt_unit_request_hash_add(ctx, req); + if (nxt_slow_path(res != NXT_UNIT_OK)) { + nxt_unit_req_warn(req, "failed to add request to hash"); + + nxt_unit_request_done(req, NXT_UNIT_ERROR); + + continue; + } + + /* + * If application have separate data handler, we may start + * request processing and process data when it is arrived. + */ + if (lib->callbacks.data_handler == NULL) { + continue; + } + } + + lib->callbacks.request_handler(&req_impl->req); + + } nxt_queue_loop; +} + + +int +nxt_unit_run_ctx(nxt_unit_ctx_t *ctx) { + int rc; nxt_unit_impl_t *lib; + nxt_unit_read_buf_t *rbuf; nxt_unit_ctx_impl_t *ctx_impl; + nxt_unit_ctx_use(ctx); + + lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit); ctx_impl = nxt_container_of(ctx, nxt_unit_ctx_impl_t, ctx); - memset(rbuf->oob, 0, sizeof(struct cmsghdr)); + rc = NXT_UNIT_OK; - if (ctx_impl->read_port_fd != -1) { - rbuf->size = nxt_unit_port_recv(ctx, ctx_impl->read_port_fd, - rbuf->buf, sizeof(rbuf->buf), - rbuf->oob, sizeof(rbuf->oob)); + while (nxt_fast_path(lib->online)) { + rbuf = nxt_unit_read_buf_get(ctx); + if (nxt_slow_path(rbuf == NULL)) { + rc = NXT_UNIT_ERROR; + break; + } - } else { - lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit); + retry: - rbuf->size = lib->callbacks.port_recv(ctx, &ctx_impl->read_port_id, - rbuf->buf, sizeof(rbuf->buf), - rbuf->oob, sizeof(rbuf->oob)); + rc = nxt_unit_ctx_port_recv(ctx, ctx_impl->read_port, rbuf); + if (rc == NXT_UNIT_AGAIN) { + goto retry; + } + + rc = nxt_unit_process_msg(ctx, rbuf); + if (nxt_slow_path(rc == NXT_UNIT_ERROR)) { + break; + } + + rc = nxt_unit_process_pending_rbuf(ctx); + if (nxt_slow_path(rc == NXT_UNIT_ERROR)) { + break; + } + + nxt_unit_process_ready_req(ctx); } + + nxt_unit_ctx_release(ctx); + + return rc; } -void -nxt_unit_done(nxt_unit_ctx_t *ctx) +nxt_inline int +nxt_unit_is_read_queue(nxt_unit_read_buf_t *rbuf) { + nxt_port_msg_t *port_msg; + + if (nxt_fast_path(rbuf->size == (ssize_t) sizeof(nxt_port_msg_t))) { + port_msg = (nxt_port_msg_t *) rbuf->buf; + + return port_msg->type == _NXT_PORT_MSG_READ_QUEUE; + } + + return 0; +} + + +nxt_inline int +nxt_unit_is_read_socket(nxt_unit_read_buf_t *rbuf) +{ + if (nxt_fast_path(rbuf->size == 1)) { + return rbuf->buf[0] == _NXT_PORT_MSG_READ_SOCKET; + } + + return 0; +} + + +nxt_inline int +nxt_unit_is_shm_ack(nxt_unit_read_buf_t *rbuf) +{ + nxt_port_msg_t *port_msg; + + if (nxt_fast_path(rbuf->size == (ssize_t) sizeof(nxt_port_msg_t))) { + port_msg = (nxt_port_msg_t *) rbuf->buf; + + return port_msg->type == _NXT_PORT_MSG_SHM_ACK; + } + + return 0; +} + + +nxt_inline int +nxt_unit_is_quit(nxt_unit_read_buf_t *rbuf) +{ + nxt_port_msg_t *port_msg; + + if (nxt_fast_path(rbuf->size == (ssize_t) sizeof(nxt_port_msg_t))) { + port_msg = (nxt_port_msg_t *) rbuf->buf; + + return port_msg->type == _NXT_PORT_MSG_QUIT; + } + + return 0; +} + + +int +nxt_unit_run_shared(nxt_unit_ctx_t *ctx) +{ + int rc; nxt_unit_impl_t *lib; - nxt_unit_process_t *process; - nxt_unit_ctx_impl_t *ctx_impl; + nxt_unit_read_buf_t *rbuf; + + nxt_unit_ctx_use(ctx); lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit); + rc = NXT_UNIT_OK; - nxt_queue_each(ctx_impl, &lib->contexts, nxt_unit_ctx_impl_t, link) { + while (nxt_fast_path(lib->online)) { + rbuf = nxt_unit_read_buf_get(ctx); + if (nxt_slow_path(rbuf == NULL)) { + rc = NXT_UNIT_ERROR; + break; + } - nxt_unit_ctx_free(&ctx_impl->ctx); + retry: - } nxt_queue_loop; + rc = nxt_unit_shared_port_recv(ctx, lib->shared_port, rbuf); + if (rc == NXT_UNIT_AGAIN) { + goto retry; + } - for ( ;; ) { - pthread_mutex_lock(&lib->mutex); + if (nxt_slow_path(rc == NXT_UNIT_ERROR)) { + nxt_unit_read_buf_release(ctx, rbuf); + break; + } - process = nxt_unit_process_pop_first(lib); - if (process == NULL) { - pthread_mutex_unlock(&lib->mutex); + rc = nxt_unit_process_msg(ctx, rbuf); + if (nxt_slow_path(rc == NXT_UNIT_ERROR)) { + break; + } + rc = nxt_unit_process_pending_rbuf(ctx); + if (nxt_slow_path(rc == NXT_UNIT_ERROR)) { break; } - nxt_unit_remove_process(ctx, process); + nxt_unit_process_ready_req(ctx); } - pthread_mutex_destroy(&lib->mutex); + nxt_unit_ctx_release(ctx); - free(lib); + return rc; +} + + +int +nxt_unit_process_port_msg(nxt_unit_ctx_t *ctx, nxt_unit_port_t *port) +{ + int rc; + + nxt_unit_ctx_use(ctx); + + rc = nxt_unit_process_port_msg_impl(ctx, port); + + nxt_unit_ctx_release(ctx); + + return rc; +} + + +static int +nxt_unit_process_port_msg_impl(nxt_unit_ctx_t *ctx, nxt_unit_port_t *port) +{ + int rc; + nxt_unit_impl_t *lib; + nxt_unit_read_buf_t *rbuf; + + rbuf = nxt_unit_read_buf_get(ctx); + if (nxt_slow_path(rbuf == NULL)) { + return NXT_UNIT_ERROR; + } + + lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit); + +retry: + + if (port == lib->shared_port) { + rc = nxt_unit_shared_port_recv(ctx, port, rbuf); + + } else { + rc = nxt_unit_ctx_port_recv(ctx, port, rbuf); + } + + if (rc != NXT_UNIT_OK) { + nxt_unit_read_buf_release(ctx, rbuf); + return rc; + } + + rc = nxt_unit_process_msg(ctx, rbuf); + if (nxt_slow_path(rc == NXT_UNIT_ERROR)) { + return NXT_UNIT_ERROR; + } + + rc = nxt_unit_process_pending_rbuf(ctx); + if (nxt_slow_path(rc == NXT_UNIT_ERROR)) { + return NXT_UNIT_ERROR; + } + + nxt_unit_process_ready_req(ctx); + + rbuf = nxt_unit_read_buf_get(ctx); + if (nxt_slow_path(rbuf == NULL)) { + return NXT_UNIT_ERROR; + } + + if (lib->online) { + goto retry; + } + + return rc; +} + + +void +nxt_unit_done(nxt_unit_ctx_t *ctx) +{ + nxt_unit_ctx_release(ctx); } nxt_unit_ctx_t * nxt_unit_ctx_alloc(nxt_unit_ctx_t *ctx, void *data) { - int rc, fd; - nxt_unit_impl_t *lib; - nxt_unit_port_id_t new_port_id; - nxt_unit_ctx_impl_t *new_ctx; + int rc, queue_fd; + void *mem; + nxt_unit_impl_t *lib; + nxt_unit_port_t *port; + nxt_unit_ctx_impl_t *new_ctx; + nxt_unit_port_impl_t *port_impl; lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit); new_ctx = malloc(sizeof(nxt_unit_ctx_impl_t) + lib->request_data_size); if (nxt_slow_path(new_ctx == NULL)) { - nxt_unit_warn(ctx, "failed to allocate context"); + nxt_unit_alert(ctx, "failed to allocate context"); return NULL; } - rc = nxt_unit_create_port(ctx, &new_port_id, &fd); + rc = nxt_unit_ctx_init(lib, new_ctx, data); if (nxt_slow_path(rc != NXT_UNIT_OK)) { - free(new_ctx); + free(new_ctx); - return NULL; + return NULL; } - rc = nxt_unit_send_port(ctx, &lib->ready_port_id, &new_port_id, fd); - if (nxt_slow_path(rc != NXT_UNIT_OK)) { - lib->callbacks.remove_port(ctx, &new_port_id); + queue_fd = -1; - close(fd); + port = nxt_unit_create_port(ctx); + if (nxt_slow_path(port == NULL)) { + goto fail; + } - free(new_ctx); + new_ctx->read_port = port; - return NULL; + queue_fd = nxt_unit_shm_open(ctx, sizeof(nxt_port_queue_t)); + if (nxt_slow_path(queue_fd == -1)) { + goto fail; } - close(fd); + mem = mmap(NULL, sizeof(nxt_port_queue_t), + PROT_READ | PROT_WRITE, MAP_SHARED, queue_fd, 0); + if (nxt_slow_path(mem == MAP_FAILED)) { + nxt_unit_alert(ctx, "mmap(%d) failed: %s (%d)", queue_fd, + strerror(errno), errno); - rc = nxt_unit_ctx_init(lib, new_ctx, data); - if (nxt_slow_path(rc != NXT_UNIT_OK)) { - lib->callbacks.remove_port(ctx, &new_port_id); + goto fail; + } - free(new_ctx); + nxt_port_queue_init(mem); - return NULL; + port_impl = nxt_container_of(port, nxt_unit_port_impl_t, port); + port_impl->queue = mem; + + rc = nxt_unit_send_port(ctx, lib->router_port, port, queue_fd); + if (nxt_slow_path(rc != NXT_UNIT_OK)) { + goto fail; } - new_ctx->read_port_id = new_port_id; + nxt_unit_close(queue_fd); return &new_ctx->ctx; + +fail: + + if (queue_fd != -1) { + nxt_unit_close(queue_fd); + } + + nxt_unit_ctx_release(&new_ctx->ctx); + + return NULL; } -void -nxt_unit_ctx_free(nxt_unit_ctx_t *ctx) +static void +nxt_unit_ctx_free(nxt_unit_ctx_impl_t *ctx_impl) { nxt_unit_impl_t *lib; - nxt_unit_ctx_impl_t *ctx_impl; nxt_unit_mmap_buf_t *mmap_buf; nxt_unit_request_info_impl_t *req_impl; nxt_unit_websocket_frame_impl_t *ws_impl; - ctx_impl = nxt_container_of(ctx, nxt_unit_ctx_impl_t, ctx); - lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit); + lib = nxt_container_of(ctx_impl->ctx.unit, nxt_unit_impl_t, unit); nxt_queue_each(req_impl, &ctx_impl->active_req, nxt_unit_request_info_impl_t, link) @@ -4099,9 +4990,16 @@ nxt_unit_ctx_free(nxt_unit_ctx_t *ctx) nxt_queue_remove(&ctx_impl->link); + if (nxt_fast_path(ctx_impl->read_port != NULL)) { + nxt_unit_remove_port(lib, &ctx_impl->read_port->id); + nxt_unit_port_release(ctx_impl->read_port); + } + if (ctx_impl != &lib->main_ctx) { free(ctx_impl); } + + nxt_unit_lib_release(lib); } @@ -4127,42 +5025,12 @@ nxt_unit_port_id_init(nxt_unit_port_id_t *port_id, pid_t pid, uint16_t id) } -int -nxt_unit_create_send_port(nxt_unit_ctx_t *ctx, nxt_unit_port_id_t *dst, - nxt_unit_port_id_t *port_id) -{ - int rc, fd; - nxt_unit_impl_t *lib; - nxt_unit_port_id_t new_port_id; - - lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit); - - rc = nxt_unit_create_port(ctx, &new_port_id, &fd); - if (nxt_slow_path(rc != NXT_UNIT_OK)) { - return rc; - } - - rc = nxt_unit_send_port(ctx, dst, &new_port_id, fd); - - if (nxt_fast_path(rc == NXT_UNIT_OK)) { - *port_id = new_port_id; - - } else { - lib->callbacks.remove_port(ctx, &new_port_id); - } - - close(fd); - - return rc; -} - - -static int -nxt_unit_create_port(nxt_unit_ctx_t *ctx, nxt_unit_port_id_t *port_id, int *fd) +static nxt_unit_port_t * +nxt_unit_create_port(nxt_unit_ctx_t *ctx) { int rc, port_sockets[2]; nxt_unit_impl_t *lib; - nxt_unit_port_t new_port; + nxt_unit_port_t new_port, *port; nxt_unit_process_t *process; lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit); @@ -4172,7 +5040,7 @@ nxt_unit_create_port(nxt_unit_ctx_t *ctx, nxt_unit_port_id_t *port_id, int *fd) nxt_unit_warn(ctx, "create_port: socketpair() failed: %s (%d)", strerror(errno), errno); - return NXT_UNIT_ERROR; + return NULL; } nxt_unit_debug(ctx, "create_port: new socketpair: %d->%d", @@ -4180,49 +5048,43 @@ nxt_unit_create_port(nxt_unit_ctx_t *ctx, nxt_unit_port_id_t *port_id, int *fd) pthread_mutex_lock(&lib->mutex); - process = nxt_unit_process_get(ctx, lib->pid); + process = nxt_unit_process_get(lib, lib->pid); if (nxt_slow_path(process == NULL)) { pthread_mutex_unlock(&lib->mutex); - close(port_sockets[0]); - close(port_sockets[1]); + nxt_unit_close(port_sockets[0]); + nxt_unit_close(port_sockets[1]); - return NXT_UNIT_ERROR; + return NULL; } nxt_unit_port_id_init(&new_port.id, lib->pid, process->next_port_id++); new_port.in_fd = port_sockets[0]; - new_port.out_fd = -1; + new_port.out_fd = port_sockets[1]; new_port.data = NULL; pthread_mutex_unlock(&lib->mutex); - nxt_unit_process_use(ctx, process, -1); - - rc = lib->callbacks.add_port(ctx, &new_port); - if (nxt_slow_path(rc != NXT_UNIT_OK)) { - nxt_unit_warn(ctx, "create_port: add_port() failed"); - - close(port_sockets[0]); - close(port_sockets[1]); + nxt_unit_process_release(process); - return rc; + port = nxt_unit_add_port(ctx, &new_port, NULL); + if (nxt_slow_path(port == NULL)) { + nxt_unit_close(port_sockets[0]); + nxt_unit_close(port_sockets[1]); } - *port_id = new_port.id; - *fd = port_sockets[1]; - - return rc; + return port; } static int -nxt_unit_send_port(nxt_unit_ctx_t *ctx, nxt_unit_port_id_t *dst, - nxt_unit_port_id_t *new_port, int fd) +nxt_unit_send_port(nxt_unit_ctx_t *ctx, nxt_unit_port_t *dst, + nxt_unit_port_t *port, int queue_fd) { ssize_t res; nxt_unit_impl_t *lib; + int fds[2] = { port->out_fd, queue_fd }; struct { nxt_port_msg_t msg; @@ -4231,7 +5093,7 @@ nxt_unit_send_port(nxt_unit_ctx_t *ctx, nxt_unit_port_id_t *dst, union { struct cmsghdr cm; - char space[CMSG_SPACE(sizeof(int))]; + char space[CMSG_SPACE(sizeof(int) * 2)]; } cmsg; lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit); @@ -4246,15 +5108,15 @@ nxt_unit_send_port(nxt_unit_ctx_t *ctx, nxt_unit_port_id_t *dst, m.msg.mf = 0; m.msg.tracking = 0; - m.new_port.id = new_port->id; - m.new_port.pid = new_port->pid; + m.new_port.id = port->id.id; + m.new_port.pid = port->id.pid; m.new_port.type = NXT_PROCESS_APP; m.new_port.max_size = 16 * 1024; m.new_port.max_share = 64 * 1024; memset(&cmsg, 0, sizeof(cmsg)); - cmsg.cm.cmsg_len = CMSG_LEN(sizeof(int)); + cmsg.cm.cmsg_len = CMSG_LEN(sizeof(int) * 2); cmsg.cm.cmsg_level = SOL_SOCKET; cmsg.cm.cmsg_type = SCM_RIGHTS; @@ -4267,22 +5129,74 @@ nxt_unit_send_port(nxt_unit_ctx_t *ctx, nxt_unit_port_id_t *dst, * Fortunately, GCC with -O1 compiles this nxt_memcpy() * in the same simple assignment as in the code above. */ - memcpy(CMSG_DATA(&cmsg.cm), &fd, sizeof(int)); + memcpy(CMSG_DATA(&cmsg.cm), fds, sizeof(int) * 2); - res = lib->callbacks.port_send(ctx, dst, &m, sizeof(m), - &cmsg, sizeof(cmsg)); + res = nxt_unit_port_send(ctx, dst, &m, sizeof(m), &cmsg, sizeof(cmsg)); - return res == sizeof(m) ? NXT_UNIT_OK : NXT_UNIT_ERROR; + return (res == sizeof(m)) ? NXT_UNIT_OK : NXT_UNIT_ERROR; } -int -nxt_unit_add_port(nxt_unit_ctx_t *ctx, nxt_unit_port_t *port) +nxt_inline void nxt_unit_port_use(nxt_unit_port_t *port) { - int rc; - nxt_unit_impl_t *lib; - nxt_unit_process_t *process; - nxt_unit_port_impl_t *new_port, *old_port; + nxt_unit_port_impl_t *port_impl; + + port_impl = nxt_container_of(port, nxt_unit_port_impl_t, port); + + nxt_atomic_fetch_add(&port_impl->use_count, 1); +} + + +nxt_inline void nxt_unit_port_release(nxt_unit_port_t *port) +{ + long c; + nxt_unit_port_impl_t *port_impl; + + port_impl = nxt_container_of(port, nxt_unit_port_impl_t, port); + + c = nxt_atomic_fetch_add(&port_impl->use_count, -1); + + if (c == 1) { + nxt_unit_debug(NULL, "destroy port{%d,%d} in_fd %d out_fd %d", + (int) port->id.pid, (int) port->id.id, + port->in_fd, port->out_fd); + + nxt_unit_process_release(port_impl->process); + + if (port->in_fd != -1) { + nxt_unit_close(port->in_fd); + + port->in_fd = -1; + } + + if (port->out_fd != -1) { + nxt_unit_close(port->out_fd); + + port->out_fd = -1; + } + + if (port_impl->queue != NULL) { + munmap(port_impl->queue, (port->id.id == (nxt_port_id_t) -1) + ? sizeof(nxt_app_queue_t) + : sizeof(nxt_port_queue_t)); + } + + free(port_impl); + } +} + + +static nxt_unit_port_t * +nxt_unit_add_port(nxt_unit_ctx_t *ctx, nxt_unit_port_t *port, void *queue) +{ + int rc; + nxt_queue_t awaiting_req; + nxt_unit_impl_t *lib; + nxt_unit_port_t *old_port; + nxt_unit_process_t *process; + nxt_unit_ctx_impl_t *ctx_impl; + nxt_unit_port_impl_t *new_port, *old_port_impl; + nxt_unit_request_info_impl_t *req_impl; lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit); @@ -4291,32 +5205,91 @@ nxt_unit_add_port(nxt_unit_ctx_t *ctx, nxt_unit_port_t *port) old_port = nxt_unit_port_hash_find(&lib->ports, &port->id, 0); if (nxt_slow_path(old_port != NULL)) { - nxt_unit_debug(ctx, "add_port: duplicate %d,%d in_fd %d out_fd %d", - port->id.pid, port->id.id, - port->in_fd, port->out_fd); + nxt_unit_debug(ctx, "add_port: duplicate port{%d,%d} " + "in_fd %d out_fd %d queue %p", + port->id.pid, port->id.id, + port->in_fd, port->out_fd, queue); + + if (old_port->data == NULL) { + old_port->data = port->data; + port->data = NULL; + } + + if (old_port->in_fd == -1) { + old_port->in_fd = port->in_fd; + port->in_fd = -1; + } if (port->in_fd != -1) { - close(port->in_fd); + nxt_unit_close(port->in_fd); port->in_fd = -1; } + if (old_port->out_fd == -1) { + old_port->out_fd = port->out_fd; + port->out_fd = -1; + } + if (port->out_fd != -1) { - close(port->out_fd); + nxt_unit_close(port->out_fd); port->out_fd = -1; } + *port = *old_port; + + nxt_queue_init(&awaiting_req); + + old_port_impl = nxt_container_of(old_port, nxt_unit_port_impl_t, port); + + if (old_port_impl->queue == NULL) { + old_port_impl->queue = queue; + } + + if (!nxt_queue_is_empty(&old_port_impl->awaiting_req)) { + nxt_queue_add(&awaiting_req, &old_port_impl->awaiting_req); + nxt_queue_init(&old_port_impl->awaiting_req); + } + + old_port_impl->ready = (port->in_fd != -1 || port->out_fd != -1); + pthread_mutex_unlock(&lib->mutex); - return NXT_UNIT_OK; + if (lib->callbacks.add_port != NULL + && (port->in_fd != -1 || port->out_fd != -1)) + { + lib->callbacks.add_port(ctx, old_port); + } + + nxt_queue_each(req_impl, &awaiting_req, + nxt_unit_request_info_impl_t, port_wait_link) + { + nxt_queue_remove(&req_impl->port_wait_link); + + ctx_impl = nxt_container_of(req_impl->req.ctx, nxt_unit_ctx_impl_t, + ctx); + + pthread_mutex_lock(&ctx_impl->mutex); + + nxt_queue_insert_tail(&ctx_impl->ready_req, + &req_impl->port_wait_link); + + pthread_mutex_unlock(&ctx_impl->mutex); + + nxt_atomic_fetch_add(&ctx_impl->wait_items, -1); + + } nxt_queue_loop; + + return old_port; } - nxt_unit_debug(ctx, "add_port: %d,%d in_fd %d out_fd %d", + new_port = NULL; + + nxt_unit_debug(ctx, "add_port: port{%d,%d} in_fd %d out_fd %d queue %p", port->id.pid, port->id.id, - port->in_fd, port->out_fd); + port->in_fd, port->out_fd, queue); - process = nxt_unit_process_get(ctx, port->id.pid); + process = nxt_unit_process_get(lib, port->id.pid); if (nxt_slow_path(process == NULL)) { - rc = NXT_UNIT_ERROR; goto unlock; } @@ -4326,7 +5299,9 @@ nxt_unit_add_port(nxt_unit_ctx_t *ctx, nxt_unit_port_t *port) new_port = malloc(sizeof(nxt_unit_port_impl_t)); if (nxt_slow_path(new_port == NULL)) { - rc = NXT_UNIT_ERROR; + nxt_unit_alert(ctx, "add_port: %d,%d malloc() failed", + port->id.pid, port->id.id); + goto unlock; } @@ -4337,149 +5312,131 @@ nxt_unit_add_port(nxt_unit_ctx_t *ctx, nxt_unit_port_t *port) nxt_unit_alert(ctx, "add_port: %d,%d hash_add failed", port->id.pid, port->id.id); + free(new_port); + + new_port = NULL; + goto unlock; } nxt_queue_insert_tail(&process->ports, &new_port->link); - rc = NXT_UNIT_OK; - + new_port->use_count = 2; new_port->process = process; + new_port->ready = (port->in_fd != -1 || port->out_fd != -1); + new_port->queue = queue; + new_port->from_socket = 0; + new_port->socket_rbuf = NULL; + + nxt_queue_init(&new_port->awaiting_req); + + process = NULL; unlock: pthread_mutex_unlock(&lib->mutex); - if (nxt_slow_path(process != NULL && rc != NXT_UNIT_OK)) { - nxt_unit_process_use(ctx, process, -1); + if (nxt_slow_path(process != NULL)) { + nxt_unit_process_release(process); } - return rc; -} - + if (lib->callbacks.add_port != NULL + && new_port != NULL + && (port->in_fd != -1 || port->out_fd != -1)) + { + lib->callbacks.add_port(ctx, &new_port->port); + } -void -nxt_unit_remove_port(nxt_unit_ctx_t *ctx, nxt_unit_port_id_t *port_id) -{ - nxt_unit_find_remove_port(ctx, port_id, NULL); + return &new_port->port; } -void -nxt_unit_find_remove_port(nxt_unit_ctx_t *ctx, nxt_unit_port_id_t *port_id, - nxt_unit_port_t *r_port) +static void +nxt_unit_remove_port(nxt_unit_impl_t *lib, nxt_unit_port_id_t *port_id) { - nxt_unit_impl_t *lib; - nxt_unit_process_t *process; - - lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit); + nxt_unit_port_t *port; + nxt_unit_port_impl_t *port_impl; pthread_mutex_lock(&lib->mutex); - process = NULL; + port = nxt_unit_remove_port_unsafe(lib, port_id); - nxt_unit_remove_port_unsafe(ctx, port_id, r_port, &process); + if (nxt_fast_path(port != NULL)) { + port_impl = nxt_container_of(port, nxt_unit_port_impl_t, port); + + nxt_queue_remove(&port_impl->link); + } pthread_mutex_unlock(&lib->mutex); - if (nxt_slow_path(process != NULL)) { - nxt_unit_process_use(ctx, process, -1); + if (lib->callbacks.remove_port != NULL && port != NULL) { + lib->callbacks.remove_port(&lib->unit, port); + } + + if (nxt_fast_path(port != NULL)) { + nxt_unit_port_release(port); } } -static void -nxt_unit_remove_port_unsafe(nxt_unit_ctx_t *ctx, nxt_unit_port_id_t *port_id, - nxt_unit_port_t *r_port, nxt_unit_process_t **process) +static nxt_unit_port_t * +nxt_unit_remove_port_unsafe(nxt_unit_impl_t *lib, nxt_unit_port_id_t *port_id) { - nxt_unit_impl_t *lib; - nxt_unit_port_impl_t *port; - - lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit); + nxt_unit_port_t *port; port = nxt_unit_port_hash_find(&lib->ports, port_id, 1); if (nxt_slow_path(port == NULL)) { - nxt_unit_debug(ctx, "remove_port: port %d,%d not found", + nxt_unit_debug(NULL, "remove_port: port{%d,%d} not found", (int) port_id->pid, (int) port_id->id); - return; + return NULL; } - nxt_unit_debug(ctx, "remove_port: port %d,%d, fds %d,%d, data %p", + nxt_unit_debug(NULL, "remove_port: port{%d,%d}, fds %d,%d, data %p", (int) port_id->pid, (int) port_id->id, - port->port.in_fd, port->port.out_fd, port->port.data); - - if (port->port.in_fd != -1) { - close(port->port.in_fd); - } - - if (port->port.out_fd != -1) { - close(port->port.out_fd); - } - - if (port->process != NULL) { - nxt_queue_remove(&port->link); - } - - if (process != NULL) { - *process = port->process; - } + port->in_fd, port->out_fd, port->data); - if (r_port != NULL) { - *r_port = port->port; - } - - free(port); + return port; } -void -nxt_unit_remove_pid(nxt_unit_ctx_t *ctx, pid_t pid) +static void +nxt_unit_remove_pid(nxt_unit_impl_t *lib, pid_t pid) { - nxt_unit_impl_t *lib; nxt_unit_process_t *process; - lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit); - pthread_mutex_lock(&lib->mutex); - process = nxt_unit_process_find(ctx, pid, 1); + process = nxt_unit_process_find(lib, pid, 1); if (nxt_slow_path(process == NULL)) { - nxt_unit_debug(ctx, "remove_pid: process %d not found", (int) pid); + nxt_unit_debug(NULL, "remove_pid: process %d not found", (int) pid); pthread_mutex_unlock(&lib->mutex); return; } - nxt_unit_remove_process(ctx, process); + nxt_unit_remove_process(lib, process); + + if (lib->callbacks.remove_pid != NULL) { + lib->callbacks.remove_pid(&lib->unit, pid); + } } static void -nxt_unit_remove_process(nxt_unit_ctx_t *ctx, nxt_unit_process_t *process) +nxt_unit_remove_process(nxt_unit_impl_t *lib, nxt_unit_process_t *process) { nxt_queue_t ports; - nxt_unit_impl_t *lib; nxt_unit_port_impl_t *port; - lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit); - nxt_queue_init(&ports); nxt_queue_add(&ports, &process->ports); nxt_queue_each(port, &ports, nxt_unit_port_impl_t, link) { - nxt_unit_process_use(ctx, process, -1); - port->process = NULL; - - /* Shortcut for default callback. */ - if (lib->callbacks.remove_port == nxt_unit_remove_port) { - nxt_queue_remove(&port->link); - - nxt_unit_remove_port_unsafe(ctx, &port->port.id, NULL, NULL); - } + nxt_unit_remove_port_unsafe(lib, &port->port.id); } nxt_queue_loop; @@ -4489,70 +5446,168 @@ nxt_unit_remove_process(nxt_unit_ctx_t *ctx, nxt_unit_process_t *process) nxt_queue_remove(&port->link); - lib->callbacks.remove_port(ctx, &port->port.id); + if (lib->callbacks.remove_port != NULL) { + lib->callbacks.remove_port(&lib->unit, &port->port); + } + + nxt_unit_port_release(&port->port); } nxt_queue_loop; - nxt_unit_process_use(ctx, process, -1); + nxt_unit_process_release(process); } -void +static void nxt_unit_quit(nxt_unit_ctx_t *ctx) { nxt_unit_impl_t *lib; lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit); - lib->online = 0; + if (lib->online) { + lib->online = 0; + + if (lib->callbacks.quit != NULL) { + lib->callbacks.quit(ctx); + } + } +} + + +static int +nxt_unit_get_port(nxt_unit_ctx_t *ctx, nxt_unit_port_id_t *port_id) +{ + ssize_t res; + nxt_unit_impl_t *lib; + nxt_unit_ctx_impl_t *ctx_impl; + + struct { + nxt_port_msg_t msg; + nxt_port_msg_get_port_t get_port; + } m; + + lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit); + ctx_impl = nxt_container_of(ctx, nxt_unit_ctx_impl_t, ctx); + + memset(&m.msg, 0, sizeof(nxt_port_msg_t)); + + m.msg.pid = lib->pid; + m.msg.reply_port = ctx_impl->read_port->id.id; + m.msg.type = _NXT_PORT_MSG_GET_PORT; + + m.get_port.id = port_id->id; + m.get_port.pid = port_id->pid; + + nxt_unit_debug(ctx, "get_port: %d %d", (int) port_id->pid, + (int) port_id->id); + + res = nxt_unit_port_send(ctx, lib->router_port, &m, sizeof(m), NULL, 0); + if (nxt_slow_path(res != sizeof(m))) { + return NXT_UNIT_ERROR; + } + + return NXT_UNIT_OK; } static ssize_t -nxt_unit_port_send_default(nxt_unit_ctx_t *ctx, nxt_unit_port_id_t *port_id, +nxt_unit_port_send(nxt_unit_ctx_t *ctx, nxt_unit_port_t *port, const void *buf, size_t buf_size, const void *oob, size_t oob_size) { - int fd; + int notify; + ssize_t ret; + nxt_int_t rc; + nxt_port_msg_t msg; nxt_unit_impl_t *lib; - nxt_unit_port_impl_t *port; + nxt_unit_port_impl_t *port_impl; lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit); - pthread_mutex_lock(&lib->mutex); + port_impl = nxt_container_of(port, nxt_unit_port_impl_t, port); + if (port_impl->queue != NULL && oob_size == 0 + && buf_size <= NXT_PORT_QUEUE_MSG_SIZE) + { + rc = nxt_port_queue_send(port_impl->queue, buf, buf_size, ¬ify); + if (nxt_slow_path(rc != NXT_OK)) { + nxt_unit_alert(ctx, "port_send: port %d,%d queue overflow", + (int) port->id.pid, (int) port->id.id); - port = nxt_unit_port_hash_find(&lib->ports, port_id, 0); + return -1; + } - if (nxt_fast_path(port != NULL)) { - fd = port->port.out_fd; + nxt_unit_debug(ctx, "port{%d,%d} enqueue %d notify %d", + (int) port->id.pid, (int) port->id.id, + (int) buf_size, notify); - } else { - nxt_unit_warn(ctx, "port_send: port %d,%d not found", - (int) port_id->pid, (int) port_id->id); - fd = -1; + if (notify) { + memcpy(&msg, buf, sizeof(nxt_port_msg_t)); + + msg.type = _NXT_PORT_MSG_READ_QUEUE; + + if (lib->callbacks.port_send == NULL) { + ret = nxt_unit_sendmsg(ctx, port->out_fd, &msg, + sizeof(nxt_port_msg_t), NULL, 0); + + nxt_unit_debug(ctx, "port{%d,%d} send %d read_queue", + (int) port->id.pid, (int) port->id.id, + (int) ret); + + } else { + ret = lib->callbacks.port_send(ctx, port, &msg, + sizeof(nxt_port_msg_t), NULL, 0); + + nxt_unit_debug(ctx, "port{%d,%d} sendcb %d read_queue", + (int) port->id.pid, (int) port->id.id, + (int) ret); + } + + } + + return buf_size; } - pthread_mutex_unlock(&lib->mutex); + if (port_impl->queue != NULL) { + msg.type = _NXT_PORT_MSG_READ_SOCKET; - if (nxt_slow_path(fd == -1)) { - if (port != NULL) { - nxt_unit_warn(ctx, "port_send: port %d,%d: fd == -1", - (int) port_id->pid, (int) port_id->id); + rc = nxt_port_queue_send(port_impl->queue, &msg.type, 1, ¬ify); + if (nxt_slow_path(rc != NXT_OK)) { + nxt_unit_alert(ctx, "port_send: port %d,%d queue overflow", + (int) port->id.pid, (int) port->id.id); + + return -1; } - return -1; + nxt_unit_debug(ctx, "port{%d,%d} enqueue 1 read_socket notify %d", + (int) port->id.pid, (int) port->id.id, notify); } - nxt_unit_debug(ctx, "port_send: found port %d,%d fd %d", - (int) port_id->pid, (int) port_id->id, fd); + if (lib->callbacks.port_send != NULL) { + ret = lib->callbacks.port_send(ctx, port, buf, buf_size, + oob, oob_size); + + nxt_unit_debug(ctx, "port{%d,%d} sendcb %d", + (int) port->id.pid, (int) port->id.id, + (int) ret); + + } else { + ret = nxt_unit_sendmsg(ctx, port->out_fd, buf, buf_size, + oob, oob_size); + + nxt_unit_debug(ctx, "port{%d,%d} sendmsg %d", + (int) port->id.pid, (int) port->id.id, + (int) ret); + } - return nxt_unit_port_send(ctx, fd, buf, buf_size, oob, oob_size); + return ret; } -ssize_t -nxt_unit_port_send(nxt_unit_ctx_t *ctx, int fd, +static ssize_t +nxt_unit_sendmsg(nxt_unit_ctx_t *ctx, int fd, const void *buf, size_t buf_size, const void *oob, size_t oob_size) { + int err; ssize_t res; struct iovec iov[1]; struct msghdr msg; @@ -4573,7 +5628,9 @@ retry: res = sendmsg(fd, &msg, 0); if (nxt_slow_path(res == -1)) { - if (errno == EINTR) { + err = errno; + + if (err == EINTR) { goto retry; } @@ -4582,7 +5639,7 @@ retry: * implementation. */ nxt_unit_warn(ctx, "sendmsg(%d, %d) failed: %s (%d)", - fd, (int) buf_size, strerror(errno), errno); + fd, (int) buf_size, strerror(err), err); } else { nxt_unit_debug(ctx, "sendmsg(%d, %d): %d", fd, (int) buf_size, @@ -4593,88 +5650,310 @@ retry: } -static ssize_t -nxt_unit_port_recv_default(nxt_unit_ctx_t *ctx, nxt_unit_port_id_t *port_id, - void *buf, size_t buf_size, void *oob, size_t oob_size) +static int +nxt_unit_ctx_port_recv(nxt_unit_ctx_t *ctx, nxt_unit_port_t *port, + nxt_unit_read_buf_t *rbuf) { - int fd; - nxt_unit_impl_t *lib; - nxt_unit_ctx_impl_t *ctx_impl; - nxt_unit_port_impl_t *port; + int res, read; + nxt_unit_port_impl_t *port_impl; - lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit); + port_impl = nxt_container_of(port, nxt_unit_port_impl_t, port); - pthread_mutex_lock(&lib->mutex); + read = 0; - port = nxt_unit_port_hash_find(&lib->ports, port_id, 0); +retry: - if (nxt_fast_path(port != NULL)) { - fd = port->port.in_fd; + if (port_impl->from_socket > 0) { + if (port_impl->socket_rbuf != NULL + && port_impl->socket_rbuf->size > 0) + { + port_impl->from_socket--; + + nxt_unit_rbuf_cpy(rbuf, port_impl->socket_rbuf); + port_impl->socket_rbuf->size = 0; + + nxt_unit_debug(ctx, "port{%d,%d} use suspended message %d", + (int) port->id.pid, (int) port->id.id, + (int) rbuf->size); + + return NXT_UNIT_OK; + } } else { - nxt_unit_debug(ctx, "port_recv: port %d,%d not found", - (int) port_id->pid, (int) port_id->id); - fd = -1; + res = nxt_unit_port_queue_recv(port, rbuf); + + if (res == NXT_UNIT_OK) { + if (nxt_unit_is_read_socket(rbuf)) { + port_impl->from_socket++; + + nxt_unit_debug(ctx, "port{%d,%d} dequeue 1 read_socket %d", + (int) port->id.pid, (int) port->id.id, + port_impl->from_socket); + + goto retry; + } + + nxt_unit_debug(ctx, "port{%d,%d} dequeue %d", + (int) port->id.pid, (int) port->id.id, + (int) rbuf->size); + + return NXT_UNIT_OK; + } } - pthread_mutex_unlock(&lib->mutex); + if (read) { + return NXT_UNIT_AGAIN; + } - if (nxt_slow_path(fd == -1)) { - return -1; + res = nxt_unit_port_recv(ctx, port, rbuf); + if (nxt_slow_path(res == NXT_UNIT_ERROR)) { + return NXT_UNIT_ERROR; } - nxt_unit_debug(ctx, "port_recv: found port %d,%d, fd %d", - (int) port_id->pid, (int) port_id->id, fd); + read = 1; - ctx_impl = nxt_container_of(ctx, nxt_unit_ctx_impl_t, ctx); + if (nxt_unit_is_read_queue(rbuf)) { + nxt_unit_debug(ctx, "port{%d,%d} recv %d read_queue", + (int) port->id.pid, (int) port->id.id, (int) rbuf->size); + + if (port_impl->from_socket) { + nxt_unit_warn(ctx, "port protocol warning: READ_QUEUE after READ_SOCKET"); + } + + goto retry; + } + + nxt_unit_debug(ctx, "port{%d,%d} recvmsg %d", + (int) port->id.pid, (int) port->id.id, + (int) rbuf->size); + + if (res == NXT_UNIT_AGAIN) { + return NXT_UNIT_AGAIN; + } + + if (port_impl->from_socket > 0) { + port_impl->from_socket--; + + return NXT_UNIT_OK; + } + + nxt_unit_debug(ctx, "port{%d,%d} suspend message %d", + (int) port->id.pid, (int) port->id.id, + (int) rbuf->size); + + if (port_impl->socket_rbuf == NULL) { + port_impl->socket_rbuf = nxt_unit_read_buf_get(ctx); + + if (nxt_slow_path(port_impl->socket_rbuf == NULL)) { + return NXT_UNIT_ERROR; + } + + port_impl->socket_rbuf->size = 0; + } + + if (port_impl->socket_rbuf->size > 0) { + nxt_unit_alert(ctx, "too many port socket messages"); + + return NXT_UNIT_ERROR; + } - if (nxt_fast_path(port_id == &ctx_impl->read_port_id)) { - ctx_impl->read_port_fd = fd; + nxt_unit_rbuf_cpy(port_impl->socket_rbuf, rbuf); + + memset(rbuf->oob, 0, sizeof(struct cmsghdr)); + + goto retry; +} + + +nxt_inline void +nxt_unit_rbuf_cpy(nxt_unit_read_buf_t *dst, nxt_unit_read_buf_t *src) +{ + memcpy(dst->buf, src->buf, src->size); + dst->size = src->size; + memcpy(dst->oob, src->oob, sizeof(src->oob)); +} + + +static int +nxt_unit_shared_port_recv(nxt_unit_ctx_t *ctx, nxt_unit_port_t *port, + nxt_unit_read_buf_t *rbuf) +{ + int res; + +retry: + + res = nxt_unit_app_queue_recv(port, rbuf); + + if (res == NXT_UNIT_AGAIN) { + res = nxt_unit_port_recv(ctx, port, rbuf); + if (nxt_slow_path(res == NXT_UNIT_ERROR)) { + return NXT_UNIT_ERROR; + } + + if (nxt_unit_is_read_queue(rbuf)) { + nxt_unit_debug(ctx, "port{%d,%d} recv %d read_queue", + (int) port->id.pid, (int) port->id.id, (int) rbuf->size); + + goto retry; + } } - return nxt_unit_port_recv(ctx, fd, buf, buf_size, oob, oob_size); + return res; } -ssize_t -nxt_unit_port_recv(nxt_unit_ctx_t *ctx, int fd, void *buf, size_t buf_size, - void *oob, size_t oob_size) +static int +nxt_unit_port_recv(nxt_unit_ctx_t *ctx, nxt_unit_port_t *port, + nxt_unit_read_buf_t *rbuf) { - ssize_t res; - struct iovec iov[1]; - struct msghdr msg; + int fd, err; + struct iovec iov[1]; + struct msghdr msg; + nxt_unit_impl_t *lib; - iov[0].iov_base = buf; - iov[0].iov_len = buf_size; + lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit); + + if (lib->callbacks.port_recv != NULL) { + rbuf->size = lib->callbacks.port_recv(ctx, port, + rbuf->buf, sizeof(rbuf->buf), + rbuf->oob, sizeof(rbuf->oob)); + + nxt_unit_debug(ctx, "port{%d,%d} recvcb %d", + (int) port->id.pid, (int) port->id.id, (int) rbuf->size); + + if (nxt_slow_path(rbuf->size < 0)) { + return NXT_UNIT_ERROR; + } + + return NXT_UNIT_OK; + } + + iov[0].iov_base = rbuf->buf; + iov[0].iov_len = sizeof(rbuf->buf); msg.msg_name = NULL; msg.msg_namelen = 0; msg.msg_iov = iov; msg.msg_iovlen = 1; msg.msg_flags = 0; - msg.msg_control = oob; - msg.msg_controllen = oob_size; + msg.msg_control = rbuf->oob; + msg.msg_controllen = sizeof(rbuf->oob); + + fd = port->in_fd; retry: - res = recvmsg(fd, &msg, 0); + rbuf->size = recvmsg(fd, &msg, 0); - if (nxt_slow_path(res == -1)) { - if (errno == EINTR) { + if (nxt_slow_path(rbuf->size == -1)) { + err = errno; + + if (err == EINTR) { goto retry; } + if (err == EAGAIN) { + nxt_unit_debug(ctx, "recvmsg(%d) failed: %s (%d)", + fd, strerror(err), err); + + return NXT_UNIT_AGAIN; + } + nxt_unit_alert(ctx, "recvmsg(%d) failed: %s (%d)", + fd, strerror(err), err); + + return NXT_UNIT_ERROR; + } + + nxt_unit_debug(ctx, "recvmsg(%d): %d", fd, (int) rbuf->size); + + return NXT_UNIT_OK; +} + + +static int +nxt_unit_port_queue_recv(nxt_unit_port_t *port, nxt_unit_read_buf_t *rbuf) +{ + nxt_unit_port_impl_t *port_impl; + + port_impl = nxt_container_of(port, nxt_unit_port_impl_t, port); + + rbuf->size = nxt_port_queue_recv(port_impl->queue, rbuf->buf); + + return (rbuf->size == -1) ? NXT_UNIT_AGAIN : NXT_UNIT_OK; +} + + +static int +nxt_unit_app_queue_recv(nxt_unit_port_t *port, nxt_unit_read_buf_t *rbuf) +{ + uint32_t cookie; + nxt_port_msg_t *port_msg; + nxt_app_queue_t *queue; + nxt_unit_port_impl_t *port_impl; + + port_impl = nxt_container_of(port, nxt_unit_port_impl_t, port); + queue = port_impl->queue; + +retry: + + rbuf->size = nxt_app_queue_recv(queue, rbuf->buf, &cookie); + + nxt_unit_debug(NULL, "app_queue_recv: %d", (int) rbuf->size); + + if (rbuf->size >= (ssize_t) sizeof(nxt_port_msg_t)) { + port_msg = (nxt_port_msg_t *) rbuf->buf; + + if (nxt_app_queue_cancel(queue, cookie, port_msg->stream)) { + return NXT_UNIT_OK; + } + + nxt_unit_debug(NULL, "app_queue_recv: message cancelled"); + + goto retry; + } + + return (rbuf->size == -1) ? NXT_UNIT_AGAIN : NXT_UNIT_OK; +} + + +nxt_inline int +nxt_unit_close(int fd) +{ + int res; + + res = close(fd); + + if (nxt_slow_path(res == -1)) { + nxt_unit_alert(NULL, "close(%d) failed: %s (%d)", fd, strerror(errno), errno); } else { - nxt_unit_debug(ctx, "recvmsg(%d): %d", fd, (int) res); + nxt_unit_debug(NULL, "close(%d): %d", fd, res); } return res; } +static int +nxt_unit_fd_blocking(int fd) +{ + int nb; + + nb = 0; + + if (nxt_slow_path(ioctl(fd, FIONBIO, &nb) == -1)) { + nxt_unit_alert(NULL, "ioctl(%d, FIONBIO, 0) failed: %s (%d)", + fd, strerror(errno), errno); + + return NXT_UNIT_ERROR; + } + + return NXT_UNIT_OK; +} + + static nxt_int_t nxt_unit_port_hash_test(nxt_lvlhsh_query_t *lhq, void *data) { @@ -4755,7 +6034,7 @@ nxt_unit_port_hash_add(nxt_lvlhsh_t *port_hash, nxt_unit_port_t *port) } -static nxt_unit_port_impl_t * +static nxt_unit_port_t * nxt_unit_port_hash_find(nxt_lvlhsh_t *port_hash, nxt_unit_port_id_t *port_id, int remove) { @@ -4775,6 +6054,10 @@ nxt_unit_port_hash_find(nxt_lvlhsh_t *port_hash, nxt_unit_port_id_t *port_id, switch (res) { case NXT_OK: + if (!remove) { + nxt_unit_port_use(lhq.value); + } + return lhq.value; default: @@ -4799,12 +6082,19 @@ static const nxt_lvlhsh_proto_t lvlhsh_requests_proto nxt_aligned(64) = { static int -nxt_unit_request_hash_add(nxt_lvlhsh_t *request_hash, - nxt_unit_request_info_impl_t *req_impl) +nxt_unit_request_hash_add(nxt_unit_ctx_t *ctx, + nxt_unit_request_info_t *req) { - uint32_t *stream; - nxt_int_t res; - nxt_lvlhsh_query_t lhq; + uint32_t *stream; + nxt_int_t res; + nxt_lvlhsh_query_t lhq; + nxt_unit_ctx_impl_t *ctx_impl; + nxt_unit_request_info_impl_t *req_impl; + + req_impl = nxt_container_of(req, nxt_unit_request_info_impl_t, req); + if (req_impl->in_hash) { + return NXT_UNIT_OK; + } stream = &req_impl->stream; @@ -4816,11 +6106,18 @@ nxt_unit_request_hash_add(nxt_lvlhsh_t *request_hash, lhq.replace = 0; lhq.value = req_impl; - res = nxt_lvlhsh_insert(request_hash, &lhq); + ctx_impl = nxt_container_of(ctx, nxt_unit_ctx_impl_t, ctx); + + pthread_mutex_lock(&ctx_impl->mutex); + + res = nxt_lvlhsh_insert(&ctx_impl->requests, &lhq); + + pthread_mutex_unlock(&ctx_impl->mutex); switch (res) { case NXT_OK: + req_impl->in_hash = 1; return NXT_UNIT_OK; default: @@ -4829,12 +6126,13 @@ nxt_unit_request_hash_add(nxt_lvlhsh_t *request_hash, } -static nxt_unit_request_info_impl_t * -nxt_unit_request_hash_find(nxt_lvlhsh_t *request_hash, uint32_t stream, - int remove) +static nxt_unit_request_info_t * +nxt_unit_request_hash_find(nxt_unit_ctx_t *ctx, uint32_t stream, int remove) { - nxt_int_t res; - nxt_lvlhsh_query_t lhq; + nxt_int_t res; + nxt_lvlhsh_query_t lhq; + nxt_unit_ctx_impl_t *ctx_impl; + nxt_unit_request_info_impl_t *req_impl; lhq.key_hash = nxt_murmur_hash2(&stream, sizeof(stream)); lhq.key.length = sizeof(stream); @@ -4842,16 +6140,26 @@ nxt_unit_request_hash_find(nxt_lvlhsh_t *request_hash, uint32_t stream, lhq.proto = &lvlhsh_requests_proto; lhq.pool = NULL; + ctx_impl = nxt_container_of(ctx, nxt_unit_ctx_impl_t, ctx); + + pthread_mutex_lock(&ctx_impl->mutex); + if (remove) { - res = nxt_lvlhsh_delete(request_hash, &lhq); + res = nxt_lvlhsh_delete(&ctx_impl->requests, &lhq); } else { - res = nxt_lvlhsh_find(request_hash, &lhq); + res = nxt_lvlhsh_find(&ctx_impl->requests, &lhq); } + pthread_mutex_unlock(&ctx_impl->mutex); + switch (res) { case NXT_OK: + req_impl = nxt_container_of(lhq.value, nxt_unit_request_info_impl_t, + req); + req_impl->in_hash = 0; + return lhq.value; default: @@ -4977,11 +6285,18 @@ nxt_unit_snprint_prefix(char *p, char *end, pid_t pid, int level) tm = *localtime(&ts.tv_sec); #endif +#if (NXT_DEBUG) p += snprintf(p, end - p, "%4d/%02d/%02d %02d:%02d:%02d.%03d ", tm.tm_year + 1900, tm.tm_mon + 1, tm.tm_mday, tm.tm_hour, tm.tm_min, tm.tm_sec, (int) ts.tv_nsec / 1000000); +#else + p += snprintf(p, end - p, + "%4d/%02d/%02d %02d:%02d:%02d ", + tm.tm_year + 1900, tm.tm_mon + 1, tm.tm_mday, + tm.tm_hour, tm.tm_min, tm.tm_sec); +#endif p += snprintf(p, end - p, "[%s] %d#%"PRIu64" [unit] ", nxt_unit_log_levels[level], |