summaryrefslogblamecommitdiffhomepage
path: root/src/nxt_http_websocket.c
blob: fb888f5d5c55283c6ac59f733fd103c434fa42ca (plain) (tree)

























































































                                                                               
                       






































































                                                                            

/*
 * Copyright (C) NGINX, Inc.
 */

#include <nxt_main.h>
#include <nxt_router.h>
#include <nxt_http.h>
#include <nxt_router_request.h>
#include <nxt_port_memory_int.h>
#include <nxt_websocket.h>
#include <nxt_websocket_header.h>


static void nxt_http_websocket_client(nxt_task_t *task, void *obj, void *data);
static void nxt_http_websocket_error_handler(nxt_task_t *task, void *obj,
    void *data);


const nxt_http_request_state_t  nxt_http_websocket
    nxt_aligned(64) =
{
    .ready_handler = nxt_http_websocket_client,
    .error_handler = nxt_http_websocket_error_handler,
};


static void
nxt_http_websocket_client(nxt_task_t *task, void *obj, void *data)
{
    size_t                  frame_size, used_size, copy_size, buf_free_size;
    size_t                  chunk_copy_size;
    nxt_buf_t               *out, *buf, **out_tail, *b, *next;
    nxt_int_t               res;
    nxt_http_request_t      *r;
    nxt_request_app_link_t  *req_app_link;
    nxt_request_rpc_data_t  *req_rpc_data;
    nxt_websocket_header_t  *wsh;

    r = obj;

    if (nxt_slow_path((req_rpc_data = r->req_rpc_data) == NULL
         || (req_app_link = req_rpc_data->req_app_link) == NULL))
    {
        nxt_debug(task, "websocket client frame for destroyed request");

        return;
    }

    nxt_debug(task, "http websocket client frame");

    wsh = (nxt_websocket_header_t *) r->ws_frame->mem.pos;

    frame_size = nxt_websocket_frame_header_size(wsh)
                  + nxt_websocket_frame_payload_len(wsh);

    buf = NULL;
    buf_free_size = 0;
    out = NULL;
    out_tail = &out;

    b = r->ws_frame;

    while (b != NULL && frame_size > 0) {
        used_size = nxt_buf_mem_used_size(&b->mem);
        copy_size = nxt_min(used_size, frame_size);

        while (copy_size > 0) {
            if (buf == NULL || buf_free_size == 0) {
                buf_free_size = nxt_min(frame_size, PORT_MMAP_DATA_SIZE);

                buf = nxt_port_mmap_get_buf(task, req_app_link->app_port,
                                            buf_free_size);

                *out_tail = buf;
                out_tail = &buf->next;
            }

            chunk_copy_size = nxt_min(buf_free_size, copy_size);

            buf->mem.free = nxt_cpymem(buf->mem.free, b->mem.pos,
                                       chunk_copy_size);

            copy_size -= chunk_copy_size;
            b->mem.pos += chunk_copy_size;
            buf_free_size -= chunk_copy_size;
        }

        frame_size -= copy_size;
        next = b->next;
        b->next = NULL;

        if (nxt_buf_mem_used_size(&b->mem) == 0) {
            nxt_work_queue_add(&task->thread->engine->fast_work_queue,
                               b->completion_handler, task, b, b->parent);

            r->ws_frame = next;
        }

        b = next;
    }

    res = nxt_port_socket_twrite(task, req_app_link->app_port,
                                 NXT_PORT_MSG_WEBSOCKET, -1,
                                 req_app_link->stream,
                                 req_app_link->reply_port->id, out, NULL);
    if (nxt_slow_path(res != NXT_OK)) {
        // TODO: handle
    }

    b = r->ws_frame;

    if (b != NULL) {
        used_size = nxt_buf_mem_used_size(&b->mem);

        if (used_size > 0) {
            nxt_memmove(b->mem.start, b->mem.pos, used_size);

            b->mem.pos = b->mem.start;
            b->mem.free = b->mem.start + used_size;
        }
    }

    nxt_http_request_ws_frame_start(task, r, r->ws_frame);
}


static void
nxt_http_websocket_error_handler(nxt_task_t *task, void *obj, void *data)
{
    nxt_http_request_t      *r;
    nxt_request_app_link_t  *req_app_link;
    nxt_request_rpc_data_t  *req_rpc_data;

    nxt_debug(task, "http websocket error handler");

    r = obj;

    if ((req_rpc_data = r->req_rpc_data) == NULL) {
        nxt_debug(task, "  req_rpc_data is NULL");
        goto close_handler;
    }

    if ((req_app_link = req_rpc_data->req_app_link) == NULL) {
        nxt_debug(task, "  req_app_link is NULL");
        goto close_handler;
    }

    if (req_app_link->app_port == NULL) {
        nxt_debug(task, "  app_port is NULL");
        goto close_handler;
    }

    (void) nxt_port_socket_twrite(task, req_app_link->app_port,
                                  NXT_PORT_MSG_WEBSOCKET_LAST,
                                  -1, req_app_link->stream,
                                  req_app_link->reply_port->id, NULL, NULL);

close_handler:

    nxt_http_request_close_handler(task, obj, data);
}