summaryrefslogblamecommitdiffhomepage
path: root/src/nxt_h1proto_websocket.c
blob: cd67a8a667a25b4951eaf3a3390d3dd16103990f (plain) (tree)



























                                                                               
                                                                           





































































































































































































































































































































































































                                                                                
 




                                                              
                                            























                                                                              
                                           























                                                              
                                                           














































































































































































































































                                                                            

/*
 * Copyright (C) NGINX, Inc.
 */

#include <nxt_main.h>
#include <nxt_router.h>
#include <nxt_http.h>
#include <nxt_h1proto.h>
#include <nxt_websocket.h>
#include <nxt_websocket_header.h>

typedef struct {
    uint16_t   code;
    uint8_t    args;
    nxt_str_t  desc;
} nxt_ws_error_t;

static void nxt_h1p_conn_ws_keepalive(nxt_task_t *task, void *obj, void *data);
static void nxt_h1p_conn_ws_frame_header_read(nxt_task_t *task, void *obj,
    void *data);
static void nxt_h1p_conn_ws_keepalive_disable(nxt_task_t *task,
    nxt_h1proto_t *h1p);
static void nxt_h1p_conn_ws_keepalive_enable(nxt_task_t *task,
    nxt_h1proto_t *h1p);
static void nxt_h1p_conn_ws_frame_process(nxt_task_t *task, nxt_conn_t *c,
    nxt_h1proto_t *h1p, nxt_websocket_header_t *wsh);
static void nxt_h1p_conn_ws_error(nxt_task_t *task, void *obj, void *data);
static ssize_t nxt_h1p_ws_io_read_handler(nxt_task_t *task, nxt_conn_t *c);
static void nxt_h1p_conn_ws_timeout(nxt_task_t *task, void *obj, void *data);
static void nxt_h1p_conn_ws_frame_payload_read(nxt_task_t *task, void *obj,
    void *data);
static void hxt_h1p_send_ws_error(nxt_task_t *task, nxt_http_request_t *r,
    const nxt_ws_error_t *err, ...);
static void nxt_h1p_conn_ws_error_sent(nxt_task_t *task, void *obj, void *data);
static void nxt_h1p_conn_ws_pong(nxt_task_t *task, void *obj, void *data);

static const nxt_conn_state_t  nxt_h1p_read_ws_frame_header_state;
static const nxt_conn_state_t  nxt_h1p_read_ws_frame_payload_state;

static const nxt_ws_error_t  nxt_ws_err_out_of_memory = {
    NXT_WEBSOCKET_CR_INTERNAL_SERVER_ERROR,
    0, nxt_string("Out of memory") };
static const nxt_ws_error_t  nxt_ws_err_too_big = {
    NXT_WEBSOCKET_CR_MESSAGE_TOO_BIG,
    1, nxt_string("Message too big: %uL bytes") };
static const nxt_ws_error_t  nxt_ws_err_invalid_close_code = {
    NXT_WEBSOCKET_CR_PROTOCOL_ERROR,
    1, nxt_string("Close code %ud is not valid") };
static const nxt_ws_error_t  nxt_ws_err_going_away = {
    NXT_WEBSOCKET_CR_GOING_AWAY,
    0, nxt_string("Remote peer is going away") };
static const nxt_ws_error_t  nxt_ws_err_not_masked = {
    NXT_WEBSOCKET_CR_PROTOCOL_ERROR,
    0, nxt_string("Not masked client frame") };
static const nxt_ws_error_t  nxt_ws_err_ctrl_fragmented = {
    NXT_WEBSOCKET_CR_PROTOCOL_ERROR,
    0, nxt_string("Fragmented control frame") };
static const nxt_ws_error_t  nxt_ws_err_ctrl_too_big = {
    NXT_WEBSOCKET_CR_PROTOCOL_ERROR,
    1, nxt_string("Control frame too big: %uL bytes") };
static const nxt_ws_error_t  nxt_ws_err_invalid_close_len = {
    NXT_WEBSOCKET_CR_PROTOCOL_ERROR,
    0, nxt_string("Close frame payload length cannot be 1") };
