/* * Copyright (C) NGINX, Inc. */ #include #include #include #include #include #include #include static void nxt_http_websocket_client(nxt_task_t *task, void *obj, void *data); static void nxt_http_websocket_error_handler(nxt_task_t *task, void *obj, void *data); const nxt_http_request_state_t nxt_http_websocket nxt_aligned(64) = { .ready_handler = nxt_http_websocket_client, .error_handler = nxt_http_websocket_error_handler, }; static void nxt_http_websocket_client(nxt_task_t *task, void *obj, void *data) { size_t frame_size, used_size, copy_size, buf_free_size; size_t chunk_copy_size; nxt_buf_t *out, *buf, **out_tail, *b, *next; nxt_int_t res; nxt_http_request_t *r; nxt_request_rpc_data_t *req_rpc_data; nxt_websocket_header_t *wsh; r = obj; req_rpc_data = r->req_rpc_data; if (nxt_slow_path(req_rpc_data == NULL)) { nxt_debug(task, "websocket client frame for destroyed request"); return; } nxt_debug(task, "http websocket client frame"); wsh = (nxt_websocket_header_t *) r->ws_frame->mem.pos; frame_size = nxt_websocket_frame_header_size(wsh) + nxt_websocket_frame_payload_len(wsh); buf = NULL; buf_free_size = 0; out = NULL; out_tail = &out; b = r->ws_frame; while (b != NULL && frame_size > 0) { used_size = nxt_buf_mem_used_size(&b->mem); copy_size = nxt_min(used_size, frame_size); while (copy_size > 0) { if (buf == NULL || buf_free_size == 0) { buf_free_size = nxt_min(frame_size, PORT_MMAP_DATA_SIZE); buf = nxt_port_mmap_get_buf(task, &req_rpc_data->app->outgoing, buf_free_size); *out_tail = buf; out_tail = &buf->next; } chunk_copy_size = nxt_min(buf_free_size, copy_size); buf->mem.free = nxt_cpymem(buf->mem.free, b->mem.pos, chunk_copy_size); copy_size -= chunk_copy_size; b->mem.pos += chunk_copy_size; buf_free_size -= chunk_copy_size; } frame_size -= copy_size; next = b->next; b->next = NULL; if (nxt_buf_mem_used_size(&b->mem) == 0) { nxt_work_queue_add(&task->thread->engine->fast_work_queue, b->completion_handler, task, b, b->parent); r->ws_frame = next; } b = next; } res = nxt_port_socket_write(task, req_rpc_data->app_port, NXT_PORT_MSG_WEBSOCKET, -1, req_rpc_data->stream, task->thread->engine->port->id, out); if (nxt_slow_path(res != NXT_OK)) { // TODO: handle } b = r->ws_frame; if (b != NULL) { used_size = nxt_buf_mem_used_size(&b->mem); if (used_size > 0) { nxt_memmove(b->mem.start, b->mem.pos, used_size); b->mem.pos = b->mem.start; b->mem.free = b->mem.start + used_size; } } nxt_http_request_ws_frame_start(task, r, r->ws_frame); } static void nxt_http_websocket_error_handler(nxt_task_t *task, void *obj, void *data) { nxt_http_request_t *r; nxt_request_rpc_data_t *req_rpc_data; nxt_debug(task, "http websocket error handler"); r = obj; req_rpc_data = r->req_rpc_data; if (req_rpc_data == NULL) { nxt_debug(task, " req_rpc_data is NULL"); goto close_handler; } if (req_rpc_data->app_port == NULL) { nxt_debug(task, " app_port is NULL"); goto close_handler; } (void) nxt_port_socket_write(task, req_rpc_data->app_port, NXT_PORT_MSG_WEBSOCKET_LAST, -1, req_rpc_data->stream, task->thread->engine->port->id, NULL); close_handler: nxt_http_request_close_handler(task, obj, data); }