summaryrefslogtreecommitdiffhomepage
path: root/src/nxt_unit.c
diff options
context:
space:
mode:
authorMax Romanov <max.romanov@nginx.com>2019-08-20 16:31:53 +0300
committerMax Romanov <max.romanov@nginx.com>2019-08-20 16:31:53 +0300
commite501c74ddceab86e48c031ca9b5e154f52dcdae0 (patch)
tree7bfe94354df516d1ceefc5af3194ba943e443aa2 /src/nxt_unit.c
parent9bbf54e23e185e94054072fff2673f6f5cd203e9 (diff)
downloadunit-e501c74ddceab86e48c031ca9b5e154f52dcdae0.tar.gz
unit-e501c74ddceab86e48c031ca9b5e154f52dcdae0.tar.bz2
Introducing websocket support in router and libunit.
Diffstat (limited to 'src/nxt_unit.c')
-rw-r--r--src/nxt_unit.c1159
1 files changed, 888 insertions, 271 deletions
diff --git a/src/nxt_unit.c b/src/nxt_unit.c
index 88c7fa6a..28a0de20 100644
--- a/src/nxt_unit.c
+++ b/src/nxt_unit.c
@@ -11,38 +11,60 @@
#include "nxt_unit.h"
#include "nxt_unit_request.h"
#include "nxt_unit_response.h"
+#include "nxt_unit_websocket.h"
+
+#include "nxt_websocket.h"
#if (NXT_HAVE_MEMFD_CREATE)
#include <linux/memfd.h>
#endif
-typedef struct nxt_unit_impl_s nxt_unit_impl_t;
-typedef struct nxt_unit_mmap_s nxt_unit_mmap_t;
-typedef struct nxt_unit_mmaps_s nxt_unit_mmaps_t;
-typedef struct nxt_unit_process_s nxt_unit_process_t;
-typedef struct nxt_unit_mmap_buf_s nxt_unit_mmap_buf_t;
-typedef struct nxt_unit_recv_msg_s nxt_unit_recv_msg_t;
-typedef struct nxt_unit_ctx_impl_s nxt_unit_ctx_impl_t;
-typedef struct nxt_unit_port_impl_s nxt_unit_port_impl_t;
-typedef struct nxt_unit_request_info_impl_s nxt_unit_request_info_impl_t;
+typedef struct nxt_unit_impl_s nxt_unit_impl_t;
+typedef struct nxt_unit_mmap_s nxt_unit_mmap_t;
+typedef struct nxt_unit_mmaps_s nxt_unit_mmaps_t;
+typedef struct nxt_unit_process_s nxt_unit_process_t;
+typedef struct nxt_unit_mmap_buf_s nxt_unit_mmap_buf_t;
+typedef struct nxt_unit_recv_msg_s nxt_unit_recv_msg_t;
+typedef struct nxt_unit_ctx_impl_s nxt_unit_ctx_impl_t;
+typedef struct nxt_unit_port_impl_s nxt_unit_port_impl_t;
+typedef struct nxt_unit_request_info_impl_s nxt_unit_request_info_impl_t;
+typedef struct nxt_unit_websocket_frame_impl_s nxt_unit_websocket_frame_impl_t;
static nxt_unit_impl_t *nxt_unit_create(nxt_unit_init_t *init);
static void nxt_unit_ctx_init(nxt_unit_impl_t *lib,
nxt_unit_ctx_impl_t *ctx_impl, void *data);
+nxt_inline void nxt_unit_mmap_buf_insert(nxt_unit_mmap_buf_t **head,
+ nxt_unit_mmap_buf_t *mmap_buf);
+nxt_inline void nxt_unit_mmap_buf_insert_tail(nxt_unit_mmap_buf_t **prev,
+ nxt_unit_mmap_buf_t *mmap_buf);
+nxt_inline void nxt_unit_mmap_buf_remove(nxt_unit_mmap_buf_t *mmap_buf);
static int nxt_unit_read_env(nxt_unit_port_t *ready_port,
nxt_unit_port_t *read_port, int *log_fd, uint32_t *stream);
static int nxt_unit_ready(nxt_unit_ctx_t *ctx, nxt_unit_port_id_t *port_id,
uint32_t stream);
+static int nxt_unit_process_new_port(nxt_unit_ctx_t *ctx,
+ nxt_unit_recv_msg_t *recv_msg);
+static int nxt_unit_process_req_headers(nxt_unit_ctx_t *ctx,
+ nxt_unit_recv_msg_t *recv_msg);
+static int nxt_unit_process_websocket(nxt_unit_ctx_t *ctx,
+ nxt_unit_recv_msg_t *recv_msg);
static nxt_unit_request_info_impl_t *nxt_unit_request_info_get(
nxt_unit_ctx_t *ctx);
static void nxt_unit_request_info_release(nxt_unit_request_info_t *req);
static void nxt_unit_request_info_free(nxt_unit_request_info_impl_t *req);
+static nxt_unit_websocket_frame_impl_t *nxt_unit_websocket_frame_get(
+ nxt_unit_ctx_t *ctx);
+static void nxt_unit_websocket_frame_release(nxt_unit_websocket_frame_t *ws);
+static void nxt_unit_websocket_frame_free(nxt_unit_websocket_frame_impl_t *ws);
static nxt_unit_process_t *nxt_unit_msg_get_process(nxt_unit_ctx_t *ctx,
nxt_unit_recv_msg_t *recv_msg);
static nxt_unit_mmap_buf_t *nxt_unit_mmap_buf_get(nxt_unit_ctx_t *ctx);
static void nxt_unit_mmap_buf_release(nxt_unit_mmap_buf_t *mmap_buf);
static int nxt_unit_mmap_buf_send(nxt_unit_ctx_t *ctx, uint32_t stream,
nxt_unit_mmap_buf_t *mmap_buf, int last);
+static void nxt_unit_mmap_buf_free(nxt_unit_mmap_buf_t *mmap_buf);
+static ssize_t nxt_unit_buf_read(nxt_unit_buf_t **b, uint64_t *len, void *dst,
+ size_t size);
static nxt_port_mmap_header_t *nxt_unit_mmap_get(nxt_unit_ctx_t *ctx,
nxt_unit_process_t *process, nxt_unit_port_id_t *port_id,
nxt_chunk_id_t *c, int n);
@@ -65,7 +87,7 @@ static nxt_port_mmap_header_t *nxt_unit_get_incoming_mmap(nxt_unit_ctx_t *ctx,
static int nxt_unit_tracking_read(nxt_unit_ctx_t *ctx,
nxt_unit_recv_msg_t *recv_msg);
static int nxt_unit_mmap_read(nxt_unit_ctx_t *ctx,
- nxt_unit_recv_msg_t *recv_msg, nxt_queue_t *incoming_buf);
+ nxt_unit_recv_msg_t *recv_msg);
static int nxt_unit_mmap_release(nxt_port_mmap_header_t *hdr, void *start,
uint32_t size);
@@ -98,14 +120,22 @@ static int nxt_unit_port_hash_add(nxt_lvlhsh_t *port_hash,
static nxt_unit_port_impl_t *nxt_unit_port_hash_find(nxt_lvlhsh_t *port_hash,
nxt_unit_port_id_t *port_id, int remove);
+static int nxt_unit_request_hash_add(nxt_lvlhsh_t *request_hash,
+ nxt_unit_request_info_impl_t *req_impl);
+static nxt_unit_request_info_impl_t *nxt_unit_request_hash_find(
+ nxt_lvlhsh_t *request_hash, uint32_t stream, int remove);
+
static char * nxt_unit_snprint_prefix(char *p, char *end, pid_t pid, int level);
struct nxt_unit_mmap_buf_s {
nxt_unit_buf_t buf;
+ nxt_unit_mmap_buf_t *next;
+ nxt_unit_mmap_buf_t **prev;
+
nxt_port_mmap_header_t *hdr;
- nxt_queue_link_t link;
+// nxt_queue_link_t link;
nxt_unit_port_id_t port_id;
nxt_unit_request_info_t *req;
nxt_unit_ctx_impl_t *ctx_impl;
@@ -113,12 +143,20 @@ struct nxt_unit_mmap_buf_s {
struct nxt_unit_recv_msg_s {
- nxt_port_msg_t port_msg;
+ uint32_t stream;
+ nxt_pid_t pid;
+ nxt_port_id_t reply_port;
+
+ uint8_t last; /* 1 bit */
+ uint8_t mmap; /* 1 bit */
void *start;
uint32_t size;
+ int fd;
nxt_unit_process_t *process;
+
+ nxt_unit_mmap_buf_t *incoming_buf;
};
@@ -127,18 +165,22 @@ typedef enum {
NXT_UNIT_RS_RESPONSE_INIT,
NXT_UNIT_RS_RESPONSE_HAS_CONTENT,
NXT_UNIT_RS_RESPONSE_SENT,
- NXT_UNIT_RS_DONE,
+ NXT_UNIT_RS_RELEASED,
} nxt_unit_req_state_t;
struct nxt_unit_request_info_impl_s {
nxt_unit_request_info_t req;
- nxt_unit_recv_msg_t recv_msg;
- nxt_queue_t outgoing_buf; /* of nxt_unit_mmap_buf_t */
- nxt_queue_t incoming_buf; /* of nxt_unit_mmap_buf_t */
+ uint32_t stream;
+
+ nxt_unit_process_t *process;
+
+ nxt_unit_mmap_buf_t *outgoing_buf;
+ nxt_unit_mmap_buf_t *incoming_buf;
nxt_unit_req_state_t state;
+ uint8_t websocket;
nxt_queue_link_t link;
@@ -146,6 +188,19 @@ struct nxt_unit_request_info_impl_s {
};
+struct nxt_unit_websocket_frame_impl_s {
+ nxt_unit_websocket_frame_t ws;
+
+ nxt_unit_mmap_buf_t *buf;
+
+ nxt_queue_link_t link;
+
+ nxt_unit_ctx_impl_t *ctx_impl;
+
+ void *retain_buf;
+};
+
+
struct nxt_unit_ctx_impl_s {
nxt_unit_ctx_t ctx;
@@ -154,14 +209,20 @@ struct nxt_unit_ctx_impl_s {
nxt_queue_link_t link;
- nxt_queue_t free_buf; /* of nxt_unit_mmap_buf_t */
+ nxt_unit_mmap_buf_t *free_buf;
/* of nxt_unit_request_info_impl_t */
nxt_queue_t free_req;
+ /* of nxt_unit_websocket_frame_impl_t */
+ nxt_queue_t free_ws;
+
/* of nxt_unit_request_info_impl_t */
nxt_queue_t active_req;
+ /* of nxt_unit_request_info_impl_t */
+ nxt_lvlhsh_t requests;
+
nxt_unit_mmap_buf_t ctx_buf[2];
nxt_unit_request_info_impl_t req;
@@ -394,18 +455,65 @@ nxt_unit_ctx_init(nxt_unit_impl_t *lib, nxt_unit_ctx_impl_t *ctx_impl,
nxt_queue_insert_tail(&lib->contexts, &ctx_impl->link);
- nxt_queue_init(&ctx_impl->free_buf);
nxt_queue_init(&ctx_impl->free_req);
+ nxt_queue_init(&ctx_impl->free_ws);
nxt_queue_init(&ctx_impl->active_req);
- nxt_queue_insert_tail(&ctx_impl->free_buf, &ctx_impl->ctx_buf[0].link);
- nxt_queue_insert_tail(&ctx_impl->free_buf, &ctx_impl->ctx_buf[1].link);
+ ctx_impl->free_buf = NULL;
+ nxt_unit_mmap_buf_insert(&ctx_impl->free_buf, &ctx_impl->ctx_buf[1]);
+ nxt_unit_mmap_buf_insert(&ctx_impl->free_buf, &ctx_impl->ctx_buf[0]);
+
nxt_queue_insert_tail(&ctx_impl->free_req, &ctx_impl->req.link);
ctx_impl->req.req.ctx = &ctx_impl->ctx;
ctx_impl->req.req.unit = &lib->unit;
ctx_impl->read_port_fd = -1;
+ ctx_impl->requests.slot = 0;
+}
+
+
+nxt_inline void
+nxt_unit_mmap_buf_insert(nxt_unit_mmap_buf_t **head,
+ nxt_unit_mmap_buf_t *mmap_buf)
+{
+ mmap_buf->next = *head;
+
+ if (mmap_buf->next != NULL) {
+ mmap_buf->next->prev = &mmap_buf->next;
+ }
+
+ *head = mmap_buf;
+ mmap_buf->prev = head;
+}
+
+
+nxt_inline void
+nxt_unit_mmap_buf_insert_tail(nxt_unit_mmap_buf_t **prev,
+ nxt_unit_mmap_buf_t *mmap_buf)
+{
+ while (*prev != NULL) {
+ prev = &(*prev)->next;
+ }
+
+ nxt_unit_mmap_buf_insert(prev, mmap_buf);
+}
+
+
+nxt_inline void
+nxt_unit_mmap_buf_remove(nxt_unit_mmap_buf_t *mmap_buf)
+{
+ nxt_unit_mmap_buf_t **prev;
+
+ prev = mmap_buf->prev;
+
+ if (mmap_buf->next != NULL) {
+ mmap_buf->next->prev = prev;
+ }
+
+ if (prev != NULL) {
+ *prev = mmap_buf->next;
+ }
}
@@ -509,26 +617,18 @@ int
nxt_unit_process_msg(nxt_unit_ctx_t *ctx, nxt_unit_port_id_t *port_id,
void *buf, size_t buf_size, void *oob, size_t oob_size)
{
- int fd, rc, nb;
- pid_t pid;
- nxt_queue_t incoming_buf;
- struct cmsghdr *cm;
- nxt_port_msg_t *port_msg;
- nxt_unit_impl_t *lib;
- nxt_unit_port_t new_port;
- nxt_queue_link_t *lnk;
- nxt_unit_request_t *r;
- nxt_unit_mmap_buf_t *b;
- nxt_unit_recv_msg_t recv_msg;
- nxt_unit_callbacks_t *cb;
- nxt_port_msg_new_port_t *new_port_msg;
- nxt_unit_request_info_t *req;
- nxt_unit_request_info_impl_t *req_impl;
+ int rc;
+ pid_t pid;
+ struct cmsghdr *cm;
+ nxt_port_msg_t *port_msg;
+ nxt_unit_impl_t *lib;
+ nxt_unit_recv_msg_t recv_msg;
+ nxt_unit_callbacks_t *cb;
lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit);
rc = NXT_UNIT_ERROR;
- fd = -1;
+ recv_msg.fd = -1;
recv_msg.process = NULL;
port_msg = buf;
cm = oob;
@@ -538,17 +638,22 @@ nxt_unit_process_msg(nxt_unit_ctx_t *ctx, nxt_unit_port_id_t *port_id,
&& cm->cmsg_level == SOL_SOCKET
&& cm->cmsg_type == SCM_RIGHTS)
{
- memcpy(&fd, CMSG_DATA(cm), sizeof(int));
+ memcpy(&recv_msg.fd, CMSG_DATA(cm), sizeof(int));
}
- nxt_queue_init(&incoming_buf);
+ recv_msg.incoming_buf = NULL;
if (nxt_slow_path(buf_size < sizeof(nxt_port_msg_t))) {
nxt_unit_warn(ctx, "message too small (%d bytes)", (int) buf_size);
goto fail;
}
- recv_msg.port_msg = *port_msg;
+ recv_msg.stream = port_msg->stream;
+ recv_msg.pid = port_msg->pid;
+ recv_msg.reply_port = port_msg->reply_port;
+ recv_msg.last = port_msg->last;
+ recv_msg.mmap = port_msg->mmap;
+
recv_msg.start = port_msg + 1;
recv_msg.size = buf_size - sizeof(nxt_port_msg_t);
@@ -572,7 +677,7 @@ nxt_unit_process_msg(nxt_unit_ctx_t *ctx, nxt_unit_port_id_t *port_id,
}
if (port_msg->mmap) {
- if (nxt_unit_mmap_read(ctx, &recv_msg, &incoming_buf) != NXT_UNIT_OK) {
+ if (nxt_unit_mmap_read(ctx, &recv_msg) != NXT_UNIT_OK) {
goto fail;
}
}
@@ -589,187 +694,326 @@ nxt_unit_process_msg(nxt_unit_ctx_t *ctx, nxt_unit_port_id_t *port_id,
break;
case _NXT_PORT_MSG_NEW_PORT:
- if (nxt_slow_path(recv_msg.size != sizeof(nxt_port_msg_new_port_t))) {
- nxt_unit_warn(ctx, "#%"PRIu32": new_port: "
- "invalid message size (%d)",
- port_msg->stream, (int) recv_msg.size);
+ rc = nxt_unit_process_new_port(ctx, &recv_msg);
+ break;
- goto fail;
- }
+ case _NXT_PORT_MSG_CHANGE_FILE:
+ nxt_unit_debug(ctx, "#%"PRIu32": change_file: fd %d",
+ port_msg->stream, recv_msg.fd);
+ break;
- if (nxt_slow_path(fd < 0)) {
- nxt_unit_alert(ctx, "#%"PRIu32": invalid fd %d for new port",
- port_msg->stream, fd);
+ case _NXT_PORT_MSG_MMAP:
+ if (nxt_slow_path(recv_msg.fd < 0)) {
+ nxt_unit_alert(ctx, "#%"PRIu32": invalid fd %d for mmap",
+ port_msg->stream, recv_msg.fd);
goto fail;
}
- new_port_msg = recv_msg.start;
+ rc = nxt_unit_incoming_mmap(ctx, port_msg->pid, recv_msg.fd);
+ break;
- nxt_unit_debug(ctx, "#%"PRIu32": new_port: %d,%d fd %d",
- port_msg->stream, (int) new_port_msg->pid,
- (int) new_port_msg->id, fd);
+ case _NXT_PORT_MSG_REQ_HEADERS:
+ rc = nxt_unit_process_req_headers(ctx, &recv_msg);
+ break;
- nb = 0;
+ case _NXT_PORT_MSG_WEBSOCKET:
+ rc = nxt_unit_process_websocket(ctx, &recv_msg);
+ break;
- if (nxt_slow_path(ioctl(fd, FIONBIO, &nb) == -1)) {
- nxt_unit_alert(ctx, "#%"PRIu32": new_port: ioctl(%d, FIONBIO, 0) "
- "failed: %s (%d)", fd, strerror(errno), errno);
+ case _NXT_PORT_MSG_REMOVE_PID:
+ if (nxt_slow_path(recv_msg.size != sizeof(pid))) {
+ nxt_unit_warn(ctx, "#%"PRIu32": remove_pid: invalid message size "
+ "(%d != %d)", port_msg->stream, (int) recv_msg.size,
+ (int) sizeof(pid));
goto fail;
}
- nxt_unit_port_id_init(&new_port.id, new_port_msg->pid,
- new_port_msg->id);
+ memcpy(&pid, recv_msg.start, sizeof(pid));
- new_port.in_fd = -1;
- new_port.out_fd = fd;
- new_port.data = NULL;
+ nxt_unit_debug(ctx, "#%"PRIu32": remove_pid: %d",
+ port_msg->stream, (int) pid);
- fd = -1;
+ cb->remove_pid(ctx, pid);
- rc = cb->add_port(ctx, &new_port);
+ rc = NXT_UNIT_OK;
break;
- case _NXT_PORT_MSG_CHANGE_FILE:
- nxt_unit_debug(ctx, "#%"PRIu32": change_file: fd %d",
- port_msg->stream, fd);
- break;
+ default:
+ nxt_unit_debug(ctx, "#%"PRIu32": ignore message type: %d",
+ port_msg->stream, (int) port_msg->type);
- case _NXT_PORT_MSG_MMAP:
- if (nxt_slow_path(fd < 0)) {
- nxt_unit_alert(ctx, "#%"PRIu32": invalid fd %d for mmap",
- port_msg->stream, fd);
+ goto fail;
+ }
- goto fail;
- }
+fail:
- rc = nxt_unit_incoming_mmap(ctx, port_msg->pid, fd);
- break;
+ if (recv_msg.fd != -1) {
+ close(recv_msg.fd);
+ }
- case _NXT_PORT_MSG_DATA:
- if (nxt_slow_path(port_msg->mmap == 0)) {
- nxt_unit_warn(ctx, "#%"PRIu32": data is not in shared memory",
- port_msg->stream);
+ while (recv_msg.incoming_buf != NULL) {
+ nxt_unit_mmap_buf_free(recv_msg.incoming_buf);
+ }
- goto fail;
- }
+ if (recv_msg.process != NULL) {
+ nxt_unit_process_use(ctx, recv_msg.process, -1);
+ }
- if (nxt_slow_path(recv_msg.size < sizeof(nxt_unit_request_t))) {
- nxt_unit_warn(ctx, "#%"PRIu32": data too short: %d while at least "
- "%d expected", port_msg->stream, (int) recv_msg.size,
- (int) sizeof(nxt_unit_request_t));
+ return rc;
+}
- goto fail;
- }
- req_impl = nxt_unit_request_info_get(ctx);
- if (nxt_slow_path(req_impl == NULL)) {
- nxt_unit_warn(ctx, "#%"PRIu32": request info allocation failed",
- port_msg->stream);
+static int
+nxt_unit_process_new_port(nxt_unit_ctx_t *ctx, nxt_unit_recv_msg_t *recv_msg)
+{
+ int nb;
+ nxt_unit_impl_t *lib;
+ nxt_unit_port_t new_port;
+ nxt_port_msg_new_port_t *new_port_msg;
- goto fail;
- }
+ if (nxt_slow_path(recv_msg->size != sizeof(nxt_port_msg_new_port_t))) {
+ nxt_unit_warn(ctx, "#%"PRIu32": new_port: "
+ "invalid message size (%d)",
+ recv_msg->stream, (int) recv_msg->size);
- req = &req_impl->req;
+ return NXT_UNIT_ERROR;
+ }
- req->request_port = *port_id;
+ if (nxt_slow_path(recv_msg->fd < 0)) {
+ nxt_unit_alert(ctx, "#%"PRIu32": invalid fd %d for new port",
+ recv_msg->stream, recv_msg->fd);
- nxt_unit_port_id_init(&req->response_port, port_msg->pid,
- port_msg->reply_port);
+ return NXT_UNIT_ERROR;
+ }
- req->request = recv_msg.start;
+ new_port_msg = recv_msg->start;
- lnk = nxt_queue_first(&incoming_buf);
- b = nxt_container_of(lnk, nxt_unit_mmap_buf_t, link);
+ nxt_unit_debug(ctx, "#%"PRIu32": new_port: %d,%d fd %d",
+ recv_msg->stream, (int) new_port_msg->pid,
+ (int) new_port_msg->id, recv_msg->fd);
- req->request_buf = &b->buf;
- req->response = NULL;
- req->response_buf = NULL;
+ nb = 0;
- r = req->request;
+ if (nxt_slow_path(ioctl(recv_msg->fd, FIONBIO, &nb) == -1)) {
+ nxt_unit_alert(ctx, "#%"PRIu32": new_port: ioctl(%d, FIONBIO, 0) "
+ "failed: %s (%d)", recv_msg->fd, strerror(errno), errno);
- req->content_length = r->content_length;
+ return NXT_UNIT_ERROR;
+ }
- req->content_buf = req->request_buf;
- req->content_buf->free = nxt_unit_sptr_get(&r->preread_content);
+ nxt_unit_port_id_init(&new_port.id, new_port_msg->pid,
+ new_port_msg->id);
- /* Move process to req_impl. */
- req_impl->recv_msg = recv_msg;
+ new_port.in_fd = -1;
+ new_port.out_fd = recv_msg->fd;
+ new_port.data = NULL;
- recv_msg.process = NULL;
+ recv_msg->fd = -1;
- nxt_queue_init(&req_impl->outgoing_buf);
- nxt_queue_init(&req_impl->incoming_buf);
+ lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit);
- nxt_queue_each(b, &incoming_buf, nxt_unit_mmap_buf_t, link)
- {
- b->req = req;
- } nxt_queue_loop;
+ return lib->callbacks.add_port(ctx, &new_port);
+}
- nxt_queue_add(&req_impl->incoming_buf, &incoming_buf);
- nxt_queue_init(&incoming_buf);
- req->response_max_fields = 0;
- req_impl->state = NXT_UNIT_RS_START;
+static int
+nxt_unit_process_req_headers(nxt_unit_ctx_t *ctx, nxt_unit_recv_msg_t *recv_msg)
+{
+ nxt_unit_impl_t *lib;
+ nxt_unit_request_t *r;
+ nxt_unit_mmap_buf_t *b;
+ nxt_unit_request_info_t *req;
+ nxt_unit_request_info_impl_t *req_impl;
- nxt_unit_debug(ctx, "#%"PRIu32": %.*s %.*s (%d)", port_msg->stream,
- (int) r->method_length, nxt_unit_sptr_get(&r->method),
- (int) r->target_length, nxt_unit_sptr_get(&r->target),
- (int) r->content_length);
+ if (nxt_slow_path(recv_msg->mmap == 0)) {
+ nxt_unit_warn(ctx, "#%"PRIu32": data is not in shared memory",
+ recv_msg->stream);
- cb->request_handler(req);
+ return NXT_UNIT_ERROR;
+ }
- rc = NXT_UNIT_OK;
- break;
+ if (nxt_slow_path(recv_msg->size < sizeof(nxt_unit_request_t))) {
+ nxt_unit_warn(ctx, "#%"PRIu32": data too short: %d while at least "
+ "%d expected", recv_msg->stream, (int) recv_msg->size,
+ (int) sizeof(nxt_unit_request_t));
- case _NXT_PORT_MSG_REMOVE_PID:
- if (nxt_slow_path(recv_msg.size != sizeof(pid))) {
- nxt_unit_warn(ctx, "#%"PRIu32": remove_pid: invalid message size "
- "(%d != %d)", port_msg->stream, (int) recv_msg.size,
- (int) sizeof(pid));
+ return NXT_UNIT_ERROR;
+ }
- goto fail;
- }
+ req_impl = nxt_unit_request_info_get(ctx);
+ if (nxt_slow_path(req_impl == NULL)) {
+ nxt_unit_warn(ctx, "#%"PRIu32": request info allocation failed",
+ recv_msg->stream);
- memcpy(&pid, recv_msg.start, sizeof(pid));
+ return NXT_UNIT_ERROR;
+ }
- nxt_unit_debug(ctx, "#%"PRIu32": remove_pid: %d",
- port_msg->stream, (int) pid);
+ req = &req_impl->req;
- cb->remove_pid(ctx, pid);
+ nxt_unit_port_id_init(&req->response_port, recv_msg->pid,
+ recv_msg->reply_port);
- rc = NXT_UNIT_OK;
- break;
+ req->request = recv_msg->start;
- default:
- nxt_unit_debug(ctx, "#%"PRIu32": ignore message type: %d",
- port_msg->stream, (int) port_msg->type);
+ b = recv_msg->incoming_buf;
- goto fail;
+ req->request_buf = &b->buf;
+ req->response = NULL;
+ req->response_buf = NULL;
+
+ r = req->request;
+
+ req->content_length = r->content_length;
+
+ req->content_buf = req->request_buf;
+ req->content_buf->free = nxt_unit_sptr_get(&r->preread_content);
+
+ /* "Move" process reference to req_impl. */
+ req_impl->process = nxt_unit_msg_get_process(ctx, recv_msg);
+ if (nxt_slow_path(req_impl->process == NULL)) {
+ return NXT_UNIT_ERROR;
}
-fail:
+ recv_msg->process = NULL;
- if (fd != -1) {
- close(fd);
+ req_impl->stream = recv_msg->stream;
+
+ req_impl->outgoing_buf = NULL;
+
+ for (b = recv_msg->incoming_buf; b != NULL; b = b->next) {
+ b->req = req;
}
- if (port_msg->mmap) {
- nxt_queue_each(b, &incoming_buf, nxt_unit_mmap_buf_t, link)
- {
- nxt_unit_mmap_release(b->hdr, b->buf.start,
- b->buf.end - b->buf.start);
+ /* "Move" incoming buffer list to req_impl. */
+ req_impl->incoming_buf = recv_msg->incoming_buf;
+ req_impl->incoming_buf->prev = &req_impl->incoming_buf;
+ recv_msg->incoming_buf = NULL;
+
+ req->response_max_fields = 0;
+ req_impl->state = NXT_UNIT_RS_START;
+ req_impl->websocket = 0;
+
+ nxt_unit_debug(ctx, "#%"PRIu32": %.*s %.*s (%d)", recv_msg->stream,
+ (int) r->method_length, nxt_unit_sptr_get(&r->method),
+ (int) r->target_length, nxt_unit_sptr_get(&r->target),
+ (int) r->content_length);
+
+ lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit);
+
+ lib->callbacks.request_handler(req);
- nxt_unit_mmap_buf_release(b);
- } nxt_queue_loop;
+ return NXT_UNIT_OK;
+}
+
+
+static int
+nxt_unit_process_websocket(nxt_unit_ctx_t *ctx, nxt_unit_recv_msg_t *recv_msg)
+{
+ size_t hsize;
+ nxt_unit_impl_t *lib;
+ nxt_unit_mmap_buf_t *b;
+ nxt_unit_ctx_impl_t *ctx_impl;
+ nxt_unit_callbacks_t *cb;
+ nxt_unit_request_info_t *req;
+ nxt_unit_request_info_impl_t *req_impl;
+ nxt_unit_websocket_frame_impl_t *ws_impl;
+
+ ctx_impl = nxt_container_of(ctx, nxt_unit_ctx_impl_t, ctx);
+
+ req_impl = nxt_unit_request_hash_find(&ctx_impl->requests, recv_msg->stream,
+ recv_msg->last);
+ if (req_impl == NULL) {
+ return NXT_UNIT_OK;
}
- if (recv_msg.process != NULL) {
- nxt_unit_process_use(ctx, recv_msg.process, -1);
+ req = &req_impl->req;
+
+ lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit);
+ cb = &lib->callbacks;
+
+ if (cb->websocket_handler && recv_msg->size >= 2) {
+ ws_impl = nxt_unit_websocket_frame_get(ctx);
+ if (nxt_slow_path(ws_impl == NULL)) {
+ nxt_unit_warn(ctx, "#%"PRIu32": websocket frame allocation failed",
+ req_impl->stream);
+
+ return NXT_UNIT_ERROR;
+ }
+
+ ws_impl->ws.req = req;
+
+ ws_impl->buf = NULL;
+ ws_impl->retain_buf = NULL;
+
+ if (recv_msg->mmap) {
+ for (b = recv_msg->incoming_buf; b != NULL; b = b->next) {
+ b->req = req;
+ }
+
+ /* "Move" incoming buffer list to ws_impl. */
+ ws_impl->buf = recv_msg->incoming_buf;
+ ws_impl->buf->prev = &ws_impl->buf;
+ recv_msg->incoming_buf = NULL;
+
+ b = ws_impl->buf;
+
+ } else {
+ b = nxt_unit_mmap_buf_get(ctx);
+ if (nxt_slow_path(b == NULL)) {
+ return NXT_UNIT_ERROR;
+ }
+
+ b->hdr = NULL;
+ b->req = req;
+ b->buf.start = recv_msg->start;
+ b->buf.free = b->buf.start;
+ b->buf.end = b->buf.start + recv_msg->size;
+
+ nxt_unit_mmap_buf_insert(&ws_impl->buf, b);
+ }
+
+ ws_impl->ws.header = (void *) b->buf.start;
+ ws_impl->ws.payload_len = nxt_websocket_frame_payload_len(
+ ws_impl->ws.header);
+
+ hsize = nxt_websocket_frame_header_size(ws_impl->ws.header);
+
+ if (ws_impl->ws.header->mask) {
+ ws_impl->ws.mask = (uint8_t *) b->buf.start + hsize - 4;
+
+ } else {
+ ws_impl->ws.mask = NULL;
+ }
+
+ b->buf.free += hsize;
+
+ ws_impl->ws.content_buf = &b->buf;
+ ws_impl->ws.content_length = ws_impl->ws.payload_len;
+
+ nxt_unit_req_debug(req, "websocket_handler: opcode=%d, "
+ "payload_len=%"PRIu64,
+ ws_impl->ws.header->opcode,
+ ws_impl->ws.payload_len);
+
+ cb->websocket_handler(&ws_impl->ws);
}
- return rc;
+ if (recv_msg->last) {
+ req_impl->websocket = 0;
+
+ if (cb->close_handler) {
+ nxt_unit_req_debug(req, "close_handler");
+
+ cb->close_handler(req);
+
+ } else {
+ nxt_unit_request_done(req, NXT_UNIT_ERROR);
+ }
+ }
+
+ return NXT_UNIT_OK;
}
@@ -815,9 +1059,7 @@ nxt_unit_request_info_get(nxt_unit_ctx_t *ctx)
static void
nxt_unit_request_info_release(nxt_unit_request_info_t *req)
{
- nxt_unit_mmap_buf_t *b;
nxt_unit_ctx_impl_t *ctx_impl;
- nxt_unit_recv_msg_t *recv_msg;
nxt_unit_request_info_impl_t *req_impl;
ctx_impl = nxt_container_of(req->ctx, nxt_unit_ctx_impl_t, ctx);
@@ -826,30 +1068,31 @@ nxt_unit_request_info_release(nxt_unit_request_info_t *req)
req->response = NULL;
req->response_buf = NULL;
- recv_msg = &req_impl->recv_msg;
+ if (req_impl->process != NULL) {
+ nxt_unit_process_use(req->ctx, req_impl->process, -1);
- if (recv_msg->process != NULL) {
- nxt_unit_process_use(req->ctx, recv_msg->process, -1);
-
- recv_msg->process = NULL;
+ req_impl->process = NULL;
}
- nxt_queue_each(b, &req_impl->outgoing_buf, nxt_unit_mmap_buf_t, link) {
-
- nxt_unit_buf_free(&b->buf);
+ if (req_impl->websocket) {
+ nxt_unit_request_hash_find(&ctx_impl->requests, req_impl->stream, 1);
- } nxt_queue_loop;
-
- nxt_queue_each(b, &req_impl->incoming_buf, nxt_unit_mmap_buf_t, link) {
+ req_impl->websocket = 0;
+ }
- nxt_unit_mmap_release(b->hdr, b->buf.start, b->buf.end - b->buf.start);
- nxt_unit_mmap_buf_release(b);
+ while (req_impl->outgoing_buf != NULL) {
+ nxt_unit_mmap_buf_free(req_impl->outgoing_buf);
+ }
- } nxt_queue_loop;
+ while (req_impl->incoming_buf != NULL) {
+ nxt_unit_mmap_buf_free(req_impl->incoming_buf);
+ }
nxt_queue_remove(&req_impl->link);
nxt_queue_insert_tail(&ctx_impl->free_req, &req_impl->link);
+
+ req_impl->state = NXT_UNIT_RS_RELEASED;
}
@@ -868,6 +1111,68 @@ nxt_unit_request_info_free(nxt_unit_request_info_impl_t *req_impl)
}
+static nxt_unit_websocket_frame_impl_t *
+nxt_unit_websocket_frame_get(nxt_unit_ctx_t *ctx)
+{
+ nxt_queue_link_t *lnk;
+ nxt_unit_ctx_impl_t *ctx_impl;
+ nxt_unit_websocket_frame_impl_t *ws_impl;
+
+ ctx_impl = nxt_container_of(ctx, nxt_unit_ctx_impl_t, ctx);
+
+ if (nxt_queue_is_empty(&ctx_impl->free_ws)) {
+ ws_impl = malloc(sizeof(nxt_unit_websocket_frame_impl_t));
+ if (nxt_slow_path(ws_impl == NULL)) {
+ nxt_unit_warn(ctx, "websocket frame allocation failed");
+
+ return NULL;
+ }
+
+ } else {
+ lnk = nxt_queue_first(&ctx_impl->free_ws);
+ nxt_queue_remove(lnk);
+
+ ws_impl = nxt_container_of(lnk, nxt_unit_websocket_frame_impl_t, link);
+ }
+
+ ws_impl->ctx_impl = ctx_impl;
+
+ return ws_impl;
+}
+
+
+static void
+nxt_unit_websocket_frame_release(nxt_unit_websocket_frame_t *ws)
+{
+ nxt_unit_websocket_frame_impl_t *ws_impl;
+
+ ws_impl = nxt_container_of(ws, nxt_unit_websocket_frame_impl_t, ws);
+
+ while (ws_impl->buf != NULL) {
+ nxt_unit_mmap_buf_free(ws_impl->buf);
+ }
+
+ ws->req = NULL;
+
+ if (ws_impl->retain_buf != NULL) {
+ free(ws_impl->retain_buf);
+
+ ws_impl->retain_buf = NULL;
+ }
+
+ nxt_queue_insert_tail(&ws_impl->ctx_impl->free_ws, &ws_impl->link);
+}
+
+
+static void
+nxt_unit_websocket_frame_free(nxt_unit_websocket_frame_impl_t *ws_impl)
+{
+ nxt_queue_remove(&ws_impl->link);
+
+ free(ws_impl);
+}
+
+
uint16_t
nxt_unit_field_hash(const char *name, size_t name_length)
{
@@ -1275,6 +1580,10 @@ nxt_unit_response_send(nxt_unit_request_info_t *req)
return NXT_UNIT_ERROR;
}
+ if (req->request->websocket_handshake && req->response->status == 101) {
+ nxt_unit_response_upgrade(req);
+ }
+
nxt_unit_req_debug(req, "send: %"PRIu32" fields, %d bytes",
req->response->fields_count,
(int) (req->response_buf->free
@@ -1282,9 +1591,7 @@ nxt_unit_response_send(nxt_unit_request_info_t *req)
mmap_buf = nxt_container_of(req->response_buf, nxt_unit_mmap_buf_t, buf);
- rc = nxt_unit_mmap_buf_send(req->ctx,
- req_impl->recv_msg.port_msg.stream,
- mmap_buf, 0);
+ rc = nxt_unit_mmap_buf_send(req->ctx, req_impl->stream, mmap_buf, 0);
if (nxt_fast_path(rc == NXT_UNIT_OK)) {
req->response = NULL;
req->response_buf = NULL;
@@ -1312,7 +1619,6 @@ nxt_unit_buf_t *
nxt_unit_response_buf_alloc(nxt_unit_request_info_t *req, uint32_t size)
{
int rc;
- nxt_unit_process_t *process;
nxt_unit_mmap_buf_t *mmap_buf;
nxt_unit_request_info_impl_t *req_impl;
@@ -1327,11 +1633,6 @@ nxt_unit_response_buf_alloc(nxt_unit_request_info_t *req, uint32_t size)
req_impl = nxt_container_of(req, nxt_unit_request_info_impl_t, req);
- process = nxt_unit_msg_get_process(req->ctx, &req_impl->recv_msg);
- if (nxt_slow_path(process == NULL)) {
- return NULL;
- }
-
mmap_buf = nxt_unit_mmap_buf_get(req->ctx);
if (nxt_slow_path(mmap_buf == NULL)) {
return NULL;
@@ -1339,10 +1640,10 @@ nxt_unit_response_buf_alloc(nxt_unit_request_info_t *req, uint32_t size)
mmap_buf->req = req;
- nxt_queue_insert_tail(&req_impl->outgoing_buf, &mmap_buf->link);
+ nxt_unit_mmap_buf_insert_tail(&req_impl->outgoing_buf, mmap_buf);
- rc = nxt_unit_get_outgoing_buf(req->ctx, process, &req->response_port,
- size, mmap_buf);
+ rc = nxt_unit_get_outgoing_buf(req->ctx, req_impl->process,
+ &req->response_port, size, mmap_buf);
if (nxt_slow_path(rc != NXT_UNIT_OK)) {
nxt_unit_mmap_buf_release(mmap_buf);
@@ -1366,13 +1667,13 @@ nxt_unit_msg_get_process(nxt_unit_ctx_t *ctx, nxt_unit_recv_msg_t *recv_msg)
pthread_mutex_lock(&lib->mutex);
- recv_msg->process = nxt_unit_process_find(ctx, recv_msg->port_msg.pid, 0);
+ recv_msg->process = nxt_unit_process_find(ctx, recv_msg->pid, 0);
pthread_mutex_unlock(&lib->mutex);
if (recv_msg->process == NULL) {
nxt_unit_warn(ctx, "#%"PRIu32": process %d not found",
- recv_msg->port_msg.stream, (int) recv_msg->port_msg.pid);
+ recv_msg->stream, (int) recv_msg->pid);
}
return recv_msg->process;
@@ -1382,23 +1683,21 @@ nxt_unit_msg_get_process(nxt_unit_ctx_t *ctx, nxt_unit_recv_msg_t *recv_msg)
static nxt_unit_mmap_buf_t *
nxt_unit_mmap_buf_get(nxt_unit_ctx_t *ctx)
{
- nxt_queue_link_t *lnk;
nxt_unit_mmap_buf_t *mmap_buf;
nxt_unit_ctx_impl_t *ctx_impl;
ctx_impl = nxt_container_of(ctx, nxt_unit_ctx_impl_t, ctx);
- if (nxt_queue_is_empty(&ctx_impl->free_buf)) {
+ if (ctx_impl->free_buf == NULL) {
mmap_buf = malloc(sizeof(nxt_unit_mmap_buf_t));
if (nxt_slow_path(mmap_buf == NULL)) {
nxt_unit_warn(ctx, "failed to allocate buf");
}
} else {
- lnk = nxt_queue_first(&ctx_impl->free_buf);
- nxt_queue_remove(lnk);
+ mmap_buf = ctx_impl->free_buf;
- mmap_buf = nxt_container_of(lnk, nxt_unit_mmap_buf_t, link);
+ nxt_unit_mmap_buf_remove(mmap_buf);
}
mmap_buf->ctx_impl = ctx_impl;
@@ -1410,9 +1709,91 @@ nxt_unit_mmap_buf_get(nxt_unit_ctx_t *ctx)
static void
nxt_unit_mmap_buf_release(nxt_unit_mmap_buf_t *mmap_buf)
{
- nxt_queue_remove(&mmap_buf->link);
+ nxt_unit_mmap_buf_remove(mmap_buf);
+
+ nxt_unit_mmap_buf_insert(&mmap_buf->ctx_impl->free_buf, mmap_buf);
+}
+
+
+typedef struct {
+ size_t len;
+ const char *str;
+} nxt_unit_str_t;
+
+
+#define nxt_unit_str(str) { nxt_length(str), str }
+
+
+int
+nxt_unit_request_is_websocket_handshake(nxt_unit_request_info_t *req)
+{
+ return req->request->websocket_handshake;
+}
+
+
+int
+nxt_unit_response_upgrade(nxt_unit_request_info_t *req)
+{
+ int rc;
+ nxt_unit_ctx_impl_t *ctx_impl;
+ nxt_unit_request_info_impl_t *req_impl;
+
+ req_impl = nxt_container_of(req, nxt_unit_request_info_impl_t, req);
+
+ if (nxt_slow_path(req_impl->websocket != 0)) {
+ nxt_unit_req_debug(req, "upgrade: already upgraded");
+
+ return NXT_UNIT_OK;
+ }
+
+ if (nxt_slow_path(req_impl->state < NXT_UNIT_RS_RESPONSE_INIT)) {
+ nxt_unit_req_warn(req, "upgrade: response is not initialized yet");
+
+ return NXT_UNIT_ERROR;
+ }
+
+ if (nxt_slow_path(req_impl->state >= NXT_UNIT_RS_RESPONSE_SENT)) {
+ nxt_unit_req_warn(req, "upgrade: response already sent");
+
+ return NXT_UNIT_ERROR;
+ }
+
+ ctx_impl = nxt_container_of(req->ctx, nxt_unit_ctx_impl_t, ctx);
+
+ rc = nxt_unit_request_hash_add(&ctx_impl->requests, req_impl);
+ if (nxt_slow_path(rc != NXT_UNIT_OK)) {
+ nxt_unit_req_warn(req, "upgrade: failed to add request to hash");
+
+ return NXT_UNIT_ERROR;
+ }
+
+ req_impl->websocket = 1;
+
+ req->response->status = 101;
+
+ return NXT_UNIT_OK;
+}
+
+
+int
+nxt_unit_response_is_websocket(nxt_unit_request_info_t *req)
+{
+ nxt_unit_request_info_impl_t *req_impl;
- nxt_queue_insert_tail(&mmap_buf->ctx_impl->free_buf, &mmap_buf->link);
+ req_impl = nxt_container_of(req, nxt_unit_request_info_impl_t, req);
+
+ return req_impl->websocket;
+}
+
+
+nxt_unit_request_info_t *
+nxt_unit_get_request_info_from_data(void *data)
+{
+ nxt_unit_request_info_impl_t *req_impl;
+
+ req_impl = nxt_container_of(data, nxt_unit_request_info_impl_t, extra_data);
+
+ return &req_impl->req;
}
@@ -1445,9 +1826,7 @@ nxt_unit_buf_send(nxt_unit_buf_t *buf)
}
if (nxt_fast_path(buf->free > buf->start)) {
- rc = nxt_unit_mmap_buf_send(req->ctx,
- req_impl->recv_msg.port_msg.stream,
- mmap_buf, 0);
+ rc = nxt_unit_mmap_buf_send(req->ctx, req_impl->stream, mmap_buf, 0);
if (nxt_slow_path(rc != NXT_UNIT_OK)) {
return rc;
}
@@ -1472,10 +1851,7 @@ nxt_unit_buf_send_done(nxt_unit_buf_t *buf)
req = mmap_buf->req;
req_impl = nxt_container_of(req, nxt_unit_request_info_impl_t, req);
- rc = nxt_unit_mmap_buf_send(req->ctx,
- req_impl->recv_msg.port_msg.stream,
- mmap_buf, 1);
-
+ rc = nxt_unit_mmap_buf_send(req->ctx, req_impl->stream, mmap_buf, 1);
if (nxt_slow_path(rc == NXT_UNIT_OK)) {
nxt_unit_mmap_buf_release(mmap_buf);
@@ -1506,6 +1882,7 @@ nxt_unit_mmap_buf_send(nxt_unit_ctx_t *ctx, uint32_t stream,
lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit);
buf = &mmap_buf->buf;
+ hdr = mmap_buf->hdr;
m.mmap_msg.size = buf->free - buf->start;
@@ -1514,15 +1891,15 @@ nxt_unit_mmap_buf_send(nxt_unit_ctx_t *ctx, uint32_t stream,
m.msg.reply_port = 0;
m.msg.type = _NXT_PORT_MSG_DATA;
m.msg.last = last != 0;
- m.msg.mmap = m.mmap_msg.size > 0;
+ m.msg.mmap = hdr != NULL && m.mmap_msg.size > 0;
m.msg.nf = 0;
m.msg.mf = 0;
m.msg.tracking = 0;
- hdr = mmap_buf->hdr;
-
- m.mmap_msg.mmap_id = hdr->id;
- m.mmap_msg.chunk_id = nxt_port_mmap_chunk_id(hdr, (u_char *) buf->start);
+ if (hdr != NULL) {
+ m.mmap_msg.mmap_id = hdr->id;
+ m.mmap_msg.chunk_id = nxt_port_mmap_chunk_id(hdr, (u_char *) buf->start);
+ }
nxt_unit_debug(ctx, "#%"PRIu32": send mmap: (%d,%d,%d)",
stream,
@@ -1531,14 +1908,13 @@ nxt_unit_mmap_buf_send(nxt_unit_ctx_t *ctx, uint32_t stream,
(int) m.mmap_msg.size);
res = lib->callbacks.port_send(ctx, &mmap_buf->port_id, &m,
- m.mmap_msg.size > 0 ? sizeof(m)
- : sizeof(m.msg),
+ m.msg.mmap ? sizeof(m) : sizeof(m.msg),
NULL, 0);
if (nxt_slow_path(res != sizeof(m))) {
return NXT_UNIT_ERROR;
}
- if (buf->end - buf->free >= PORT_MMAP_CHUNK_SIZE) {
+ if (buf->end - buf->free >= PORT_MMAP_CHUNK_SIZE && hdr != NULL) {
last_used = (u_char *) buf->free - 1;
first_free_chunk = nxt_port_mmap_chunk_id(hdr, last_used) + 1;
@@ -1557,11 +1933,17 @@ nxt_unit_mmap_buf_send(nxt_unit_ctx_t *ctx, uint32_t stream,
void
nxt_unit_buf_free(nxt_unit_buf_t *buf)
{
- nxt_unit_mmap_buf_t *mmap_buf;
+ nxt_unit_mmap_buf_free(nxt_container_of(buf, nxt_unit_mmap_buf_t, buf));
+}
- mmap_buf = nxt_container_of(buf, nxt_unit_mmap_buf_t, buf);
- nxt_unit_mmap_release(mmap_buf->hdr, buf->start, buf->end - buf->start);
+static void
+nxt_unit_mmap_buf_free(nxt_unit_mmap_buf_t *mmap_buf)
+{
+ if (nxt_fast_path(mmap_buf->hdr != NULL)) {
+ nxt_unit_mmap_release(mmap_buf->hdr, mmap_buf->buf.start,
+ mmap_buf->buf.end - mmap_buf->buf.start);
+ }
nxt_unit_mmap_buf_release(mmap_buf);
}
@@ -1570,26 +1952,15 @@ nxt_unit_buf_free(nxt_unit_buf_t *buf)
nxt_unit_buf_t *
nxt_unit_buf_next(nxt_unit_buf_t *buf)
{
- nxt_queue_link_t *lnk;
- nxt_unit_mmap_buf_t *mmap_buf;
- nxt_unit_request_info_impl_t *req_impl;
+ nxt_unit_mmap_buf_t *mmap_buf;
mmap_buf = nxt_container_of(buf, nxt_unit_mmap_buf_t, buf);
- req_impl = nxt_container_of(mmap_buf->req, nxt_unit_request_info_impl_t,
- req);
- lnk = &mmap_buf->link;
-
- if (lnk == nxt_queue_last(&req_impl->incoming_buf)
- || lnk == nxt_queue_last(&req_impl->outgoing_buf))
- {
+ if (mmap_buf->next == NULL) {
return NULL;
}
- lnk = nxt_queue_next(lnk);
- mmap_buf = nxt_container_of(lnk, nxt_unit_mmap_buf_t, link);
-
- return &mmap_buf->buf;
+ return &mmap_buf->next->buf;
}
@@ -1614,7 +1985,6 @@ nxt_unit_response_write(nxt_unit_request_info_t *req, const void *start,
int rc;
uint32_t part_size;
const char *part_start;
- nxt_unit_process_t *process;
nxt_unit_mmap_buf_t mmap_buf;
nxt_unit_request_info_impl_t *req_impl;
@@ -1641,16 +2011,12 @@ nxt_unit_response_write(nxt_unit_request_info_t *req, const void *start,
part_start += part_size;
}
- process = nxt_unit_msg_get_process(req->ctx, &req_impl->recv_msg);
- if (nxt_slow_path(process == NULL)) {
- return NXT_UNIT_ERROR;
- }
-
while (size > 0) {
part_size = nxt_min(size, PORT_MMAP_DATA_SIZE);
- rc = nxt_unit_get_outgoing_buf(req->ctx, process, &req->response_port,
- part_size, &mmap_buf);
+ rc = nxt_unit_get_outgoing_buf(req->ctx, req_impl->process,
+ &req->response_port, part_size,
+ &mmap_buf);
if (nxt_slow_path(rc != NXT_UNIT_OK)) {
return rc;
}
@@ -1658,9 +2024,7 @@ nxt_unit_response_write(nxt_unit_request_info_t *req, const void *start,
mmap_buf.buf.free = nxt_cpymem(mmap_buf.buf.free,
part_start, part_size);
- rc = nxt_unit_mmap_buf_send(req->ctx,
- req_impl->recv_msg.port_msg.stream,
- &mmap_buf, 0);
+ rc = nxt_unit_mmap_buf_send(req->ctx, req_impl->stream, &mmap_buf, 0);
if (nxt_slow_path(rc != NXT_UNIT_OK)) {
nxt_unit_mmap_release(mmap_buf.hdr, mmap_buf.buf.start,
mmap_buf.buf.end - mmap_buf.buf.start);
@@ -1766,6 +2130,14 @@ nxt_unit_response_write_cb(nxt_unit_request_info_t *req,
ssize_t
nxt_unit_request_read(nxt_unit_request_info_t *req, void *dst, size_t size)
{
+ return nxt_unit_buf_read(&req->content_buf, &req->content_length,
+ dst, size);
+}
+
+
+static ssize_t
+nxt_unit_buf_read(nxt_unit_buf_t **b, uint64_t *len, void *dst, size_t size)
+{
u_char *p;
size_t rest, copy, read;
nxt_unit_buf_t *buf;
@@ -1773,7 +2145,7 @@ nxt_unit_request_read(nxt_unit_request_info_t *req, void *dst, size_t size)
p = dst;
rest = size;
- buf = req->content_buf;
+ buf = *b;
while (buf != NULL) {
copy = buf->end - buf->free;
@@ -1795,11 +2167,11 @@ nxt_unit_request_read(nxt_unit_request_info_t *req, void *dst, size_t size)
buf = nxt_unit_buf_next(buf);
}
- req->content_buf = buf;
+ *b = buf;
read = size - rest;
- req->content_length -= read;
+ *len -= read;
return read;
}
@@ -1852,7 +2224,7 @@ skip_response_send:
lib = nxt_container_of(req->unit, nxt_unit_impl_t, unit);
- msg.stream = req_impl->recv_msg.port_msg.stream;
+ msg.stream = req_impl->stream;
msg.pid = lib->pid;
msg.reply_port = 0;
msg.type = (rc == NXT_UNIT_OK) ? _NXT_PORT_MSG_DATA
@@ -1874,6 +2246,162 @@ skip_response_send:
}
+int
+nxt_unit_websocket_send(nxt_unit_request_info_t *req, uint8_t opcode,
+ uint8_t last, const void *start, size_t size)
+{
+ const struct iovec iov = { (void *) start, size };
+
+ return nxt_unit_websocket_sendv(req, opcode, last, &iov, 1);
+}
+
+
+int
+nxt_unit_websocket_sendv(nxt_unit_request_info_t *req, uint8_t opcode,
+ uint8_t last, const struct iovec *iov, int iovcnt)
+{
+ int i, rc;
+ size_t l, copy;
+ uint32_t payload_len, buf_size;
+ const uint8_t *b;
+ nxt_unit_buf_t *buf;
+ nxt_websocket_header_t *wh;
+
+ payload_len = 0;
+
+ for (i = 0; i < iovcnt; i++) {
+ payload_len += iov[i].iov_len;
+ }
+
+ buf_size = 10 + payload_len;
+
+ buf = nxt_unit_response_buf_alloc(req, nxt_min(buf_size,
+ PORT_MMAP_DATA_SIZE));
+ if (nxt_slow_path(buf == NULL)) {
+ nxt_unit_req_error(req, "Failed to allocate buf for content");
+
+ return NXT_UNIT_ERROR;
+ }
+
+ buf->start[0] = 0;
+ buf->start[1] = 0;
+
+ wh = (void *) buf->free;
+
+ buf->free = nxt_websocket_frame_init(wh, payload_len);
+ wh->fin = last;
+ wh->opcode = opcode;
+
+ for (i = 0; i < iovcnt; i++) {
+ b = iov[i].iov_base;
+ l = iov[i].iov_len;
+
+ while (l > 0) {
+ copy = buf->end - buf->free;
+ copy = nxt_min(l, copy);
+
+ buf->free = nxt_cpymem(buf->free, b, copy);
+ b += copy;
+ l -= copy;
+
+ if (l > 0) {
+ buf_size -= buf->end - buf->start;
+
+ rc = nxt_unit_buf_send(buf);
+ if (nxt_slow_path(rc != NXT_UNIT_OK)) {
+ nxt_unit_req_error(req, "Failed to send content");
+
+ return NXT_UNIT_ERROR;
+ }
+
+ buf = nxt_unit_response_buf_alloc(req, nxt_min(buf_size,
+ PORT_MMAP_DATA_SIZE));
+ if (nxt_slow_path(buf == NULL)) {
+ nxt_unit_req_error(req,
+ "Failed to allocate buf for content");
+
+ return NXT_UNIT_ERROR;
+ }
+ }
+ }
+ }
+
+ if (buf->free > buf->start) {
+ rc = nxt_unit_buf_send(buf);
+ if (nxt_slow_path(rc != NXT_UNIT_OK)) {
+ nxt_unit_req_error(req, "Failed to send content");
+ }
+ }
+
+ return rc;
+}
+
+
+ssize_t
+nxt_unit_websocket_read(nxt_unit_websocket_frame_t *ws, void *dst,
+ size_t size)
+{
+ ssize_t res;
+ uint8_t *b;
+ uint64_t i, d;
+
+ res = nxt_unit_buf_read(&ws->content_buf, &ws->content_length,
+ dst, size);
+
+ if (ws->mask == NULL) {
+ return res;
+ }
+
+ b = dst;
+ d = (ws->payload_len - ws->content_length - res) % 4;
+
+ for (i = 0; i < (uint64_t) res; i++) {
+ b[i] ^= ws->mask[ (i + d) % 4 ];
+ }
+
+ return res;
+}
+
+
+int
+nxt_unit_websocket_retain(nxt_unit_websocket_frame_t *ws)
+{
+ char *b;
+ size_t size;
+ nxt_unit_websocket_frame_impl_t *ws_impl;
+
+ ws_impl = nxt_container_of(ws, nxt_unit_websocket_frame_impl_t, ws);
+
+ if (ws_impl->retain_buf != NULL || ws_impl->buf->hdr != NULL) {
+ return NXT_UNIT_OK;
+ }
+
+ size = ws_impl->buf->buf.end - ws_impl->buf->buf.start;
+
+ b = malloc(size);
+ if (nxt_slow_path(b == NULL)) {
+ return NXT_UNIT_ERROR;
+ }
+
+ memcpy(b, ws_impl->buf->buf.start, size);
+
+ ws_impl->buf->buf.start = b;
+ ws_impl->buf->buf.free = b;
+ ws_impl->buf->buf.end = b + size;
+
+ ws_impl->retain_buf = b;
+
+ return NXT_UNIT_OK;
+}
+
+
+void
+nxt_unit_websocket_done(nxt_unit_websocket_frame_t *ws)
+{
+ nxt_unit_websocket_frame_release(ws);
+}
+
+
static nxt_port_mmap_header_t *
nxt_unit_mmap_get(nxt_unit_ctx_t *ctx, nxt_unit_process_t *process,
nxt_unit_port_id_t *port_id, nxt_chunk_id_t *c, int n)
@@ -2355,7 +2883,7 @@ nxt_unit_tracking_read(nxt_unit_ctx_t *ctx, nxt_unit_recv_msg_t *recv_msg)
if (recv_msg->size < (int) sizeof(nxt_port_mmap_tracking_msg_t)) {
nxt_unit_warn(ctx, "#%"PRIu32": tracking_read: too small message (%d)",
- recv_msg->port_msg.stream, (int) recv_msg->size);
+ recv_msg->stream, (int) recv_msg->size);
return 0;
}
@@ -2378,18 +2906,18 @@ nxt_unit_tracking_read(nxt_unit_ctx_t *ctx, nxt_unit_recv_msg_t *recv_msg)
nxt_unit_warn(ctx, "#%"PRIu32": tracking_read: "
"invalid mmap id %d,%"PRIu32,
- recv_msg->port_msg.stream,
- (int) process->pid, tracking_msg->mmap_id);
+ recv_msg->stream, (int) process->pid,
+ tracking_msg->mmap_id);
return 0;
}
c = tracking_msg->tracking_id;
- rc = nxt_atomic_cmp_set(hdr->tracking + c, recv_msg->port_msg.stream, 0);
+ rc = nxt_atomic_cmp_set(hdr->tracking + c, recv_msg->stream, 0);
if (rc == 0) {
nxt_unit_debug(ctx, "#%"PRIu32": tracking cancelled",
- recv_msg->port_msg.stream);
+ recv_msg->stream);
nxt_port_mmap_set_chunk_free(hdr->free_tracking_map, c);
}
@@ -2401,19 +2929,18 @@ nxt_unit_tracking_read(nxt_unit_ctx_t *ctx, nxt_unit_recv_msg_t *recv_msg)
static int
-nxt_unit_mmap_read(nxt_unit_ctx_t *ctx, nxt_unit_recv_msg_t *recv_msg,
- nxt_queue_t *incoming_buf)
+nxt_unit_mmap_read(nxt_unit_ctx_t *ctx, nxt_unit_recv_msg_t *recv_msg)
{
void *start;
uint32_t size;
nxt_unit_process_t *process;
- nxt_unit_mmap_buf_t *b;
+ nxt_unit_mmap_buf_t *b, **incoming_tail;
nxt_port_mmap_msg_t *mmap_msg, *end;
nxt_port_mmap_header_t *hdr;
if (nxt_slow_path(recv_msg->size < sizeof(nxt_port_mmap_msg_t))) {
nxt_unit_warn(ctx, "#%"PRIu32": mmap_read: too small message (%d)",
- recv_msg->port_msg.stream, (int) recv_msg->size);
+ recv_msg->stream, (int) recv_msg->size);
return NXT_UNIT_ERROR;
}
@@ -2426,6 +2953,8 @@ nxt_unit_mmap_read(nxt_unit_ctx_t *ctx, nxt_unit_recv_msg_t *recv_msg,
mmap_msg = recv_msg->start;
end = nxt_pointer_to(recv_msg->start, recv_msg->size);
+ incoming_tail = &recv_msg->incoming_buf;
+
pthread_mutex_lock(&process->incoming.mutex);
for (; mmap_msg < end; mmap_msg++) {
@@ -2435,8 +2964,8 @@ nxt_unit_mmap_read(nxt_unit_ctx_t *ctx, nxt_unit_recv_msg_t *recv_msg,
nxt_unit_warn(ctx, "#%"PRIu32": mmap_read: "
"invalid mmap id %d,%"PRIu32,
- recv_msg->port_msg.stream,
- (int) process->pid, mmap_msg->mmap_id);
+ recv_msg->stream, (int) process->pid,
+ mmap_msg->mmap_id);
return NXT_UNIT_ERROR;
}
@@ -2453,16 +2982,16 @@ nxt_unit_mmap_read(nxt_unit_ctx_t *ctx, nxt_unit_recv_msg_t *recv_msg,
if (nxt_slow_path(b == NULL)) {
pthread_mutex_unlock(&process->incoming.mutex);
- nxt_unit_warn(ctx, "#%"PRIu32": mmap_read: "
- "failed to allocate buf",
- recv_msg->port_msg.stream);
+ nxt_unit_warn(ctx, "#%"PRIu32": mmap_read: failed to allocate buf",
+ recv_msg->stream);
nxt_unit_mmap_release(hdr, start, size);
return NXT_UNIT_ERROR;
}
- nxt_queue_insert_tail(incoming_buf, &b->link);
+ nxt_unit_mmap_buf_insert(incoming_tail, b);
+ incoming_tail = &b->next;
b->buf.start = start;
b->buf.free = start;
@@ -2470,7 +2999,7 @@ nxt_unit_mmap_read(nxt_unit_ctx_t *ctx, nxt_unit_recv_msg_t *recv_msg,
b->hdr = hdr;
nxt_unit_debug(ctx, "#%"PRIu32": mmap_read: [%p,%d] %d->%d,(%d,%d,%d)",
- recv_msg->port_msg.stream,
+ recv_msg->stream,
start, (int) size,
(int) hdr->src_pid, (int) hdr->dst_pid,
(int) hdr->id, (int) mmap_msg->chunk_id,
@@ -2685,6 +3214,11 @@ nxt_unit_run_once(nxt_unit_ctx_t *ctx)
if (nxt_fast_path(rsize > 0)) {
rc = nxt_unit_process_msg(ctx, &ctx_impl->read_port_id, buf, rsize,
oob, sizeof(oob));
+
+#if (NXT_DEBUG)
+ memset(buf, 0xAC, rsize);
+#endif
+
} else {
rc = NXT_UNIT_ERROR;
}
@@ -2775,10 +3309,11 @@ nxt_unit_ctx_alloc(nxt_unit_ctx_t *ctx, void *data)
void
nxt_unit_ctx_free(nxt_unit_ctx_t *ctx)
{
- nxt_unit_impl_t *lib;
- nxt_unit_ctx_impl_t *ctx_impl;
- nxt_unit_mmap_buf_t *mmap_buf;
- nxt_unit_request_info_impl_t *req_impl;
+ nxt_unit_impl_t *lib;
+ nxt_unit_ctx_impl_t *ctx_impl;
+ nxt_unit_mmap_buf_t *mmap_buf;
+ nxt_unit_request_info_impl_t *req_impl;
+ nxt_unit_websocket_frame_impl_t *ws_impl;
ctx_impl = nxt_container_of(ctx, nxt_unit_ctx_impl_t, ctx);
lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit);
@@ -2792,15 +3327,14 @@ nxt_unit_ctx_free(nxt_unit_ctx_t *ctx)
} nxt_queue_loop;
- nxt_queue_remove(&ctx_impl->ctx_buf[0].link);
- nxt_queue_remove(&ctx_impl->ctx_buf[1].link);
-
- nxt_queue_each(mmap_buf, &ctx_impl->free_buf, nxt_unit_mmap_buf_t, link) {
+ nxt_unit_mmap_buf_remove(&ctx_impl->ctx_buf[0]);
+ nxt_unit_mmap_buf_remove(&ctx_impl->ctx_buf[1]);
- nxt_queue_remove(&mmap_buf->link);
+ while (ctx_impl->free_buf != NULL) {
+ mmap_buf = ctx_impl->free_buf;
+ nxt_unit_mmap_buf_remove(mmap_buf);
free(mmap_buf);
-
- } nxt_queue_loop;
+ }
nxt_queue_each(req_impl, &ctx_impl->free_req,
nxt_unit_request_info_impl_t, link)
@@ -2809,6 +3343,13 @@ nxt_unit_ctx_free(nxt_unit_ctx_t *ctx)
} nxt_queue_loop;
+ nxt_queue_each(ws_impl, &ctx_impl->free_ws,
+ nxt_unit_websocket_frame_impl_t, link)
+ {
+ nxt_unit_websocket_frame_free(ws_impl);
+
+ } nxt_queue_loop;
+
nxt_queue_remove(&ctx_impl->link);
if (ctx_impl != &lib->main_ctx) {
@@ -3454,6 +3995,83 @@ nxt_unit_port_hash_find(nxt_lvlhsh_t *port_hash, nxt_unit_port_id_t *port_id,
}
+static nxt_int_t
+nxt_unit_request_hash_test(nxt_lvlhsh_query_t *lhq, void *data)
+{
+ return NXT_OK;
+}
+
+
+static const nxt_lvlhsh_proto_t lvlhsh_requests_proto nxt_aligned(64) = {
+ NXT_LVLHSH_DEFAULT,
+ nxt_unit_request_hash_test,
+ nxt_lvlhsh_alloc,
+ nxt_lvlhsh_free,
+};
+
+
+static int
+nxt_unit_request_hash_add(nxt_lvlhsh_t *request_hash,
+ nxt_unit_request_info_impl_t *req_impl)
+{
+ uint32_t *stream;
+ nxt_int_t res;
+ nxt_lvlhsh_query_t lhq;
+
+ stream = &req_impl->stream;
+
+ lhq.key_hash = nxt_murmur_hash2(stream, sizeof(*stream));
+ lhq.key.length = sizeof(*stream);
+ lhq.key.start = (u_char *) stream;
+ lhq.proto = &lvlhsh_requests_proto;
+ lhq.pool = NULL;
+ lhq.replace = 0;
+ lhq.value = req_impl;
+
+ res = nxt_lvlhsh_insert(request_hash, &lhq);
+
+ switch (res) {
+
+ case NXT_OK:
+ return NXT_UNIT_OK;
+
+ default:
+ return NXT_UNIT_ERROR;
+ }
+}
+
+
+static nxt_unit_request_info_impl_t *
+nxt_unit_request_hash_find(nxt_lvlhsh_t *request_hash, uint32_t stream,
+ int remove)
+{
+ nxt_int_t res;
+ nxt_lvlhsh_query_t lhq;
+
+ lhq.key_hash = nxt_murmur_hash2(&stream, sizeof(stream));
+ lhq.key.length = sizeof(stream);
+ lhq.key.start = (u_char *) &stream;
+ lhq.proto = &lvlhsh_requests_proto;
+ lhq.pool = NULL;
+
+ if (remove) {
+ res = nxt_lvlhsh_delete(request_hash, &lhq);
+
+ } else {
+ res = nxt_lvlhsh_find(request_hash, &lhq);
+ }
+
+ switch (res) {
+
+ case NXT_OK:
+ return lhq.value;
+
+ default:
+ return NULL;
+ }
+}
+
+
void
nxt_unit_log(nxt_unit_ctx_t *ctx, int level, const char *fmt, ...)
{
@@ -3526,8 +4144,7 @@ nxt_unit_req_log(nxt_unit_request_info_t *req, int level, const char *fmt, ...)
if (nxt_fast_path(req != NULL)) {
req_impl = nxt_container_of(req, nxt_unit_request_info_impl_t, req);
- p += snprintf(p, end - p,
- "#%"PRIu32": ", req_impl->recv_msg.port_msg.stream);
+ p += snprintf(p, end - p, "#%"PRIu32": ", req_impl->stream);
}
va_start(ap, fmt);