static const nxt_ws_error_t  nxt_ws_err_invalid_opcode = {
    NXT_WEBSOCKET_CR_PROTOCOL_ERROR,
    1, nxt_string("Unrecognized opcode %ud") };
static const nxt_ws_error_t  nxt_ws_err_cont_expected = {
    NXT_WEBSOCKET_CR_PROTOCOL_ERROR,
    1, nxt_string("Continuation expected, but %ud opcode received") };

void
nxt_h1p_websocket_first_frame_start(nxt_task_t *task, nxt_http_request_t *r,
    nxt_buf_t *ws_frame)
{
    nxt_conn_t               *c;
    nxt_timer_t              *timer;
    nxt_h1proto_t            *h1p;
    nxt_socket_conf_joint_t  *joint;

    nxt_debug(task, "h1p ws first frame start");

    h1p = r->proto.h1;
    c = h1p->conn;

    if (!c->tcp_nodelay) {
        nxt_conn_tcp_nodelay_on(task, c);
    }

    joint = c->listen->socket.data;

    if (nxt_slow_path(joint != NULL
        && joint->socket_conf->websocket_conf.keepalive_interval != 0))
    {
        h1p->websocket_timer = nxt_mp_zget(c->mem_pool,
                                           sizeof(nxt_h1p_websocket_timer_t));
        if (nxt_slow_path(h1p->websocket_timer == NULL)) {
            hxt_h1p_send_ws_error(task, r, &nxt_ws_err_out_of_memory);
            return;
        }

        h1p->websocket_timer->keepalive_interval =
            joint->socket_conf->websocket_conf.keepalive_interval;
        h1p->websocket_timer->h1p = h1p;

        timer = &h1p->websocket_timer->timer;
        timer->task = &c->task;
        timer->work_queue = &task->thread->engine->fast_work_queue;
        timer->log = &c->log;
        timer->bias = NXT_TIMER_DEFAULT_BIAS;
        timer->handler = nxt_h1p_conn_ws_keepalive;
    }

    nxt_h1p_websocket_frame_start(task, r, ws_frame);
}


void
nxt_h1p_websocket_frame_start(nxt_task_t *task, nxt_http_request_t *r,
    nxt_buf_t *ws_frame)
{
    size_t         size;
    nxt_buf_t      *in;
    nxt_conn_t     *c;
    nxt_h1proto_t  *h1p;

    nxt_debug(task, "h1p ws frame start");

    h1p = r->proto.h1;

    if (nxt_slow_path(h1p->websocket_closed)) {
        return;
    }

    c = h1p->conn;
    c->read = ws_frame;

    nxt_h1p_complete_buffers(task, h1p);

    in = c->read;
    c->read_state = &nxt_h1p_read_ws_frame_header_state;

    if (in == NULL) {
        nxt_conn_read(task->thread->engine, c);
        nxt_h1p_conn_ws_keepalive_enable(task, h1p);

    } else {
        size = nxt_buf_mem_used_size(&in->mem);

        nxt_debug(task, "h1p read client ws frame");

        nxt_memmove(in->mem.start, in->mem.pos, size);

        in->mem.pos = in->mem.start;
        in->mem.free = in->mem.start + size;

        nxt_h1p_conn_ws_frame_header_read(task, c, h1p);
    }
}


static void
nxt_h1p_conn_ws_keepalive(nxt_task_t *task, void *obj, void *data)
{
    nxt_buf_t                  *out;
    nxt_timer_t                *timer;
    nxt_h1proto_t              *h1p;
    nxt_http_request_t         *r;
    nxt_websocket_header_t     *wsh;
    nxt_h1p_websocket_timer_t  *ws_timer;

    nxt_debug(task, "h1p conn ws keepalive");

    timer = obj;
    ws_timer = nxt_timer_data(timer, nxt_h1p_websocket_timer_t, timer);
    h1p = ws_timer->h1p;

    r = h1p->request;
    if (nxt_slow_path(r == NULL)) {
        return;
    }

    out = nxt_http_buf_mem(task, r, 2);
    if (nxt_slow_path(out == NULL)) {
        nxt_http_request_error_handler(task, r, r->proto.any);
        return;
    }

    out->mem.start[0] = 0;
    out->mem.start[1] = 0;

    wsh = (nxt_websocket_header_t *) out->mem.start;
    out->mem.free = nxt_websocket_frame_init(wsh, 0);

    wsh->fin = 1;
    wsh->opcode = NXT_WEBSOCKET_OP_PING;

    nxt_http_request_send(task, r, out);
}


