summaryrefslogtreecommitdiffhomepage
path: root/src/nxt_router.c
diff options
context:
space:
mode:
authorMax Romanov <max.romanov@nginx.com>2019-08-20 16:31:53 +0300
committerMax Romanov <max.romanov@nginx.com>2019-08-20 16:31:53 +0300
commite501c74ddceab86e48c031ca9b5e154f52dcdae0 (patch)
tree7bfe94354df516d1ceefc5af3194ba943e443aa2 /src/nxt_router.c
parent9bbf54e23e185e94054072fff2673f6f5cd203e9 (diff)
downloadunit-e501c74ddceab86e48c031ca9b5e154f52dcdae0.tar.gz
unit-e501c74ddceab86e48c031ca9b5e154f52dcdae0.tar.bz2
Introducing websocket support in router and libunit.
Diffstat (limited to 'src/nxt_router.c')
-rw-r--r--src/nxt_router.c181
1 files changed, 110 insertions, 71 deletions
diff --git a/src/nxt_router.c b/src/nxt_router.c
index 566e0c65..b87f588f 100644
--- a/src/nxt_router.c
+++ b/src/nxt_router.c
@@ -14,7 +14,7 @@
#include <nxt_port_memory_int.h>
#include <nxt_unit_request.h>
#include <nxt_unit_response.h>
-
+#include <nxt_router_request.h>
typedef struct {
nxt_str_t type;
@@ -48,64 +48,6 @@ typedef struct {
#endif
-typedef struct nxt_msg_info_s {
- nxt_buf_t *buf;
- nxt_port_mmap_tracking_t tracking;
- nxt_work_handler_t completion_handler;
-} nxt_msg_info_t;
-
-
-typedef struct nxt_request_app_link_s nxt_request_app_link_t;
-
-
-typedef enum {
- NXT_APR_NEW_PORT,
- NXT_APR_REQUEST_FAILED,
- NXT_APR_GOT_RESPONSE,
- NXT_APR_CLOSE,
-} nxt_apr_action_t;
-
-
-typedef struct {
- uint32_t stream;
- nxt_app_t *app;
-
- nxt_port_t *app_port;
- nxt_apr_action_t apr_action;
-
- nxt_http_request_t *request;
- nxt_msg_info_t msg_info;
- nxt_request_app_link_t *req_app_link;
-} nxt_request_rpc_data_t;
-
-
-struct nxt_request_app_link_s {
- uint32_t stream;
- nxt_atomic_t use_count;
-
- nxt_port_t *app_port;
- nxt_apr_action_t apr_action;
-
- nxt_port_t *reply_port;
- nxt_http_request_t *request;
- nxt_msg_info_t msg_info;
- nxt_request_rpc_data_t *req_rpc_data;
-
- nxt_nsec_t res_time;
-
- nxt_queue_link_t link_app_requests; /* for nxt_app_t.requests */
- /* for nxt_port_t.pending_requests */
- nxt_queue_link_t link_port_pending;
- nxt_queue_link_t link_app_pending; /* for nxt_app_t.pending */
-
- nxt_mp_t *mem_pool;
- nxt_work_t work;
-
- int err_code;
- const char *err_str;
-};
-
-
typedef struct {
nxt_socket_conf_t *socket_conf;
nxt_router_temp_conf_t *temp_conf;
@@ -305,6 +247,8 @@ static nxt_int_t nxt_router_http_request_done(nxt_task_t *task,
static void nxt_router_http_request_release(nxt_task_t *task, void *obj,
void *data);
+const nxt_http_request_state_t nxt_http_websocket;
+
static nxt_router_t *nxt_router;
static const nxt_str_t http_prefix = nxt_string("HTTP_");
@@ -663,6 +607,7 @@ nxt_request_app_link_release(nxt_task_t *task,
nxt_request_app_link_t *req_app_link)
{
nxt_mp_t *mp;
+ nxt_http_request_t *r;
nxt_request_rpc_data_t *req_rpc_data;
nxt_assert(task->thread->engine == req_app_link->work.data);
@@ -683,10 +628,11 @@ nxt_request_app_link_release(nxt_task_t *task,
req_rpc_data->msg_info = req_app_link->msg_info;
if (req_rpc_data->app->timeout != 0) {
- req_rpc_data->request->timer.handler = nxt_router_app_timeout;
- req_rpc_data->request->timer_data = req_rpc_data;
- nxt_timer_add(task->thread->engine,
- &req_rpc_data->request->timer,
+ r = req_rpc_data->request;
+
+ r->timer.handler = nxt_router_app_timeout;
+ r->timer_data = req_rpc_data;
+ nxt_timer_add(task->thread->engine, &r->timer,
req_rpc_data->app->timeout);
}
@@ -833,14 +779,16 @@ nxt_request_rpc_data_unlink(nxt_task_t *task,
if (req_app_link->link_app_requests.next == NULL
&& req_app_link->link_port_pending.next == NULL
- && req_app_link->link_app_pending.next == NULL)
+ && req_app_link->link_app_pending.next == NULL
+ && req_app_link->link_port_websockets.next == NULL)
{
req_app_link = NULL;
} else {
ra_use_delta -=
nxt_queue_chk_remove(&req_app_link->link_app_requests)
- + nxt_queue_chk_remove(&req_app_link->link_port_pending);
+ + nxt_queue_chk_remove(&req_app_link->link_port_pending)
+ + nxt_queue_chk_remove(&req_app_link->link_port_websockets);
nxt_queue_chk_remove(&req_app_link->link_app_pending);
}
@@ -863,6 +811,7 @@ nxt_request_rpc_data_unlink(nxt_task_t *task,
nxt_router_http_request_done(task, req_rpc_data->request);
+ req_rpc_data->request->req_rpc_data = NULL;
req_rpc_data->request = NULL;
}
}
@@ -1412,6 +1361,28 @@ static nxt_conf_map_t nxt_router_http_conf[] = {
};
+static nxt_conf_map_t nxt_router_websocket_conf[] = {
+ {
+ nxt_string("max_frame_size"),
+ NXT_CONF_MAP_SIZE,
+ offsetof(nxt_websocket_conf_t, max_frame_size),
+ },
+
+ {
+ nxt_string("read_timeout"),
+ NXT_CONF_MAP_MSEC,
+ offsetof(nxt_websocket_conf_t, read_timeout),
+ },
+
+ {
+ nxt_string("keepalive_interval"),
+ NXT_CONF_MAP_MSEC,
+ offsetof(nxt_websocket_conf_t, keepalive_interval),
+ },
+
+};
+
+
static nxt_int_t
nxt_router_conf_create(nxt_task_t *task, nxt_router_temp_conf_t *tmcf,
u_char *start, u_char *end)
@@ -1425,7 +1396,7 @@ nxt_router_conf_create(nxt_task_t *task, nxt_router_temp_conf_t *tmcf,
nxt_app_t *app, *prev;
nxt_router_t *router;
nxt_app_joint_t *app_joint;
- nxt_conf_value_t *conf, *http, *value;
+ nxt_conf_value_t *conf, *http, *value, *websocket;
nxt_conf_value_t *applications, *application;
nxt_conf_value_t *listeners, *listener;
nxt_conf_value_t *routes_conf;
@@ -1448,6 +1419,7 @@ nxt_router_conf_create(nxt_task_t *task, nxt_router_temp_conf_t *tmcf,
#if (NXT_TLS)
static nxt_str_t certificate_path = nxt_string("/tls/certificate");
#endif
+ static nxt_str_t websocket_path = nxt_string("/settings/http/websocket");
conf = nxt_conf_json_parse(tmcf->mem_pool, start, end, NULL);
if (conf == NULL) {
@@ -1658,6 +1630,8 @@ nxt_router_conf_create(nxt_task_t *task, nxt_router_temp_conf_t *tmcf,
}
#endif
+ websocket = nxt_conf_get_path(conf, &websocket_path);
+
listeners = nxt_conf_get_path(conf, &listeners_path);
if (listeners != NULL) {
@@ -1697,6 +1671,10 @@ nxt_router_conf_create(nxt_task_t *task, nxt_router_temp_conf_t *tmcf,
skcf->body_read_timeout = 30 * 1000;
skcf->send_timeout = 30 * 1000;
+ skcf->websocket_conf.max_frame_size = 1024 * 1024;
+ skcf->websocket_conf.read_timeout = 60 * 1000;
+ skcf->websocket_conf.keepalive_interval = 30 * 1000;
+
if (http != NULL) {
ret = nxt_conf_map_object(mp, http, nxt_router_http_conf,
nxt_nitems(nxt_router_http_conf),
@@ -1707,6 +1685,17 @@ nxt_router_conf_create(nxt_task_t *task, nxt_router_temp_conf_t *tmcf,
}
}
+ if (websocket != NULL) {
+ ret = nxt_conf_map_object(mp, websocket,
+ nxt_router_websocket_conf,
+ nxt_nitems(nxt_router_websocket_conf),
+ &skcf->websocket_conf);
+ if (ret != NXT_OK) {
+ nxt_alert(task, "websocket map error");
+ goto fail;
+ }
+ }
+
#if (NXT_TLS)
value = nxt_conf_get_path(listener, &certificate_path);
@@ -3418,10 +3407,12 @@ nxt_router_response_ready_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg,
{
nxt_int_t ret;
nxt_buf_t *b;
+ nxt_port_t *app_port;
nxt_unit_field_t *f;
nxt_http_field_t *field;
nxt_http_request_t *r;
nxt_unit_response_t *resp;
+ nxt_request_app_link_t *req_app_link;
nxt_request_rpc_data_t *req_rpc_data;
b = msg->buf;
@@ -3542,7 +3533,48 @@ nxt_router_response_ready_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg,
nxt_http_request_header_send(task, r);
- r->state = &nxt_http_request_send_state;
+ if (r->websocket_handshake
+ && r->status == NXT_HTTP_SWITCHING_PROTOCOLS)
+ {
+ req_app_link = nxt_request_app_link_alloc(task,
+ req_rpc_data->req_app_link,
+ req_rpc_data);
+ if (nxt_slow_path(req_app_link == NULL)) {
+ goto fail;
+ }
+
+ app_port = req_app_link->app_port;
+
+ if (app_port == NULL && req_rpc_data->app_port != NULL) {
+ req_app_link->app_port = req_rpc_data->app_port;
+ app_port = req_app_link->app_port;
+ req_app_link->apr_action = req_rpc_data->apr_action;
+
+ req_rpc_data->app_port = NULL;
+ }
+
+ if (nxt_slow_path(app_port == NULL)) {
+ goto fail;
+ }
+
+ nxt_thread_mutex_lock(&req_rpc_data->app->mutex);
+
+ nxt_queue_insert_tail(&app_port->active_websockets,
+ &req_app_link->link_port_websockets);
+
+ nxt_thread_mutex_unlock(&req_rpc_data->app->mutex);
+
+ nxt_router_app_port_release(task, app_port, NXT_APR_UPGRADE);
+ req_app_link->apr_action = NXT_APR_CLOSE;
+
+ nxt_debug(task, "req_app_link stream #%uD upgrade",
+ req_app_link->stream);
+
+ r->state = &nxt_http_websocket;
+
+ } else {
+ r->state = &nxt_http_request_send_state;
+ }
if (r->out) {
nxt_work_queue_add(&task->thread->engine->fast_work_queue,
@@ -3924,6 +3956,10 @@ nxt_router_app_port_release(nxt_task_t *task, nxt_port_t *port,
got_response = 1;
inc_use = -1;
break;
+ case NXT_APR_UPGRADE:
+ dec_pending = 1;
+ got_response = 1;
+ break;
case NXT_APR_CLOSE:
inc_use = -1;
break;
@@ -4046,9 +4082,10 @@ re_ra_cancelled:
adjust_idle_timer = 0;
- if (port->pair[1] != -1 && !send_quit && port->app_pending_responses == 0) {
- nxt_assert(port->idle_link.next == NULL);
-
+ if (port->pair[1] != -1 && !send_quit && port->app_pending_responses == 0
+ && nxt_queue_is_empty(&port->active_websockets)
+ && port->idle_link.next == NULL)
+ {
if (app->idle_processes == app->spare_processes
&& app->adjust_idle_work.data == NULL)
{
@@ -4545,6 +4582,7 @@ nxt_router_process_http_request(nxt_task_t *task, nxt_http_request_t *r,
nxt_router_app_use(task, app, 1);
req_rpc_data->request = r;
+ r->req_rpc_data = req_rpc_data;
req_app_link = &ra_local;
nxt_request_app_link_init(task, req_app_link, req_rpc_data);
@@ -4635,7 +4673,7 @@ nxt_router_app_prepare_request(nxt_task_t *task,
goto release_port;
}
- res = nxt_port_socket_twrite(task, port, NXT_PORT_MSG_DATA,
+ res = nxt_port_socket_twrite(task, port, NXT_PORT_MSG_REQ_HEADERS,
-1, req_app_link->stream, reply_port->id, buf,
&req_app_link->msg_info.tracking);
@@ -4785,6 +4823,7 @@ nxt_router_prepare_msg(nxt_task_t *task, nxt_http_request_t *r,
*p++ = '\0';
req->tls = (r->tls != NULL);
+ req->websocket_handshake = r->websocket_handshake;
req->server_name_length = r->server_name.length;
nxt_unit_sptr_set(&req->server_name, p);