/*
* Copyright (C) NGINX, Inc.
*/
#include <nxt_main.h>
#include <nxt_router.h>
#include <nxt_http.h>
#include <nxt_h1proto.h>
#include <nxt_websocket.h>
#include <nxt_websocket_header.h>
typedef struct {
uint16_t code;
uint8_t args;
nxt_str_t desc;
} nxt_ws_error_t;
static void nxt_h1p_conn_ws_keepalive(nxt_task_t *task, void *obj, void *data);
static void nxt_h1p_conn_ws_frame_header_read(nxt_task_t *task, void *obj,
void *data);
static void nxt_h1p_conn_ws_keepalive_disable(nxt_task_t *task,
nxt_h1proto_t *h1p);
static void nxt_h1p_conn_ws_keepalive_enable(nxt_task_t *task,
nxt_h1proto_t *h1p);
static void nxt_h1p_conn_ws_frame_process(nxt_task_t *task, nxt_conn_t *c,
nxt_h1proto_t *h1p, nxt_websocket_header_t *wsh);
static void nxt_h1p_conn_ws_error(nxt_task_t *task, void *obj, void *data);
static ssize_t nxt_h1p_ws_io_read_handler(nxt_task_t *task, nxt_conn_t *c);
static void nxt_h1p_conn_ws_timeout(nxt_task_t *task, void *obj, void *data);
static void nxt_h1p_conn_ws_frame_payload_read(nxt_task_t *task, void *obj,
void *data);
static void hxt_h1p_send_ws_error(nxt_task_t *task, nxt_http_request_t *r,
const nxt_ws_error_t *err, ...);
static void nxt_h1p_conn_ws_error_sent(nxt_task_t *task, void *obj, void *data);
static void nxt_h1p_conn_ws_pong(nxt_task_t *task, void *obj, void *data);
static const nxt_conn_state_t nxt_h1p_read_ws_frame_header_state;
static const nxt_conn_state_t nxt_h1p_read_ws_frame_payload_state;
static const nxt_ws_error_t nxt_ws_err_out_of_memory = {
NXT_WEBSOCKET_CR_INTERNAL_SERVER_ERROR,
0, nxt_string("Out of memory") };
static const nxt_ws_error_t nxt_ws_err_too_big = {
NXT_WEBSOCKET_CR_MESSAGE_TOO_BIG,
1, nxt_string("Message too big: %uL bytes") };
static const nxt_ws_error_t nxt_ws_err_invalid_close_code = {
NXT_WEBSOCKET_CR_PROTOCOL_ERROR,
1, nxt_string("Close code %ud is not valid") };
static const nxt_ws_error_t nxt_ws_err_going_away = {
NXT_WEBSOCKET_CR_GOING_AWAY,
0, nxt_string("Remote peer is going away") };
static const nxt_ws_error_t nxt_ws_err_not_masked = {
NXT_WEBSOCKET_CR_PROTOCOL_ERROR,
0, nxt_string("Not masked client frame") };
static const nxt_ws_error_t nxt_ws_err_ctrl_fragmented = {
NXT_WEBSOCKET_CR_PROTOCOL_ERROR,
0, nxt_string("Fragmented control frame") };
static const nxt_ws_error_t nxt_ws_err_ctrl_too_big = {
NXT_WEBSOCKET_CR_PROTOCOL_ERROR,
1, nxt_string("Control frame too big: %uL bytes") };
static const nxt_ws_error_t nxt_ws_err_invalid_close_len = {
NXT_WEBSOCKET_CR_PROTOCOL_ERROR,
0, nxt_string("Close frame payload length cannot be 1") };
static const nxt_ws_error_t nxt_ws_err_invalid_opcode = {
NXT_WEBSOCKET_CR_PROTOCOL_ERROR,
1, nxt_string("Unrecognized opcode %ud") };
static const nxt_ws_error_t nxt_ws_err_cont_expected = {
NXT_WEBSOCKET_CR_PROTOCOL_ERROR,
1, nxt_string("Continuation expected, but %ud opcode received") };
void
nxt_h1p_websocket_first_frame_start(nxt_task_t *task, nxt_http_request_t *r,
nxt_buf_t *ws_frame)
{
nxt_conn_t *c;
nxt_timer_t *timer;
nxt_h1proto_t *h1p;
nxt_socket_conf_joint_t *joint;
nxt_debug(task, "h1p ws first frame start");
h1p = r->proto.h1;
c = h1p->conn;
if (!c->tcp_nodelay) {
nxt_conn_tcp_nodelay_on(task, c);
}
joint = c->listen->socket.data;
if (nxt_slow_path(joint != NULL
&& joint->socket_conf->websocket_conf.keepalive_interval != 0))
{
h1p->websocket_timer = nxt_mp_zget(c->mem_pool,
sizeof(nxt_h1p_websocket_timer_t));
if (nxt_slow_path(h1p->websocket_timer == NULL)) {
hxt_h1p_send_ws_error(task, r, &nxt_ws_err_out_of_memory);
return;
}
h1p->websocket_timer->keepalive_interval =
joint->socket_conf->websocket_conf.keepalive_interval;
h1p->websocket_timer->h1p = h1p;
timer = &h1p->websocket_timer->timer;
timer->task = &c->task;
timer->work_queue = &task->thread->engine->fast_work_queue;
timer->log = &c->log;
timer->bias = NXT_TIMER_DEFAULT_BIAS;
timer->handler = nxt_h1p_conn_ws_keepalive;
}
nxt_h1p_websocket_frame_start(task, r, ws_frame);
}
void
nxt_h1p_websocket_frame_start(nxt_task_t *task, nxt_http_request_t *r,
nxt_buf_t *ws_frame)
{
size_t size;
nxt_buf_t *in;
nxt_conn_t *c;
nxt_h1proto_t *h1p;
nxt_debug(task, "h1p ws frame start");
h1p = r->proto.h1;
if (nxt_slow_path(h1p->websocket_closed)) {
return;
}
c = h1p->conn;
c->read = ws_frame;
nxt_h1p_complete_buffers(task, h1p);
in = c->read;
c->read_state = &nxt_h1p_read_ws_frame_header_state;
if (in == NULL) {
nxt_conn_read(task->thread->engine, c);
nxt_h1p_conn_ws_keepalive_enable(task, h1p);
} else {
size = nxt_buf_mem_used_size(&in->mem);
nxt_debug(task, "h1p read client ws frame");
nxt_memmove(in->mem.start, in->mem.pos, size);
in->mem.pos = in->mem.start;
in->mem.free = in->mem.start + size;
nxt_h1p_conn_ws_frame_header_read(task, c, h1p);
}
}
static void
nxt_h1p_conn_ws_keepalive(nxt_task_t *task, void *obj, void *data)
{
nxt_buf_t *out;
nxt_timer_t *timer;
nxt_h1proto_t *h1p;
nxt_http_request_t *r;
nxt_websocket_header_t *wsh;
nxt_h1p_websocket_timer_t *ws_timer;
nxt_debug(task, "h1p conn ws keepalive");
timer = obj;
ws_timer = nxt_timer_data(timer, nxt_h1p_websocket_timer_t, timer);
h1p = ws_timer->h1p;
r = h1p->request;
if (nxt_slow_path(r == NULL)) {
return;
}
out = nxt_http_buf_mem(task, r, 2);
if (nxt_slow_path(out == NULL)) {
nxt_http_request_error_handler(task, r, r->proto.any);
return;
}
out->mem.start[0] = 0;
out->mem.start[1] = 0;
wsh = (nxt_websocket_header_t *) out->mem.start;
out->mem.free = nxt_websocket_frame_init(wsh, 0);
wsh->fin = 1;
wsh->opcode = NXT_WEBSOCKET_OP_PING;
nxt_http_request_send(task, r, out);
}
static const nxt_conn_state_t nxt_h1p_read_ws_frame_header_state
nxt_aligned(64) =
{
.ready_handler = nxt_h1p_conn_ws_frame_header_read,
.close_handler = nxt_h1p_conn_ws_error,
.error_handler = nxt_h1p_conn_ws_error,
.io_read_handler = nxt_h1p_ws_io_read_handler,
.timer_handler = nxt_h1p_conn_ws_timeout,
.timer_value = nxt_h1p_conn_request_timer_value,
.timer_data = offsetof(nxt_socket_conf_t, websocket_conf.read_timeout),
.timer_autoreset = 1,
};
static void
nxt_h1p_conn_ws_frame_header_read(nxt_task_t *task, void *obj, void *data)
{
size_t size, hsize, frame_size, max_frame_size;
uint64_t payload_len;
nxt_conn_t *c;
nxt_h1proto_t *h1p;
nxt_http_request_t *r;
nxt_event_engine_t *engine;
nxt_websocket_header_t *wsh;
nxt_socket_conf_joint_t *joint;
c = obj;
h1p = data;
nxt_h1p_conn_ws_keepalive_disable(task, h1p);
size = nxt_buf_mem_used_size(&c->read->mem);
engine = task->thread->engine;
if (size < 2) {
nxt_debug(task, "h1p conn ws frame header read %z", size);
nxt_conn_read(engine, c);
nxt_h1p_conn_ws_keepalive_enable(task, h1p);
return;
}
wsh = (nxt_websocket_header_t *) c->read->mem.pos;
hsize = nxt_websocket_frame_header_size(wsh);
if (size < hsize) {
nxt_debug(task, "h1p conn ws frame header read %z < %z", size, hsize);
nxt_conn_read(engine, c);
nxt_h1p_conn_ws_keepalive_enable(task, h1p);
return;
}
r = h1p->request;
if (nxt_slow_path(r == NULL)) {
return;
}
r->ws_frame = c->read;
joint = c->listen->socket.data;
if (nxt_slow_path(joint == NULL)) {
/*
* Listening socket had been closed while
* connection was in keep-alive state.
*/
c->read_state = &nxt_h1p_idle_close_state;
return;
}
if (nxt_slow_path(wsh->mask == 0)) {
hxt_h1p_send_ws_error(task, r, &nxt_ws_err_not_masked);
return;
}
if ((wsh->opcode & NXT_WEBSOCKET_OP_CTRL) != 0) {
if (nxt_slow_path(wsh->fin == 0)) {
hxt_h1p_send_ws_error(task, r, &nxt_ws_err_ctrl_fragmented);
return;
}
if (nxt_slow_path(wsh->opcode != NXT_WEBSOCKET_OP_PING
&& wsh->opcode != NXT_WEBSOCKET_OP_PONG
&& wsh->opcode != NXT_WEBSOCKET_OP_CLOSE))
{
hxt_h1p_send_ws_error(task, r, &nxt_ws_err_invalid_opcode,
wsh->opcode);
return;
}
if (nxt_slow_path(wsh->payload_len > 125)) {
hxt_h1p_send_ws_error(task, r, &nxt_ws_err_ctrl_too_big,
nxt_websocket_frame_payload_len(wsh));
return;
}
if (nxt_slow_path(wsh->opcode == NXT_WEBSOCKET_OP_CLOSE
&& wsh->payload_len == 1))
{
hxt_h1p_send_ws_error(task, r, &nxt_ws_err_invalid_close_len);
return;
}
} else {
if (h1p->websocket_cont_expected) {
if (nxt_slow_path(wsh->opcode != NXT_WEBSOCKET_OP_CONT)) {
hxt_h1p_send_ws_error(task, r, &nxt_ws_err_cont_expected,
wsh->opcode);
return;
}
} else {
if (nxt_slow_path(wsh->opcode != NXT_WEBSOCKET_OP_BINARY
&& wsh->opcode != NXT_WEBSOCKET_OP_TEXT))
{
hxt_h1p_send_ws_error(task, r, &nxt_ws_err_invalid_opcode,
wsh->opcode);
return;
}
}
h1p->websocket_cont_expected = !wsh->fin;
}
max_frame_size = joint->socket_conf->websocket_conf.max_frame_size;
payload_len = nxt_websocket_frame_payload_len(wsh);
if (nxt_slow_path(hsize > max_frame_size
|| payload_len > (max_frame_size - hsize)))
{
hxt_h1p_send_ws_error(task, r, &nxt_ws_err_too_big, payload_len);
return;
}
c->read_state = &nxt_h1p_read_ws_frame_payload_state;
frame_size = payload_len + hsize;
nxt_debug(task, "h1p conn ws frame header read: %z, %z", size, frame_size);
if (frame_size <= size) {
nxt_h1p_conn_ws_frame_process(task, c, h1p, wsh);
return;
}
if (frame_size < (size_t) nxt_buf_mem_size(&c->read->mem)) {
c->read->mem.end = c->read->mem.start + frame_size;
} else {
nxt_buf_t *b = nxt_buf_mem_alloc(c->mem_pool, frame_size - size, 0);
c->read->next = b;
c->read = b;
}
nxt_conn_read(engine, c);
nxt_h1p_conn_ws_keepalive_enable(task, h1p);
}
static void
nxt_h1p_conn_ws_keepalive_disable(nxt_task_t *task, nxt_h1proto_t *h1p)
{
nxt_timer_t *timer;
if (h1p->websocket_timer == NULL) {
return;
}
timer = &h1p->websocket_timer->timer;
if (nxt_slow_path(timer->handler != nxt_h1p_conn_ws_keepalive)) {
nxt_debug(task, "h1p ws keepalive disable: scheduled ws shutdown");
return;
}
nxt_timer_disable(task->thread->engine, timer);
}
static void
nxt_h1p_conn_ws_keepalive_enable(nxt_task_t *task, nxt_h1proto_t *h1p)
{
nxt_timer_t *timer;
if (h1p->websocket_timer == NULL) {
return;
}
timer = &h1p->websocket_timer->timer;
if (nxt_slow_path(timer->handler != nxt_h1p_conn_ws_keepalive)) {
nxt_debug(task, "h1p ws keepalive enable: scheduled ws shutdown");
return;
}
nxt_timer_add(task->thread->engine, timer,
h1p->websocket_timer->keepalive_interval);
}
static void
nxt_h1p_conn_ws_frame_process(nxt_task_t *task, nxt_conn_t *c,
nxt_h1proto_t *h1p, nxt_websocket_header_t *wsh)
{
size_t hsize;
uint8_t *p, *mask;
uint16_t code;
nxt_http_request_t *r;
r = h1p->request;
c->read = NULL;
if (nxt_slow_path(wsh->opcode == NXT_WEBSOCKET_OP_PING)) {
nxt_h1p_conn_ws_pong(task, r, NULL);
return;
}
if (nxt_slow_path(wsh->opcode == NXT_WEBSOCKET_OP_CLOSE)) {
if (wsh->payload_len >= 2) {
hsize = nxt_websocket_frame_header_size(wsh);
mask = nxt_pointer_to(wsh, hsize - 4);
p = nxt_pointer_to(wsh, hsize);
code = ((p[0] ^ mask[0]) << 8) + (p[1] ^ mask[1]);
if (nxt_slow_path(code < 1000 || code >= 5000
|| (code > 1003 && code < 1007)
|| (code > 1014 && code < 3000)))
{
hxt_h1p_send_ws_error(task, r, &nxt_ws_err_invalid_close_code,
code);
return;
}
}
h1p->websocket_closed = 1;
}
r->state->ready_handler(task, r, NULL);
}
static void
nxt_h1p_conn_ws_error(nxt_task_t *task, void *obj, void *data)
{
nxt_h1proto_t *h1p;
nxt_http_request_t *r;
h1p = data;
nxt_debug(task, "h1p conn ws error");
r = h1p->request;
h1p->keepalive = 0;
if (nxt_fast_path(r != NULL)) {
r->state->error_handler(task, r, h1p);
}
}
static ssize_t
nxt_h1p_ws_io_read_handler(nxt_task_t *task, nxt_conn_t *c)
{
size_t size;
ssize_t n;
nxt_buf_t *b;
b = c->read;
if (b == NULL) {
/* Enough for control frame. */
size = 10 + 125;
b = nxt_buf_mem_alloc(c->mem_pool, size, 0);
if (nxt_slow_path(b == NULL)) {
c->socket.error = NXT_ENOMEM;
return NXT_ERROR;
}
}
n = c->io->recvbuf(c, b);
if (n > 0) {
c->read = b;
} else {
c->read = NULL;
nxt_mp_free(c->mem_pool, b);
}
return n;
}
static void
nxt_h1p_conn_ws_timeout(nxt_task_t *task, void *obj, void *data)
{
nxt_conn_t *c;
nxt_timer_t *timer;
nxt_h1proto_t *h1p;
nxt_http_request_t *r;
timer = obj;
nxt_debug(task, "h1p conn ws timeout");
c = nxt_read_timer_conn(timer);
c->block_read = 1;
/*
* Disable SO_LINGER off during socket closing
* to send "408 Request Timeout" error response.
*/
c->socket.timedout = 0;
h1p = c->socket.data;
h1p->keepalive = 0;
r = h1p->request;
if (nxt_slow_path(r == NULL)) {
return;
}
hxt_h1p_send_ws_error(task, r, &nxt_ws_err_going_away);
}
static const nxt_conn_state_t nxt_h1p_read_ws_frame_payload_state
nxt_aligned(64) =
{
.ready_handler = nxt_h1p_conn_ws_frame_payload_read,
.close_handler = nxt_h1p_conn_ws_error,
.error_handler = nxt_h1p_conn_ws_error,
.timer_handler = nxt_h1p_conn_ws_timeout,
.timer_value = nxt_h1p_conn_request_timer_value,
.timer_data = offsetof(nxt_socket_conf_t, websocket_conf.read_timeout),
.timer_autoreset = 1,
};
static void
nxt_h1p_conn_ws_frame_payload_read(nxt_task_t *task, void *obj, void *data)
{
nxt_conn_t *c;
nxt_h1proto_t *h1p;
nxt_http_request_t *r;
nxt_event_engine_t *engine;
nxt_websocket_header_t *wsh;
c = obj;
h1p = data;
nxt_h1p_conn_ws_keepalive_disable(task, h1p);
nxt_debug(task, "h1p conn ws frame read");
if (nxt_buf_mem_free_size(&c->read->mem) == 0) {
r = h1p->request;
if (nxt_slow_path(r == NULL)) {
return;
}
wsh = (nxt_websocket_header_t *) r->ws_frame->mem.pos;
nxt_h1p_conn_ws_frame_process(task, c, h1p, wsh);
return;
}
engine = task->thread->engine;
nxt_conn_read(engine, c);
nxt_h1p_conn_ws_keepalive_enable(task, h1p);
}
static void
hxt_h1p_send_ws_error(nxt_task_t *task, nxt_http_request_t *r,
const nxt_ws_error_t *err, ...)
{
u_char *p;
va_list args;
nxt_buf_t *out;
nxt_str_t desc;
nxt_websocket_header_t *wsh;
u_char buf[125];
if (nxt_slow_path(err->args)) {
va_start(args, err);
p = nxt_vsprintf(buf, buf + sizeof(buf), (char *) err->desc.start,
args);
va_end(args);
desc.start = buf;
desc.length = p - buf;
} else {
desc = err->desc;
}
nxt_log(task, NXT_LOG_INFO, "websocket error %d: %V", err->code, &desc);
out = nxt_http_buf_mem(task, r, 2 + sizeof(err->code) + desc.length);
if (nxt_slow_path(out == NULL)) {
nxt_http_request_error_handler(task, r, r->proto.any);
return;
}
out->mem.start[0] = 0;
out->mem.start[1] = 0;
wsh = (nxt_websocket_header_t *) out->mem.start;
p = nxt_websocket_frame_init(wsh, sizeof(err->code) + desc.length);
wsh->fin = 1;
wsh->opcode = NXT_WEBSOCKET_OP_CLOSE;
*p++ = (err->code >> 8) & 0xFF;
*p++ = err->code & 0xFF;
out->mem.free = nxt_cpymem(p, desc.start, desc.length);
out->next = nxt_http_buf_last(r);
if (out->next != NULL) {
out->next->completion_handler = nxt_h1p_conn_ws_error_sent;
}
nxt_http_request_send(task, r, out);
}
static void
nxt_h1p_conn_ws_error_sent(nxt_task_t *task, void *obj, void *data)
{
nxt_http_request_t *r;
r = data;
nxt_debug(task, "h1p conn ws error sent");
r->state->error_handler(task, r, r->proto.any);
}
static void
nxt_h1p_conn_ws_pong(nxt_task_t *task, void *obj, void *data)
{
uint8_t payload_len, i;
nxt_buf_t *b, *out, *next;
nxt_http_request_t *r;
nxt_websocket_header_t *wsh;
uint8_t mask[4];
nxt_debug(task, "h1p conn ws pong");
r = obj;
b = r->ws_frame;
wsh = (nxt_websocket_header_t *) b->mem.pos;
payload_len = wsh->payload_len;
b->mem.pos += 2;
nxt_memcpy(mask, b->mem.pos, 4);
b->mem.pos += 4;
out = nxt_http_buf_mem(task, r, 2 + payload_len);
if (nxt_slow_path(out == NULL)) {
nxt_http_request_error_handler(task, r, r->proto.any);
return;
}
out->mem.start[0] = 0;
out->mem.start[1] = 0;
wsh = (nxt_websocket_header_t *) out->mem.start;
out->mem.free = nxt_websocket_frame_init(wsh, payload_len);
wsh->fin = 1;
wsh->opcode = NXT_WEBSOCKET_OP_PONG;
for (i = 0; i < payload_len; i++) {
while (nxt_buf_mem_used_size(&b->mem) == 0) {
next = b->next;
nxt_work_queue_add(&task->thread->engine->fast_work_queue,
b->completion_handler, task, b, b->parent);
b = next;
}
*out->mem.free++ = *b->mem.pos++ ^ mask[i % 4];
}
r->ws_frame = b;
nxt_http_request_send(task, r, out);
nxt_http_request_ws_frame_start(task, r, r->ws_frame);
}