static const nxt_conn_state_t  nxt_h1p_read_ws_frame_header_state
    nxt_aligned(64) =
{
    .ready_handler = nxt_h1p_conn_ws_frame_header_read,
    .close_handler = nxt_h1p_conn_ws_error,
    .error_handler = nxt_h1p_conn_ws_error,

    .io_read_handler = nxt_h1p_ws_io_read_handler,

    .timer_handler = nxt_h1p_conn_ws_timeout,
    .timer_value = nxt_h1p_conn_request_timer_value,
    .timer_data = offsetof(nxt_socket_conf_t, websocket_conf.read_timeout),
    .timer_autoreset = 1,
};


static void
nxt_h1p_conn_ws_frame_header_read(nxt_task_t *task, void *obj, void *data)
{
    size_t                   size, hsize, frame_size, max_frame_size;
    uint64_t                 payload_len;
    nxt_conn_t               *c;
    nxt_h1proto_t            *h1p;
    nxt_http_request_t       *r;
    nxt_event_engine_t       *engine;
    nxt_websocket_header_t   *wsh;
    nxt_socket_conf_joint_t  *joint;

    c = obj;
    h1p = data;

    nxt_h1p_conn_ws_keepalive_disable(task, h1p);

    size = nxt_buf_mem_used_size(&c->read->mem);

    engine = task->thread->engine;

    if (size < 2) {
        nxt_debug(task, "h1p conn ws frame header read %z", size);

        nxt_conn_read(engine, c);
        nxt_h1p_conn_ws_keepalive_enable(task, h1p);

        return;
    }

    wsh = (nxt_websocket_header_t *) c->read->mem.pos;

    hsize = nxt_websocket_frame_header_size(wsh);

    if (size < hsize) {
        nxt_debug(task, "h1p conn ws frame header read %z < %z", size, hsize);

        nxt_conn_read(engine, c);
        nxt_h1p_conn_ws_keepalive_enable(task, h1p);

        return;
    }

    r = h1p->request;
    if (nxt_slow_path(r == NULL)) {
        return;
    }

    r->ws_frame = c->read;

    joint = c->listen->socket.data;

    if (nxt_slow_path(joint == NULL)) {
        /*
         * Listening socket had been closed while
         * connection was in keep-alive state.
         */
        c->read_state = &nxt_h1p_idle_close_state;
        return;
    }

    if (nxt_slow_path(wsh->mask == 0)) {
        hxt_h1p_send_ws_error(task, r, &nxt_ws_err_not_masked);
        return;
    }

    if ((wsh->opcode & NXT_WEBSOCKET_OP_CTRL) != 0) {
        if (nxt_slow_path(wsh->fin == 0)) {
            hxt_h1p_send_ws_error(task, r, &nxt_ws_err_ctrl_fragmented);
            return;
        }

        if (nxt_slow_path(wsh->opcode != NXT_WEBSOCKET_OP_PING
                          && wsh->opcode != NXT_WEBSOCKET_OP_PONG
                          && wsh->opcode != NXT_WEBSOCKET_OP_CLOSE))
        {
            hxt_h1p_send_ws_error(task, r, &nxt_ws_err_invalid_opcode,
                                  wsh->opcode);
            return;
        }

        if (nxt_slow_path(wsh->payload_len > 125)) {
            hxt_h1p_send_ws_error(task, r, &nxt_ws_err_ctrl_too_big,
                                  nxt_websocket_frame_payload_len(wsh));
            return;
        }

        if (nxt_slow_path(wsh->opcode == NXT_WEBSOCKET_OP_CLOSE
                          && wsh->payload_len == 1))
        {
            hxt_h1p_send_ws_error(task, r, &nxt_ws_err_invalid_close_len);
            return;
        }

    } else {
        if (h1p->websocket_cont_expected) {
            if (nxt_slow_path(wsh->opcode != NXT_WEBSOCKET_OP_CONT)) {
                hxt_h1p_send_ws_error(task, r, &nxt_ws_err_cont_expected,
                                      wsh->opcode);
                return;
            }

        } else {
            if (nxt_slow_path(wsh->opcode != NXT_WEBSOCKET_OP_BINARY
                              && wsh->opcode != NXT_WEBSOCKET_OP_TEXT))
            {
                hxt_h1p_send_ws_error(task, r, &nxt_ws_err_invalid_opcode,
                                      wsh->opcode);
                return;
            }
        }

        h1p->websocket_cont_expected = !wsh->fin;
    }

    max_frame_size = joint->socket_conf->websocket_conf.max_frame_size;

    payload_len = nxt_websocket_frame_payload_len(wsh);

    if (nxt_slow_path(hsize > max_frame_size
                      || payload_len > (max_frame_size - hsize)))
    {
        hxt_h1p_send_ws_error(task, r, &nxt_ws_err_too_big, payload_len);
        return;
    }

    c->read_state = &nxt_h1p_read_ws_frame_payload_state;

    frame_size = payload_len + hsize;

    nxt_debug(task, "h1p conn ws frame header read: %z, %z", size, frame_size);

    if (frame_size <= size) {
        nxt_h1p_conn_ws_frame_process(task, c, h1p, wsh);

        return;
    }

    if (frame_size < (size_t) nxt_buf_mem_size(&c->read->mem)) {
        c->read->mem.end = c->read->mem.start + frame_size;

    } else {
        nxt_buf_t *b = nxt_buf_mem_alloc(c->mem_pool, frame_size - size, 0);

        c->read->next = b;
        c->read = b;
    }

    nxt_conn_read(engine, c);
    nxt_h1p_conn_ws_keepalive_enable(task, h1p);
}


