diff options
Diffstat (limited to 'src/nxt_unit.c')
-rw-r--r-- | src/nxt_unit.c | 828 |
1 files changed, 675 insertions, 153 deletions
diff --git a/src/nxt_unit.c b/src/nxt_unit.c index 0cf32916..95874db3 100644 --- a/src/nxt_unit.c +++ b/src/nxt_unit.c @@ -19,12 +19,21 @@ #include <linux/memfd.h> #endif +#define NXT_UNIT_MAX_PLAIN_SIZE 1024 +#define NXT_UNIT_LOCAL_BUF_SIZE \ + (NXT_UNIT_MAX_PLAIN_SIZE + sizeof(nxt_port_msg_t)) + +#define NXT_UNIT_MAX_PLAIN_SIZE 1024 +#define NXT_UNIT_LOCAL_BUF_SIZE \ + (NXT_UNIT_MAX_PLAIN_SIZE + sizeof(nxt_port_msg_t)) + typedef struct nxt_unit_impl_s nxt_unit_impl_t; typedef struct nxt_unit_mmap_s nxt_unit_mmap_t; typedef struct nxt_unit_mmaps_s nxt_unit_mmaps_t; typedef struct nxt_unit_process_s nxt_unit_process_t; typedef struct nxt_unit_mmap_buf_s nxt_unit_mmap_buf_t; typedef struct nxt_unit_recv_msg_s nxt_unit_recv_msg_t; +typedef struct nxt_unit_read_buf_s nxt_unit_read_buf_t; typedef struct nxt_unit_ctx_impl_s nxt_unit_ctx_impl_t; typedef struct nxt_unit_port_impl_s nxt_unit_port_impl_t; typedef struct nxt_unit_request_info_impl_s nxt_unit_request_info_impl_t; @@ -37,9 +46,10 @@ nxt_inline void nxt_unit_mmap_buf_insert(nxt_unit_mmap_buf_t **head, nxt_unit_mmap_buf_t *mmap_buf); nxt_inline void nxt_unit_mmap_buf_insert_tail(nxt_unit_mmap_buf_t **prev, nxt_unit_mmap_buf_t *mmap_buf); -nxt_inline void nxt_unit_mmap_buf_remove(nxt_unit_mmap_buf_t *mmap_buf); +nxt_inline void nxt_unit_mmap_buf_unlink(nxt_unit_mmap_buf_t *mmap_buf); static int nxt_unit_read_env(nxt_unit_port_t *ready_port, - nxt_unit_port_t *read_port, int *log_fd, uint32_t *stream); + nxt_unit_port_t *read_port, int *log_fd, uint32_t *stream, + uint32_t *shm_limit); static int nxt_unit_ready(nxt_unit_ctx_t *ctx, nxt_unit_port_id_t *port_id, uint32_t stream); static int nxt_unit_process_new_port(nxt_unit_ctx_t *ctx, @@ -48,6 +58,7 @@ static int nxt_unit_process_req_headers(nxt_unit_ctx_t *ctx, nxt_unit_recv_msg_t *recv_msg); static int nxt_unit_process_websocket(nxt_unit_ctx_t *ctx, nxt_unit_recv_msg_t *recv_msg); +static int nxt_unit_process_shm_ack(nxt_unit_ctx_t *ctx); static nxt_unit_request_info_impl_t *nxt_unit_request_info_get( nxt_unit_ctx_t *ctx); static void nxt_unit_request_info_release(nxt_unit_request_info_t *req); @@ -63,11 +74,19 @@ static void nxt_unit_mmap_buf_release(nxt_unit_mmap_buf_t *mmap_buf); static int nxt_unit_mmap_buf_send(nxt_unit_ctx_t *ctx, uint32_t stream, nxt_unit_mmap_buf_t *mmap_buf, int last); static void nxt_unit_mmap_buf_free(nxt_unit_mmap_buf_t *mmap_buf); +static void nxt_unit_free_outgoing_buf(nxt_unit_mmap_buf_t *mmap_buf); +static nxt_unit_read_buf_t *nxt_unit_read_buf_get(nxt_unit_ctx_t *ctx); +static nxt_unit_read_buf_t *nxt_unit_read_buf_get_impl( + nxt_unit_ctx_impl_t *ctx_impl); +static void nxt_unit_read_buf_release(nxt_unit_ctx_t *ctx, + nxt_unit_read_buf_t *rbuf); static ssize_t nxt_unit_buf_read(nxt_unit_buf_t **b, uint64_t *len, void *dst, size_t size); static nxt_port_mmap_header_t *nxt_unit_mmap_get(nxt_unit_ctx_t *ctx, nxt_unit_process_t *process, nxt_unit_port_id_t *port_id, - nxt_chunk_id_t *c, int n); + nxt_chunk_id_t *c, int *n, int min_n); +static int nxt_unit_send_oosm(nxt_unit_ctx_t *ctx, nxt_unit_port_id_t *port_id); +static int nxt_unit_wait_shm_ack(nxt_unit_ctx_t *ctx); static nxt_unit_mmap_t *nxt_unit_mmap_at(nxt_unit_mmaps_t *mmaps, uint32_t i); static nxt_port_mmap_header_t *nxt_unit_new_mmap(nxt_unit_ctx_t *ctx, nxt_unit_process_t *process, nxt_unit_port_id_t *port_id, int n); @@ -75,7 +94,7 @@ static int nxt_unit_send_mmap(nxt_unit_ctx_t *ctx, nxt_unit_port_id_t *port_id, int fd); static int nxt_unit_get_outgoing_buf(nxt_unit_ctx_t *ctx, nxt_unit_process_t *process, nxt_unit_port_id_t *port_id, uint32_t size, - nxt_unit_mmap_buf_t *mmap_buf); + uint32_t min_size, nxt_unit_mmap_buf_t *mmap_buf, char *local_buf); static int nxt_unit_incoming_mmap(nxt_unit_ctx_t *ctx, pid_t pid, int fd); static void nxt_unit_mmaps_init(nxt_unit_mmaps_t *mmaps); @@ -88,14 +107,18 @@ static int nxt_unit_tracking_read(nxt_unit_ctx_t *ctx, nxt_unit_recv_msg_t *recv_msg); static int nxt_unit_mmap_read(nxt_unit_ctx_t *ctx, nxt_unit_recv_msg_t *recv_msg); -static int nxt_unit_mmap_release(nxt_port_mmap_header_t *hdr, void *start, - uint32_t size); +static void nxt_unit_mmap_release(nxt_unit_ctx_t *ctx, + nxt_unit_process_t *process, + nxt_port_mmap_header_t *hdr, void *start, uint32_t size); +static int nxt_unit_send_shm_ack(nxt_unit_ctx_t *ctx, pid_t pid); static nxt_unit_process_t *nxt_unit_process_get(nxt_unit_ctx_t *ctx, pid_t pid); static nxt_unit_process_t *nxt_unit_process_find(nxt_unit_ctx_t *ctx, pid_t pid, int remove); static nxt_unit_process_t *nxt_unit_process_pop_first(nxt_unit_impl_t *lib); +static void nxt_unit_read_buf(nxt_unit_ctx_t *ctx, + nxt_unit_read_buf_t *rbuf); static int nxt_unit_create_port(nxt_unit_ctx_t *ctx, nxt_unit_port_id_t *port_id, int *fd); @@ -135,10 +158,12 @@ struct nxt_unit_mmap_buf_s { nxt_unit_mmap_buf_t **prev; nxt_port_mmap_header_t *hdr; -// nxt_queue_link_t link; nxt_unit_port_id_t port_id; nxt_unit_request_info_t *req; nxt_unit_ctx_impl_t *ctx_impl; + nxt_unit_process_t *process; + char *free_ptr; + char *plain_ptr; }; @@ -196,8 +221,14 @@ struct nxt_unit_websocket_frame_impl_s { nxt_queue_link_t link; nxt_unit_ctx_impl_t *ctx_impl; +}; + - void *retain_buf; +struct nxt_unit_read_buf_s { + nxt_unit_read_buf_t *next; + ssize_t size; + char buf[16384]; + char oob[256]; }; @@ -225,7 +256,12 @@ struct nxt_unit_ctx_impl_s { /* of nxt_unit_request_info_impl_t */ nxt_lvlhsh_t requests; + nxt_unit_read_buf_t *pending_read_head; + nxt_unit_read_buf_t **pending_read_tail; + nxt_unit_read_buf_t *free_read_buf; + nxt_unit_mmap_buf_t ctx_buf[2]; + nxt_unit_read_buf_t ctx_read_buf; nxt_unit_request_info_impl_t req; }; @@ -236,6 +272,7 @@ struct nxt_unit_impl_s { nxt_unit_callbacks_t callbacks; uint32_t request_data_size; + uint32_t shm_mmap_limit; pthread_mutex_t mutex; @@ -271,6 +308,7 @@ struct nxt_unit_mmaps_s { pthread_mutex_t mutex; uint32_t size; uint32_t cap; + nxt_atomic_t allocated_chunks; nxt_unit_mmap_t *elts; }; @@ -302,7 +340,7 @@ nxt_unit_ctx_t * nxt_unit_init(nxt_unit_init_t *init) { int rc; - uint32_t ready_stream; + uint32_t ready_stream, shm_limit; nxt_unit_ctx_t *ctx; nxt_unit_impl_t *lib; nxt_unit_port_t ready_port, read_port; @@ -325,12 +363,20 @@ nxt_unit_init(nxt_unit_init_t *init) ready_port.id.id); nxt_unit_port_id_init(&read_port.id, read_port.id.pid, read_port.id.id); + } else { rc = nxt_unit_read_env(&ready_port, &read_port, &lib->log_fd, - &ready_stream); + &ready_stream, &shm_limit); if (nxt_slow_path(rc != NXT_UNIT_OK)) { goto fail; } + + lib->shm_mmap_limit = (shm_limit + PORT_MMAP_DATA_SIZE - 1) + / PORT_MMAP_DATA_SIZE; + } + + if (nxt_slow_path(lib->shm_mmap_limit < 1)) { + lib->shm_mmap_limit = 1; } lib->pid = read_port.id.pid; @@ -395,6 +441,8 @@ nxt_unit_create(nxt_unit_init_t *init) lib->callbacks = init->callbacks; lib->request_data_size = init->request_data_size; + lib->shm_mmap_limit = (init->shm_limit + PORT_MMAP_DATA_SIZE - 1) + / PORT_MMAP_DATA_SIZE; lib->processes.slot = NULL; lib->ports.slot = NULL; @@ -479,6 +527,11 @@ nxt_unit_ctx_init(nxt_unit_impl_t *lib, nxt_unit_ctx_impl_t *ctx_impl, nxt_queue_insert_tail(&ctx_impl->free_req, &ctx_impl->req.link); + ctx_impl->pending_read_head = NULL; + ctx_impl->pending_read_tail = &ctx_impl->pending_read_head; + ctx_impl->free_read_buf = &ctx_impl->ctx_read_buf; + ctx_impl->ctx_read_buf.next = NULL; + ctx_impl->req.req.ctx = &ctx_impl->ctx; ctx_impl->req.req.unit = &lib->unit; @@ -517,7 +570,7 @@ nxt_unit_mmap_buf_insert_tail(nxt_unit_mmap_buf_t **prev, nxt_inline void -nxt_unit_mmap_buf_remove(nxt_unit_mmap_buf_t *mmap_buf) +nxt_unit_mmap_buf_unlink(nxt_unit_mmap_buf_t *mmap_buf) { nxt_unit_mmap_buf_t **prev; @@ -535,7 +588,7 @@ nxt_unit_mmap_buf_remove(nxt_unit_mmap_buf_t *mmap_buf) static int nxt_unit_read_env(nxt_unit_port_t *ready_port, nxt_unit_port_t *read_port, - int *log_fd, uint32_t *stream) + int *log_fd, uint32_t *stream, uint32_t *shm_limit) { int rc; int ready_fd, read_fd; @@ -570,14 +623,14 @@ nxt_unit_read_env(nxt_unit_port_t *ready_port, nxt_unit_port_t *read_port, "%"PRIu32";" "%"PRId64",%"PRIu32",%d;" "%"PRId64",%"PRIu32",%d;" - "%d", + "%d,%"PRIu32, &ready_stream, &ready_pid, &ready_id, &ready_fd, &read_pid, &read_id, &read_fd, - log_fd); + log_fd, shm_limit); - if (nxt_slow_path(rc != 8)) { - nxt_unit_alert(NULL, "failed to scan variables"); + if (nxt_slow_path(rc != 9)) { + nxt_unit_alert(NULL, "failed to scan variables: %d", rc); return NXT_UNIT_ERROR; } @@ -756,6 +809,10 @@ nxt_unit_process_msg(nxt_unit_ctx_t *ctx, nxt_unit_port_id_t *port_id, rc = NXT_UNIT_OK; break; + case _NXT_PORT_MSG_SHM_ACK: + rc = nxt_unit_process_shm_ack(ctx); + break; + default: nxt_unit_debug(ctx, "#%"PRIu32": ignore message type: %d", port_msg->stream, (int) port_msg->type); @@ -961,7 +1018,6 @@ nxt_unit_process_websocket(nxt_unit_ctx_t *ctx, nxt_unit_recv_msg_t *recv_msg) ws_impl->ws.req = req; ws_impl->buf = NULL; - ws_impl->retain_buf = NULL; if (recv_msg->mmap) { for (b = recv_msg->incoming_buf; b != NULL; b = b->next) { @@ -986,7 +1042,6 @@ nxt_unit_process_websocket(nxt_unit_ctx_t *ctx, nxt_unit_recv_msg_t *recv_msg) return NXT_UNIT_ERROR; } - b->hdr = NULL; b->req = req; b->buf.start = recv_msg->start; b->buf.free = b->buf.start; @@ -1038,6 +1093,23 @@ nxt_unit_process_websocket(nxt_unit_ctx_t *ctx, nxt_unit_recv_msg_t *recv_msg) } +static int +nxt_unit_process_shm_ack(nxt_unit_ctx_t *ctx) +{ + nxt_unit_impl_t *lib; + nxt_unit_callbacks_t *cb; + + lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit); + cb = &lib->callbacks; + + if (cb->shm_ack_handler != NULL) { + cb->shm_ack_handler(ctx); + } + + return NXT_UNIT_OK; +} + + static nxt_unit_request_info_impl_t * nxt_unit_request_info_get(nxt_unit_ctx_t *ctx) { @@ -1193,12 +1265,6 @@ nxt_unit_websocket_frame_release(nxt_unit_websocket_frame_t *ws) ws->req = NULL; - if (ws_impl->retain_buf != NULL) { - free(ws_impl->retain_buf); - - ws_impl->retain_buf = NULL; - } - pthread_mutex_lock(&ws_impl->ctx_impl->mutex); nxt_queue_insert_tail(&ws_impl->ctx_impl->free_ws, &ws_impl->link); @@ -1649,7 +1715,7 @@ nxt_unit_response_send(nxt_unit_request_info_t *req) req->response_buf = NULL; req_impl->state = NXT_UNIT_RS_RESPONSE_SENT; - nxt_unit_mmap_buf_release(mmap_buf); + nxt_unit_mmap_buf_free(mmap_buf); } return rc; @@ -1697,7 +1763,8 @@ nxt_unit_response_buf_alloc(nxt_unit_request_info_t *req, uint32_t size) nxt_unit_mmap_buf_insert_tail(&req_impl->outgoing_buf, mmap_buf); rc = nxt_unit_get_outgoing_buf(req->ctx, req_impl->process, - &req->response_port, size, mmap_buf); + &req->response_port, size, size, mmap_buf, + NULL); if (nxt_slow_path(rc != NXT_UNIT_OK)) { nxt_unit_mmap_buf_release(mmap_buf); @@ -1755,13 +1822,16 @@ nxt_unit_mmap_buf_get(nxt_unit_ctx_t *ctx) } else { mmap_buf = ctx_impl->free_buf; - nxt_unit_mmap_buf_remove(mmap_buf); + nxt_unit_mmap_buf_unlink(mmap_buf); pthread_mutex_unlock(&ctx_impl->mutex); } mmap_buf->ctx_impl = ctx_impl; + mmap_buf->hdr = NULL; + mmap_buf->free_ptr = NULL; + return mmap_buf; } @@ -1769,7 +1839,7 @@ nxt_unit_mmap_buf_get(nxt_unit_ctx_t *ctx) static void nxt_unit_mmap_buf_release(nxt_unit_mmap_buf_t *mmap_buf) { - nxt_unit_mmap_buf_remove(mmap_buf); + nxt_unit_mmap_buf_unlink(mmap_buf); pthread_mutex_lock(&mmap_buf->ctx_impl->mutex); @@ -1896,7 +1966,7 @@ nxt_unit_buf_send(nxt_unit_buf_t *buf) } } - nxt_unit_mmap_buf_release(mmap_buf); + nxt_unit_mmap_buf_free(mmap_buf); return NXT_UNIT_OK; } @@ -1917,7 +1987,7 @@ nxt_unit_buf_send_done(nxt_unit_buf_t *buf) rc = nxt_unit_mmap_buf_send(req->ctx, req_impl->stream, mmap_buf, 1); if (nxt_slow_path(rc == NXT_UNIT_OK)) { - nxt_unit_mmap_buf_release(mmap_buf); + nxt_unit_mmap_buf_free(mmap_buf); nxt_unit_request_info_release(req); @@ -1936,7 +2006,8 @@ nxt_unit_mmap_buf_send(nxt_unit_ctx_t *ctx, uint32_t stream, nxt_port_mmap_msg_t mmap_msg; } m; - u_char *end, *last_used, *first_free; + int rc; + u_char *last_used, *first_free; ssize_t res; nxt_chunk_id_t first_free_chunk; nxt_unit_buf_t *buf; @@ -1960,38 +2031,85 @@ nxt_unit_mmap_buf_send(nxt_unit_ctx_t *ctx, uint32_t stream, m.msg.mf = 0; m.msg.tracking = 0; - if (hdr != NULL) { + rc = NXT_UNIT_ERROR; + + if (m.msg.mmap) { m.mmap_msg.mmap_id = hdr->id; m.mmap_msg.chunk_id = nxt_port_mmap_chunk_id(hdr, (u_char *) buf->start); - } - nxt_unit_debug(ctx, "#%"PRIu32": send mmap: (%d,%d,%d)", - stream, - (int) m.mmap_msg.mmap_id, - (int) m.mmap_msg.chunk_id, - (int) m.mmap_msg.size); + nxt_unit_debug(ctx, "#%"PRIu32": send mmap: (%d,%d,%d)", + stream, + (int) m.mmap_msg.mmap_id, + (int) m.mmap_msg.chunk_id, + (int) m.mmap_msg.size); - res = lib->callbacks.port_send(ctx, &mmap_buf->port_id, &m, - m.msg.mmap ? sizeof(m) : sizeof(m.msg), - NULL, 0); - if (nxt_slow_path(res != sizeof(m))) { - return NXT_UNIT_ERROR; - } + res = lib->callbacks.port_send(ctx, &mmap_buf->port_id, &m, sizeof(m), + NULL, 0); + if (nxt_slow_path(res != sizeof(m))) { + goto free_buf; + } - if (buf->end - buf->free >= PORT_MMAP_CHUNK_SIZE && hdr != NULL) { last_used = (u_char *) buf->free - 1; - first_free_chunk = nxt_port_mmap_chunk_id(hdr, last_used) + 1; - first_free = nxt_port_mmap_chunk_start(hdr, first_free_chunk); - end = (u_char *) buf->end; - nxt_unit_mmap_release(hdr, first_free, (uint32_t) (end - first_free)); + if (buf->end - buf->free >= PORT_MMAP_CHUNK_SIZE) { + first_free = nxt_port_mmap_chunk_start(hdr, first_free_chunk); + + buf->start = (char *) first_free; + buf->free = buf->start; + + if (buf->end < buf->start) { + buf->end = buf->start; + } + + } else { + buf->start = NULL; + buf->free = NULL; + buf->end = NULL; + + mmap_buf->hdr = NULL; + } + + nxt_atomic_fetch_add(&mmap_buf->process->outgoing.allocated_chunks, + (int) m.mmap_msg.chunk_id - (int) first_free_chunk); + + nxt_unit_debug(ctx, "process %d allocated_chunks %d", + mmap_buf->process->pid, + mmap_buf->process->outgoing.allocated_chunks); + + } else { + if (nxt_slow_path(mmap_buf->plain_ptr == NULL + || mmap_buf->plain_ptr > buf->start - sizeof(m.msg))) + { + nxt_unit_warn(ctx, "#%"PRIu32": failed to send plain memory buffer" + ": no space reserved for message header", stream); + + goto free_buf; + } + + memcpy(buf->start - sizeof(m.msg), &m.msg, sizeof(m.msg)); + + nxt_unit_debug(ctx, "#%"PRIu32": send plain: %d", + stream, + (int) (sizeof(m.msg) + m.mmap_msg.size)); - buf->end = (char *) first_free; + res = lib->callbacks.port_send(ctx, &mmap_buf->port_id, + buf->start - sizeof(m.msg), + m.mmap_msg.size + sizeof(m.msg), + NULL, 0); + if (nxt_slow_path(res != (ssize_t) (m.mmap_msg.size + sizeof(m.msg)))) { + goto free_buf; + } } - return NXT_UNIT_OK; + rc = NXT_UNIT_OK; + +free_buf: + + nxt_unit_free_outgoing_buf(mmap_buf); + + return rc; } @@ -2005,12 +2123,83 @@ nxt_unit_buf_free(nxt_unit_buf_t *buf) static void nxt_unit_mmap_buf_free(nxt_unit_mmap_buf_t *mmap_buf) { - if (nxt_fast_path(mmap_buf->hdr != NULL)) { - nxt_unit_mmap_release(mmap_buf->hdr, mmap_buf->buf.start, + nxt_unit_free_outgoing_buf(mmap_buf); + + nxt_unit_mmap_buf_release(mmap_buf); +} + + +static void +nxt_unit_free_outgoing_buf(nxt_unit_mmap_buf_t *mmap_buf) +{ + if (mmap_buf->hdr != NULL) { + nxt_unit_mmap_release(&mmap_buf->ctx_impl->ctx, + mmap_buf->process, + mmap_buf->hdr, mmap_buf->buf.start, mmap_buf->buf.end - mmap_buf->buf.start); + + mmap_buf->hdr = NULL; + + return; } - nxt_unit_mmap_buf_release(mmap_buf); + if (mmap_buf->free_ptr != NULL) { + free(mmap_buf->free_ptr); + + mmap_buf->free_ptr = NULL; + } +} + + +static nxt_unit_read_buf_t * +nxt_unit_read_buf_get(nxt_unit_ctx_t *ctx) +{ + nxt_unit_ctx_impl_t *ctx_impl; + + ctx_impl = nxt_container_of(ctx, nxt_unit_ctx_impl_t, ctx); + + pthread_mutex_lock(&ctx_impl->mutex); + + return nxt_unit_read_buf_get_impl(ctx_impl); +} + + +static nxt_unit_read_buf_t * +nxt_unit_read_buf_get_impl(nxt_unit_ctx_impl_t *ctx_impl) +{ + nxt_unit_read_buf_t *rbuf; + + if (ctx_impl->free_read_buf != NULL) { + rbuf = ctx_impl->free_read_buf; + ctx_impl->free_read_buf = rbuf->next; + + pthread_mutex_unlock(&ctx_impl->mutex); + + return rbuf; + } + + pthread_mutex_unlock(&ctx_impl->mutex); + + rbuf = malloc(sizeof(nxt_unit_read_buf_t)); + + return rbuf; +} + + +static void +nxt_unit_read_buf_release(nxt_unit_ctx_t *ctx, + nxt_unit_read_buf_t *rbuf) +{ + nxt_unit_ctx_impl_t *ctx_impl; + + ctx_impl = nxt_container_of(ctx, nxt_unit_ctx_impl_t, ctx); + + pthread_mutex_lock(&ctx_impl->mutex); + + rbuf->next = ctx_impl->free_read_buf; + ctx_impl->free_read_buf = rbuf; + + pthread_mutex_unlock(&ctx_impl->mutex); } @@ -2047,61 +2236,93 @@ int nxt_unit_response_write(nxt_unit_request_info_t *req, const void *start, size_t size) { + ssize_t res; + + res = nxt_unit_response_write_nb(req, start, size, size); + + return res < 0 ? -res : NXT_UNIT_OK; +} + + +ssize_t +nxt_unit_response_write_nb(nxt_unit_request_info_t *req, const void *start, + size_t size, size_t min_size) +{ int rc; - uint32_t part_size; + ssize_t sent; + uint32_t part_size, min_part_size, buf_size; const char *part_start; nxt_unit_mmap_buf_t mmap_buf; nxt_unit_request_info_impl_t *req_impl; + char local_buf[NXT_UNIT_LOCAL_BUF_SIZE]; req_impl = nxt_container_of(req, nxt_unit_request_info_impl_t, req); part_start = start; + sent = 0; + + if (nxt_slow_path(req_impl->state < NXT_UNIT_RS_RESPONSE_INIT)) { + nxt_unit_req_warn(req, "write: response not initialized yet"); + + return -NXT_UNIT_ERROR; + } /* Check if response is not send yet. */ - if (nxt_slow_path(req->response_buf)) { + if (nxt_slow_path(req->response_buf != NULL)) { part_size = req->response_buf->end - req->response_buf->free; part_size = nxt_min(size, part_size); rc = nxt_unit_response_add_content(req, part_start, part_size); if (nxt_slow_path(rc != NXT_UNIT_OK)) { - return rc; + return -rc; } rc = nxt_unit_response_send(req); if (nxt_slow_path(rc != NXT_UNIT_OK)) { - return rc; + return -rc; } size -= part_size; part_start += part_size; + sent += part_size; + + min_size -= nxt_min(min_size, part_size); } while (size > 0) { part_size = nxt_min(size, PORT_MMAP_DATA_SIZE); + min_part_size = nxt_min(min_size, part_size); + min_part_size = nxt_min(min_part_size, PORT_MMAP_CHUNK_SIZE); rc = nxt_unit_get_outgoing_buf(req->ctx, req_impl->process, &req->response_port, part_size, - &mmap_buf); + min_part_size, &mmap_buf, local_buf); if (nxt_slow_path(rc != NXT_UNIT_OK)) { - return rc; + return -rc; } + buf_size = mmap_buf.buf.end - mmap_buf.buf.free; + if (nxt_slow_path(buf_size == 0)) { + return sent; + } + part_size = nxt_min(buf_size, part_size); + mmap_buf.buf.free = nxt_cpymem(mmap_buf.buf.free, part_start, part_size); rc = nxt_unit_mmap_buf_send(req->ctx, req_impl->stream, &mmap_buf, 0); if (nxt_slow_path(rc != NXT_UNIT_OK)) { - nxt_unit_mmap_release(mmap_buf.hdr, mmap_buf.buf.start, - mmap_buf.buf.end - mmap_buf.buf.start); - - return rc; + return -rc; } size -= part_size; part_start += part_size; + sent += part_size; + + min_size -= nxt_min(min_size, part_size); } - return NXT_UNIT_OK; + return sent; } @@ -2109,9 +2330,15 @@ int nxt_unit_response_write_cb(nxt_unit_request_info_t *req, nxt_unit_read_info_t *read_info) { - int rc; - ssize_t n; - nxt_unit_buf_t *buf; + int rc; + ssize_t n; + uint32_t buf_size; + nxt_unit_buf_t *buf; + nxt_unit_mmap_buf_t mmap_buf; + nxt_unit_request_info_impl_t *req_impl; + char local_buf[NXT_UNIT_LOCAL_BUF_SIZE]; + + req_impl = nxt_container_of(req, nxt_unit_request_info_impl_t, req); /* Check if response is not send yet. */ if (nxt_slow_path(req->response_buf)) { @@ -2159,20 +2386,24 @@ nxt_unit_response_write_cb(nxt_unit_request_info_t *req, nxt_unit_req_debug(req, "write_cb, alloc %"PRIu32"", read_info->buf_size); - buf = nxt_unit_response_buf_alloc(req, nxt_min(read_info->buf_size, - PORT_MMAP_DATA_SIZE)); - if (nxt_slow_path(buf == NULL)) { - nxt_unit_req_error(req, "Failed to allocate buf for content"); + buf_size = nxt_min(read_info->buf_size, PORT_MMAP_DATA_SIZE); - return NXT_UNIT_ERROR; + rc = nxt_unit_get_outgoing_buf(req->ctx, req_impl->process, + &req->response_port, + buf_size, buf_size, + &mmap_buf, local_buf); + if (nxt_slow_path(rc != NXT_UNIT_OK)) { + return rc; } + buf = &mmap_buf.buf; + while (!read_info->eof && buf->end > buf->free) { n = read_info->read(read_info, buf->free, buf->end - buf->free); if (nxt_slow_path(n < 0)) { nxt_unit_req_error(req, "Read error"); - nxt_unit_buf_free(buf); + nxt_unit_free_outgoing_buf(&mmap_buf); return NXT_UNIT_ERROR; } @@ -2180,7 +2411,7 @@ nxt_unit_response_write_cb(nxt_unit_request_info_t *req, buf->free += n; } - rc = nxt_unit_buf_send(buf); + rc = nxt_unit_mmap_buf_send(req->ctx, req_impl->stream, &mmap_buf, 0); if (nxt_slow_path(rc != NXT_UNIT_OK)) { nxt_unit_req_error(req, "Failed to send content"); @@ -2325,12 +2556,17 @@ int nxt_unit_websocket_sendv(nxt_unit_request_info_t *req, uint8_t opcode, uint8_t last, const struct iovec *iov, int iovcnt) { - int i, rc; - size_t l, copy; - uint32_t payload_len, buf_size; - const uint8_t *b; - nxt_unit_buf_t *buf; - nxt_websocket_header_t *wh; + int i, rc; + size_t l, copy; + uint32_t payload_len, buf_size, alloc_size; + const uint8_t *b; + nxt_unit_buf_t *buf; + nxt_unit_mmap_buf_t mmap_buf; + nxt_websocket_header_t *wh; + nxt_unit_request_info_impl_t *req_impl; + char local_buf[NXT_UNIT_LOCAL_BUF_SIZE]; + + req_impl = nxt_container_of(req, nxt_unit_request_info_impl_t, req); payload_len = 0; @@ -2339,18 +2575,23 @@ nxt_unit_websocket_sendv(nxt_unit_request_info_t *req, uint8_t opcode, } buf_size = 10 + payload_len; + alloc_size = nxt_min(buf_size, PORT_MMAP_DATA_SIZE); - buf = nxt_unit_response_buf_alloc(req, nxt_min(buf_size, - PORT_MMAP_DATA_SIZE)); - if (nxt_slow_path(buf == NULL)) { - nxt_unit_req_error(req, "Failed to allocate buf for content"); - - return NXT_UNIT_ERROR; + rc = nxt_unit_get_outgoing_buf(req->ctx, req_impl->process, + &req->response_port, + alloc_size, alloc_size, + &mmap_buf, local_buf); + if (nxt_slow_path(rc != NXT_UNIT_OK)) { + return rc; } + buf = &mmap_buf.buf; + buf->start[0] = 0; buf->start[1] = 0; + buf_size -= buf->end - buf->start; + wh = (void *) buf->free; buf->free = nxt_websocket_frame_init(wh, payload_len); @@ -2370,32 +2611,33 @@ nxt_unit_websocket_sendv(nxt_unit_request_info_t *req, uint8_t opcode, l -= copy; if (l > 0) { - buf_size -= buf->end - buf->start; - - rc = nxt_unit_buf_send(buf); - if (nxt_slow_path(rc != NXT_UNIT_OK)) { - nxt_unit_req_error(req, "Failed to send content"); + if (nxt_fast_path(buf->free > buf->start)) { + rc = nxt_unit_mmap_buf_send(req->ctx, req_impl->stream, + &mmap_buf, 0); - return NXT_UNIT_ERROR; + if (nxt_slow_path(rc != NXT_UNIT_OK)) { + return rc; + } } - buf = nxt_unit_response_buf_alloc(req, nxt_min(buf_size, - PORT_MMAP_DATA_SIZE)); - if (nxt_slow_path(buf == NULL)) { - nxt_unit_req_error(req, - "Failed to allocate buf for content"); + alloc_size = nxt_min(buf_size, PORT_MMAP_DATA_SIZE); - return NXT_UNIT_ERROR; + rc = nxt_unit_get_outgoing_buf(req->ctx, req_impl->process, + &req->response_port, + alloc_size, alloc_size, + &mmap_buf, local_buf); + if (nxt_slow_path(rc != NXT_UNIT_OK)) { + return rc; } + + buf_size -= buf->end - buf->start; } } } if (buf->free > buf->start) { - rc = nxt_unit_buf_send(buf); - if (nxt_slow_path(rc != NXT_UNIT_OK)) { - nxt_unit_req_error(req, "Failed to send content"); - } + rc = nxt_unit_mmap_buf_send(req->ctx, req_impl->stream, + &mmap_buf, 0); } return rc; @@ -2437,7 +2679,7 @@ nxt_unit_websocket_retain(nxt_unit_websocket_frame_t *ws) ws_impl = nxt_container_of(ws, nxt_unit_websocket_frame_impl_t, ws); - if (ws_impl->retain_buf != NULL || ws_impl->buf->hdr != NULL) { + if (ws_impl->buf->free_ptr != NULL || ws_impl->buf->hdr != NULL) { return NXT_UNIT_OK; } @@ -2454,7 +2696,7 @@ nxt_unit_websocket_retain(nxt_unit_websocket_frame_t *ws) ws_impl->buf->buf.free = b; ws_impl->buf->buf.end = b + size; - ws_impl->retain_buf = b; + ws_impl->buf->free_ptr = b; return NXT_UNIT_OK; } @@ -2469,15 +2711,23 @@ nxt_unit_websocket_done(nxt_unit_websocket_frame_t *ws) static nxt_port_mmap_header_t * nxt_unit_mmap_get(nxt_unit_ctx_t *ctx, nxt_unit_process_t *process, - nxt_unit_port_id_t *port_id, nxt_chunk_id_t *c, int n) + nxt_unit_port_id_t *port_id, nxt_chunk_id_t *c, int *n, int min_n) { int res, nchunks, i; + uint32_t outgoing_size; nxt_unit_mmap_t *mm, *mm_end; + nxt_unit_impl_t *lib; nxt_port_mmap_header_t *hdr; + lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit); + pthread_mutex_lock(&process->outgoing.mutex); - mm_end = process->outgoing.elts + process->outgoing.size; +retry: + + outgoing_size = process->outgoing.size; + + mm_end = process->outgoing.elts + outgoing_size; for (mm = process->outgoing.elts; mm < mm_end; mm++) { hdr = mm->hdr; @@ -2491,11 +2741,17 @@ nxt_unit_mmap_get(nxt_unit_ctx_t *ctx, nxt_unit_process_t *process, while (nxt_port_mmap_get_free_chunk(hdr->free_map, c)) { nchunks = 1; - while (nchunks < n) { + while (nchunks < *n) { res = nxt_port_mmap_chk_set_chunk_busy(hdr->free_map, *c + nchunks); if (res == 0) { + if (nchunks >= min_n) { + *n = nchunks; + + goto unlock; + } + for (i = 0; i < nchunks; i++) { nxt_port_mmap_set_chunk_free(hdr->free_map, *c + i); } @@ -2508,23 +2764,155 @@ nxt_unit_mmap_get(nxt_unit_ctx_t *ctx, nxt_unit_process_t *process, nchunks++; } - if (nchunks == n) { + if (nchunks >= min_n) { + *n = nchunks; + goto unlock; } } + + hdr->oosm = 1; + } + + if (outgoing_size >= lib->shm_mmap_limit) { + /* Cannot allocate more shared memory. */ + pthread_mutex_unlock(&process->outgoing.mutex); + + if (min_n == 0) { + *n = 0; + } + + if (nxt_slow_path(process->outgoing.allocated_chunks + min_n + >= lib->shm_mmap_limit * PORT_MMAP_CHUNK_COUNT)) + { + /* Memory allocated by application, but not send to router. */ + return NULL; + } + + /* Notify router about OOSM condition. */ + + res = nxt_unit_send_oosm(ctx, port_id); + if (nxt_slow_path(res != NXT_UNIT_OK)) { + return NULL; + } + + /* Return if caller can handle OOSM condition. Non-blocking mode. */ + + if (min_n == 0) { + return NULL; + } + + nxt_unit_debug(ctx, "oosm: waiting for ACK"); + + res = nxt_unit_wait_shm_ack(ctx); + if (nxt_slow_path(res != NXT_UNIT_OK)) { + return NULL; + } + + nxt_unit_debug(ctx, "oosm: retry"); + + pthread_mutex_lock(&process->outgoing.mutex); + + goto retry; } *c = 0; - hdr = nxt_unit_new_mmap(ctx, process, port_id, n); + hdr = nxt_unit_new_mmap(ctx, process, port_id, *n); unlock: + nxt_atomic_fetch_add(&process->outgoing.allocated_chunks, *n); + + nxt_unit_debug(ctx, "process %d allocated_chunks %d", + process->pid, + process->outgoing.allocated_chunks); + pthread_mutex_unlock(&process->outgoing.mutex); return hdr; } +static int +nxt_unit_send_oosm(nxt_unit_ctx_t *ctx, nxt_unit_port_id_t *port_id) +{ + ssize_t res; + nxt_port_msg_t msg; + nxt_unit_impl_t *lib; + + lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit); + + msg.stream = 0; + msg.pid = lib->pid; + msg.reply_port = 0; + msg.type = _NXT_PORT_MSG_OOSM; + msg.last = 0; + msg.mmap = 0; + msg.nf = 0; + msg.mf = 0; + msg.tracking = 0; + + res = lib->callbacks.port_send(ctx, port_id, &msg, sizeof(msg), NULL, 0); + if (nxt_slow_path(res != sizeof(msg))) { + nxt_unit_warn(ctx, "failed to send oosm to %d: %s (%d)", + (int) port_id->pid, strerror(errno), errno); + + return NXT_UNIT_ERROR; + } + + return NXT_UNIT_OK; +} + + +static int +nxt_unit_wait_shm_ack(nxt_unit_ctx_t *ctx) +{ + nxt_port_msg_t *port_msg; + nxt_unit_ctx_impl_t *ctx_impl; + nxt_unit_read_buf_t *rbuf; + + ctx_impl = nxt_container_of(ctx, nxt_unit_ctx_impl_t, ctx); + + while (1) { + rbuf = nxt_unit_read_buf_get(ctx); + if (nxt_slow_path(rbuf == NULL)) { + return NXT_UNIT_ERROR; + } + + nxt_unit_read_buf(ctx, rbuf); + if (nxt_slow_path(rbuf->size < (ssize_t) sizeof(nxt_port_msg_t))) { + nxt_unit_read_buf_release(ctx, rbuf); + + return NXT_UNIT_ERROR; + } + + port_msg = (nxt_port_msg_t *) rbuf->buf; + + if (port_msg->type == _NXT_PORT_MSG_SHM_ACK) { + nxt_unit_read_buf_release(ctx, rbuf); + + break; + } + + pthread_mutex_lock(&ctx_impl->mutex); + + *ctx_impl->pending_read_tail = rbuf; + ctx_impl->pending_read_tail = &rbuf->next; + rbuf->next = NULL; + + pthread_mutex_unlock(&ctx_impl->mutex); + + if (port_msg->type == _NXT_PORT_MSG_QUIT) { + nxt_unit_debug(ctx, "oosm: quit received"); + + return NXT_UNIT_ERROR; + } + } + + return NXT_UNIT_OK; +} + + static nxt_unit_mmap_t * nxt_unit_mmap_at(nxt_unit_mmaps_t *mmaps, uint32_t i) { @@ -2759,17 +3147,55 @@ nxt_unit_send_mmap(nxt_unit_ctx_t *ctx, nxt_unit_port_id_t *port_id, int fd) static int nxt_unit_get_outgoing_buf(nxt_unit_ctx_t *ctx, nxt_unit_process_t *process, - nxt_unit_port_id_t *port_id, uint32_t size, - nxt_unit_mmap_buf_t *mmap_buf) + nxt_unit_port_id_t *port_id, uint32_t size, uint32_t min_size, + nxt_unit_mmap_buf_t *mmap_buf, char *local_buf) { - uint32_t nchunks; + int nchunks, min_nchunks; nxt_chunk_id_t c; nxt_port_mmap_header_t *hdr; + if (size <= NXT_UNIT_MAX_PLAIN_SIZE) { + if (local_buf != NULL) { + mmap_buf->free_ptr = NULL; + mmap_buf->plain_ptr = local_buf; + + } else { + mmap_buf->free_ptr = malloc(size + sizeof(nxt_port_msg_t)); + if (nxt_slow_path(mmap_buf->free_ptr == NULL)) { + return NXT_UNIT_ERROR; + } + + mmap_buf->plain_ptr = mmap_buf->free_ptr; + } + + mmap_buf->hdr = NULL; + mmap_buf->buf.start = mmap_buf->plain_ptr + sizeof(nxt_port_msg_t); + mmap_buf->buf.free = mmap_buf->buf.start; + mmap_buf->buf.end = mmap_buf->buf.start + size; + mmap_buf->port_id = *port_id; + mmap_buf->process = process; + + nxt_unit_debug(ctx, "outgoing plain buffer allocation: (%p, %d)", + mmap_buf->buf.start, (int) size); + + return NXT_UNIT_OK; + } + nchunks = (size + PORT_MMAP_CHUNK_SIZE - 1) / PORT_MMAP_CHUNK_SIZE; + min_nchunks = (min_size + PORT_MMAP_CHUNK_SIZE - 1) / PORT_MMAP_CHUNK_SIZE; - hdr = nxt_unit_mmap_get(ctx, process, port_id, &c, nchunks); + hdr = nxt_unit_mmap_get(ctx, process, port_id, &c, &nchunks, min_nchunks); if (nxt_slow_path(hdr == NULL)) { + if (nxt_fast_path(min_nchunks == 0 && nchunks == 0)) { + mmap_buf->hdr = NULL; + mmap_buf->buf.start = NULL; + mmap_buf->buf.free = NULL; + mmap_buf->buf.end = NULL; + mmap_buf->free_ptr = NULL; + + return NXT_UNIT_OK; + } + return NXT_UNIT_ERROR; } @@ -2778,6 +3204,8 @@ nxt_unit_get_outgoing_buf(nxt_unit_ctx_t *ctx, nxt_unit_process_t *process, mmap_buf->buf.free = mmap_buf->buf.start; mmap_buf->buf.end = mmap_buf->buf.start + nchunks * PORT_MMAP_CHUNK_SIZE; mmap_buf->port_id = *port_id; + mmap_buf->process = process; + mmap_buf->free_ptr = NULL; nxt_unit_debug(ctx, "outgoing mmap allocation: (%d,%d,%d)", (int) hdr->id, (int) c, @@ -2880,6 +3308,7 @@ nxt_unit_mmaps_init(nxt_unit_mmaps_t *mmaps) mmaps->size = 0; mmaps->cap = 0; mmaps->elts = NULL; + mmaps->allocated_chunks = 0; } @@ -3020,6 +3449,22 @@ nxt_unit_mmap_read(nxt_unit_ctx_t *ctx, nxt_unit_recv_msg_t *recv_msg) incoming_tail = &recv_msg->incoming_buf; + for (; mmap_msg < end; mmap_msg++) { + b = nxt_unit_mmap_buf_get(ctx); + if (nxt_slow_path(b == NULL)) { + nxt_unit_warn(ctx, "#%"PRIu32": mmap_read: failed to allocate buf", + recv_msg->stream); + + return NXT_UNIT_ERROR; + } + + nxt_unit_mmap_buf_insert(incoming_tail, b); + incoming_tail = &b->next; + } + + b = recv_msg->incoming_buf; + mmap_msg = recv_msg->start; + pthread_mutex_lock(&process->incoming.mutex); for (; mmap_msg < end; mmap_msg++) { @@ -3043,25 +3488,13 @@ nxt_unit_mmap_read(nxt_unit_ctx_t *ctx, nxt_unit_recv_msg_t *recv_msg) recv_msg->size = size; } - b = nxt_unit_mmap_buf_get(ctx); - if (nxt_slow_path(b == NULL)) { - pthread_mutex_unlock(&process->incoming.mutex); - - nxt_unit_warn(ctx, "#%"PRIu32": mmap_read: failed to allocate buf", - recv_msg->stream); - - nxt_unit_mmap_release(hdr, start, size); - - return NXT_UNIT_ERROR; - } - - nxt_unit_mmap_buf_insert(incoming_tail, b); - incoming_tail = &b->next; - b->buf.start = start; b->buf.free = start; b->buf.end = b->buf.start + size; b->hdr = hdr; + b->process = process; + + b = b->next; nxt_unit_debug(ctx, "#%"PRIu32": mmap_read: [%p,%d] %d->%d,(%d,%d,%d)", recv_msg->stream, @@ -3077,23 +3510,79 @@ nxt_unit_mmap_read(nxt_unit_ctx_t *ctx, nxt_unit_recv_msg_t *recv_msg) } -static int -nxt_unit_mmap_release(nxt_port_mmap_header_t *hdr, void *start, uint32_t size) +static void +nxt_unit_mmap_release(nxt_unit_ctx_t *ctx, + nxt_unit_process_t *process, nxt_port_mmap_header_t *hdr, + void *start, uint32_t size) { - u_char *p, *end; - nxt_chunk_id_t c; + int freed_chunks; + u_char *p, *end; + nxt_chunk_id_t c; + nxt_unit_impl_t *lib; memset(start, 0xA5, size); p = start; end = p + size; c = nxt_port_mmap_chunk_id(hdr, p); + freed_chunks = 0; while (p < end) { nxt_port_mmap_set_chunk_free(hdr->free_map, c); p += PORT_MMAP_CHUNK_SIZE; c++; + freed_chunks++; + } + + lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit); + + if (hdr->src_pid == lib->pid && freed_chunks != 0) { + nxt_atomic_fetch_add(&process->outgoing.allocated_chunks, + -freed_chunks); + + nxt_unit_debug(ctx, "process %d allocated_chunks %d", + process->pid, + process->outgoing.allocated_chunks); + } + + if (hdr->dst_pid == lib->pid + && freed_chunks != 0 + && nxt_atomic_cmp_set(&hdr->oosm, 1, 0)) + { + nxt_unit_send_shm_ack(ctx, hdr->src_pid); + } +} + + +static int +nxt_unit_send_shm_ack(nxt_unit_ctx_t *ctx, pid_t pid) +{ + ssize_t res; + nxt_port_msg_t msg; + nxt_unit_impl_t *lib; + nxt_unit_port_id_t port_id; + + lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit); + + nxt_unit_port_id_init(&port_id, pid, 0); + + msg.stream = 0; + msg.pid = lib->pid; + msg.reply_port = 0; + msg.type = _NXT_PORT_MSG_SHM_ACK; + msg.last = 0; + msg.mmap = 0; + msg.nf = 0; + msg.mf = 0; + msg.tracking = 0; + + res = lib->callbacks.port_send(ctx, &port_id, &msg, sizeof(msg), NULL, 0); + if (nxt_slow_path(res != sizeof(msg))) { + nxt_unit_warn(ctx, "failed to send ack to %d: %s (%d)", + (int) port_id.pid, strerror(errno), errno); + + return NXT_UNIT_ERROR; } return NXT_UNIT_OK; @@ -3255,43 +3744,76 @@ int nxt_unit_run_once(nxt_unit_ctx_t *ctx) { int rc; - char buf[4096]; - char oob[256]; - ssize_t rsize; - nxt_unit_impl_t *lib; nxt_unit_ctx_impl_t *ctx_impl; + nxt_unit_read_buf_t *rbuf; - lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit); ctx_impl = nxt_container_of(ctx, nxt_unit_ctx_impl_t, ctx); - memset(oob, 0, sizeof(struct cmsghdr)); + pthread_mutex_lock(&ctx_impl->mutex); + + if (ctx_impl->pending_read_head != NULL) { + rbuf = ctx_impl->pending_read_head; + ctx_impl->pending_read_head = rbuf->next; + + if (ctx_impl->pending_read_tail == &rbuf->next) { + ctx_impl->pending_read_tail = &ctx_impl->pending_read_head; + } + + pthread_mutex_unlock(&ctx_impl->mutex); - if (ctx_impl->read_port_fd != -1) { - rsize = nxt_unit_port_recv(ctx, ctx_impl->read_port_fd, - buf, sizeof(buf), - oob, sizeof(oob)); } else { - rsize = lib->callbacks.port_recv(ctx, &ctx_impl->read_port_id, - buf, sizeof(buf), - oob, sizeof(oob)); + rbuf = nxt_unit_read_buf_get_impl(ctx_impl); + if (nxt_slow_path(rbuf == NULL)) { + return NXT_UNIT_ERROR; + } + + nxt_unit_read_buf(ctx, rbuf); } - if (nxt_fast_path(rsize > 0)) { - rc = nxt_unit_process_msg(ctx, &ctx_impl->read_port_id, buf, rsize, - oob, sizeof(oob)); + if (nxt_fast_path(rbuf->size > 0)) { + rc = nxt_unit_process_msg(ctx, &ctx_impl->read_port_id, + rbuf->buf, rbuf->size, + rbuf->oob, sizeof(rbuf->oob)); #if (NXT_DEBUG) - memset(buf, 0xAC, rsize); + memset(rbuf->buf, 0xAC, rbuf->size); #endif } else { rc = NXT_UNIT_ERROR; } + nxt_unit_read_buf_release(ctx, rbuf); + return rc; } +static void +nxt_unit_read_buf(nxt_unit_ctx_t *ctx, nxt_unit_read_buf_t *rbuf) +{ + nxt_unit_impl_t *lib; + nxt_unit_ctx_impl_t *ctx_impl; + + ctx_impl = nxt_container_of(ctx, nxt_unit_ctx_impl_t, ctx); + + memset(rbuf->oob, 0, sizeof(struct cmsghdr)); + + if (ctx_impl->read_port_fd != -1) { + rbuf->size = nxt_unit_port_recv(ctx, ctx_impl->read_port_fd, + rbuf->buf, sizeof(rbuf->buf), + rbuf->oob, sizeof(rbuf->oob)); + + } else { + lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit); + + rbuf->size = lib->callbacks.port_recv(ctx, &ctx_impl->read_port_id, + rbuf->buf, sizeof(rbuf->buf), + rbuf->oob, sizeof(rbuf->oob)); + } +} + + void nxt_unit_done(nxt_unit_ctx_t *ctx) { @@ -3399,12 +3921,12 @@ nxt_unit_ctx_free(nxt_unit_ctx_t *ctx) } nxt_queue_loop; - nxt_unit_mmap_buf_remove(&ctx_impl->ctx_buf[0]); - nxt_unit_mmap_buf_remove(&ctx_impl->ctx_buf[1]); + nxt_unit_mmap_buf_unlink(&ctx_impl->ctx_buf[0]); + nxt_unit_mmap_buf_unlink(&ctx_impl->ctx_buf[1]); while (ctx_impl->free_buf != NULL) { mmap_buf = ctx_impl->free_buf; - nxt_unit_mmap_buf_remove(mmap_buf); + nxt_unit_mmap_buf_unlink(mmap_buf); free(mmap_buf); } |