diff options
author | Max Romanov <max.romanov@nginx.com> | 2019-08-20 16:31:53 +0300 |
---|---|---|
committer | Max Romanov <max.romanov@nginx.com> | 2019-08-20 16:31:53 +0300 |
commit | e501c74ddceab86e48c031ca9b5e154f52dcdae0 (patch) | |
tree | 7bfe94354df516d1ceefc5af3194ba943e443aa2 /src/nxt_h1proto_websocket.c | |
parent | 9bbf54e23e185e94054072fff2673f6f5cd203e9 (diff) | |
download | unit-e501c74ddceab86e48c031ca9b5e154f52dcdae0.tar.gz unit-e501c74ddceab86e48c031ca9b5e154f52dcdae0.tar.bz2 |
Introducing websocket support in router and libunit.
Diffstat (limited to 'src/nxt_h1proto_websocket.c')
-rw-r--r-- | src/nxt_h1proto_websocket.c | 719 |
1 files changed, 719 insertions, 0 deletions
diff --git a/src/nxt_h1proto_websocket.c b/src/nxt_h1proto_websocket.c new file mode 100644 index 00000000..dd9b6848 --- /dev/null +++ b/src/nxt_h1proto_websocket.c @@ -0,0 +1,719 @@ + +/* + * Copyright (C) NGINX, Inc. + */ + +#include <nxt_main.h> +#include <nxt_router.h> +#include <nxt_http.h> +#include <nxt_h1proto.h> +#include <nxt_websocket.h> +#include <nxt_websocket_header.h> + +typedef struct { + uint16_t code; + uint8_t args; + nxt_str_t desc; +} nxt_ws_error_t; + +static void nxt_h1p_conn_ws_keepalive(nxt_task_t *task, void *obj, void *data); +static void nxt_h1p_conn_ws_frame_header_read(nxt_task_t *task, void *obj, + void *data); +static void nxt_h1p_conn_ws_keepalive_disable(nxt_task_t *task, + nxt_h1proto_t *h1p); +static void nxt_h1p_conn_ws_keepalive_enable(nxt_task_t *task, + nxt_h1proto_t *h1p); +static void nxt_h1p_conn_ws_frame_process(nxt_task_t *task, nxt_conn_t *c, + nxt_h1proto_t *h1p, nxt_websocket_header_t *wsh); +static void nxt_h1p_conn_ws_error(nxt_task_t *task, void *obj, void *data); +static ssize_t nxt_h1p_ws_io_read_handler(nxt_conn_t *c); +static void nxt_h1p_conn_ws_timeout(nxt_task_t *task, void *obj, void *data); +static void nxt_h1p_conn_ws_frame_payload_read(nxt_task_t *task, void *obj, + void *data); +static void hxt_h1p_send_ws_error(nxt_task_t *task, nxt_http_request_t *r, + const nxt_ws_error_t *err, ...); +static void nxt_h1p_conn_ws_error_sent(nxt_task_t *task, void *obj, void *data); +static void nxt_h1p_conn_ws_pong(nxt_task_t *task, void *obj, void *data); + +static const nxt_conn_state_t nxt_h1p_read_ws_frame_header_state; +static const nxt_conn_state_t nxt_h1p_read_ws_frame_payload_state; + +static const nxt_ws_error_t nxt_ws_err_out_of_memory = { + NXT_WEBSOCKET_CR_INTERNAL_SERVER_ERROR, + 0, nxt_string("Out of memory") }; +static const nxt_ws_error_t nxt_ws_err_too_big = { + NXT_WEBSOCKET_CR_MESSAGE_TOO_BIG, + 1, nxt_string("Message too big: %uL bytes") }; +static const nxt_ws_error_t nxt_ws_err_invalid_close_code = { + NXT_WEBSOCKET_CR_PROTOCOL_ERROR, + 1, nxt_string("Close code %ud is not valid") }; +static const nxt_ws_error_t nxt_ws_err_going_away = { + NXT_WEBSOCKET_CR_GOING_AWAY, + 0, nxt_string("Remote peer is going away") }; +static const nxt_ws_error_t nxt_ws_err_not_masked = { + NXT_WEBSOCKET_CR_PROTOCOL_ERROR, + 0, nxt_string("Not masked client frame") }; +static const nxt_ws_error_t nxt_ws_err_ctrl_fragmented = { + NXT_WEBSOCKET_CR_PROTOCOL_ERROR, + 0, nxt_string("Fragmented control frame") }; +static const nxt_ws_error_t nxt_ws_err_ctrl_too_big = { + NXT_WEBSOCKET_CR_PROTOCOL_ERROR, + 1, nxt_string("Control frame too big: %uL bytes") }; +static const nxt_ws_error_t nxt_ws_err_invalid_close_len = { + NXT_WEBSOCKET_CR_PROTOCOL_ERROR, + 0, nxt_string("Close frame payload length cannot be 1") }; +static const nxt_ws_error_t nxt_ws_err_invalid_opcode = { + NXT_WEBSOCKET_CR_PROTOCOL_ERROR, + 1, nxt_string("Unrecognized opcode %ud") }; +static const nxt_ws_error_t nxt_ws_err_cont_expected = { + NXT_WEBSOCKET_CR_PROTOCOL_ERROR, + 1, nxt_string("Continuation expected, but %ud opcode received") }; + +void +nxt_h1p_websocket_first_frame_start(nxt_task_t *task, nxt_http_request_t *r, + nxt_buf_t *ws_frame) +{ + nxt_conn_t *c; + nxt_timer_t *timer; + nxt_h1proto_t *h1p; + nxt_socket_conf_joint_t *joint; + + nxt_debug(task, "h1p ws first frame start"); + + h1p = r->proto.h1; + c = h1p->conn; + + if (!c->tcp_nodelay) { + nxt_conn_tcp_nodelay_on(task, c); + } + + joint = c->listen->socket.data; + + if (nxt_slow_path(joint != NULL + && joint->socket_conf->websocket_conf.keepalive_interval != 0)) + { + h1p->websocket_timer = nxt_mp_zget(c->mem_pool, + sizeof(nxt_h1p_websocket_timer_t)); + if (nxt_slow_path(h1p->websocket_timer == NULL)) { + hxt_h1p_send_ws_error(task, r, &nxt_ws_err_out_of_memory); + return; + } + + h1p->websocket_timer->keepalive_interval = + joint->socket_conf->websocket_conf.keepalive_interval; + h1p->websocket_timer->h1p = h1p; + + timer = &h1p->websocket_timer->timer; + timer->task = &c->task; + timer->work_queue = &task->thread->engine->fast_work_queue; + timer->log = &c->log; + timer->bias = NXT_TIMER_DEFAULT_BIAS; + timer->handler = nxt_h1p_conn_ws_keepalive; + } + + nxt_h1p_websocket_frame_start(task, r, ws_frame); +} + + +void +nxt_h1p_websocket_frame_start(nxt_task_t *task, nxt_http_request_t *r, + nxt_buf_t *ws_frame) +{ + size_t size; + nxt_buf_t *in; + nxt_conn_t *c; + nxt_h1proto_t *h1p; + + nxt_debug(task, "h1p ws frame start"); + + h1p = r->proto.h1; + + if (nxt_slow_path(h1p->websocket_closed)) { + return; + } + + c = h1p->conn; + c->read = ws_frame; + + nxt_h1p_complete_buffers(task, h1p); + + in = c->read; + c->read_state = &nxt_h1p_read_ws_frame_header_state; + + if (in == NULL) { + nxt_conn_read(task->thread->engine, c); + nxt_h1p_conn_ws_keepalive_enable(task, h1p); + + } else { + size = nxt_buf_mem_used_size(&in->mem); + + nxt_debug(task, "h1p read client ws frame"); + + nxt_memmove(in->mem.start, in->mem.pos, size); + + in->mem.pos = in->mem.start; + in->mem.free = in->mem.start + size; + + nxt_h1p_conn_ws_frame_header_read(task, c, h1p); + } +} + + +static void +nxt_h1p_conn_ws_keepalive(nxt_task_t *task, void *obj, void *data) +{ + nxt_buf_t *out; + nxt_timer_t *timer; + nxt_h1proto_t *h1p; + nxt_http_request_t *r; + nxt_websocket_header_t *wsh; + nxt_h1p_websocket_timer_t *ws_timer; + + nxt_debug(task, "h1p conn ws keepalive"); + + timer = obj; + ws_timer = nxt_timer_data(timer, nxt_h1p_websocket_timer_t, timer); + h1p = ws_timer->h1p; + + r = h1p->request; + if (nxt_slow_path(r == NULL)) { + return; + } + + out = nxt_http_buf_mem(task, r, 2); + if (nxt_slow_path(out == NULL)) { + nxt_http_request_error_handler(task, r, r->proto.any); + return; + } + + out->mem.start[0] = 0; + out->mem.start[1] = 0; + + wsh = (nxt_websocket_header_t *) out->mem.start; + out->mem.free = nxt_websocket_frame_init(wsh, 0); + + wsh->fin = 1; + wsh->opcode = NXT_WEBSOCKET_OP_PING; + + nxt_http_request_send(task, r, out); +} + + +static const nxt_conn_state_t nxt_h1p_read_ws_frame_header_state + nxt_aligned(64) = +{ + .ready_handler = nxt_h1p_conn_ws_frame_header_read, + .close_handler = nxt_h1p_conn_ws_error, + .error_handler = nxt_h1p_conn_ws_error, + + .io_read_handler = nxt_h1p_ws_io_read_handler, + + .timer_handler = nxt_h1p_conn_ws_timeout, + .timer_value = nxt_h1p_conn_request_timer_value, + .timer_data = offsetof(nxt_socket_conf_t, websocket_conf.read_timeout), + .timer_autoreset = 1, +}; + + +static void +nxt_h1p_conn_ws_frame_header_read(nxt_task_t *task, void *obj, void *data) +{ + size_t size, hsize, frame_size, max_frame_size; + uint64_t payload_len; + nxt_conn_t *c; + nxt_h1proto_t *h1p; + nxt_http_request_t *r; + nxt_event_engine_t *engine; + nxt_websocket_header_t *wsh; + nxt_socket_conf_joint_t *joint; + + c = obj; + h1p = data; + + nxt_h1p_conn_ws_keepalive_disable(task, h1p); + + size = nxt_buf_mem_used_size(&c->read->mem); + + engine = task->thread->engine; + + if (size < 2) { + nxt_debug(task, "h1p conn ws frame header read %z", size); + + nxt_conn_read(engine, c); + nxt_h1p_conn_ws_keepalive_enable(task, h1p); + + return; + } + + wsh = (nxt_websocket_header_t *) c->read->mem.pos; + + hsize = nxt_websocket_frame_header_size(wsh); + + if (size < hsize) { + nxt_debug(task, "h1p conn ws frame header read %z < %z", size, hsize); + + nxt_conn_read(engine, c); + nxt_h1p_conn_ws_keepalive_enable(task, h1p); + + return; + } + + r = h1p->request; + if (nxt_slow_path(r == NULL)) { + return; + } + + r->ws_frame = c->read; + + joint = c->listen->socket.data; + + if (nxt_slow_path(joint == NULL)) { + /* + * Listening socket had been closed while + * connection was in keep-alive state. + */ + c->read_state = &nxt_h1p_idle_close_state; + return; + } + + if (nxt_slow_path(wsh->mask == 0)) { + hxt_h1p_send_ws_error(task, r, &nxt_ws_err_not_masked); + return; + } + + if ((wsh->opcode & NXT_WEBSOCKET_OP_CTRL) != 0) { + if (nxt_slow_path(wsh->fin == 0)) { + hxt_h1p_send_ws_error(task, r, &nxt_ws_err_ctrl_fragmented); + return; + } + + if (nxt_slow_path(wsh->opcode != NXT_WEBSOCKET_OP_PING + && wsh->opcode != NXT_WEBSOCKET_OP_PONG + && wsh->opcode != NXT_WEBSOCKET_OP_CLOSE)) + { + hxt_h1p_send_ws_error(task, r, &nxt_ws_err_invalid_opcode, + wsh->opcode); + return; + } + + if (nxt_slow_path(wsh->payload_len > 125)) { + hxt_h1p_send_ws_error(task, r, &nxt_ws_err_ctrl_too_big, + nxt_websocket_frame_payload_len(wsh)); + return; + } + + if (nxt_slow_path(wsh->opcode == NXT_WEBSOCKET_OP_CLOSE + && wsh->payload_len == 1)) + { + hxt_h1p_send_ws_error(task, r, &nxt_ws_err_invalid_close_len); + return; + } + + } else { + if (h1p->websocket_cont_expected) { + if (nxt_slow_path(wsh->opcode != NXT_WEBSOCKET_OP_CONT)) { + hxt_h1p_send_ws_error(task, r, &nxt_ws_err_cont_expected, + wsh->opcode); + return; + } + + } else { + if (nxt_slow_path(wsh->opcode != NXT_WEBSOCKET_OP_BINARY + && wsh->opcode != NXT_WEBSOCKET_OP_TEXT)) + { + hxt_h1p_send_ws_error(task, r, &nxt_ws_err_invalid_opcode, + wsh->opcode); + return; + } + } + + h1p->websocket_cont_expected = !wsh->fin; + } + + max_frame_size = joint->socket_conf->websocket_conf.max_frame_size; + + payload_len = nxt_websocket_frame_payload_len(wsh); + + if (nxt_slow_path(hsize > max_frame_size + || payload_len > (max_frame_size - hsize))) + { + hxt_h1p_send_ws_error(task, r, &nxt_ws_err_too_big, payload_len); + return; + } + + c->read_state = &nxt_h1p_read_ws_frame_payload_state; + + frame_size = payload_len + hsize; + + nxt_debug(task, "h1p conn ws frame header read: %z, %z", size, frame_size); + + if (frame_size <= size) { + nxt_h1p_conn_ws_frame_process(task, c, h1p, wsh); + + return; + } + + if (frame_size < (size_t) nxt_buf_mem_size(&c->read->mem)) { + c->read->mem.end = c->read->mem.start + frame_size; + + } else { + nxt_buf_t *b = nxt_buf_mem_alloc(c->mem_pool, frame_size - size, 0); + + c->read->next = b; + c->read = b; + } + + nxt_conn_read(engine, c); + nxt_h1p_conn_ws_keepalive_enable(task, h1p); +} + + +static void +nxt_h1p_conn_ws_keepalive_disable(nxt_task_t *task, nxt_h1proto_t *h1p) +{ + nxt_timer_t *timer; + + if (h1p->websocket_timer == NULL) { + return; + } + + timer = &h1p->websocket_timer->timer; + + if (nxt_slow_path(timer->handler != nxt_h1p_conn_ws_keepalive)) { + nxt_debug(task, "h1p ws keepalive disable: scheduled ws shutdown"); + return; + } + + nxt_timer_disable(task->thread->engine, timer); +} + + +static void +nxt_h1p_conn_ws_keepalive_enable(nxt_task_t *task, nxt_h1proto_t *h1p) +{ + nxt_timer_t *timer; + + if (h1p->websocket_timer == NULL) { + return; + } + + timer = &h1p->websocket_timer->timer; + + if (nxt_slow_path(timer->handler != nxt_h1p_conn_ws_keepalive)) { + nxt_debug(task, "h1p ws keepalive enable: scheduled ws shutdown"); + return; + } + + nxt_timer_add(task->thread->engine, timer, + h1p->websocket_timer->keepalive_interval); +} + + +static void +nxt_h1p_conn_ws_frame_process(nxt_task_t *task, nxt_conn_t *c, + nxt_h1proto_t *h1p, nxt_websocket_header_t *wsh) +{ + size_t hsize; + uint8_t *p, *mask; + uint16_t code; + nxt_http_request_t *r; + nxt_event_engine_t *engine; + + engine = task->thread->engine; + r = h1p->request; + + c->read = NULL; + + if (nxt_slow_path(wsh->opcode == NXT_WEBSOCKET_OP_PING)) { + nxt_work_queue_add(&engine->fast_work_queue, nxt_h1p_conn_ws_pong, + task, r, NULL); + return; + } + + if (nxt_slow_path(wsh->opcode == NXT_WEBSOCKET_OP_CLOSE)) { + if (wsh->payload_len >= 2) { + hsize = nxt_websocket_frame_header_size(wsh); + mask = nxt_pointer_to(wsh, hsize - 4); + p = nxt_pointer_to(wsh, hsize); + + code = ((p[0] ^ mask[0]) << 8) + (p[1] ^ mask[1]); + + if (nxt_slow_path(code < 1000 || code >= 5000 + || (code > 1003 && code < 1007) + || (code > 1014 && code < 3000))) + { + hxt_h1p_send_ws_error(task, r, &nxt_ws_err_invalid_close_code, + code); + return; + } + } + + h1p->websocket_closed = 1; + } + + nxt_work_queue_add(&engine->fast_work_queue, r->state->ready_handler, + task, r, NULL); +} + + +static void +nxt_h1p_conn_ws_error(nxt_task_t *task, void *obj, void *data) +{ + nxt_h1proto_t *h1p; + nxt_http_request_t *r; + + h1p = data; + + nxt_debug(task, "h1p conn ws error"); + + r = h1p->request; + + h1p->keepalive = 0; + + if (nxt_fast_path(r != NULL)) { + r->state->error_handler(task, r, h1p); + } +} + + +static ssize_t +nxt_h1p_ws_io_read_handler(nxt_conn_t *c) +{ + size_t size; + ssize_t n; + nxt_buf_t *b; + + b = c->read; + + if (b == NULL) { + /* Enough for control frame. */ + size = 10 + 125; + + b = nxt_buf_mem_alloc(c->mem_pool, size, 0); + if (nxt_slow_path(b == NULL)) { + c->socket.error = NXT_ENOMEM; + return NXT_ERROR; + } + } + + n = c->io->recvbuf(c, b); + + if (n > 0) { + c->read = b; + + } else { + c->read = NULL; + nxt_mp_free(c->mem_pool, b); + } + + return n; +} + + +static void +nxt_h1p_conn_ws_timeout(nxt_task_t *task, void *obj, void *data) +{ + nxt_conn_t *c; + nxt_timer_t *timer; + nxt_h1proto_t *h1p; + nxt_http_request_t *r; + + timer = obj; + + nxt_debug(task, "h1p conn ws timeout"); + + c = nxt_read_timer_conn(timer); + c->block_read = 1; + /* + * Disable SO_LINGER off during socket closing + * to send "408 Request Timeout" error response. + */ + c->socket.timedout = 0; + + h1p = c->socket.data; + h1p->keepalive = 0; + + r = h1p->request; + if (nxt_slow_path(r == NULL)) { + return; + } + + hxt_h1p_send_ws_error(task, r, &nxt_ws_err_going_away); +} + + +static const nxt_conn_state_t nxt_h1p_read_ws_frame_payload_state + nxt_aligned(64) = +{ + .ready_handler = nxt_h1p_conn_ws_frame_payload_read, + .close_handler = nxt_h1p_conn_ws_error, + .error_handler = nxt_h1p_conn_ws_error, + + .timer_handler = nxt_h1p_conn_ws_timeout, + .timer_value = nxt_h1p_conn_request_timer_value, + .timer_data = offsetof(nxt_socket_conf_t, websocket_conf.read_timeout), + .timer_autoreset = 1, +}; + + +static void +nxt_h1p_conn_ws_frame_payload_read(nxt_task_t *task, void *obj, void *data) +{ + nxt_conn_t *c; + nxt_h1proto_t *h1p; + nxt_http_request_t *r; + nxt_event_engine_t *engine; + nxt_websocket_header_t *wsh; + + c = obj; + h1p = data; + + nxt_h1p_conn_ws_keepalive_disable(task, h1p); + + nxt_debug(task, "h1p conn ws frame read"); + + if (nxt_buf_mem_free_size(&c->read->mem) == 0) { + r = h1p->request; + if (nxt_slow_path(r == NULL)) { + return; + } + + wsh = (nxt_websocket_header_t *) r->ws_frame->mem.pos; + + nxt_h1p_conn_ws_frame_process(task, c, h1p, wsh); + + return; + } + + engine = task->thread->engine; + + nxt_conn_read(engine, c); + nxt_h1p_conn_ws_keepalive_enable(task, h1p); +} + + +static void +hxt_h1p_send_ws_error(nxt_task_t *task, nxt_http_request_t *r, + const nxt_ws_error_t *err, ...) +{ + u_char *p; + va_list args; + nxt_buf_t *out; + nxt_str_t desc; + nxt_websocket_header_t *wsh; + u_char buf[125]; + + if (nxt_slow_path(err->args)) { + va_start(args, err); + p = nxt_vsprintf(buf, buf + sizeof(buf), (char *) err->desc.start, + args); + va_end(args); + + desc.start = buf; + desc.length = p - buf; + + } else { + desc = err->desc; + } + + nxt_log(task, NXT_LOG_INFO, "websocket error %d: %V", err->code, &desc); + + out = nxt_http_buf_mem(task, r, 2 + sizeof(err->code) + desc.length); + if (nxt_slow_path(out == NULL)) { + nxt_http_request_error_handler(task, r, r->proto.any); + return; + } + + out->mem.start[0] = 0; + out->mem.start[1] = 0; + + wsh = (nxt_websocket_header_t *) out->mem.start; + p = nxt_websocket_frame_init(wsh, sizeof(err->code) + desc.length); + + wsh->fin = 1; + wsh->opcode = NXT_WEBSOCKET_OP_CLOSE; + + *p++ = (err->code >> 8) & 0xFF; + *p++ = err->code & 0xFF; + + out->mem.free = nxt_cpymem(p, desc.start, desc.length); + out->next = nxt_http_buf_last(r); + + if (out->next != NULL) { + out->next->completion_handler = nxt_h1p_conn_ws_error_sent; + } + + nxt_http_request_send(task, r, out); +} + + +static void +nxt_h1p_conn_ws_error_sent(nxt_task_t *task, void *obj, void *data) +{ + nxt_http_request_t *r; + + r = data; + + nxt_debug(task, "h1p conn ws error sent"); + + r->state->error_handler(task, r, r->proto.any); +} + + +static void +nxt_h1p_conn_ws_pong(nxt_task_t *task, void *obj, void *data) +{ + uint8_t payload_len, i; + nxt_buf_t *b, *out, *next; + nxt_http_request_t *r; + nxt_websocket_header_t *wsh; + uint8_t mask[4]; + + nxt_debug(task, "h1p conn ws pong"); + + r = obj; + b = r->ws_frame; + + wsh = (nxt_websocket_header_t *) b->mem.pos; + payload_len = wsh->payload_len; + + b->mem.pos += 2; + + nxt_memcpy(mask, b->mem.pos, 4); + + b->mem.pos += 4; + + out = nxt_http_buf_mem(task, r, 2 + payload_len); + if (nxt_slow_path(out == NULL)) { + nxt_http_request_error_handler(task, r, r->proto.any); + return; + } + + out->mem.start[0] = 0; + out->mem.start[1] = 0; + + wsh = (nxt_websocket_header_t *) out->mem.start; + out->mem.free = nxt_websocket_frame_init(wsh, payload_len); + + wsh->fin = 1; + wsh->opcode = NXT_WEBSOCKET_OP_PONG; + + for (i = 0; i < payload_len; i++) { + while (nxt_buf_mem_used_size(&b->mem) == 0) { + next = b->next; + + nxt_work_queue_add(&task->thread->engine->fast_work_queue, + b->completion_handler, task, b, b->parent); + + b = next; + } + + *out->mem.free++ = *b->mem.pos++ ^ mask[i % 4]; + } + + r->ws_frame = b; + + nxt_http_request_send(task, r, out); + + nxt_http_request_ws_frame_start(task, r, r->ws_frame); +} |