static void
nxt_h1p_conn_ws_keepalive_disable(nxt_task_t *task, nxt_h1proto_t *h1p)
{
    nxt_timer_t  *timer;

    if (h1p->websocket_timer == NULL) {
        return;
    }

    timer = &h1p->websocket_timer->timer;

    if (nxt_slow_path(timer->handler != nxt_h1p_conn_ws_keepalive)) {
        nxt_debug(task, "h1p ws keepalive disable: scheduled ws shutdown");
        return;
    }

    nxt_timer_disable(task->thread->engine, timer);
}


static void
nxt_h1p_conn_ws_keepalive_enable(nxt_task_t *task, nxt_h1proto_t *h1p)
{
    nxt_timer_t  *timer;

    if (h1p->websocket_timer == NULL) {
        return;
    }

    timer = &h1p->websocket_timer->timer;

    if (nxt_slow_path(timer->handler != nxt_h1p_conn_ws_keepalive)) {
        nxt_debug(task, "h1p ws keepalive enable: scheduled ws shutdown");
        return;
    }

    nxt_timer_add(task->thread->engine, timer,
                  h1p->websocket_timer->keepalive_interval);
}


static void
nxt_h1p_conn_ws_frame_process(nxt_task_t *task, nxt_conn_t *c,
    nxt_h1proto_t *h1p, nxt_websocket_header_t *wsh)
{
    size_t              hsize;
    uint8_t             *p, *mask;
    uint16_t            code;
    nxt_http_request_t  *r;

    r = h1p->request;

    c->read = NULL;

    if (nxt_slow_path(wsh->opcode == NXT_WEBSOCKET_OP_PING)) {
        nxt_h1p_conn_ws_pong(task, r, NULL);
        return;
    }

    if (nxt_slow_path(wsh->opcode == NXT_WEBSOCKET_OP_CLOSE)) {
        if (wsh->payload_len >= 2) {
            hsize = nxt_websocket_frame_header_size(wsh);
            mask = nxt_pointer_to(wsh, hsize - 4);
            p = nxt_pointer_to(wsh, hsize);

            code = ((p[0] ^ mask[0]) << 8) + (p[1] ^ mask[1]);

            if (nxt_slow_path(code < 1000 || code >= 5000
                              || (code > 1003 && code < 1007)
                              || (code > 1014 && code < 3000)))
            {
                hxt_h1p_send_ws_error(task, r, &nxt_ws_err_invalid_close_code,
                                      code);
                return;
            }
        }

        h1p->websocket_closed = 1;
    }

    r->state->ready_handler(task, r, NULL);
}


