From e501c74ddceab86e48c031ca9b5e154f52dcdae0 Mon Sep 17 00:00:00 2001 From: Max Romanov Date: Tue, 20 Aug 2019 16:31:53 +0300 Subject: Introducing websocket support in router and libunit. --- src/nxt_http_websocket.c | 161 +++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 161 insertions(+) create mode 100644 src/nxt_http_websocket.c (limited to 'src/nxt_http_websocket.c') diff --git a/src/nxt_http_websocket.c b/src/nxt_http_websocket.c new file mode 100644 index 00000000..d58d615c --- /dev/null +++ b/src/nxt_http_websocket.c @@ -0,0 +1,161 @@ + +/* + * Copyright (C) NGINX, Inc. + */ + +#include +#include +#include +#include +#include +#include +#include + + +static void nxt_http_websocket_client(nxt_task_t *task, void *obj, void *data); +static void nxt_http_websocket_error_handler(nxt_task_t *task, void *obj, + void *data); + + +const nxt_http_request_state_t nxt_http_websocket + nxt_aligned(64) = +{ + .ready_handler = nxt_http_websocket_client, + .error_handler = nxt_http_websocket_error_handler, +}; + + +static void +nxt_http_websocket_client(nxt_task_t *task, void *obj, void *data) +{ + size_t frame_size, used_size, copy_size, buf_free_size; + size_t chunk_copy_size; + nxt_buf_t *out, *buf, **out_tail, *b, *next; + nxt_int_t res; + nxt_http_request_t *r; + nxt_request_app_link_t *req_app_link; + nxt_request_rpc_data_t *req_rpc_data; + nxt_websocket_header_t *wsh; + + r = obj; + + if (nxt_slow_path((req_rpc_data = r->req_rpc_data) == NULL + || (req_app_link = req_rpc_data->req_app_link) == NULL)) + { + nxt_debug(task, "websocket client frame for destroyed request"); + + return; + } + + nxt_debug(task, "http websocket client frame"); + + wsh = (nxt_websocket_header_t *) r->ws_frame->mem.pos; + + frame_size = nxt_websocket_frame_header_size(wsh) + + nxt_websocket_frame_payload_len(wsh); + + buf = NULL; + buf_free_size = 0; + out = NULL; + out_tail = &out; + + b = r->ws_frame; + + while (b != NULL && frame_size > 0) { + used_size = nxt_buf_mem_used_size(&b->mem); + copy_size = nxt_min(used_size, frame_size); + + while (copy_size > 0) { + if (buf == NULL || buf_free_size == 0) { + buf_free_size = nxt_min(frame_size, PORT_MMAP_DATA_SIZE); + + buf = nxt_port_mmap_get_buf(task, req_app_link->app_port, + buf_free_size); + + *out_tail = buf; + out_tail = &buf->next; + } + + chunk_copy_size = nxt_min(buf_free_size, copy_size); + + buf->mem.free = nxt_cpymem(buf->mem.free, b->mem.pos, + chunk_copy_size); + + copy_size -= chunk_copy_size; + b->mem.pos += chunk_copy_size; + buf_free_size -= chunk_copy_size; + } + + frame_size -= copy_size; + next = b->next; + + if (nxt_buf_mem_used_size(&b->mem) == 0) { + nxt_work_queue_add(&task->thread->engine->fast_work_queue, + b->completion_handler, task, b, b->parent); + + r->ws_frame = next; + } + + b = next; + } + + res = nxt_port_socket_twrite(task, req_app_link->app_port, + NXT_PORT_MSG_WEBSOCKET, -1, + req_app_link->stream, + req_app_link->reply_port->id, out, NULL); + if (nxt_slow_path(res != NXT_OK)) { + // TODO: handle + } + + b = r->ws_frame; + + if (b != NULL) { + used_size = nxt_buf_mem_used_size(&b->mem); + + if (used_size > 0) { + nxt_memmove(b->mem.start, b->mem.pos, used_size); + + b->mem.pos = b->mem.start; + b->mem.free = b->mem.start + used_size; + } + } + + nxt_http_request_ws_frame_start(task, r, r->ws_frame); +} + + +static void +nxt_http_websocket_error_handler(nxt_task_t *task, void *obj, void *data) +{ + nxt_http_request_t *r; + nxt_request_app_link_t *req_app_link; + nxt_request_rpc_data_t *req_rpc_data; + + nxt_debug(task, "http websocket error handler"); + + r = obj; + + if ((req_rpc_data = r->req_rpc_data) == NULL) { + nxt_debug(task, " req_rpc_data is NULL"); + goto close_handler; + } + + if ((req_app_link = req_rpc_data->req_app_link) == NULL) { + nxt_debug(task, " req_app_link is NULL"); + goto close_handler; + } + + if (req_app_link->app_port == NULL) { + nxt_debug(task, " app_port is NULL"); + goto close_handler; + } + + (void) nxt_port_socket_twrite(task, req_app_link->app_port, + NXT_PORT_MSG_WEBSOCKET_LAST, + -1, req_app_link->stream, + req_app_link->reply_port->id, NULL, NULL); + +close_handler: + + nxt_http_request_close_handler(task, obj, data); +} -- cgit