diff options
-rw-r--r-- | src/nxt_conf.c | 2 | ||||
-rw-r--r-- | src/nxt_conf_validation.c | 30 | ||||
-rw-r--r-- | src/nxt_conn.c | 32 | ||||
-rw-r--r-- | src/nxt_conn.h | 17 | ||||
-rw-r--r-- | src/nxt_event_engine.c | 117 | ||||
-rw-r--r-- | src/nxt_event_engine.h | 10 | ||||
-rw-r--r-- | src/nxt_main_process.c | 20 | ||||
-rw-r--r-- | src/nxt_port.c | 3 | ||||
-rw-r--r-- | src/nxt_port.h | 3 | ||||
-rw-r--r-- | src/nxt_port_rpc.c | 261 | ||||
-rw-r--r-- | src/nxt_port_rpc.h | 8 | ||||
-rw-r--r-- | src/nxt_router.c | 410 | ||||
-rw-r--r-- | src/nxt_router.h | 2 |
13 files changed, 589 insertions, 326 deletions
diff --git a/src/nxt_conf.c b/src/nxt_conf.c index 904b29e7..d48eade3 100644 --- a/src/nxt_conf.c +++ b/src/nxt_conf.c @@ -495,7 +495,7 @@ nxt_conf_map_object(nxt_mp_t *mp, nxt_conf_value_t *value, nxt_conf_map_t *map, break; case NXT_CONF_MAP_MSEC: - ptr->msec = v->u.integer; + ptr->msec = v->u.integer * 1000; break; default: diff --git a/src/nxt_conf_validation.c b/src/nxt_conf_validation.c index 337498d9..80e1de6f 100644 --- a/src/nxt_conf_validation.c +++ b/src/nxt_conf_validation.c @@ -66,6 +66,21 @@ static nxt_conf_vldt_object_t nxt_conf_vldt_listener_members[] = { }; +static nxt_conf_vldt_object_t nxt_conf_vldt_app_limits_members[] = { + { nxt_string("timeout"), + NXT_CONF_INTEGER, + NULL, + NULL }, + + { nxt_string("requests"), + NXT_CONF_INTEGER, + NULL, + NULL }, + + { nxt_null_string, 0, NULL, NULL } +}; + + static nxt_conf_vldt_object_t nxt_conf_vldt_python_members[] = { { nxt_string("type"), NXT_CONF_STRING, @@ -77,6 +92,11 @@ static nxt_conf_vldt_object_t nxt_conf_vldt_python_members[] = { NULL, NULL }, + { nxt_string("limits"), + NXT_CONF_OBJECT, + &nxt_conf_vldt_object, + (void *) &nxt_conf_vldt_app_limits_members }, + { nxt_string("user"), NXT_CONF_STRING, nxt_conf_vldt_system, @@ -117,6 +137,11 @@ static nxt_conf_vldt_object_t nxt_conf_vldt_php_members[] = { NULL, NULL }, + { nxt_string("limits"), + NXT_CONF_OBJECT, + &nxt_conf_vldt_object, + (void *) &nxt_conf_vldt_app_limits_members }, + { nxt_string("user"), NXT_CONF_STRING, nxt_conf_vldt_system, @@ -162,6 +187,11 @@ static nxt_conf_vldt_object_t nxt_conf_vldt_go_members[] = { NULL, NULL }, + { nxt_string("limits"), + NXT_CONF_OBJECT, + &nxt_conf_vldt_object, + (void *) &nxt_conf_vldt_app_limits_members }, + { nxt_string("user"), NXT_CONF_STRING, nxt_conf_vldt_system, diff --git a/src/nxt_conn.c b/src/nxt_conn.c index 5d84cbb4..a5549eda 100644 --- a/src/nxt_conn.c +++ b/src/nxt_conn.c @@ -152,35 +152,3 @@ nxt_conn_work_queue_set(nxt_conn_t *c, nxt_work_queue_t *wq) c->read_timer.work_queue = wq; c->write_timer.work_queue = wq; } - - -nxt_req_conn_link_t * -nxt_conn_request_add(nxt_conn_t *c, nxt_req_id_t req_id) -{ - nxt_req_conn_link_t *rc; - - rc = nxt_mp_zalloc(c->mem_pool, sizeof(nxt_req_conn_link_t)); - if (nxt_slow_path(rc == NULL)) { - nxt_thread_log_error(NXT_LOG_WARN, "failed to allocate req %08uxD " - "to conn", req_id); - return NULL; - } - - rc->req_id = req_id; - rc->conn = c; - - nxt_queue_insert_tail(&c->requests, &rc->link); - - return rc; -} - - -void -nxt_conn_request_remove(nxt_conn_t *c, nxt_req_conn_link_t *rc) -{ - nxt_queue_remove(&rc->link); - - nxt_mp_free(c->mem_pool, rc); -} - - diff --git a/src/nxt_conn.h b/src/nxt_conn.h index 33c6ad28..32cad432 100644 --- a/src/nxt_conn.h +++ b/src/nxt_conn.h @@ -175,17 +175,6 @@ struct nxt_conn_s { }; -typedef uint32_t nxt_req_id_t; - -typedef struct { - nxt_req_id_t req_id; - nxt_conn_t *conn; - nxt_port_t *app_port; - - nxt_queue_link_t link; /* for nxt_conn_t.requests */ -} nxt_req_conn_link_t; - - #define nxt_conn_timer_init(ev, c, wq) \ do { \ (ev)->work_queue = (wq); \ @@ -353,10 +342,4 @@ NXT_EXPORT void nxt_conn_proxy(nxt_task_t *task, nxt_conn_proxy_t *p); #define nxt_event_conn_close nxt_conn_close -NXT_EXPORT nxt_req_conn_link_t *nxt_conn_request_add(nxt_conn_t *c, - nxt_req_id_t req_id); -NXT_EXPORT void nxt_conn_request_remove(nxt_conn_t *c, - nxt_req_conn_link_t *rc); - - #endif /* _NXT_CONN_H_INCLUDED_ */ diff --git a/src/nxt_event_engine.c b/src/nxt_event_engine.c index bdb0a011..ba297329 100644 --- a/src/nxt_event_engine.c +++ b/src/nxt_event_engine.c @@ -232,6 +232,12 @@ nxt_event_engine_post(nxt_event_engine_t *engine, nxt_work_t *work) { nxt_debug(&engine->task, "event engine post"); +#if (NXT_DEBUG) + if (nxt_slow_path(work->next != NULL)) { + nxt_debug(&engine->task, "event engine post multiple works"); + } +#endif + nxt_locked_work_queue_add(&engine->locked_work_queue, work); nxt_event_engine_signal(engine, 0); @@ -530,117 +536,6 @@ nxt_event_engine_start(nxt_event_engine_t *engine) } -static nxt_int_t -nxt_req_conn_test(nxt_lvlhsh_query_t *lhq, void *data) -{ - return NXT_OK; -} - -static const nxt_lvlhsh_proto_t lvlhsh_req_conn_proto nxt_aligned(64) = { - NXT_LVLHSH_DEFAULT, - nxt_req_conn_test, - nxt_lvlhsh_alloc, - nxt_lvlhsh_free, -}; - - -void -nxt_event_engine_request_add(nxt_event_engine_t *engine, - nxt_req_conn_link_t *rc) -{ - nxt_lvlhsh_query_t lhq; - - lhq.key_hash = nxt_murmur_hash2(&rc->req_id, sizeof(rc->req_id)); - lhq.key.length = sizeof(rc->req_id); - lhq.key.start = (u_char *) &rc->req_id; - lhq.proto = &lvlhsh_req_conn_proto; - lhq.replace = 0; - lhq.value = rc; - lhq.pool = engine->mem_pool; - - switch (nxt_lvlhsh_insert(&engine->requests, &lhq)) { - - case NXT_OK: - break; - - default: - nxt_thread_log_error(NXT_LOG_WARN, "req %08uxD to conn add failed", - rc->req_id); - break; - } -} - - -nxt_req_conn_link_t * -nxt_event_engine_request_find(nxt_event_engine_t *engine, nxt_req_id_t req_id) -{ - nxt_lvlhsh_query_t lhq; - - lhq.key_hash = nxt_murmur_hash2(&req_id, sizeof(req_id)); - lhq.key.length = sizeof(req_id); - lhq.key.start = (u_char *) &req_id; - lhq.proto = &lvlhsh_req_conn_proto; - - if (nxt_lvlhsh_find(&engine->requests, &lhq) == NXT_OK) { - return lhq.value; - } - - return NULL; -} - - -void -nxt_event_engine_request_remove(nxt_event_engine_t *engine, - nxt_req_conn_link_t *rc) -{ - nxt_lvlhsh_query_t lhq; - - lhq.key_hash = nxt_murmur_hash2(&rc->req_id, sizeof(rc->req_id)); - lhq.key.length = sizeof(rc->req_id); - lhq.key.start = (u_char *) &rc->req_id; - lhq.proto = &lvlhsh_req_conn_proto; - lhq.pool = engine->mem_pool; - - switch (nxt_lvlhsh_delete(&engine->requests, &lhq)) { - - case NXT_OK: - break; - - default: - nxt_thread_log_error(NXT_LOG_WARN, "req %08uxD to conn remove failed", - rc->req_id); - break; - } -} - - -nxt_req_conn_link_t * -nxt_event_engine_request_find_remove(nxt_event_engine_t *engine, - nxt_req_id_t req_id) -{ - nxt_lvlhsh_query_t lhq; - - lhq.key_hash = nxt_murmur_hash2(&req_id, sizeof(req_id)); - lhq.key.length = sizeof(req_id); - lhq.key.start = (u_char *) &req_id; - lhq.proto = &lvlhsh_req_conn_proto; - lhq.pool = engine->mem_pool; - - switch (nxt_lvlhsh_delete(&engine->requests, &lhq)) { - - case NXT_OK: - return lhq.value; - - default: - nxt_thread_log_error(NXT_LOG_WARN, "req %08uxD to conn remove failed", - req_id); - break; - } - - return NULL; -} - - #if (NXT_DEBUG) void nxt_event_engine_thread_adopt(nxt_event_engine_t *engine) diff --git a/src/nxt_event_engine.h b/src/nxt_event_engine.h index 6c0adafd..57776a06 100644 --- a/src/nxt_event_engine.h +++ b/src/nxt_event_engine.h @@ -491,7 +491,6 @@ struct nxt_event_engine_s { nxt_queue_t joints; nxt_queue_t listen_connections; nxt_queue_t idle_connections; - nxt_lvlhsh_t requests; /* req_id to nxt_req_conn_link_t */ nxt_queue_link_t link; // STUB: router link @@ -512,15 +511,6 @@ NXT_EXPORT void nxt_event_engine_post(nxt_event_engine_t *engine, NXT_EXPORT void nxt_event_engine_signal(nxt_event_engine_t *engine, nxt_uint_t signo); -NXT_EXPORT void nxt_event_engine_request_add(nxt_event_engine_t *engine, - nxt_req_conn_link_t *rc); -NXT_EXPORT nxt_req_conn_link_t *nxt_event_engine_request_find( - nxt_event_engine_t *engine, nxt_req_id_t req_id); -NXT_EXPORT void nxt_event_engine_request_remove(nxt_event_engine_t *engine, - nxt_req_conn_link_t *rc); -NXT_EXPORT nxt_req_conn_link_t *nxt_event_engine_request_find_remove( - nxt_event_engine_t *engine, nxt_req_id_t req_id); - nxt_inline nxt_event_engine_t * nxt_thread_event_engine(void) diff --git a/src/nxt_main_process.c b/src/nxt_main_process.c index 7748d07f..2e91628b 100644 --- a/src/nxt_main_process.c +++ b/src/nxt_main_process.c @@ -171,11 +171,14 @@ nxt_port_main_start_worker_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg) nxt_mp_t *mp; nxt_int_t ret; nxt_buf_t *b; + nxt_port_t *port; nxt_conf_value_t *conf; nxt_common_app_conf_t app_conf; static nxt_str_t nobody = nxt_string("nobody"); + ret = NXT_ERROR; + b = msg->buf; nxt_debug(task, "main start worker: %*s", b->mem.free - b->mem.pos, @@ -196,7 +199,8 @@ nxt_port_main_start_worker_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg) if (conf == NULL) { nxt_log(task, NXT_LOG_CRIT, "configuration parsing error"); - return; + + goto failed; } app_conf.user = nobody; @@ -205,12 +209,24 @@ nxt_port_main_start_worker_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg) nxt_nitems(nxt_common_app_conf), &app_conf); if (ret != NXT_OK) { nxt_log(task, NXT_LOG_CRIT, "root map error"); - return; + + goto failed; } ret = nxt_main_start_worker_process(task, task->thread->runtime, &app_conf, msg->port_msg.stream); +failed: + + if (ret == NXT_ERROR) { + port = nxt_runtime_port_find(task->thread->runtime, msg->port_msg.pid, + msg->port_msg.reply_port); + if (nxt_fast_path(port != NULL)) { + nxt_port_socket_write(task, port, NXT_PORT_MSG_RPC_ERROR, + -1, msg->port_msg.stream, 0, NULL); + } + } + nxt_mp_destroy(mp); } diff --git a/src/nxt_port.c b/src/nxt_port.c index 8adbcde4..eef8873b 100644 --- a/src/nxt_port.c +++ b/src/nxt_port.c @@ -27,7 +27,7 @@ nxt_port_mp_cleanup(nxt_task_t *task, void *obj, void *data) nxt_assert(port->pair[0] == -1); nxt_assert(port->pair[1] == -1); - nxt_assert(port->app_req_id == 0); + nxt_assert(port->app_stream == 0); nxt_assert(port->app_link.next == NULL); nxt_assert(nxt_queue_is_empty(&port->messages)); @@ -58,7 +58,6 @@ nxt_port_new(nxt_task_t *task, nxt_port_id_t id, nxt_pid_t pid, port->pid = pid; port->type = type; port->mem_pool = mp; - port->next_stream = 1; nxt_mp_cleanup(mp, nxt_port_mp_cleanup, task, port, mp); diff --git a/src/nxt_port.h b/src/nxt_port.h index a338a8c4..1e501390 100644 --- a/src/nxt_port.h +++ b/src/nxt_port.h @@ -106,7 +106,7 @@ struct nxt_port_s { uint32_t max_size; /* Maximum interleave of message parts. */ uint32_t max_share; - uint32_t app_req_id; + uint32_t app_stream; nxt_port_handler_t handler; nxt_port_handler_t *data; @@ -122,7 +122,6 @@ struct nxt_port_s { nxt_lvlhsh_t rpc_streams; /* stream to nxt_port_rpc_reg_t */ nxt_lvlhsh_t rpc_peers; /* peer to queue of nxt_port_rpc_reg_t */ - uint32_t next_stream; nxt_process_type_t type; nxt_work_t work; diff --git a/src/nxt_port_rpc.c b/src/nxt_port_rpc.c index c5821439..04de7775 100644 --- a/src/nxt_port_rpc.c +++ b/src/nxt_port_rpc.c @@ -8,6 +8,8 @@ #include <nxt_port_rpc.h> +static nxt_atomic_t nxt_stream_ident = 1; + typedef struct nxt_port_rpc_reg_s nxt_port_rpc_reg_t; struct nxt_port_rpc_reg_s { @@ -15,6 +17,7 @@ struct nxt_port_rpc_reg_s { nxt_pid_t peer; nxt_queue_link_t link; + nxt_bool_t link_first; nxt_port_rpc_handler_t ready_handler; nxt_port_rpc_handler_t error_handler; @@ -62,30 +65,57 @@ nxt_port_rpc_register_handler(nxt_task_t *task, nxt_port_t *port, nxt_port_rpc_handler_t ready_handler, nxt_port_rpc_handler_t error_handler, nxt_pid_t peer, void *data) { + void *ex; + nxt_port_rpc_reg_t *reg; + + ex = nxt_port_rpc_register_handler_ex(task, port, ready_handler, + error_handler, 0); + + if (ex == NULL) { + return 0; + } + + if (peer != -1) { + nxt_port_rpc_ex_set_peer(task, port, ex, peer); + } + + reg = nxt_pointer_to(ex, -sizeof(nxt_port_rpc_reg_t)); + + nxt_assert(reg->data == ex); + + reg->data = data; + + return reg->stream; +} + + +void * +nxt_port_rpc_register_handler_ex(nxt_task_t *task, nxt_port_t *port, + nxt_port_rpc_handler_t ready_handler, nxt_port_rpc_handler_t error_handler, + size_t ex_size) +{ uint32_t stream; - nxt_queue_link_t *peer_link; nxt_port_rpc_reg_t *reg; nxt_lvlhsh_query_t lhq; nxt_assert(port->pair[0] != -1); - stream = port->next_stream++; + stream = + (uint32_t) nxt_atomic_fetch_add(&nxt_stream_ident, 1) & 0x3fffffff; - reg = nxt_mp_zalloc(port->mem_pool, sizeof(nxt_port_rpc_reg_t)); + reg = nxt_mp_zalloc(port->mem_pool, sizeof(nxt_port_rpc_reg_t) + ex_size); if (nxt_slow_path(reg == NULL)) { - nxt_log_error(NXT_LOG_ERR, task->log, "rpc: failed to allocate " - "reg for stream #%uD", stream); + nxt_debug(task, "rpc: stream #%uD failed to allocate reg", stream); - return 0; + return NULL; } reg->stream = stream; - reg->peer = peer; + reg->peer = -1; reg->ready_handler = ready_handler; reg->error_handler = error_handler; - reg->data = data; - + reg->data = reg + 1; nxt_port_rpc_lhq_stream(&lhq, &stream); lhq.replace = 0; @@ -98,39 +128,140 @@ nxt_port_rpc_register_handler(nxt_task_t *task, nxt_port_t *port, break; default: - nxt_log_error(NXT_LOG_ERR, task->log, "rpc: failed to add handler " - "for stream #%uD", stream); + nxt_log_error(NXT_LOG_ERR, task->log, "rpc: stream #%uD failed to add " + "reg ", stream); nxt_mp_free(port->mem_pool, reg); - return 0; + return NULL; } - if (peer != -1) { - nxt_port_rpc_lhq_peer(&lhq, &peer); - lhq.replace = 0; - lhq.value = ®->link; + nxt_debug(task, "rpc: stream #%uD registered", stream); + + return reg->data; +} + + +uint32_t +nxt_port_rpc_ex_stream(void *ex) +{ + nxt_port_rpc_reg_t *reg; + + reg = nxt_pointer_to(ex, -sizeof(nxt_port_rpc_reg_t)); + + nxt_assert(reg->data == ex); + + return reg->stream; +} + + +void +nxt_port_rpc_ex_set_peer(nxt_task_t *task, nxt_port_t *port, + void *ex, nxt_pid_t peer) +{ + nxt_int_t ret; + nxt_queue_link_t *peer_link; + nxt_port_rpc_reg_t *reg; + nxt_lvlhsh_query_t lhq; + + reg = nxt_pointer_to(ex, -sizeof(nxt_port_rpc_reg_t)); + + nxt_assert(reg->data == ex); + + if (peer == -1 || reg->peer != -1) { + nxt_log_error(NXT_LOG_ERR, task->log, "rpc: stream #%uD failed to " + "change peer %PI->%PI", reg->stream, reg->peer, peer); + + return; + } + + reg->peer = peer; + + nxt_port_rpc_lhq_peer(&lhq, &peer); + lhq.replace = 0; + lhq.value = ®->link; + lhq.pool = port->mem_pool; + + ret = nxt_lvlhsh_insert(&port->rpc_peers, &lhq); + + switch (ret) { + + case NXT_OK: + reg->link_first = 1; + nxt_queue_self(®->link); + + nxt_debug(task, "rpc: stream #%uD assigned uniq pid %PI (%p)", + reg->stream, reg->peer, reg->link.next); + break; + + case NXT_DECLINED: + reg->link_first = 0; + peer_link = lhq.value; + nxt_queue_insert_after(peer_link, ®->link); + + nxt_debug(task, "rpc: stream #%uD assigned duplicate pid %PI (%p)", + reg->stream, reg->peer, reg->link.next); + break; + + default: + nxt_log_error(NXT_LOG_ERR, task->log, "rpc: stream #%uD failed to add " + "peer for stream #%uD (%d)", reg->stream, ret); + + reg->peer = -1; + break; + } + +} + + +static void +nxt_port_rpc_remove_from_peers(nxt_task_t *task, nxt_port_t *port, + nxt_port_rpc_reg_t *reg) +{ + uint32_t stream; + nxt_int_t ret; + nxt_lvlhsh_query_t lhq; + nxt_port_rpc_reg_t *r; + + stream = reg->stream; + + if (reg->link_first != 0) { + nxt_port_rpc_lhq_peer(&lhq, ®->peer); lhq.pool = port->mem_pool; - switch (nxt_lvlhsh_insert(&port->rpc_peers, &lhq)) { + if (reg->link.next == ®->link) { + nxt_assert(reg->link.prev == ®->link); - case NXT_OK: - nxt_queue_self(®->link); - break; + nxt_debug(task, "rpc: stream #%uD remove first and last pid %PI " + "registration (%p)", stream, reg->peer, reg->link.next); - case NXT_DECLINED: - peer_link = lhq.value; - nxt_queue_insert_before(peer_link, ®->link); - break; + ret = nxt_lvlhsh_delete(&port->rpc_peers, &lhq); + } else { + nxt_debug(task, "rpc: stream #%uD remove first pid %PI " + "registration (%p)", stream, reg->peer, reg->link.next); + + lhq.replace = 1; + lhq.value = reg->link.next; - default: - nxt_log_error(NXT_LOG_ERR, task->log, "rpc: failed to add peer " - "for stream #%uD", stream); - break; + r = nxt_queue_link_data(reg->link.next, nxt_port_rpc_reg_t, link); + r->link_first = 1; + + nxt_queue_remove(®->link); + + ret = nxt_lvlhsh_insert(&port->rpc_peers, &lhq); } + } else { + nxt_debug(task, "rpc: stream #%uD remove pid %PI " + "registration (%p)", stream, reg->peer, reg->link.next); + + nxt_queue_remove(®->link); + ret = NXT_OK; } - return stream; + if (nxt_slow_path(ret != NXT_OK)) { + nxt_log_error(NXT_LOG_ERR, task->log, "rpc: stream #%uD failed" + " to delete peer %PI (%d)", stream, reg->peer, ret); + } } @@ -150,8 +281,6 @@ nxt_port_rpc_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg) last = msg->port_msg.last; type = msg->port_msg.type; - nxt_debug(task, "rpc: handler for stream #%uD, type %d", stream, type); - nxt_port_rpc_lhq_stream(&lhq, &stream); lhq.pool = port->mem_pool; @@ -163,11 +292,14 @@ nxt_port_rpc_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg) } if (ret != NXT_OK) { - nxt_debug(task, "rpc: no handler found for stream #%uD", stream); + nxt_debug(task, "rpc: stream #%uD no handler found", stream); return; } + nxt_debug(task, "rpc: stream #%uD %shandler, type %d", stream, + (last ? "last " : ""), type); + reg = lhq.value; if (reg->peer != -1) { @@ -182,28 +314,15 @@ nxt_port_rpc_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg) } if (last == 0) { - nxt_debug(task, "rpc: keep handler for stream #%uD", stream); - return; } if (reg->peer != -1) { - if (reg->link.next == ®->link) { - nxt_port_rpc_lhq_peer(&lhq, ®->peer); - lhq.pool = port->mem_pool; - - ret = nxt_lvlhsh_delete(&port->rpc_peers, &lhq); - - if (nxt_slow_path(ret != NXT_OK)) { - nxt_log_error(NXT_LOG_ERR, task->log, "rpc: failed to delete " - "peer %PI", reg->peer); - } - - } else { - nxt_queue_remove(®->link); - } + nxt_port_rpc_remove_from_peers(task, port, reg); } + nxt_debug(task, "rpc: stream #%uD free registration", stream); + nxt_mp_free(port->mem_pool, reg); } @@ -215,7 +334,7 @@ nxt_port_rpc_remove_peer(nxt_task_t *task, nxt_port_t *port, nxt_pid_t peer) uint32_t stream; nxt_int_t ret; nxt_buf_t buf; - nxt_queue_link_t *peer_link; + nxt_queue_link_t *peer_link, *next_link; nxt_port_rpc_reg_t *reg; nxt_lvlhsh_query_t lhq; nxt_port_recv_msg_t msg; @@ -226,7 +345,7 @@ nxt_port_rpc_remove_peer(nxt_task_t *task, nxt_port_t *port, nxt_pid_t peer) ret = nxt_lvlhsh_delete(&port->rpc_peers, &lhq); if (nxt_slow_path(ret != NXT_OK)) { - nxt_debug(task, "rpc: no handler found for peer %PI", peer); + nxt_debug(task, "rpc: no reg found for peer %PI", peer); return; } @@ -249,14 +368,15 @@ nxt_port_rpc_remove_peer(nxt_task_t *task, nxt_port_t *port, nxt_pid_t peer) reg = nxt_queue_link_data(peer_link, nxt_port_rpc_reg_t, link); - nxt_debug(task, "rpc: trigger error for stream #%uD", reg->stream); + nxt_assert(reg->peer == peer); - msg.port_msg.stream = reg->stream; + stream = reg->stream; - reg->error_handler(task, &msg, reg->data); + nxt_debug(task, "rpc: stream #%uD trigger error", stream); + msg.port_msg.stream = stream; - stream = reg->stream; + reg->error_handler(task, &msg, reg->data); nxt_port_rpc_lhq_stream(&lhq, &stream); lhq.pool = port->mem_pool; @@ -265,18 +385,24 @@ nxt_port_rpc_remove_peer(nxt_task_t *task, nxt_port_t *port, nxt_pid_t peer) if (nxt_slow_path(ret != NXT_OK)) { nxt_log_error(NXT_LOG_ERR, task->log, - "rpc: failed to delete handler for stream #%uD", - stream); + "rpc: stream #%uD failed to delete handler", stream); return; } if (peer_link == peer_link->next) { + nxt_assert(peer_link->prev == peer_link); + last = 1; } else { - peer_link = peer_link->next; - nxt_queue_remove(peer_link->prev); + nxt_assert(peer_link->next->prev == peer_link); + nxt_assert(peer_link->prev->next == peer_link); + + next_link = peer_link->next; + nxt_queue_remove(peer_link); + + peer_link = next_link; } nxt_mp_free(port->mem_pool, reg); @@ -297,7 +423,7 @@ nxt_port_rpc_cancel(nxt_task_t *task, nxt_port_t *port, uint32_t stream) ret = nxt_lvlhsh_delete(&port->rpc_streams, &lhq); if (ret != NXT_OK) { - nxt_debug(task, "rpc: no handler found for stream %uxD", stream); + nxt_debug(task, "rpc: stream #%uD no handler found", stream); return; } @@ -305,21 +431,10 @@ nxt_port_rpc_cancel(nxt_task_t *task, nxt_port_t *port, uint32_t stream) reg = lhq.value; if (reg->peer != -1) { - if (reg->link.next == ®->link) { - nxt_port_rpc_lhq_peer(&lhq, ®->peer); - lhq.pool = port->mem_pool; - - ret = nxt_lvlhsh_delete(&port->rpc_peers, &lhq); - - if (nxt_slow_path(ret != NXT_OK)) { - nxt_log_error(NXT_LOG_ERR, task->log, - "rpc: failed to delete peer %PI", reg->peer); - } - - } else { - nxt_queue_remove(®->link); - } + nxt_port_rpc_remove_from_peers(task, port, reg); } + nxt_debug(task, "rpc: stream #%uD cancel registration", stream); + nxt_mp_free(port->mem_pool, reg); } diff --git a/src/nxt_port_rpc.h b/src/nxt_port_rpc.h index cae20539..2152e68d 100644 --- a/src/nxt_port_rpc.h +++ b/src/nxt_port_rpc.h @@ -14,6 +14,14 @@ typedef void (*nxt_port_rpc_handler_t)(nxt_task_t *task, uint32_t nxt_port_rpc_register_handler(nxt_task_t *task, nxt_port_t *port, nxt_port_rpc_handler_t ready_handler, nxt_port_rpc_handler_t error_handler, nxt_pid_t peer, void *data); +void *nxt_port_rpc_register_handler_ex(nxt_task_t *task, nxt_port_t *port, + nxt_port_rpc_handler_t ready_handler, nxt_port_rpc_handler_t error_handler, + size_t ex_size); + +uint32_t nxt_port_rpc_ex_stream(void *ex); +void nxt_port_rpc_ex_set_peer(nxt_task_t *task, nxt_port_t *port, + void *ex, nxt_pid_t peer); + void nxt_port_rpc_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg); void nxt_port_rpc_remove_peer(nxt_task_t *task, nxt_port_t *port, nxt_pid_t peer); diff --git a/src/nxt_router.c b/src/nxt_router.c index ae569267..7357667d 100644 --- a/src/nxt_router.c +++ b/src/nxt_router.c @@ -10,8 +10,11 @@ typedef struct { - nxt_str_t type; - uint32_t workers; + nxt_str_t type; + uint32_t workers; + nxt_msec_t timeout; + uint32_t requests; + nxt_conf_value_t *limits_value; } nxt_router_app_conf_t; @@ -31,9 +34,20 @@ struct nxt_start_worker_s { }; +typedef struct { + uint32_t stream; + nxt_conn_t *conn; + nxt_port_t *app_port; + nxt_req_app_link_t *ra; + + nxt_queue_link_t link; /* for nxt_conn_t.requests */ +} nxt_req_conn_link_t; + + struct nxt_req_app_link_s { - nxt_req_id_t req_id; + uint32_t stream; nxt_port_t *app_port; + nxt_pid_t app_pid; nxt_port_t *reply_port; nxt_app_parse_ctx_t *ap; nxt_req_conn_link_t *rc; @@ -51,6 +65,18 @@ typedef struct { } nxt_socket_rpc_t; +typedef struct { + nxt_mp_t *mem_pool; + nxt_port_recv_msg_t msg; + nxt_work_t work; +} nxt_remove_pid_msg_t; + + +static void nxt_router_worker_remove_pid_handler(nxt_task_t *task, void *obj, + void *data); +static void nxt_router_worker_remove_pid_done(nxt_task_t *task, void *obj, + void *data); + static nxt_router_temp_conf_t *nxt_router_temp_conf(nxt_task_t *task); static void nxt_router_conf_apply(nxt_task_t *task, void *obj, void *data); static void nxt_router_conf_ready(nxt_task_t *task, @@ -105,8 +131,6 @@ static void nxt_router_engines_post(nxt_router_t *router, nxt_router_temp_conf_t *tmcf); static void nxt_router_engine_post(nxt_event_engine_t *engine, nxt_work_t *jobs); -static void nxt_router_app_data_handler(nxt_task_t *task, - nxt_port_recv_msg_t *msg); static void nxt_router_thread_start(void *data); static void nxt_router_listen_socket_create(nxt_task_t *task, void *obj, @@ -129,7 +153,7 @@ static void nxt_router_conf_release(nxt_task_t *task, static void nxt_router_send_sw_request(nxt_task_t *task, void *obj, void *data); static nxt_bool_t nxt_router_app_free(nxt_task_t *task, nxt_app_t *app); -static nxt_port_t * nxt_router_app_get_port(nxt_app_t *app, uint32_t req_id); +static nxt_port_t * nxt_router_app_get_port(nxt_app_t *app, uint32_t stream); static void nxt_router_app_release_port(nxt_task_t *task, void *obj, void *data); @@ -153,6 +177,7 @@ static void nxt_router_conn_close(nxt_task_t *task, void *obj, void *data); static void nxt_router_conn_free(nxt_task_t *task, void *obj, void *data); static void nxt_router_conn_error(nxt_task_t *task, void *obj, void *data); static void nxt_router_conn_timeout(nxt_task_t *task, void *obj, void *data); +static void nxt_router_app_timeout(nxt_task_t *task, void *obj, void *data); static nxt_msec_t nxt_router_conn_timeout_value(nxt_conn_t *c, uintptr_t data); static void nxt_router_gen_error(nxt_task_t *task, nxt_conn_t *c, int code, @@ -213,8 +238,8 @@ nxt_router_sw_create(nxt_task_t *task, nxt_app_t *app, nxt_req_app_link_t *ra) sw->app = app; sw->ra = ra; - nxt_debug(task, "sw %p create, request #%uxD, app '%V' %p", sw, - ra->req_id, &app->name, app); + nxt_debug(task, "sw %p create, stream #%uD, app '%V' %p", sw, + ra->stream, &app->name, app); rt = task->thread->runtime; main_port = rt->port_by_type[NXT_PROCESS_MAIN]; @@ -248,13 +273,29 @@ nxt_router_sw_release(nxt_task_t *task, nxt_start_worker_t *sw) } +nxt_inline void +nxt_router_rc_unlink(nxt_req_conn_link_t *rc) +{ + nxt_queue_remove(&rc->link); + + if (rc->ra != NULL) { + rc->ra->rc = NULL; + rc->ra = NULL; + } + + rc->conn = NULL; +} + + static nxt_req_app_link_t * nxt_router_ra_create(nxt_task_t *task, nxt_req_conn_link_t *rc) { nxt_mp_t *mp; + nxt_event_engine_t *engine; nxt_req_app_link_t *ra; mp = rc->conn->mem_pool; + engine = task->thread->engine; ra = nxt_mp_retain(mp, sizeof(nxt_req_app_link_t)); @@ -262,20 +303,22 @@ nxt_router_ra_create(nxt_task_t *task, nxt_req_conn_link_t *rc) return NULL; } - nxt_debug(task, "ra #%uxD create", rc->req_id); + nxt_debug(task, "ra stream #%uD create", rc->stream); nxt_memzero(ra, sizeof(nxt_req_app_link_t)); - ra->req_id = rc->req_id; - ra->app_port = NULL; + ra->stream = rc->stream; + ra->app_pid = -1; ra->rc = rc; + rc->ra = ra; + ra->reply_port = engine->port; ra->mem_pool = mp; ra->work.handler = NULL; - ra->work.task = &task->thread->engine->task; + ra->work.task = &engine->task; ra->work.obj = ra; - ra->work.data = task->thread->engine; + ra->work.data = engine; return ra; } @@ -284,39 +327,87 @@ nxt_router_ra_create(nxt_task_t *task, nxt_req_conn_link_t *rc) static void nxt_router_ra_release(nxt_task_t *task, void *obj, void *data) { + nxt_port_t *app_port; nxt_req_app_link_t *ra; nxt_event_engine_t *engine; ra = obj; engine = data; + if (ra->app_port != NULL) { + + app_port = ra->app_port; + ra->app_port = NULL; + + if (task->thread->engine != engine) { + ra->app_pid = app_port->pid; + } + + nxt_router_app_release_port(task, app_port, app_port->app); + +#if 0 + /* Uncomment to hold app port until complete response received. */ + if (ra->rc != NULL) { + ra->rc->app_port = ra->app_port; + + } else { + nxt_router_app_release_port(task, ra->app_port, ra->app_port->app); + } +#endif + } + if (task->thread->engine != engine) { ra->work.handler = nxt_router_ra_release; ra->work.task = &engine->task; ra->work.next = NULL; - nxt_debug(task, "ra #%uxD post release to %p", ra->req_id, engine); + nxt_debug(task, "ra stream #%uD post release to %p", + ra->stream, engine); nxt_event_engine_post(engine, &ra->work); return; } - nxt_debug(task, "ra #%uxD release", ra->req_id); + if (ra->rc != NULL && ra->app_pid != -1) { + nxt_port_rpc_ex_set_peer(task, engine->port, ra->rc, ra->app_pid); + } - if (ra->app_port != NULL) { + nxt_debug(task, "ra stream #%uD release", ra->stream); - nxt_router_app_release_port(task, ra->app_port, ra->app_port->app); + nxt_mp_release(ra->mem_pool, ra); +} -#if 0 - /* Uncomment to hold app port until complete response received. */ - if (ra->rc->conn != NULL) { - ra->rc->app_port = ra->app_port; - } else { - nxt_router_app_release_port(task, ra->app_port, ra->app_port->app); - } -#endif +static void +nxt_router_ra_abort(nxt_task_t *task, void *obj, void *data) +{ + nxt_conn_t *c; + nxt_req_app_link_t *ra; + nxt_event_engine_t *engine; + + ra = obj; + engine = data; + + if (task->thread->engine != engine) { + ra->work.handler = nxt_router_ra_abort; + ra->work.task = &engine->task; + ra->work.next = NULL; + + nxt_debug(task, "ra stream #%uD post abort to %p", ra->stream, engine); + + nxt_event_engine_post(engine, &ra->work); + + return; + } + + nxt_debug(task, "ra stream #%uD abort", ra->stream); + + if (ra->rc != NULL) { + c = ra->rc->conn; + + nxt_router_gen_error(task, c, 500, + "Failed to start application worker"); } nxt_mp_release(ra->mem_pool, ra); @@ -384,18 +475,83 @@ nxt_router_conf_data_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg) void nxt_router_remove_pid_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg) { + nxt_mp_t *mp; + nxt_buf_t *buf; + nxt_event_engine_t *engine; + nxt_remove_pid_msg_t *rp; + nxt_port_remove_pid_handler(task, msg); if (msg->port_msg.stream == 0) { return; } + mp = nxt_mp_create(1024, 128, 256, 32); + + buf = nxt_buf_mem_alloc(mp, nxt_buf_used_size(msg->buf), 0); + buf->mem.free = nxt_cpymem(buf->mem.free, msg->buf->mem.pos, + nxt_buf_used_size(msg->buf)); + + nxt_queue_each(engine, &nxt_router->engines, nxt_event_engine_t, link0) + { + rp = nxt_mp_retain(mp, sizeof(nxt_remove_pid_msg_t)); + + rp->mem_pool = mp; + + rp->msg.fd = msg->fd; + rp->msg.buf = buf; + rp->msg.port = engine->port; + rp->msg.port_msg = msg->port_msg; + rp->msg.size = msg->size; + rp->msg.new_port = NULL; + + rp->work.handler = nxt_router_worker_remove_pid_handler; + rp->work.task = &engine->task; + rp->work.obj = rp; + rp->work.data = task->thread->engine; + rp->work.next = NULL; + + nxt_event_engine_post(engine, &rp->work); + } + nxt_queue_loop; + msg->port_msg.type = _NXT_PORT_MSG_RPC_ERROR; nxt_port_rpc_handler(task, msg); } +static void +nxt_router_worker_remove_pid_handler(nxt_task_t *task, void *obj, void *data) +{ + nxt_event_engine_t *engine; + nxt_remove_pid_msg_t *rp; + + rp = obj; + + nxt_port_remove_pid_handler(task, &rp->msg); + + engine = rp->work.data; + + rp->work.handler = nxt_router_worker_remove_pid_done; + rp->work.task = &engine->task; + rp->work.next = NULL; + + nxt_event_engine_post(engine, &rp->work); +} + + +static void +nxt_router_worker_remove_pid_done(nxt_task_t *task, void *obj, void *data) +{ + nxt_remove_pid_msg_t *rp; + + rp = obj; + + nxt_mp_release(rp->mem_pool, rp); +} + + static nxt_router_temp_conf_t * nxt_router_temp_conf(nxt_task_t *task) { @@ -607,6 +763,27 @@ static nxt_conf_map_t nxt_router_app_conf[] = { NXT_CONF_MAP_INT32, offsetof(nxt_router_app_conf_t, workers), }, + + { + nxt_string("limits"), + NXT_CONF_MAP_PTR, + offsetof(nxt_router_app_conf_t, limits_value), + }, +}; + + +static nxt_conf_map_t nxt_router_app_limits_conf[] = { + { + nxt_string("timeout"), + NXT_CONF_MAP_MSEC, + offsetof(nxt_router_app_conf_t, timeout), + }, + + { + nxt_string("requests"), + NXT_CONF_MAP_INT32, + offsetof(nxt_router_app_conf_t, requests), + }, }; @@ -754,6 +931,9 @@ nxt_router_conf_create(nxt_task_t *task, nxt_router_temp_conf_t *tmcf, } apcf.workers = 1; + apcf.timeout = 0; + apcf.requests = 0; + apcf.limits_value = NULL; ret = nxt_conf_map_object(mp, application, nxt_router_app_conf, nxt_nitems(nxt_router_app_conf), &apcf); @@ -762,8 +942,27 @@ nxt_router_conf_create(nxt_task_t *task, nxt_router_temp_conf_t *tmcf, goto app_fail; } + if (apcf.limits_value != NULL) { + + if (nxt_conf_type(apcf.limits_value) != NXT_CONF_OBJECT) { + nxt_log(task, NXT_LOG_CRIT, "application limits is not object"); + goto app_fail; + } + + ret = nxt_conf_map_object(mp, apcf.limits_value, + nxt_router_app_limits_conf, + nxt_nitems(nxt_router_app_limits_conf), + &apcf); + if (ret != NXT_OK) { + nxt_log(task, NXT_LOG_CRIT, "application limits map error"); + goto app_fail; + } + } + nxt_debug(task, "application type: %V", &apcf.type); nxt_debug(task, "application workers: %D", apcf.workers); + nxt_debug(task, "application timeout: %D", apcf.timeout); + nxt_debug(task, "application requests: %D", apcf.requests); lang = nxt_app_lang_module(task->thread->runtime, &apcf.type); @@ -802,6 +1001,7 @@ nxt_router_conf_create(nxt_task_t *task, nxt_router_temp_conf_t *tmcf, app->type = type; app->max_workers = apcf.workers; + app->timeout = apcf.timeout; app->live = 1; app->prepare_msg = nxt_app_prepare_msg[type]; @@ -1589,7 +1789,7 @@ static nxt_port_handler_t nxt_router_app_port_handlers[] = { NULL, /* NXT_PORT_MSG_CHANGE_FILE */ /* TODO: remove mmap_handler from app ports */ nxt_port_mmap_handler, /* NXT_PORT_MSG_MMAP */ - nxt_router_app_data_handler, + nxt_port_rpc_handler, /* NXT_PORT_MSG_DATA */ NULL, /* NXT_PORT_MSG_REMOVE_PID */ NULL, /* NXT_PORT_MSG_READY */ NULL, /* NXT_PORT_MSG_START_WORKER */ @@ -2008,23 +2208,16 @@ static const nxt_conn_state_t nxt_router_conn_write_state static void -nxt_router_app_data_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg) +nxt_router_response_ready_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg, + void *data) { size_t dump_size; nxt_buf_t *b, *last; nxt_conn_t *c; nxt_req_conn_link_t *rc; - nxt_event_engine_t *engine; b = msg->buf; - engine = task->thread->engine; - - rc = nxt_event_engine_request_find(engine, msg->port_msg.stream); - if (nxt_slow_path(rc == NULL)) { - nxt_debug(task, "request id %08uxD not found", msg->port_msg.stream); - - return; - } + rc = data; c = rc->conn; @@ -2058,7 +2251,7 @@ nxt_router_app_data_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg) rc->app_port = NULL; } - rc->conn = NULL; + nxt_router_rc_unlink(rc); } if (b == NULL) { @@ -2084,6 +2277,21 @@ nxt_router_app_data_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg) } +static void +nxt_router_response_error_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg, + void *data) +{ + nxt_req_conn_link_t *rc; + + rc = data; + + nxt_router_gen_error(task, rc->conn, 500, + "Application terminated unexpectedly"); + + nxt_router_rc_unlink(rc); +} + + nxt_inline const char * nxt_router_text_by_code(int code) { @@ -2147,20 +2355,21 @@ nxt_router_gen_error(nxt_task_t *task, nxt_conn_t *c, int code, const char* fmt, ...) { va_list args; + nxt_mp_t *mp; nxt_buf_t *b; + /* TODO: fix when called from main thread */ + /* TODO: fix when called in the middle of response */ + + mp = nxt_mp_create(1024, 128, 256, 32); + va_start(args, fmt); - b = nxt_router_get_error_buf(task, c->mem_pool, code, fmt, args); + b = nxt_router_get_error_buf(task, mp, code, fmt, args); va_end(args); - if (c->socket.data != NULL) { - nxt_mp_free(c->mem_pool, c->socket.data); - c->socket.data = NULL; - } - if (c->socket.fd == -1) { - nxt_mp_release(c->mem_pool, b->next); - nxt_mp_release(c->mem_pool, b); + nxt_mp_release(mp, b->next); + nxt_mp_release(mp, b); return; } @@ -2204,17 +2413,35 @@ nxt_router_sw_ready(nxt_task_t *task, nxt_port_recv_msg_t *msg, void *data) static void nxt_router_sw_error(nxt_task_t *task, nxt_port_recv_msg_t *msg, void *data) { + nxt_app_t *app; + nxt_queue_link_t *lnk; + nxt_req_app_link_t *ra; nxt_start_worker_t *sw; sw = data; nxt_assert(sw != NULL); + nxt_assert(sw->app != NULL); nxt_assert(sw->app->pending_workers != 0); + app = sw->app; + sw->app->pending_workers--; nxt_debug(task, "sw %p error, failed to start app '%V'", - sw, &sw->app->name); + sw, &app->name); + + if (!nxt_queue_is_empty(&app->requests)) { + lnk = nxt_queue_last(&app->requests); + nxt_queue_remove(lnk); + + ra = nxt_queue_link_data(lnk, nxt_req_app_link_t, link); + + nxt_debug(task, "app '%V' %p abort next stream #%uD", + &app->name, app, ra->stream); + + nxt_router_ra_abort(task, ra, ra->work.data); + } nxt_router_sw_release(task, sw); } @@ -2237,11 +2464,11 @@ nxt_router_send_sw_request(nxt_task_t *task, void *obj, void *data) if (nxt_queue_is_empty(&app->requests)) { ra = sw->ra; - app_port = nxt_router_app_get_port(app, ra->req_id); + app_port = nxt_router_app_get_port(app, ra->stream); if (app_port != NULL) { - nxt_debug(task, "app '%V' %p process request #%uxD", - &app->name, app, ra->req_id); + nxt_debug(task, "app '%V' %p process stream #%uD", + &app->name, app, ra->stream); ra->app_port = app_port; @@ -2330,7 +2557,7 @@ nxt_router_app_free(nxt_task_t *task, nxt_app_t *app) static nxt_port_t * -nxt_router_app_get_port(nxt_app_t *app, uint32_t req_id) +nxt_router_app_get_port(nxt_app_t *app, uint32_t stream) { nxt_port_t *port; nxt_queue_link_t *lnk; @@ -2347,7 +2574,7 @@ nxt_router_app_get_port(nxt_app_t *app, uint32_t req_id) port = nxt_queue_link_data(lnk, nxt_port_t, app_link); - port->app_req_id = req_id; + port->app_stream = stream; } nxt_thread_mutex_unlock(&app->mutex); @@ -2395,11 +2622,11 @@ nxt_router_app_release_port(nxt_task_t *task, void *obj, void *data) ra = nxt_queue_link_data(lnk, nxt_req_app_link_t, link); - nxt_debug(task, "app '%V' %p process next request #%uxD", - &app->name, app, ra->req_id); + nxt_debug(task, "app '%V' %p process next stream #%uD", + &app->name, app, ra->stream); ra->app_port = port; - port->app_req_id = ra->req_id; + port->app_stream = ra->stream; nxt_router_process_http_request_mp(task, ra, port); @@ -2408,7 +2635,7 @@ nxt_router_app_release_port(nxt_task_t *task, void *obj, void *data) return; } - port->app_req_id = 0; + port->app_stream = 0; if (port->pair[1] == -1) { nxt_debug(task, "app '%V' %p port already closed (pid %PI dead?)", @@ -2452,7 +2679,7 @@ nxt_router_app_remove_port(nxt_port_t *port) nxt_bool_t busy; app = port->app; - busy = port->app_req_id != 0; + busy = port->app_stream != 0; if (app == NULL) { nxt_thread_log_debug("port %p app remove, no app", port); @@ -2483,8 +2710,9 @@ nxt_router_app_remove_port(nxt_port_t *port) return 1; } - nxt_thread_log_debug("port %p app remove, busy, app '%V' %p, req #%uxD", - port, &app->name, app, port->app_req_id); + nxt_thread_log_debug("port %p app remove, busy, app '%V' %p, " + "app stream #%uD", port, &app->name, app, + port->app_stream); return 0; } @@ -2496,6 +2724,7 @@ nxt_router_app_port(nxt_task_t *task, nxt_req_app_link_t *ra) nxt_app_t *app; nxt_conn_t *c; nxt_port_t *port; + nxt_event_engine_t *engine; nxt_start_worker_t *sw; nxt_socket_conf_joint_t *joint; @@ -2511,8 +2740,16 @@ nxt_router_app_port(nxt_task_t *task, nxt_req_app_link_t *ra) return NXT_ERROR; } + engine = task->thread->engine; - port = nxt_router_app_get_port(app, ra->req_id); + nxt_timer_disable(engine, &c->read_timer); + + if (app->timeout != 0) { + c->read_timer.handler = nxt_router_app_timeout; + nxt_timer_add(engine, &c->read_timer, app->timeout); + } + + port = nxt_router_app_get_port(app, ra->stream); if (port != NULL) { nxt_debug(task, "already have port for app '%V'", &app->name); @@ -2740,18 +2977,16 @@ nxt_router_process_http_request(nxt_task_t *task, nxt_conn_t *c, nxt_mp_t *port_mp; nxt_int_t res; nxt_port_t *port; - nxt_req_id_t req_id; nxt_event_engine_t *engine; nxt_req_app_link_t *ra; nxt_req_conn_link_t *rc; engine = task->thread->engine; - do { - req_id = nxt_random(&task->thread->random); - } while (nxt_event_engine_request_find(engine, req_id) != NULL); - - rc = nxt_conn_request_add(c, req_id); + rc = nxt_port_rpc_register_handler_ex(task, engine->port, + nxt_router_response_ready_handler, + nxt_router_response_error_handler, + sizeof(nxt_req_conn_link_t)); if (nxt_slow_path(rc == NULL)) { nxt_router_gen_error(task, c, 500, "Failed to allocate " @@ -2760,17 +2995,19 @@ nxt_router_process_http_request(nxt_task_t *task, nxt_conn_t *c, return; } - nxt_event_engine_request_add(engine, rc); + rc->stream = nxt_port_rpc_ex_stream(rc); + rc->conn = c; - nxt_debug(task, "req_id %uxD linked to conn %p at engine %p", - req_id, c, engine); + nxt_queue_insert_tail(&c->requests, &rc->link); + + nxt_debug(task, "stream #%uD linked to conn %p at engine %p", + rc->stream, c, engine); c->socket.data = NULL; ra = nxt_router_ra_create(task, rc); ra->ap = ap; - ra->reply_port = engine->port; res = nxt_router_app_port(task, ra); @@ -2781,10 +3018,12 @@ nxt_router_process_http_request(nxt_task_t *task, nxt_conn_t *c, port = ra->app_port; if (nxt_slow_path(port == NULL)) { - nxt_router_gen_error(task, rc->conn, 500, "Application port not found"); + nxt_router_gen_error(task, c, 500, "Application port not found"); return; } + nxt_port_rpc_ex_set_peer(task, engine->port, rc, port->pid); + port_mp = port->mem_pool; port->mem_pool = c->mem_pool; @@ -2792,7 +3031,6 @@ nxt_router_process_http_request(nxt_task_t *task, nxt_conn_t *c, port->mem_pool = port_mp; - nxt_router_ra_release(task, ra, ra->work.data); } @@ -2807,6 +3045,10 @@ nxt_router_process_http_request_mp(nxt_task_t *task, nxt_req_app_link_t *ra, nxt_app_wmsg_t wmsg; nxt_app_parse_ctx_t *ap; + /* TODO: it is unsafe to use ra->rc and ra->rc->conn in main thread */ + + nxt_assert(ra->rc != NULL); + reply_port = ra->reply_port; ap = ra->ap; c = ra->rc->conn; @@ -2828,7 +3070,7 @@ nxt_router_process_http_request_mp(nxt_task_t *task, nxt_req_app_link_t *ra, wmsg.port = port; wmsg.write = NULL; wmsg.buf = &wmsg.write; - wmsg.stream = ra->req_id; + wmsg.stream = ra->stream; res = port->app->prepare_msg(task, &ap->r, &wmsg); @@ -2843,7 +3085,7 @@ nxt_router_process_http_request_mp(nxt_task_t *task, nxt_req_app_link_t *ra, wmsg.port->socket.fd); res = nxt_port_socket_write(task, wmsg.port, NXT_PORT_MSG_DATA, - -1, ra->req_id, reply_port->id, wmsg.write); + -1, ra->stream, reply_port->id, wmsg.write); if (nxt_slow_path(res != NXT_OK)) { nxt_router_gen_error(task, c, 500, @@ -3217,7 +3459,7 @@ nxt_router_conn_free(nxt_task_t *task, void *obj, void *data) nxt_queue_each(rc, &c->requests, nxt_req_conn_link_t, link) { - nxt_debug(task, "conn %p close, req %uxD", c, rc->req_id); + nxt_debug(task, "conn %p close, stream #%uD", c, rc->stream); if (rc->app_port != NULL) { nxt_router_app_release_port(task, rc->app_port, rc->app_port->app); @@ -3225,9 +3467,9 @@ nxt_router_conn_free(nxt_task_t *task, void *obj, void *data) rc->app_port = NULL; } - rc->conn = NULL; + nxt_router_rc_unlink(rc); - nxt_event_engine_request_remove(task->thread->engine, rc); + nxt_port_rpc_cancel(task, task->thread->engine->port, rc->stream); } nxt_queue_loop; @@ -3281,6 +3523,22 @@ nxt_router_conn_timeout(nxt_task_t *task, void *obj, void *data) } +static void +nxt_router_app_timeout(nxt_task_t *task, void *obj, void *data) +{ + nxt_conn_t *c; + nxt_timer_t *timer; + + timer = obj; + + nxt_debug(task, "router app timeout"); + + c = nxt_read_timer_conn(timer); + + nxt_router_gen_error(task, c, 408, "Application timeout"); +} + + static nxt_msec_t nxt_router_conn_timeout_value(nxt_conn_t *c, uintptr_t data) { diff --git a/src/nxt_router.h b/src/nxt_router.h index b4560a37..f5c5f7aa 100644 --- a/src/nxt_router.h +++ b/src/nxt_router.h @@ -88,6 +88,8 @@ struct nxt_app_s { uint32_t workers; uint32_t max_workers; + nxt_msec_t timeout; + nxt_app_type_t type:8; uint8_t live; /* 1 bit */ |