diff options
Diffstat (limited to '')
-rw-r--r-- | src/nxt_unit.c | 1159 |
1 files changed, 888 insertions, 271 deletions
diff --git a/src/nxt_unit.c b/src/nxt_unit.c index 88c7fa6a..28a0de20 100644 --- a/src/nxt_unit.c +++ b/src/nxt_unit.c @@ -11,38 +11,60 @@ #include "nxt_unit.h" #include "nxt_unit_request.h" #include "nxt_unit_response.h" +#include "nxt_unit_websocket.h" + +#include "nxt_websocket.h" #if (NXT_HAVE_MEMFD_CREATE) #include <linux/memfd.h> #endif -typedef struct nxt_unit_impl_s nxt_unit_impl_t; -typedef struct nxt_unit_mmap_s nxt_unit_mmap_t; -typedef struct nxt_unit_mmaps_s nxt_unit_mmaps_t; -typedef struct nxt_unit_process_s nxt_unit_process_t; -typedef struct nxt_unit_mmap_buf_s nxt_unit_mmap_buf_t; -typedef struct nxt_unit_recv_msg_s nxt_unit_recv_msg_t; -typedef struct nxt_unit_ctx_impl_s nxt_unit_ctx_impl_t; -typedef struct nxt_unit_port_impl_s nxt_unit_port_impl_t; -typedef struct nxt_unit_request_info_impl_s nxt_unit_request_info_impl_t; +typedef struct nxt_unit_impl_s nxt_unit_impl_t; +typedef struct nxt_unit_mmap_s nxt_unit_mmap_t; +typedef struct nxt_unit_mmaps_s nxt_unit_mmaps_t; +typedef struct nxt_unit_process_s nxt_unit_process_t; +typedef struct nxt_unit_mmap_buf_s nxt_unit_mmap_buf_t; +typedef struct nxt_unit_recv_msg_s nxt_unit_recv_msg_t; +typedef struct nxt_unit_ctx_impl_s nxt_unit_ctx_impl_t; +typedef struct nxt_unit_port_impl_s nxt_unit_port_impl_t; +typedef struct nxt_unit_request_info_impl_s nxt_unit_request_info_impl_t; +typedef struct nxt_unit_websocket_frame_impl_s nxt_unit_websocket_frame_impl_t; static nxt_unit_impl_t *nxt_unit_create(nxt_unit_init_t *init); static void nxt_unit_ctx_init(nxt_unit_impl_t *lib, nxt_unit_ctx_impl_t *ctx_impl, void *data); +nxt_inline void nxt_unit_mmap_buf_insert(nxt_unit_mmap_buf_t **head, + nxt_unit_mmap_buf_t *mmap_buf); +nxt_inline void nxt_unit_mmap_buf_insert_tail(nxt_unit_mmap_buf_t **prev, + nxt_unit_mmap_buf_t *mmap_buf); +nxt_inline void nxt_unit_mmap_buf_remove(nxt_unit_mmap_buf_t *mmap_buf); static int nxt_unit_read_env(nxt_unit_port_t *ready_port, nxt_unit_port_t *read_port, int *log_fd, uint32_t *stream); static int nxt_unit_ready(nxt_unit_ctx_t *ctx, nxt_unit_port_id_t *port_id, uint32_t stream); +static int nxt_unit_process_new_port(nxt_unit_ctx_t *ctx, + nxt_unit_recv_msg_t *recv_msg); +static int nxt_unit_process_req_headers(nxt_unit_ctx_t *ctx, + nxt_unit_recv_msg_t *recv_msg); +static int nxt_unit_process_websocket(nxt_unit_ctx_t *ctx, + nxt_unit_recv_msg_t *recv_msg); static nxt_unit_request_info_impl_t *nxt_unit_request_info_get( nxt_unit_ctx_t *ctx); static void nxt_unit_request_info_release(nxt_unit_request_info_t *req); static void nxt_unit_request_info_free(nxt_unit_request_info_impl_t *req); +static nxt_unit_websocket_frame_impl_t *nxt_unit_websocket_frame_get( + nxt_unit_ctx_t *ctx); +static void nxt_unit_websocket_frame_release(nxt_unit_websocket_frame_t *ws); +static void nxt_unit_websocket_frame_free(nxt_unit_websocket_frame_impl_t *ws); static nxt_unit_process_t *nxt_unit_msg_get_process(nxt_unit_ctx_t *ctx, nxt_unit_recv_msg_t *recv_msg); static nxt_unit_mmap_buf_t *nxt_unit_mmap_buf_get(nxt_unit_ctx_t *ctx); static void nxt_unit_mmap_buf_release(nxt_unit_mmap_buf_t *mmap_buf); static int nxt_unit_mmap_buf_send(nxt_unit_ctx_t *ctx, uint32_t stream, nxt_unit_mmap_buf_t *mmap_buf, int last); +static void nxt_unit_mmap_buf_free(nxt_unit_mmap_buf_t *mmap_buf); +static ssize_t nxt_unit_buf_read(nxt_unit_buf_t **b, uint64_t *len, void *dst, + size_t size); static nxt_port_mmap_header_t *nxt_unit_mmap_get(nxt_unit_ctx_t *ctx, nxt_unit_process_t *process, nxt_unit_port_id_t *port_id, nxt_chunk_id_t *c, int n); @@ -65,7 +87,7 @@ static nxt_port_mmap_header_t *nxt_unit_get_incoming_mmap(nxt_unit_ctx_t *ctx, static int nxt_unit_tracking_read(nxt_unit_ctx_t *ctx, nxt_unit_recv_msg_t *recv_msg); static int nxt_unit_mmap_read(nxt_unit_ctx_t *ctx, - nxt_unit_recv_msg_t *recv_msg, nxt_queue_t *incoming_buf); + nxt_unit_recv_msg_t *recv_msg); static int nxt_unit_mmap_release(nxt_port_mmap_header_t *hdr, void *start, uint32_t size); @@ -98,14 +120,22 @@ static int nxt_unit_port_hash_add(nxt_lvlhsh_t *port_hash, static nxt_unit_port_impl_t *nxt_unit_port_hash_find(nxt_lvlhsh_t *port_hash, nxt_unit_port_id_t *port_id, int remove); +static int nxt_unit_request_hash_add(nxt_lvlhsh_t *request_hash, + nxt_unit_request_info_impl_t *req_impl); +static nxt_unit_request_info_impl_t *nxt_unit_request_hash_find( + nxt_lvlhsh_t *request_hash, uint32_t stream, int remove); + static char * nxt_unit_snprint_prefix(char *p, char *end, pid_t pid, int level); struct nxt_unit_mmap_buf_s { nxt_unit_buf_t buf; + nxt_unit_mmap_buf_t *next; + nxt_unit_mmap_buf_t **prev; + nxt_port_mmap_header_t *hdr; - nxt_queue_link_t link; +// nxt_queue_link_t link; nxt_unit_port_id_t port_id; nxt_unit_request_info_t *req; nxt_unit_ctx_impl_t *ctx_impl; @@ -113,12 +143,20 @@ struct nxt_unit_mmap_buf_s { struct nxt_unit_recv_msg_s { - nxt_port_msg_t port_msg; + uint32_t stream; + nxt_pid_t pid; + nxt_port_id_t reply_port; + + uint8_t last; /* 1 bit */ + uint8_t mmap; /* 1 bit */ void *start; uint32_t size; + int fd; nxt_unit_process_t *process; + + nxt_unit_mmap_buf_t *incoming_buf; }; @@ -127,18 +165,22 @@ typedef enum { NXT_UNIT_RS_RESPONSE_INIT, NXT_UNIT_RS_RESPONSE_HAS_CONTENT, NXT_UNIT_RS_RESPONSE_SENT, - NXT_UNIT_RS_DONE, + NXT_UNIT_RS_RELEASED, } nxt_unit_req_state_t; struct nxt_unit_request_info_impl_s { nxt_unit_request_info_t req; - nxt_unit_recv_msg_t recv_msg; - nxt_queue_t outgoing_buf; /* of nxt_unit_mmap_buf_t */ - nxt_queue_t incoming_buf; /* of nxt_unit_mmap_buf_t */ + uint32_t stream; + + nxt_unit_process_t *process; + + nxt_unit_mmap_buf_t *outgoing_buf; + nxt_unit_mmap_buf_t *incoming_buf; nxt_unit_req_state_t state; + uint8_t websocket; nxt_queue_link_t link; @@ -146,6 +188,19 @@ struct nxt_unit_request_info_impl_s { }; +struct nxt_unit_websocket_frame_impl_s { + nxt_unit_websocket_frame_t ws; + + nxt_unit_mmap_buf_t *buf; + + nxt_queue_link_t link; + + nxt_unit_ctx_impl_t *ctx_impl; + + void *retain_buf; +}; + + struct nxt_unit_ctx_impl_s { nxt_unit_ctx_t ctx; @@ -154,14 +209,20 @@ struct nxt_unit_ctx_impl_s { nxt_queue_link_t link; - nxt_queue_t free_buf; /* of nxt_unit_mmap_buf_t */ + nxt_unit_mmap_buf_t *free_buf; /* of nxt_unit_request_info_impl_t */ nxt_queue_t free_req; + /* of nxt_unit_websocket_frame_impl_t */ + nxt_queue_t free_ws; + /* of nxt_unit_request_info_impl_t */ nxt_queue_t active_req; + /* of nxt_unit_request_info_impl_t */ + nxt_lvlhsh_t requests; + nxt_unit_mmap_buf_t ctx_buf[2]; nxt_unit_request_info_impl_t req; @@ -394,18 +455,65 @@ nxt_unit_ctx_init(nxt_unit_impl_t *lib, nxt_unit_ctx_impl_t *ctx_impl, nxt_queue_insert_tail(&lib->contexts, &ctx_impl->link); - nxt_queue_init(&ctx_impl->free_buf); nxt_queue_init(&ctx_impl->free_req); + nxt_queue_init(&ctx_impl->free_ws); nxt_queue_init(&ctx_impl->active_req); - nxt_queue_insert_tail(&ctx_impl->free_buf, &ctx_impl->ctx_buf[0].link); - nxt_queue_insert_tail(&ctx_impl->free_buf, &ctx_impl->ctx_buf[1].link); + ctx_impl->free_buf = NULL; + nxt_unit_mmap_buf_insert(&ctx_impl->free_buf, &ctx_impl->ctx_buf[1]); + nxt_unit_mmap_buf_insert(&ctx_impl->free_buf, &ctx_impl->ctx_buf[0]); + nxt_queue_insert_tail(&ctx_impl->free_req, &ctx_impl->req.link); ctx_impl->req.req.ctx = &ctx_impl->ctx; ctx_impl->req.req.unit = &lib->unit; ctx_impl->read_port_fd = -1; + ctx_impl->requests.slot = 0; +} + + +nxt_inline void +nxt_unit_mmap_buf_insert(nxt_unit_mmap_buf_t **head, + nxt_unit_mmap_buf_t *mmap_buf) +{ + mmap_buf->next = *head; + + if (mmap_buf->next != NULL) { + mmap_buf->next->prev = &mmap_buf->next; + } + + *head = mmap_buf; + mmap_buf->prev = head; +} + + +nxt_inline void +nxt_unit_mmap_buf_insert_tail(nxt_unit_mmap_buf_t **prev, + nxt_unit_mmap_buf_t *mmap_buf) +{ + while (*prev != NULL) { + prev = &(*prev)->next; + } + + nxt_unit_mmap_buf_insert(prev, mmap_buf); +} + + +nxt_inline void +nxt_unit_mmap_buf_remove(nxt_unit_mmap_buf_t *mmap_buf) +{ + nxt_unit_mmap_buf_t **prev; + + prev = mmap_buf->prev; + + if (mmap_buf->next != NULL) { + mmap_buf->next->prev = prev; + } + + if (prev != NULL) { + *prev = mmap_buf->next; + } } @@ -509,26 +617,18 @@ int nxt_unit_process_msg(nxt_unit_ctx_t *ctx, nxt_unit_port_id_t *port_id, void *buf, size_t buf_size, void *oob, size_t oob_size) { - int fd, rc, nb; - pid_t pid; - nxt_queue_t incoming_buf; - struct cmsghdr *cm; - nxt_port_msg_t *port_msg; - nxt_unit_impl_t *lib; - nxt_unit_port_t new_port; - nxt_queue_link_t *lnk; - nxt_unit_request_t *r; - nxt_unit_mmap_buf_t *b; - nxt_unit_recv_msg_t recv_msg; - nxt_unit_callbacks_t *cb; - nxt_port_msg_new_port_t *new_port_msg; - nxt_unit_request_info_t *req; - nxt_unit_request_info_impl_t *req_impl; + int rc; + pid_t pid; + struct cmsghdr *cm; + nxt_port_msg_t *port_msg; + nxt_unit_impl_t *lib; + nxt_unit_recv_msg_t recv_msg; + nxt_unit_callbacks_t *cb; lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit); rc = NXT_UNIT_ERROR; - fd = -1; + recv_msg.fd = -1; recv_msg.process = NULL; port_msg = buf; cm = oob; @@ -538,17 +638,22 @@ nxt_unit_process_msg(nxt_unit_ctx_t *ctx, nxt_unit_port_id_t *port_id, && cm->cmsg_level == SOL_SOCKET && cm->cmsg_type == SCM_RIGHTS) { - memcpy(&fd, CMSG_DATA(cm), sizeof(int)); + memcpy(&recv_msg.fd, CMSG_DATA(cm), sizeof(int)); } - nxt_queue_init(&incoming_buf); + recv_msg.incoming_buf = NULL; if (nxt_slow_path(buf_size < sizeof(nxt_port_msg_t))) { nxt_unit_warn(ctx, "message too small (%d bytes)", (int) buf_size); goto fail; } - recv_msg.port_msg = *port_msg; + recv_msg.stream = port_msg->stream; + recv_msg.pid = port_msg->pid; + recv_msg.reply_port = port_msg->reply_port; + recv_msg.last = port_msg->last; + recv_msg.mmap = port_msg->mmap; + recv_msg.start = port_msg + 1; recv_msg.size = buf_size - sizeof(nxt_port_msg_t); @@ -572,7 +677,7 @@ nxt_unit_process_msg(nxt_unit_ctx_t *ctx, nxt_unit_port_id_t *port_id, } if (port_msg->mmap) { - if (nxt_unit_mmap_read(ctx, &recv_msg, &incoming_buf) != NXT_UNIT_OK) { + if (nxt_unit_mmap_read(ctx, &recv_msg) != NXT_UNIT_OK) { goto fail; } } @@ -589,187 +694,326 @@ nxt_unit_process_msg(nxt_unit_ctx_t *ctx, nxt_unit_port_id_t *port_id, break; case _NXT_PORT_MSG_NEW_PORT: - if (nxt_slow_path(recv_msg.size != sizeof(nxt_port_msg_new_port_t))) { - nxt_unit_warn(ctx, "#%"PRIu32": new_port: " - "invalid message size (%d)", - port_msg->stream, (int) recv_msg.size); + rc = nxt_unit_process_new_port(ctx, &recv_msg); + break; - goto fail; - } + case _NXT_PORT_MSG_CHANGE_FILE: + nxt_unit_debug(ctx, "#%"PRIu32": change_file: fd %d", + port_msg->stream, recv_msg.fd); + break; - if (nxt_slow_path(fd < 0)) { - nxt_unit_alert(ctx, "#%"PRIu32": invalid fd %d for new port", - port_msg->stream, fd); + case _NXT_PORT_MSG_MMAP: + if (nxt_slow_path(recv_msg.fd < 0)) { + nxt_unit_alert(ctx, "#%"PRIu32": invalid fd %d for mmap", + port_msg->stream, recv_msg.fd); goto fail; } - new_port_msg = recv_msg.start; + rc = nxt_unit_incoming_mmap(ctx, port_msg->pid, recv_msg.fd); + break; - nxt_unit_debug(ctx, "#%"PRIu32": new_port: %d,%d fd %d", - port_msg->stream, (int) new_port_msg->pid, - (int) new_port_msg->id, fd); + case _NXT_PORT_MSG_REQ_HEADERS: + rc = nxt_unit_process_req_headers(ctx, &recv_msg); + break; - nb = 0; + case _NXT_PORT_MSG_WEBSOCKET: + rc = nxt_unit_process_websocket(ctx, &recv_msg); + break; - if (nxt_slow_path(ioctl(fd, FIONBIO, &nb) == -1)) { - nxt_unit_alert(ctx, "#%"PRIu32": new_port: ioctl(%d, FIONBIO, 0) " - "failed: %s (%d)", fd, strerror(errno), errno); + case _NXT_PORT_MSG_REMOVE_PID: + if (nxt_slow_path(recv_msg.size != sizeof(pid))) { + nxt_unit_warn(ctx, "#%"PRIu32": remove_pid: invalid message size " + "(%d != %d)", port_msg->stream, (int) recv_msg.size, + (int) sizeof(pid)); goto fail; } - nxt_unit_port_id_init(&new_port.id, new_port_msg->pid, - new_port_msg->id); + memcpy(&pid, recv_msg.start, sizeof(pid)); - new_port.in_fd = -1; - new_port.out_fd = fd; - new_port.data = NULL; + nxt_unit_debug(ctx, "#%"PRIu32": remove_pid: %d", + port_msg->stream, (int) pid); - fd = -1; + cb->remove_pid(ctx, pid); - rc = cb->add_port(ctx, &new_port); + rc = NXT_UNIT_OK; break; - case _NXT_PORT_MSG_CHANGE_FILE: - nxt_unit_debug(ctx, "#%"PRIu32": change_file: fd %d", - port_msg->stream, fd); - break; + default: + nxt_unit_debug(ctx, "#%"PRIu32": ignore message type: %d", + port_msg->stream, (int) port_msg->type); - case _NXT_PORT_MSG_MMAP: - if (nxt_slow_path(fd < 0)) { - nxt_unit_alert(ctx, "#%"PRIu32": invalid fd %d for mmap", - port_msg->stream, fd); + goto fail; + } - goto fail; - } +fail: - rc = nxt_unit_incoming_mmap(ctx, port_msg->pid, fd); - break; + if (recv_msg.fd != -1) { + close(recv_msg.fd); + } - case _NXT_PORT_MSG_DATA: - if (nxt_slow_path(port_msg->mmap == 0)) { - nxt_unit_warn(ctx, "#%"PRIu32": data is not in shared memory", - port_msg->stream); + while (recv_msg.incoming_buf != NULL) { + nxt_unit_mmap_buf_free(recv_msg.incoming_buf); + } - goto fail; - } + if (recv_msg.process != NULL) { + nxt_unit_process_use(ctx, recv_msg.process, -1); + } - if (nxt_slow_path(recv_msg.size < sizeof(nxt_unit_request_t))) { - nxt_unit_warn(ctx, "#%"PRIu32": data too short: %d while at least " - "%d expected", port_msg->stream, (int) recv_msg.size, - (int) sizeof(nxt_unit_request_t)); + return rc; +} - goto fail; - } - req_impl = nxt_unit_request_info_get(ctx); - if (nxt_slow_path(req_impl == NULL)) { - nxt_unit_warn(ctx, "#%"PRIu32": request info allocation failed", - port_msg->stream); +static int +nxt_unit_process_new_port(nxt_unit_ctx_t *ctx, nxt_unit_recv_msg_t *recv_msg) +{ + int nb; + nxt_unit_impl_t *lib; + nxt_unit_port_t new_port; + nxt_port_msg_new_port_t *new_port_msg; - goto fail; - } + if (nxt_slow_path(recv_msg->size != sizeof(nxt_port_msg_new_port_t))) { + nxt_unit_warn(ctx, "#%"PRIu32": new_port: " + "invalid message size (%d)", + recv_msg->stream, (int) recv_msg->size); - req = &req_impl->req; + return NXT_UNIT_ERROR; + } - req->request_port = *port_id; + if (nxt_slow_path(recv_msg->fd < 0)) { + nxt_unit_alert(ctx, "#%"PRIu32": invalid fd %d for new port", + recv_msg->stream, recv_msg->fd); - nxt_unit_port_id_init(&req->response_port, port_msg->pid, - port_msg->reply_port); + return NXT_UNIT_ERROR; + } - req->request = recv_msg.start; + new_port_msg = recv_msg->start; - lnk = nxt_queue_first(&incoming_buf); - b = nxt_container_of(lnk, nxt_unit_mmap_buf_t, link); + nxt_unit_debug(ctx, "#%"PRIu32": new_port: %d,%d fd %d", + recv_msg->stream, (int) new_port_msg->pid, + (int) new_port_msg->id, recv_msg->fd); - req->request_buf = &b->buf; - req->response = NULL; - req->response_buf = NULL; + nb = 0; - r = req->request; + if (nxt_slow_path(ioctl(recv_msg->fd, FIONBIO, &nb) == -1)) { + nxt_unit_alert(ctx, "#%"PRIu32": new_port: ioctl(%d, FIONBIO, 0) " + "failed: %s (%d)", recv_msg->fd, strerror(errno), errno); - req->content_length = r->content_length; + return NXT_UNIT_ERROR; + } - req->content_buf = req->request_buf; - req->content_buf->free = nxt_unit_sptr_get(&r->preread_content); + nxt_unit_port_id_init(&new_port.id, new_port_msg->pid, + new_port_msg->id); - /* Move process to req_impl. */ - req_impl->recv_msg = recv_msg; + new_port.in_fd = -1; + new_port.out_fd = recv_msg->fd; + new_port.data = NULL; - recv_msg.process = NULL; + recv_msg->fd = -1; - nxt_queue_init(&req_impl->outgoing_buf); - nxt_queue_init(&req_impl->incoming_buf); + lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit); - nxt_queue_each(b, &incoming_buf, nxt_unit_mmap_buf_t, link) - { - b->req = req; - } nxt_queue_loop; + return lib->callbacks.add_port(ctx, &new_port); +} - nxt_queue_add(&req_impl->incoming_buf, &incoming_buf); - nxt_queue_init(&incoming_buf); - req->response_max_fields = 0; - req_impl->state = NXT_UNIT_RS_START; +static int +nxt_unit_process_req_headers(nxt_unit_ctx_t *ctx, nxt_unit_recv_msg_t *recv_msg) +{ + nxt_unit_impl_t *lib; + nxt_unit_request_t *r; + nxt_unit_mmap_buf_t *b; + nxt_unit_request_info_t *req; + nxt_unit_request_info_impl_t *req_impl; - nxt_unit_debug(ctx, "#%"PRIu32": %.*s %.*s (%d)", port_msg->stream, - (int) r->method_length, nxt_unit_sptr_get(&r->method), - (int) r->target_length, nxt_unit_sptr_get(&r->target), - (int) r->content_length); + if (nxt_slow_path(recv_msg->mmap == 0)) { + nxt_unit_warn(ctx, "#%"PRIu32": data is not in shared memory", + recv_msg->stream); - cb->request_handler(req); + return NXT_UNIT_ERROR; + } - rc = NXT_UNIT_OK; - break; + if (nxt_slow_path(recv_msg->size < sizeof(nxt_unit_request_t))) { + nxt_unit_warn(ctx, "#%"PRIu32": data too short: %d while at least " + "%d expected", recv_msg->stream, (int) recv_msg->size, + (int) sizeof(nxt_unit_request_t)); - case _NXT_PORT_MSG_REMOVE_PID: - if (nxt_slow_path(recv_msg.size != sizeof(pid))) { - nxt_unit_warn(ctx, "#%"PRIu32": remove_pid: invalid message size " - "(%d != %d)", port_msg->stream, (int) recv_msg.size, - (int) sizeof(pid)); + return NXT_UNIT_ERROR; + } - goto fail; - } + req_impl = nxt_unit_request_info_get(ctx); + if (nxt_slow_path(req_impl == NULL)) { + nxt_unit_warn(ctx, "#%"PRIu32": request info allocation failed", + recv_msg->stream); - memcpy(&pid, recv_msg.start, sizeof(pid)); + return NXT_UNIT_ERROR; + } - nxt_unit_debug(ctx, "#%"PRIu32": remove_pid: %d", - port_msg->stream, (int) pid); + req = &req_impl->req; - cb->remove_pid(ctx, pid); + nxt_unit_port_id_init(&req->response_port, recv_msg->pid, + recv_msg->reply_port); - rc = NXT_UNIT_OK; - break; + req->request = recv_msg->start; - default: - nxt_unit_debug(ctx, "#%"PRIu32": ignore message type: %d", - port_msg->stream, (int) port_msg->type); + b = recv_msg->incoming_buf; - goto fail; + req->request_buf = &b->buf; + req->response = NULL; + req->response_buf = NULL; + + r = req->request; + + req->content_length = r->content_length; + + req->content_buf = req->request_buf; + req->content_buf->free = nxt_unit_sptr_get(&r->preread_content); + + /* "Move" process reference to req_impl. */ + req_impl->process = nxt_unit_msg_get_process(ctx, recv_msg); + if (nxt_slow_path(req_impl->process == NULL)) { + return NXT_UNIT_ERROR; } -fail: + recv_msg->process = NULL; - if (fd != -1) { - close(fd); + req_impl->stream = recv_msg->stream; + + req_impl->outgoing_buf = NULL; + + for (b = recv_msg->incoming_buf; b != NULL; b = b->next) { + b->req = req; } - if (port_msg->mmap) { - nxt_queue_each(b, &incoming_buf, nxt_unit_mmap_buf_t, link) - { - nxt_unit_mmap_release(b->hdr, b->buf.start, - b->buf.end - b->buf.start); + /* "Move" incoming buffer list to req_impl. */ + req_impl->incoming_buf = recv_msg->incoming_buf; + req_impl->incoming_buf->prev = &req_impl->incoming_buf; + recv_msg->incoming_buf = NULL; + + req->response_max_fields = 0; + req_impl->state = NXT_UNIT_RS_START; + req_impl->websocket = 0; + + nxt_unit_debug(ctx, "#%"PRIu32": %.*s %.*s (%d)", recv_msg->stream, + (int) r->method_length, nxt_unit_sptr_get(&r->method), + (int) r->target_length, nxt_unit_sptr_get(&r->target), + (int) r->content_length); + + lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit); + + lib->callbacks.request_handler(req); - nxt_unit_mmap_buf_release(b); - } nxt_queue_loop; + return NXT_UNIT_OK; +} + + +static int +nxt_unit_process_websocket(nxt_unit_ctx_t *ctx, nxt_unit_recv_msg_t *recv_msg) +{ + size_t hsize; + nxt_unit_impl_t *lib; + nxt_unit_mmap_buf_t *b; + nxt_unit_ctx_impl_t *ctx_impl; + nxt_unit_callbacks_t *cb; + nxt_unit_request_info_t *req; + nxt_unit_request_info_impl_t *req_impl; + nxt_unit_websocket_frame_impl_t *ws_impl; + + ctx_impl = nxt_container_of(ctx, nxt_unit_ctx_impl_t, ctx); + + req_impl = nxt_unit_request_hash_find(&ctx_impl->requests, recv_msg->stream, + recv_msg->last); + if (req_impl == NULL) { + return NXT_UNIT_OK; } - if (recv_msg.process != NULL) { - nxt_unit_process_use(ctx, recv_msg.process, -1); + req = &req_impl->req; + + lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit); + cb = &lib->callbacks; + + if (cb->websocket_handler && recv_msg->size >= 2) { + ws_impl = nxt_unit_websocket_frame_get(ctx); + if (nxt_slow_path(ws_impl == NULL)) { + nxt_unit_warn(ctx, "#%"PRIu32": websocket frame allocation failed", + req_impl->stream); + + return NXT_UNIT_ERROR; + } + + ws_impl->ws.req = req; + + ws_impl->buf = NULL; + ws_impl->retain_buf = NULL; + + if (recv_msg->mmap) { + for (b = recv_msg->incoming_buf; b != NULL; b = b->next) { + b->req = req; + } + + /* "Move" incoming buffer list to ws_impl. */ + ws_impl->buf = recv_msg->incoming_buf; + ws_impl->buf->prev = &ws_impl->buf; + recv_msg->incoming_buf = NULL; + + b = ws_impl->buf; + + } else { + b = nxt_unit_mmap_buf_get(ctx); + if (nxt_slow_path(b == NULL)) { + return NXT_UNIT_ERROR; + } + + b->hdr = NULL; + b->req = req; + b->buf.start = recv_msg->start; + b->buf.free = b->buf.start; + b->buf.end = b->buf.start + recv_msg->size; + + nxt_unit_mmap_buf_insert(&ws_impl->buf, b); + } + + ws_impl->ws.header = (void *) b->buf.start; + ws_impl->ws.payload_len = nxt_websocket_frame_payload_len( + ws_impl->ws.header); + + hsize = nxt_websocket_frame_header_size(ws_impl->ws.header); + + if (ws_impl->ws.header->mask) { + ws_impl->ws.mask = (uint8_t *) b->buf.start + hsize - 4; + + } else { + ws_impl->ws.mask = NULL; + } + + b->buf.free += hsize; + + ws_impl->ws.content_buf = &b->buf; + ws_impl->ws.content_length = ws_impl->ws.payload_len; + + nxt_unit_req_debug(req, "websocket_handler: opcode=%d, " + "payload_len=%"PRIu64, + ws_impl->ws.header->opcode, + ws_impl->ws.payload_len); + + cb->websocket_handler(&ws_impl->ws); } - return rc; + if (recv_msg->last) { + req_impl->websocket = 0; + + if (cb->close_handler) { + nxt_unit_req_debug(req, "close_handler"); + + cb->close_handler(req); + + } else { + nxt_unit_request_done(req, NXT_UNIT_ERROR); + } + } + + return NXT_UNIT_OK; } @@ -815,9 +1059,7 @@ nxt_unit_request_info_get(nxt_unit_ctx_t *ctx) static void nxt_unit_request_info_release(nxt_unit_request_info_t *req) { - nxt_unit_mmap_buf_t *b; nxt_unit_ctx_impl_t *ctx_impl; - nxt_unit_recv_msg_t *recv_msg; nxt_unit_request_info_impl_t *req_impl; ctx_impl = nxt_container_of(req->ctx, nxt_unit_ctx_impl_t, ctx); @@ -826,30 +1068,31 @@ nxt_unit_request_info_release(nxt_unit_request_info_t *req) req->response = NULL; req->response_buf = NULL; - recv_msg = &req_impl->recv_msg; + if (req_impl->process != NULL) { + nxt_unit_process_use(req->ctx, req_impl->process, -1); - if (recv_msg->process != NULL) { - nxt_unit_process_use(req->ctx, recv_msg->process, -1); - - recv_msg->process = NULL; + req_impl->process = NULL; } - nxt_queue_each(b, &req_impl->outgoing_buf, nxt_unit_mmap_buf_t, link) { - - nxt_unit_buf_free(&b->buf); + if (req_impl->websocket) { + nxt_unit_request_hash_find(&ctx_impl->requests, req_impl->stream, 1); - } nxt_queue_loop; - - nxt_queue_each(b, &req_impl->incoming_buf, nxt_unit_mmap_buf_t, link) { + req_impl->websocket = 0; + } - nxt_unit_mmap_release(b->hdr, b->buf.start, b->buf.end - b->buf.start); - nxt_unit_mmap_buf_release(b); + while (req_impl->outgoing_buf != NULL) { + nxt_unit_mmap_buf_free(req_impl->outgoing_buf); + } - } nxt_queue_loop; + while (req_impl->incoming_buf != NULL) { + nxt_unit_mmap_buf_free(req_impl->incoming_buf); + } nxt_queue_remove(&req_impl->link); nxt_queue_insert_tail(&ctx_impl->free_req, &req_impl->link); + + req_impl->state = NXT_UNIT_RS_RELEASED; } @@ -868,6 +1111,68 @@ nxt_unit_request_info_free(nxt_unit_request_info_impl_t *req_impl) } +static nxt_unit_websocket_frame_impl_t * +nxt_unit_websocket_frame_get(nxt_unit_ctx_t *ctx) +{ + nxt_queue_link_t *lnk; + nxt_unit_ctx_impl_t *ctx_impl; + nxt_unit_websocket_frame_impl_t *ws_impl; + + ctx_impl = nxt_container_of(ctx, nxt_unit_ctx_impl_t, ctx); + + if (nxt_queue_is_empty(&ctx_impl->free_ws)) { + ws_impl = malloc(sizeof(nxt_unit_websocket_frame_impl_t)); + if (nxt_slow_path(ws_impl == NULL)) { + nxt_unit_warn(ctx, "websocket frame allocation failed"); + + return NULL; + } + + } else { + lnk = nxt_queue_first(&ctx_impl->free_ws); + nxt_queue_remove(lnk); + + ws_impl = nxt_container_of(lnk, nxt_unit_websocket_frame_impl_t, link); + } + + ws_impl->ctx_impl = ctx_impl; + + return ws_impl; +} + + +static void +nxt_unit_websocket_frame_release(nxt_unit_websocket_frame_t *ws) +{ + nxt_unit_websocket_frame_impl_t *ws_impl; + + ws_impl = nxt_container_of(ws, nxt_unit_websocket_frame_impl_t, ws); + + while (ws_impl->buf != NULL) { + nxt_unit_mmap_buf_free(ws_impl->buf); + } + + ws->req = NULL; + + if (ws_impl->retain_buf != NULL) { + free(ws_impl->retain_buf); + + ws_impl->retain_buf = NULL; + } + + nxt_queue_insert_tail(&ws_impl->ctx_impl->free_ws, &ws_impl->link); +} + + +static void +nxt_unit_websocket_frame_free(nxt_unit_websocket_frame_impl_t *ws_impl) +{ + nxt_queue_remove(&ws_impl->link); + + free(ws_impl); +} + + uint16_t nxt_unit_field_hash(const char *name, size_t name_length) { @@ -1275,6 +1580,10 @@ nxt_unit_response_send(nxt_unit_request_info_t *req) return NXT_UNIT_ERROR; } + if (req->request->websocket_handshake && req->response->status == 101) { + nxt_unit_response_upgrade(req); + } + nxt_unit_req_debug(req, "send: %"PRIu32" fields, %d bytes", req->response->fields_count, (int) (req->response_buf->free @@ -1282,9 +1591,7 @@ nxt_unit_response_send(nxt_unit_request_info_t *req) mmap_buf = nxt_container_of(req->response_buf, nxt_unit_mmap_buf_t, buf); - rc = nxt_unit_mmap_buf_send(req->ctx, - req_impl->recv_msg.port_msg.stream, - mmap_buf, 0); + rc = nxt_unit_mmap_buf_send(req->ctx, req_impl->stream, mmap_buf, 0); if (nxt_fast_path(rc == NXT_UNIT_OK)) { req->response = NULL; req->response_buf = NULL; @@ -1312,7 +1619,6 @@ nxt_unit_buf_t * nxt_unit_response_buf_alloc(nxt_unit_request_info_t *req, uint32_t size) { int rc; - nxt_unit_process_t *process; nxt_unit_mmap_buf_t *mmap_buf; nxt_unit_request_info_impl_t *req_impl; @@ -1327,11 +1633,6 @@ nxt_unit_response_buf_alloc(nxt_unit_request_info_t *req, uint32_t size) req_impl = nxt_container_of(req, nxt_unit_request_info_impl_t, req); - process = nxt_unit_msg_get_process(req->ctx, &req_impl->recv_msg); - if (nxt_slow_path(process == NULL)) { - return NULL; - } - mmap_buf = nxt_unit_mmap_buf_get(req->ctx); if (nxt_slow_path(mmap_buf == NULL)) { return NULL; @@ -1339,10 +1640,10 @@ nxt_unit_response_buf_alloc(nxt_unit_request_info_t *req, uint32_t size) mmap_buf->req = req; - nxt_queue_insert_tail(&req_impl->outgoing_buf, &mmap_buf->link); + nxt_unit_mmap_buf_insert_tail(&req_impl->outgoing_buf, mmap_buf); - rc = nxt_unit_get_outgoing_buf(req->ctx, process, &req->response_port, - size, mmap_buf); + rc = nxt_unit_get_outgoing_buf(req->ctx, req_impl->process, + &req->response_port, size, mmap_buf); if (nxt_slow_path(rc != NXT_UNIT_OK)) { nxt_unit_mmap_buf_release(mmap_buf); @@ -1366,13 +1667,13 @@ nxt_unit_msg_get_process(nxt_unit_ctx_t *ctx, nxt_unit_recv_msg_t *recv_msg) pthread_mutex_lock(&lib->mutex); - recv_msg->process = nxt_unit_process_find(ctx, recv_msg->port_msg.pid, 0); + recv_msg->process = nxt_unit_process_find(ctx, recv_msg->pid, 0); pthread_mutex_unlock(&lib->mutex); if (recv_msg->process == NULL) { nxt_unit_warn(ctx, "#%"PRIu32": process %d not found", - recv_msg->port_msg.stream, (int) recv_msg->port_msg.pid); + recv_msg->stream, (int) recv_msg->pid); } return recv_msg->process; @@ -1382,23 +1683,21 @@ nxt_unit_msg_get_process(nxt_unit_ctx_t *ctx, nxt_unit_recv_msg_t *recv_msg) static nxt_unit_mmap_buf_t * nxt_unit_mmap_buf_get(nxt_unit_ctx_t *ctx) { - nxt_queue_link_t *lnk; nxt_unit_mmap_buf_t *mmap_buf; nxt_unit_ctx_impl_t *ctx_impl; ctx_impl = nxt_container_of(ctx, nxt_unit_ctx_impl_t, ctx); - if (nxt_queue_is_empty(&ctx_impl->free_buf)) { + if (ctx_impl->free_buf == NULL) { mmap_buf = malloc(sizeof(nxt_unit_mmap_buf_t)); if (nxt_slow_path(mmap_buf == NULL)) { nxt_unit_warn(ctx, "failed to allocate buf"); } } else { - lnk = nxt_queue_first(&ctx_impl->free_buf); - nxt_queue_remove(lnk); + mmap_buf = ctx_impl->free_buf; - mmap_buf = nxt_container_of(lnk, nxt_unit_mmap_buf_t, link); + nxt_unit_mmap_buf_remove(mmap_buf); } mmap_buf->ctx_impl = ctx_impl; @@ -1410,9 +1709,91 @@ nxt_unit_mmap_buf_get(nxt_unit_ctx_t *ctx) static void nxt_unit_mmap_buf_release(nxt_unit_mmap_buf_t *mmap_buf) { - nxt_queue_remove(&mmap_buf->link); + nxt_unit_mmap_buf_remove(mmap_buf); + + nxt_unit_mmap_buf_insert(&mmap_buf->ctx_impl->free_buf, mmap_buf); +} + + +typedef struct { + size_t len; + const char *str; +} nxt_unit_str_t; + + +#define nxt_unit_str(str) { nxt_length(str), str } + + +int +nxt_unit_request_is_websocket_handshake(nxt_unit_request_info_t *req) +{ + return req->request->websocket_handshake; +} + + +int +nxt_unit_response_upgrade(nxt_unit_request_info_t *req) +{ + int rc; + nxt_unit_ctx_impl_t *ctx_impl; + nxt_unit_request_info_impl_t *req_impl; + + req_impl = nxt_container_of(req, nxt_unit_request_info_impl_t, req); + + if (nxt_slow_path(req_impl->websocket != 0)) { + nxt_unit_req_debug(req, "upgrade: already upgraded"); + + return NXT_UNIT_OK; + } + + if (nxt_slow_path(req_impl->state < NXT_UNIT_RS_RESPONSE_INIT)) { + nxt_unit_req_warn(req, "upgrade: response is not initialized yet"); + + return NXT_UNIT_ERROR; + } + + if (nxt_slow_path(req_impl->state >= NXT_UNIT_RS_RESPONSE_SENT)) { + nxt_unit_req_warn(req, "upgrade: response already sent"); + + return NXT_UNIT_ERROR; + } + + ctx_impl = nxt_container_of(req->ctx, nxt_unit_ctx_impl_t, ctx); + + rc = nxt_unit_request_hash_add(&ctx_impl->requests, req_impl); + if (nxt_slow_path(rc != NXT_UNIT_OK)) { + nxt_unit_req_warn(req, "upgrade: failed to add request to hash"); + + return NXT_UNIT_ERROR; + } + + req_impl->websocket = 1; + + req->response->status = 101; + + return NXT_UNIT_OK; +} + + +int +nxt_unit_response_is_websocket(nxt_unit_request_info_t *req) +{ + nxt_unit_request_info_impl_t *req_impl; - nxt_queue_insert_tail(&mmap_buf->ctx_impl->free_buf, &mmap_buf->link); + req_impl = nxt_container_of(req, nxt_unit_request_info_impl_t, req); + + return req_impl->websocket; +} + + +nxt_unit_request_info_t * +nxt_unit_get_request_info_from_data(void *data) +{ + nxt_unit_request_info_impl_t *req_impl; + + req_impl = nxt_container_of(data, nxt_unit_request_info_impl_t, extra_data); + + return &req_impl->req; } @@ -1445,9 +1826,7 @@ nxt_unit_buf_send(nxt_unit_buf_t *buf) } if (nxt_fast_path(buf->free > buf->start)) { - rc = nxt_unit_mmap_buf_send(req->ctx, - req_impl->recv_msg.port_msg.stream, - mmap_buf, 0); + rc = nxt_unit_mmap_buf_send(req->ctx, req_impl->stream, mmap_buf, 0); if (nxt_slow_path(rc != NXT_UNIT_OK)) { return rc; } @@ -1472,10 +1851,7 @@ nxt_unit_buf_send_done(nxt_unit_buf_t *buf) req = mmap_buf->req; req_impl = nxt_container_of(req, nxt_unit_request_info_impl_t, req); - rc = nxt_unit_mmap_buf_send(req->ctx, - req_impl->recv_msg.port_msg.stream, - mmap_buf, 1); - + rc = nxt_unit_mmap_buf_send(req->ctx, req_impl->stream, mmap_buf, 1); if (nxt_slow_path(rc == NXT_UNIT_OK)) { nxt_unit_mmap_buf_release(mmap_buf); @@ -1506,6 +1882,7 @@ nxt_unit_mmap_buf_send(nxt_unit_ctx_t *ctx, uint32_t stream, lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit); buf = &mmap_buf->buf; + hdr = mmap_buf->hdr; m.mmap_msg.size = buf->free - buf->start; @@ -1514,15 +1891,15 @@ nxt_unit_mmap_buf_send(nxt_unit_ctx_t *ctx, uint32_t stream, m.msg.reply_port = 0; m.msg.type = _NXT_PORT_MSG_DATA; m.msg.last = last != 0; - m.msg.mmap = m.mmap_msg.size > 0; + m.msg.mmap = hdr != NULL && m.mmap_msg.size > 0; m.msg.nf = 0; m.msg.mf = 0; m.msg.tracking = 0; - hdr = mmap_buf->hdr; - - m.mmap_msg.mmap_id = hdr->id; - m.mmap_msg.chunk_id = nxt_port_mmap_chunk_id(hdr, (u_char *) buf->start); + if (hdr != NULL) { + m.mmap_msg.mmap_id = hdr->id; + m.mmap_msg.chunk_id = nxt_port_mmap_chunk_id(hdr, (u_char *) buf->start); + } nxt_unit_debug(ctx, "#%"PRIu32": send mmap: (%d,%d,%d)", stream, @@ -1531,14 +1908,13 @@ nxt_unit_mmap_buf_send(nxt_unit_ctx_t *ctx, uint32_t stream, (int) m.mmap_msg.size); res = lib->callbacks.port_send(ctx, &mmap_buf->port_id, &m, - m.mmap_msg.size > 0 ? sizeof(m) - : sizeof(m.msg), + m.msg.mmap ? sizeof(m) : sizeof(m.msg), NULL, 0); if (nxt_slow_path(res != sizeof(m))) { return NXT_UNIT_ERROR; } - if (buf->end - buf->free >= PORT_MMAP_CHUNK_SIZE) { + if (buf->end - buf->free >= PORT_MMAP_CHUNK_SIZE && hdr != NULL) { last_used = (u_char *) buf->free - 1; first_free_chunk = nxt_port_mmap_chunk_id(hdr, last_used) + 1; @@ -1557,11 +1933,17 @@ nxt_unit_mmap_buf_send(nxt_unit_ctx_t *ctx, uint32_t stream, void nxt_unit_buf_free(nxt_unit_buf_t *buf) { - nxt_unit_mmap_buf_t *mmap_buf; + nxt_unit_mmap_buf_free(nxt_container_of(buf, nxt_unit_mmap_buf_t, buf)); +} - mmap_buf = nxt_container_of(buf, nxt_unit_mmap_buf_t, buf); - nxt_unit_mmap_release(mmap_buf->hdr, buf->start, buf->end - buf->start); +static void +nxt_unit_mmap_buf_free(nxt_unit_mmap_buf_t *mmap_buf) +{ + if (nxt_fast_path(mmap_buf->hdr != NULL)) { + nxt_unit_mmap_release(mmap_buf->hdr, mmap_buf->buf.start, + mmap_buf->buf.end - mmap_buf->buf.start); + } nxt_unit_mmap_buf_release(mmap_buf); } @@ -1570,26 +1952,15 @@ nxt_unit_buf_free(nxt_unit_buf_t *buf) nxt_unit_buf_t * nxt_unit_buf_next(nxt_unit_buf_t *buf) { - nxt_queue_link_t *lnk; - nxt_unit_mmap_buf_t *mmap_buf; - nxt_unit_request_info_impl_t *req_impl; + nxt_unit_mmap_buf_t *mmap_buf; mmap_buf = nxt_container_of(buf, nxt_unit_mmap_buf_t, buf); - req_impl = nxt_container_of(mmap_buf->req, nxt_unit_request_info_impl_t, - req); - lnk = &mmap_buf->link; - - if (lnk == nxt_queue_last(&req_impl->incoming_buf) - || lnk == nxt_queue_last(&req_impl->outgoing_buf)) - { + if (mmap_buf->next == NULL) { return NULL; } - lnk = nxt_queue_next(lnk); - mmap_buf = nxt_container_of(lnk, nxt_unit_mmap_buf_t, link); - - return &mmap_buf->buf; + return &mmap_buf->next->buf; } @@ -1614,7 +1985,6 @@ nxt_unit_response_write(nxt_unit_request_info_t *req, const void *start, int rc; uint32_t part_size; const char *part_start; - nxt_unit_process_t *process; nxt_unit_mmap_buf_t mmap_buf; nxt_unit_request_info_impl_t *req_impl; @@ -1641,16 +2011,12 @@ nxt_unit_response_write(nxt_unit_request_info_t *req, const void *start, part_start += part_size; } - process = nxt_unit_msg_get_process(req->ctx, &req_impl->recv_msg); - if (nxt_slow_path(process == NULL)) { - return NXT_UNIT_ERROR; - } - while (size > 0) { part_size = nxt_min(size, PORT_MMAP_DATA_SIZE); - rc = nxt_unit_get_outgoing_buf(req->ctx, process, &req->response_port, - part_size, &mmap_buf); + rc = nxt_unit_get_outgoing_buf(req->ctx, req_impl->process, + &req->response_port, part_size, + &mmap_buf); if (nxt_slow_path(rc != NXT_UNIT_OK)) { return rc; } @@ -1658,9 +2024,7 @@ nxt_unit_response_write(nxt_unit_request_info_t *req, const void *start, mmap_buf.buf.free = nxt_cpymem(mmap_buf.buf.free, part_start, part_size); - rc = nxt_unit_mmap_buf_send(req->ctx, - req_impl->recv_msg.port_msg.stream, - &mmap_buf, 0); + rc = nxt_unit_mmap_buf_send(req->ctx, req_impl->stream, &mmap_buf, 0); if (nxt_slow_path(rc != NXT_UNIT_OK)) { nxt_unit_mmap_release(mmap_buf.hdr, mmap_buf.buf.start, mmap_buf.buf.end - mmap_buf.buf.start); @@ -1766,6 +2130,14 @@ nxt_unit_response_write_cb(nxt_unit_request_info_t *req, ssize_t nxt_unit_request_read(nxt_unit_request_info_t *req, void *dst, size_t size) { + return nxt_unit_buf_read(&req->content_buf, &req->content_length, + dst, size); +} + + +static ssize_t +nxt_unit_buf_read(nxt_unit_buf_t **b, uint64_t *len, void *dst, size_t size) +{ u_char *p; size_t rest, copy, read; nxt_unit_buf_t *buf; @@ -1773,7 +2145,7 @@ nxt_unit_request_read(nxt_unit_request_info_t *req, void *dst, size_t size) p = dst; rest = size; - buf = req->content_buf; + buf = *b; while (buf != NULL) { copy = buf->end - buf->free; @@ -1795,11 +2167,11 @@ nxt_unit_request_read(nxt_unit_request_info_t *req, void *dst, size_t size) buf = nxt_unit_buf_next(buf); } - req->content_buf = buf; + *b = buf; read = size - rest; - req->content_length -= read; + *len -= read; return read; } @@ -1852,7 +2224,7 @@ skip_response_send: lib = nxt_container_of(req->unit, nxt_unit_impl_t, unit); - msg.stream = req_impl->recv_msg.port_msg.stream; + msg.stream = req_impl->stream; msg.pid = lib->pid; msg.reply_port = 0; msg.type = (rc == NXT_UNIT_OK) ? _NXT_PORT_MSG_DATA @@ -1874,6 +2246,162 @@ skip_response_send: } +int +nxt_unit_websocket_send(nxt_unit_request_info_t *req, uint8_t opcode, + uint8_t last, const void *start, size_t size) +{ + const struct iovec iov = { (void *) start, size }; + + return nxt_unit_websocket_sendv(req, opcode, last, &iov, 1); +} + + +int +nxt_unit_websocket_sendv(nxt_unit_request_info_t *req, uint8_t opcode, + uint8_t last, const struct iovec *iov, int iovcnt) +{ + int i, rc; + size_t l, copy; + uint32_t payload_len, buf_size; + const uint8_t *b; + nxt_unit_buf_t *buf; + nxt_websocket_header_t *wh; + + payload_len = 0; + + for (i = 0; i < iovcnt; i++) { + payload_len += iov[i].iov_len; + } + + buf_size = 10 + payload_len; + + buf = nxt_unit_response_buf_alloc(req, nxt_min(buf_size, + PORT_MMAP_DATA_SIZE)); + if (nxt_slow_path(buf == NULL)) { + nxt_unit_req_error(req, "Failed to allocate buf for content"); + + return NXT_UNIT_ERROR; + } + + buf->start[0] = 0; + buf->start[1] = 0; + + wh = (void *) buf->free; + + buf->free = nxt_websocket_frame_init(wh, payload_len); + wh->fin = last; + wh->opcode = opcode; + + for (i = 0; i < iovcnt; i++) { + b = iov[i].iov_base; + l = iov[i].iov_len; + + while (l > 0) { + copy = buf->end - buf->free; + copy = nxt_min(l, copy); + + buf->free = nxt_cpymem(buf->free, b, copy); + b += copy; + l -= copy; + + if (l > 0) { + buf_size -= buf->end - buf->start; + + rc = nxt_unit_buf_send(buf); + if (nxt_slow_path(rc != NXT_UNIT_OK)) { + nxt_unit_req_error(req, "Failed to send content"); + + return NXT_UNIT_ERROR; + } + + buf = nxt_unit_response_buf_alloc(req, nxt_min(buf_size, + PORT_MMAP_DATA_SIZE)); + if (nxt_slow_path(buf == NULL)) { + nxt_unit_req_error(req, + "Failed to allocate buf for content"); + + return NXT_UNIT_ERROR; + } + } + } + } + + if (buf->free > buf->start) { + rc = nxt_unit_buf_send(buf); + if (nxt_slow_path(rc != NXT_UNIT_OK)) { + nxt_unit_req_error(req, "Failed to send content"); + } + } + + return rc; +} + + +ssize_t +nxt_unit_websocket_read(nxt_unit_websocket_frame_t *ws, void *dst, + size_t size) +{ + ssize_t res; + uint8_t *b; + uint64_t i, d; + + res = nxt_unit_buf_read(&ws->content_buf, &ws->content_length, + dst, size); + + if (ws->mask == NULL) { + return res; + } + + b = dst; + d = (ws->payload_len - ws->content_length - res) % 4; + + for (i = 0; i < (uint64_t) res; i++) { + b[i] ^= ws->mask[ (i + d) % 4 ]; + } + + return res; +} + + +int +nxt_unit_websocket_retain(nxt_unit_websocket_frame_t *ws) +{ + char *b; + size_t size; + nxt_unit_websocket_frame_impl_t *ws_impl; + + ws_impl = nxt_container_of(ws, nxt_unit_websocket_frame_impl_t, ws); + + if (ws_impl->retain_buf != NULL || ws_impl->buf->hdr != NULL) { + return NXT_UNIT_OK; + } + + size = ws_impl->buf->buf.end - ws_impl->buf->buf.start; + + b = malloc(size); + if (nxt_slow_path(b == NULL)) { + return NXT_UNIT_ERROR; + } + + memcpy(b, ws_impl->buf->buf.start, size); + + ws_impl->buf->buf.start = b; + ws_impl->buf->buf.free = b; + ws_impl->buf->buf.end = b + size; + + ws_impl->retain_buf = b; + + return NXT_UNIT_OK; +} + + +void +nxt_unit_websocket_done(nxt_unit_websocket_frame_t *ws) +{ + nxt_unit_websocket_frame_release(ws); +} + + static nxt_port_mmap_header_t * nxt_unit_mmap_get(nxt_unit_ctx_t *ctx, nxt_unit_process_t *process, nxt_unit_port_id_t *port_id, nxt_chunk_id_t *c, int n) @@ -2355,7 +2883,7 @@ nxt_unit_tracking_read(nxt_unit_ctx_t *ctx, nxt_unit_recv_msg_t *recv_msg) if (recv_msg->size < (int) sizeof(nxt_port_mmap_tracking_msg_t)) { nxt_unit_warn(ctx, "#%"PRIu32": tracking_read: too small message (%d)", - recv_msg->port_msg.stream, (int) recv_msg->size); + recv_msg->stream, (int) recv_msg->size); return 0; } @@ -2378,18 +2906,18 @@ nxt_unit_tracking_read(nxt_unit_ctx_t *ctx, nxt_unit_recv_msg_t *recv_msg) nxt_unit_warn(ctx, "#%"PRIu32": tracking_read: " "invalid mmap id %d,%"PRIu32, - recv_msg->port_msg.stream, - (int) process->pid, tracking_msg->mmap_id); + recv_msg->stream, (int) process->pid, + tracking_msg->mmap_id); return 0; } c = tracking_msg->tracking_id; - rc = nxt_atomic_cmp_set(hdr->tracking + c, recv_msg->port_msg.stream, 0); + rc = nxt_atomic_cmp_set(hdr->tracking + c, recv_msg->stream, 0); if (rc == 0) { nxt_unit_debug(ctx, "#%"PRIu32": tracking cancelled", - recv_msg->port_msg.stream); + recv_msg->stream); nxt_port_mmap_set_chunk_free(hdr->free_tracking_map, c); } @@ -2401,19 +2929,18 @@ nxt_unit_tracking_read(nxt_unit_ctx_t *ctx, nxt_unit_recv_msg_t *recv_msg) static int -nxt_unit_mmap_read(nxt_unit_ctx_t *ctx, nxt_unit_recv_msg_t *recv_msg, - nxt_queue_t *incoming_buf) +nxt_unit_mmap_read(nxt_unit_ctx_t *ctx, nxt_unit_recv_msg_t *recv_msg) { void *start; uint32_t size; nxt_unit_process_t *process; - nxt_unit_mmap_buf_t *b; + nxt_unit_mmap_buf_t *b, **incoming_tail; nxt_port_mmap_msg_t *mmap_msg, *end; nxt_port_mmap_header_t *hdr; if (nxt_slow_path(recv_msg->size < sizeof(nxt_port_mmap_msg_t))) { nxt_unit_warn(ctx, "#%"PRIu32": mmap_read: too small message (%d)", - recv_msg->port_msg.stream, (int) recv_msg->size); + recv_msg->stream, (int) recv_msg->size); return NXT_UNIT_ERROR; } @@ -2426,6 +2953,8 @@ nxt_unit_mmap_read(nxt_unit_ctx_t *ctx, nxt_unit_recv_msg_t *recv_msg, mmap_msg = recv_msg->start; end = nxt_pointer_to(recv_msg->start, recv_msg->size); + incoming_tail = &recv_msg->incoming_buf; + pthread_mutex_lock(&process->incoming.mutex); for (; mmap_msg < end; mmap_msg++) { @@ -2435,8 +2964,8 @@ nxt_unit_mmap_read(nxt_unit_ctx_t *ctx, nxt_unit_recv_msg_t *recv_msg, nxt_unit_warn(ctx, "#%"PRIu32": mmap_read: " "invalid mmap id %d,%"PRIu32, - recv_msg->port_msg.stream, - (int) process->pid, mmap_msg->mmap_id); + recv_msg->stream, (int) process->pid, + mmap_msg->mmap_id); return NXT_UNIT_ERROR; } @@ -2453,16 +2982,16 @@ nxt_unit_mmap_read(nxt_unit_ctx_t *ctx, nxt_unit_recv_msg_t *recv_msg, if (nxt_slow_path(b == NULL)) { pthread_mutex_unlock(&process->incoming.mutex); - nxt_unit_warn(ctx, "#%"PRIu32": mmap_read: " - "failed to allocate buf", - recv_msg->port_msg.stream); + nxt_unit_warn(ctx, "#%"PRIu32": mmap_read: failed to allocate buf", + recv_msg->stream); nxt_unit_mmap_release(hdr, start, size); return NXT_UNIT_ERROR; } - nxt_queue_insert_tail(incoming_buf, &b->link); + nxt_unit_mmap_buf_insert(incoming_tail, b); + incoming_tail = &b->next; b->buf.start = start; b->buf.free = start; @@ -2470,7 +2999,7 @@ nxt_unit_mmap_read(nxt_unit_ctx_t *ctx, nxt_unit_recv_msg_t *recv_msg, b->hdr = hdr; nxt_unit_debug(ctx, "#%"PRIu32": mmap_read: [%p,%d] %d->%d,(%d,%d,%d)", - recv_msg->port_msg.stream, + recv_msg->stream, start, (int) size, (int) hdr->src_pid, (int) hdr->dst_pid, (int) hdr->id, (int) mmap_msg->chunk_id, @@ -2685,6 +3214,11 @@ nxt_unit_run_once(nxt_unit_ctx_t *ctx) if (nxt_fast_path(rsize > 0)) { rc = nxt_unit_process_msg(ctx, &ctx_impl->read_port_id, buf, rsize, oob, sizeof(oob)); + +#if (NXT_DEBUG) + memset(buf, 0xAC, rsize); +#endif + } else { rc = NXT_UNIT_ERROR; } @@ -2775,10 +3309,11 @@ nxt_unit_ctx_alloc(nxt_unit_ctx_t *ctx, void *data) void nxt_unit_ctx_free(nxt_unit_ctx_t *ctx) { - nxt_unit_impl_t *lib; - nxt_unit_ctx_impl_t *ctx_impl; - nxt_unit_mmap_buf_t *mmap_buf; - nxt_unit_request_info_impl_t *req_impl; + nxt_unit_impl_t *lib; + nxt_unit_ctx_impl_t *ctx_impl; + nxt_unit_mmap_buf_t *mmap_buf; + nxt_unit_request_info_impl_t *req_impl; + nxt_unit_websocket_frame_impl_t *ws_impl; ctx_impl = nxt_container_of(ctx, nxt_unit_ctx_impl_t, ctx); lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit); @@ -2792,15 +3327,14 @@ nxt_unit_ctx_free(nxt_unit_ctx_t *ctx) } nxt_queue_loop; - nxt_queue_remove(&ctx_impl->ctx_buf[0].link); - nxt_queue_remove(&ctx_impl->ctx_buf[1].link); - - nxt_queue_each(mmap_buf, &ctx_impl->free_buf, nxt_unit_mmap_buf_t, link) { + nxt_unit_mmap_buf_remove(&ctx_impl->ctx_buf[0]); + nxt_unit_mmap_buf_remove(&ctx_impl->ctx_buf[1]); - nxt_queue_remove(&mmap_buf->link); + while (ctx_impl->free_buf != NULL) { + mmap_buf = ctx_impl->free_buf; + nxt_unit_mmap_buf_remove(mmap_buf); free(mmap_buf); - - } nxt_queue_loop; + } nxt_queue_each(req_impl, &ctx_impl->free_req, nxt_unit_request_info_impl_t, link) @@ -2809,6 +3343,13 @@ nxt_unit_ctx_free(nxt_unit_ctx_t *ctx) } nxt_queue_loop; + nxt_queue_each(ws_impl, &ctx_impl->free_ws, + nxt_unit_websocket_frame_impl_t, link) + { + nxt_unit_websocket_frame_free(ws_impl); + + } nxt_queue_loop; + nxt_queue_remove(&ctx_impl->link); if (ctx_impl != &lib->main_ctx) { @@ -3454,6 +3995,83 @@ nxt_unit_port_hash_find(nxt_lvlhsh_t *port_hash, nxt_unit_port_id_t *port_id, } +static nxt_int_t +nxt_unit_request_hash_test(nxt_lvlhsh_query_t *lhq, void *data) +{ + return NXT_OK; +} + + +static const nxt_lvlhsh_proto_t lvlhsh_requests_proto nxt_aligned(64) = { + NXT_LVLHSH_DEFAULT, + nxt_unit_request_hash_test, + nxt_lvlhsh_alloc, + nxt_lvlhsh_free, +}; + + +static int +nxt_unit_request_hash_add(nxt_lvlhsh_t *request_hash, + nxt_unit_request_info_impl_t *req_impl) +{ + uint32_t *stream; + nxt_int_t res; + nxt_lvlhsh_query_t lhq; + + stream = &req_impl->stream; + + lhq.key_hash = nxt_murmur_hash2(stream, sizeof(*stream)); + lhq.key.length = sizeof(*stream); + lhq.key.start = (u_char *) stream; + lhq.proto = &lvlhsh_requests_proto; + lhq.pool = NULL; + lhq.replace = 0; + lhq.value = req_impl; + + res = nxt_lvlhsh_insert(request_hash, &lhq); + + switch (res) { + + case NXT_OK: + return NXT_UNIT_OK; + + default: + return NXT_UNIT_ERROR; + } +} + + +static nxt_unit_request_info_impl_t * +nxt_unit_request_hash_find(nxt_lvlhsh_t *request_hash, uint32_t stream, + int remove) +{ + nxt_int_t res; + nxt_lvlhsh_query_t lhq; + + lhq.key_hash = nxt_murmur_hash2(&stream, sizeof(stream)); + lhq.key.length = sizeof(stream); + lhq.key.start = (u_char *) &stream; + lhq.proto = &lvlhsh_requests_proto; + lhq.pool = NULL; + + if (remove) { + res = nxt_lvlhsh_delete(request_hash, &lhq); + + } else { + res = nxt_lvlhsh_find(request_hash, &lhq); + } + + switch (res) { + + case NXT_OK: + return lhq.value; + + default: + return NULL; + } +} + + void nxt_unit_log(nxt_unit_ctx_t *ctx, int level, const char *fmt, ...) { @@ -3526,8 +4144,7 @@ nxt_unit_req_log(nxt_unit_request_info_t *req, int level, const char *fmt, ...) if (nxt_fast_path(req != NULL)) { req_impl = nxt_container_of(req, nxt_unit_request_info_impl_t, req); - p += snprintf(p, end - p, - "#%"PRIu32": ", req_impl->recv_msg.port_msg.stream); + p += snprintf(p, end - p, "#%"PRIu32": ", req_impl->stream); } va_start(ap, fmt); |