static void
nxt_h1p_conn_ws_error(nxt_task_t *task, void *obj, void *data)
{
    nxt_h1proto_t       *h1p;
    nxt_http_request_t  *r;

    h1p = data;

    nxt_debug(task, "h1p conn ws error");

    r = h1p->request;

    h1p->keepalive = 0;

    if (nxt_fast_path(r != NULL)) {
        r->state->error_handler(task, r, h1p);
    }
}


static ssize_t
nxt_h1p_ws_io_read_handler(nxt_task_t *task, nxt_conn_t *c)
{
    size_t     size;
    ssize_t    n;
    nxt_buf_t  *b;

    b = c->read;

    if (b == NULL) {
        /* Enough for control frame. */
        size = 10 + 125;

        b = nxt_buf_mem_alloc(c->mem_pool, size, 0);
        if (nxt_slow_path(b == NULL)) {
            c->socket.error = NXT_ENOMEM;
            return NXT_ERROR;
        }
    }

    n = c->io->recvbuf(c, b);

    if (n > 0) {
        c->read = b;

    } else {
        c->read = NULL;
        nxt_mp_free(c->mem_pool, b);
    }

    return n;
}


static void
nxt_h1p_conn_ws_timeout(nxt_task_t *task, void *obj, void *data)
{
    nxt_conn_t          *c;
    nxt_timer_t         *timer;
    nxt_h1proto_t       *h1p;
    nxt_http_request_t  *r;

    timer = obj;

    nxt_debug(task, "h1p conn ws timeout");

    c = nxt_read_timer_conn(timer);
    c->block_read = 1;
    /*
     * Disable SO_LINGER off during socket closing
     * to send "408 Request Timeout" error response.
     */
    c->socket.timedout = 0;

    h1p = c->socket.data;
    h1p->keepalive = 0;

    r = h1p->request;
    if (nxt_slow_path(r == NULL)) {
        return;
    }

    hxt_h1p_send_ws_error(task, r, &nxt_ws_err_going_away);
}


static const nxt_conn_state_t  nxt_h1p_read_ws_frame_payload_state
    nxt_aligned(64) =
{
    .ready_handler = nxt_h1p_conn_ws_frame_payload_read,
    .close_handler = nxt_h1p_conn_ws_error,
    .error_handler = nxt_h1p_conn_ws_error,

    .timer_handler = nxt_h1p_conn_ws_timeout,
    .timer_value = nxt_h1p_conn_request_timer_value,
    .timer_data = offsetof(nxt_socket_conf_t, websocket_conf.read_timeout),
    .timer_autoreset = 1,
};


static void
nxt_h1p_conn_ws_frame_payload_read(nxt_task_t *task, void *obj, void *data)
{
    nxt_conn_t              *c;
    nxt_h1proto_t           *h1p;
    nxt_http_request_t      *r;
    nxt_event_engine_t      *engine;
    nxt_websocket_header_t  *wsh;

    c = obj;
    h1p = data;

    nxt_h1p_conn_ws_keepalive_disable(task, h1p);

    nxt_debug(task, "h1p conn ws frame read");

    if (nxt_buf_mem_free_size(&c->read->mem) == 0) {
        r = h1p->request;
        if (nxt_slow_path(r == NULL)) {
            return;
        }

        wsh = (nxt_websocket_header_t *) r->ws_frame->mem.pos;

        nxt_h1p_conn_ws_frame_process(task, c, h1p, wsh);

        return;
    }

    engine = task->thread->engine;

    nxt_conn_read(engine, c);
    nxt_h1p_conn_ws_keepalive_enable(task, h1p);
}


static void
hxt_h1p_send_ws_error(nxt_task_t *task, nxt_http_request_t *r,
    const nxt_ws_error_t *err, ...)
{
    u_char                  *p;
    va_list                 args;
    nxt_buf_t               *out;
    nxt_str_t               desc;
    nxt_websocket_header_t  *wsh;
    u_char                  buf[125];

    if (nxt_slow_path(err->args)) {
        va_start(args, err);
        p = nxt_vsprintf(buf, buf + sizeof(buf), (char *) err->desc.start,
                         args);
        va_end(args);

        desc.start = buf;
        desc.length = p - buf;

    } else {
        desc = err->desc;
    }

    nxt_log(task, NXT_LOG_INFO, "websocket error %d: %V", err->code, &desc);

    out = nxt_http_buf_mem(task, r, 2 + sizeof(err->code) + desc.length);
    if (nxt_slow_path(out == NULL)) {
        nxt_http_request_error_handler(task, r, r->proto.any);
        return;
    }

    out->mem.start[0] = 0;
    out->mem.start[1] = 0;

    wsh = (nxt_websocket_header_t *) out->mem.start;
    p = nxt_websocket_frame_init(wsh, sizeof(err->code) + desc.length);

    wsh->fin = 1;
    wsh->opcode = NXT_WEBSOCKET_OP_CLOSE;

    *p++ = (err->code >> 8) & 0xFF;
    *p++ = err->code & 0xFF;

    out->mem.free = nxt_cpymem(p, desc.start, desc.length);
    out->next = nxt_http_buf_last(r);

    if (out->next != NULL) {
        out->next->completion_handler = nxt_h1p_conn_ws_error_sent;
    }

    nxt_http_request_send(task, r, out);
}


static void
nxt_h1p_conn_ws_error_sent(nxt_task_t *task, void *obj, void *data)
{
    nxt_http_request_t  *r;

    r = data;

    nxt_debug(task, "h1p conn ws error sent");

    r->state->error_handler(task, r, r->proto.any);
}


static void
nxt_h1p_conn_ws_pong(nxt_task_t *task, void *obj, void *data)
{
    uint8_t                 payload_len, i;
    nxt_buf_t               *b, *out, *next;
    nxt_http_request_t      *r;
    nxt_websocket_header_t  *wsh;
    uint8_t                 mask[4];

    nxt_debug(task, "h1p conn ws pong");

    r = obj;
    b = r->ws_frame;

    wsh = (nxt_websocket_header_t *) b->mem.pos;
    payload_len = wsh->payload_len;

    b->mem.pos += 2;

    nxt_memcpy(mask, b->mem.pos, 4);

    b->mem.pos += 4;

    out = nxt_http_buf_mem(task, r, 2 + payload_len);
    if (nxt_slow_path(out == NULL)) {
        nxt_http_request_error_handler(task, r, r->proto.any);
        return;
    }

    out->mem.start[0] = 0;
    out->mem.start[1] = 0;

    wsh = (nxt_websocket_header_t *) out->mem.start;
    out->mem.free = nxt_websocket_frame_init(wsh, payload_len);

    wsh->fin = 1;
    wsh->opcode = NXT_WEBSOCKET_OP_PONG;

    for (i = 0; i < payload_len; i++) {
        while (nxt_buf_mem_used_size(&b->mem) == 0) {
            next = b->next;

            nxt_work_queue_add(&task->thread->engine->fast_work_queue,
                               b->completion_handler, task, b, b->parent);

            b = next;
        }

        *out->mem.free++ = *b->mem.pos++ ^ mask[i % 4];
    }

    r->ws_frame = b;

    nxt_http_request_send(task, r, out);

    nxt_http_request_ws_frame_start(task, r, r->ws_frame);
}