diff options
author | Max Romanov <max.romanov@nginx.com> | 2017-09-15 20:30:24 +0300 |
---|---|---|
committer | Max Romanov <max.romanov@nginx.com> | 2017-09-15 20:30:24 +0300 |
commit | 0bec14878e99de280046e1d1b1a0195e5478c808 (patch) | |
tree | f5f40c8b9b4d1f3679d6daf38031002b4160d05b /src/nxt_router.c | |
parent | 90ae152ce0ace8f2c41a012b9fa52b2f3283a845 (diff) | |
download | unit-0bec14878e99de280046e1d1b1a0195e5478c808.tar.gz unit-0bec14878e99de280046e1d1b1a0195e5478c808.tar.bz2 |
Introducing application timeout.
Diffstat (limited to '')
-rw-r--r-- | src/nxt_router.c | 410 |
1 files changed, 334 insertions, 76 deletions
diff --git a/src/nxt_router.c b/src/nxt_router.c index ae569267..7357667d 100644 --- a/src/nxt_router.c +++ b/src/nxt_router.c @@ -10,8 +10,11 @@ typedef struct { - nxt_str_t type; - uint32_t workers; + nxt_str_t type; + uint32_t workers; + nxt_msec_t timeout; + uint32_t requests; + nxt_conf_value_t *limits_value; } nxt_router_app_conf_t; @@ -31,9 +34,20 @@ struct nxt_start_worker_s { }; +typedef struct { + uint32_t stream; + nxt_conn_t *conn; + nxt_port_t *app_port; + nxt_req_app_link_t *ra; + + nxt_queue_link_t link; /* for nxt_conn_t.requests */ +} nxt_req_conn_link_t; + + struct nxt_req_app_link_s { - nxt_req_id_t req_id; + uint32_t stream; nxt_port_t *app_port; + nxt_pid_t app_pid; nxt_port_t *reply_port; nxt_app_parse_ctx_t *ap; nxt_req_conn_link_t *rc; @@ -51,6 +65,18 @@ typedef struct { } nxt_socket_rpc_t; +typedef struct { + nxt_mp_t *mem_pool; + nxt_port_recv_msg_t msg; + nxt_work_t work; +} nxt_remove_pid_msg_t; + + +static void nxt_router_worker_remove_pid_handler(nxt_task_t *task, void *obj, + void *data); +static void nxt_router_worker_remove_pid_done(nxt_task_t *task, void *obj, + void *data); + static nxt_router_temp_conf_t *nxt_router_temp_conf(nxt_task_t *task); static void nxt_router_conf_apply(nxt_task_t *task, void *obj, void *data); static void nxt_router_conf_ready(nxt_task_t *task, @@ -105,8 +131,6 @@ static void nxt_router_engines_post(nxt_router_t *router, nxt_router_temp_conf_t *tmcf); static void nxt_router_engine_post(nxt_event_engine_t *engine, nxt_work_t *jobs); -static void nxt_router_app_data_handler(nxt_task_t *task, - nxt_port_recv_msg_t *msg); static void nxt_router_thread_start(void *data); static void nxt_router_listen_socket_create(nxt_task_t *task, void *obj, @@ -129,7 +153,7 @@ static void nxt_router_conf_release(nxt_task_t *task, static void nxt_router_send_sw_request(nxt_task_t *task, void *obj, void *data); static nxt_bool_t nxt_router_app_free(nxt_task_t *task, nxt_app_t *app); -static nxt_port_t * nxt_router_app_get_port(nxt_app_t *app, uint32_t req_id); +static nxt_port_t * nxt_router_app_get_port(nxt_app_t *app, uint32_t stream); static void nxt_router_app_release_port(nxt_task_t *task, void *obj, void *data); @@ -153,6 +177,7 @@ static void nxt_router_conn_close(nxt_task_t *task, void *obj, void *data); static void nxt_router_conn_free(nxt_task_t *task, void *obj, void *data); static void nxt_router_conn_error(nxt_task_t *task, void *obj, void *data); static void nxt_router_conn_timeout(nxt_task_t *task, void *obj, void *data); +static void nxt_router_app_timeout(nxt_task_t *task, void *obj, void *data); static nxt_msec_t nxt_router_conn_timeout_value(nxt_conn_t *c, uintptr_t data); static void nxt_router_gen_error(nxt_task_t *task, nxt_conn_t *c, int code, @@ -213,8 +238,8 @@ nxt_router_sw_create(nxt_task_t *task, nxt_app_t *app, nxt_req_app_link_t *ra) sw->app = app; sw->ra = ra; - nxt_debug(task, "sw %p create, request #%uxD, app '%V' %p", sw, - ra->req_id, &app->name, app); + nxt_debug(task, "sw %p create, stream #%uD, app '%V' %p", sw, + ra->stream, &app->name, app); rt = task->thread->runtime; main_port = rt->port_by_type[NXT_PROCESS_MAIN]; @@ -248,13 +273,29 @@ nxt_router_sw_release(nxt_task_t *task, nxt_start_worker_t *sw) } +nxt_inline void +nxt_router_rc_unlink(nxt_req_conn_link_t *rc) +{ + nxt_queue_remove(&rc->link); + + if (rc->ra != NULL) { + rc->ra->rc = NULL; + rc->ra = NULL; + } + + rc->conn = NULL; +} + + static nxt_req_app_link_t * nxt_router_ra_create(nxt_task_t *task, nxt_req_conn_link_t *rc) { nxt_mp_t *mp; + nxt_event_engine_t *engine; nxt_req_app_link_t *ra; mp = rc->conn->mem_pool; + engine = task->thread->engine; ra = nxt_mp_retain(mp, sizeof(nxt_req_app_link_t)); @@ -262,20 +303,22 @@ nxt_router_ra_create(nxt_task_t *task, nxt_req_conn_link_t *rc) return NULL; } - nxt_debug(task, "ra #%uxD create", rc->req_id); + nxt_debug(task, "ra stream #%uD create", rc->stream); nxt_memzero(ra, sizeof(nxt_req_app_link_t)); - ra->req_id = rc->req_id; - ra->app_port = NULL; + ra->stream = rc->stream; + ra->app_pid = -1; ra->rc = rc; + rc->ra = ra; + ra->reply_port = engine->port; ra->mem_pool = mp; ra->work.handler = NULL; - ra->work.task = &task->thread->engine->task; + ra->work.task = &engine->task; ra->work.obj = ra; - ra->work.data = task->thread->engine; + ra->work.data = engine; return ra; } @@ -284,39 +327,87 @@ nxt_router_ra_create(nxt_task_t *task, nxt_req_conn_link_t *rc) static void nxt_router_ra_release(nxt_task_t *task, void *obj, void *data) { + nxt_port_t *app_port; nxt_req_app_link_t *ra; nxt_event_engine_t *engine; ra = obj; engine = data; + if (ra->app_port != NULL) { + + app_port = ra->app_port; + ra->app_port = NULL; + + if (task->thread->engine != engine) { + ra->app_pid = app_port->pid; + } + + nxt_router_app_release_port(task, app_port, app_port->app); + +#if 0 + /* Uncomment to hold app port until complete response received. */ + if (ra->rc != NULL) { + ra->rc->app_port = ra->app_port; + + } else { + nxt_router_app_release_port(task, ra->app_port, ra->app_port->app); + } +#endif + } + if (task->thread->engine != engine) { ra->work.handler = nxt_router_ra_release; ra->work.task = &engine->task; ra->work.next = NULL; - nxt_debug(task, "ra #%uxD post release to %p", ra->req_id, engine); + nxt_debug(task, "ra stream #%uD post release to %p", + ra->stream, engine); nxt_event_engine_post(engine, &ra->work); return; } - nxt_debug(task, "ra #%uxD release", ra->req_id); + if (ra->rc != NULL && ra->app_pid != -1) { + nxt_port_rpc_ex_set_peer(task, engine->port, ra->rc, ra->app_pid); + } - if (ra->app_port != NULL) { + nxt_debug(task, "ra stream #%uD release", ra->stream); - nxt_router_app_release_port(task, ra->app_port, ra->app_port->app); + nxt_mp_release(ra->mem_pool, ra); +} -#if 0 - /* Uncomment to hold app port until complete response received. */ - if (ra->rc->conn != NULL) { - ra->rc->app_port = ra->app_port; - } else { - nxt_router_app_release_port(task, ra->app_port, ra->app_port->app); - } -#endif +static void +nxt_router_ra_abort(nxt_task_t *task, void *obj, void *data) +{ + nxt_conn_t *c; + nxt_req_app_link_t *ra; + nxt_event_engine_t *engine; + + ra = obj; + engine = data; + + if (task->thread->engine != engine) { + ra->work.handler = nxt_router_ra_abort; + ra->work.task = &engine->task; + ra->work.next = NULL; + + nxt_debug(task, "ra stream #%uD post abort to %p", ra->stream, engine); + + nxt_event_engine_post(engine, &ra->work); + + return; + } + + nxt_debug(task, "ra stream #%uD abort", ra->stream); + + if (ra->rc != NULL) { + c = ra->rc->conn; + + nxt_router_gen_error(task, c, 500, + "Failed to start application worker"); } nxt_mp_release(ra->mem_pool, ra); @@ -384,18 +475,83 @@ nxt_router_conf_data_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg) void nxt_router_remove_pid_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg) { + nxt_mp_t *mp; + nxt_buf_t *buf; + nxt_event_engine_t *engine; + nxt_remove_pid_msg_t *rp; + nxt_port_remove_pid_handler(task, msg); if (msg->port_msg.stream == 0) { return; } + mp = nxt_mp_create(1024, 128, 256, 32); + + buf = nxt_buf_mem_alloc(mp, nxt_buf_used_size(msg->buf), 0); + buf->mem.free = nxt_cpymem(buf->mem.free, msg->buf->mem.pos, + nxt_buf_used_size(msg->buf)); + + nxt_queue_each(engine, &nxt_router->engines, nxt_event_engine_t, link0) + { + rp = nxt_mp_retain(mp, sizeof(nxt_remove_pid_msg_t)); + + rp->mem_pool = mp; + + rp->msg.fd = msg->fd; + rp->msg.buf = buf; + rp->msg.port = engine->port; + rp->msg.port_msg = msg->port_msg; + rp->msg.size = msg->size; + rp->msg.new_port = NULL; + + rp->work.handler = nxt_router_worker_remove_pid_handler; + rp->work.task = &engine->task; + rp->work.obj = rp; + rp->work.data = task->thread->engine; + rp->work.next = NULL; + + nxt_event_engine_post(engine, &rp->work); + } + nxt_queue_loop; + msg->port_msg.type = _NXT_PORT_MSG_RPC_ERROR; nxt_port_rpc_handler(task, msg); } +static void +nxt_router_worker_remove_pid_handler(nxt_task_t *task, void *obj, void *data) +{ + nxt_event_engine_t *engine; + nxt_remove_pid_msg_t *rp; + + rp = obj; + + nxt_port_remove_pid_handler(task, &rp->msg); + + engine = rp->work.data; + + rp->work.handler = nxt_router_worker_remove_pid_done; + rp->work.task = &engine->task; + rp->work.next = NULL; + + nxt_event_engine_post(engine, &rp->work); +} + + +static void +nxt_router_worker_remove_pid_done(nxt_task_t *task, void *obj, void *data) +{ + nxt_remove_pid_msg_t *rp; + + rp = obj; + + nxt_mp_release(rp->mem_pool, rp); +} + + static nxt_router_temp_conf_t * nxt_router_temp_conf(nxt_task_t *task) { @@ -607,6 +763,27 @@ static nxt_conf_map_t nxt_router_app_conf[] = { NXT_CONF_MAP_INT32, offsetof(nxt_router_app_conf_t, workers), }, + + { + nxt_string("limits"), + NXT_CONF_MAP_PTR, + offsetof(nxt_router_app_conf_t, limits_value), + }, +}; + + +static nxt_conf_map_t nxt_router_app_limits_conf[] = { + { + nxt_string("timeout"), + NXT_CONF_MAP_MSEC, + offsetof(nxt_router_app_conf_t, timeout), + }, + + { + nxt_string("requests"), + NXT_CONF_MAP_INT32, + offsetof(nxt_router_app_conf_t, requests), + }, }; @@ -754,6 +931,9 @@ nxt_router_conf_create(nxt_task_t *task, nxt_router_temp_conf_t *tmcf, } apcf.workers = 1; + apcf.timeout = 0; + apcf.requests = 0; + apcf.limits_value = NULL; ret = nxt_conf_map_object(mp, application, nxt_router_app_conf, nxt_nitems(nxt_router_app_conf), &apcf); @@ -762,8 +942,27 @@ nxt_router_conf_create(nxt_task_t *task, nxt_router_temp_conf_t *tmcf, goto app_fail; } + if (apcf.limits_value != NULL) { + + if (nxt_conf_type(apcf.limits_value) != NXT_CONF_OBJECT) { + nxt_log(task, NXT_LOG_CRIT, "application limits is not object"); + goto app_fail; + } + + ret = nxt_conf_map_object(mp, apcf.limits_value, + nxt_router_app_limits_conf, + nxt_nitems(nxt_router_app_limits_conf), + &apcf); + if (ret != NXT_OK) { + nxt_log(task, NXT_LOG_CRIT, "application limits map error"); + goto app_fail; + } + } + nxt_debug(task, "application type: %V", &apcf.type); nxt_debug(task, "application workers: %D", apcf.workers); + nxt_debug(task, "application timeout: %D", apcf.timeout); + nxt_debug(task, "application requests: %D", apcf.requests); lang = nxt_app_lang_module(task->thread->runtime, &apcf.type); @@ -802,6 +1001,7 @@ nxt_router_conf_create(nxt_task_t *task, nxt_router_temp_conf_t *tmcf, app->type = type; app->max_workers = apcf.workers; + app->timeout = apcf.timeout; app->live = 1; app->prepare_msg = nxt_app_prepare_msg[type]; @@ -1589,7 +1789,7 @@ static nxt_port_handler_t nxt_router_app_port_handlers[] = { NULL, /* NXT_PORT_MSG_CHANGE_FILE */ /* TODO: remove mmap_handler from app ports */ nxt_port_mmap_handler, /* NXT_PORT_MSG_MMAP */ - nxt_router_app_data_handler, + nxt_port_rpc_handler, /* NXT_PORT_MSG_DATA */ NULL, /* NXT_PORT_MSG_REMOVE_PID */ NULL, /* NXT_PORT_MSG_READY */ NULL, /* NXT_PORT_MSG_START_WORKER */ @@ -2008,23 +2208,16 @@ static const nxt_conn_state_t nxt_router_conn_write_state static void -nxt_router_app_data_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg) +nxt_router_response_ready_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg, + void *data) { size_t dump_size; nxt_buf_t *b, *last; nxt_conn_t *c; nxt_req_conn_link_t *rc; - nxt_event_engine_t *engine; b = msg->buf; - engine = task->thread->engine; - - rc = nxt_event_engine_request_find(engine, msg->port_msg.stream); - if (nxt_slow_path(rc == NULL)) { - nxt_debug(task, "request id %08uxD not found", msg->port_msg.stream); - - return; - } + rc = data; c = rc->conn; @@ -2058,7 +2251,7 @@ nxt_router_app_data_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg) rc->app_port = NULL; } - rc->conn = NULL; + nxt_router_rc_unlink(rc); } if (b == NULL) { @@ -2084,6 +2277,21 @@ nxt_router_app_data_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg) } +static void +nxt_router_response_error_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg, + void *data) +{ + nxt_req_conn_link_t *rc; + + rc = data; + + nxt_router_gen_error(task, rc->conn, 500, + "Application terminated unexpectedly"); + + nxt_router_rc_unlink(rc); +} + + nxt_inline const char * nxt_router_text_by_code(int code) { @@ -2147,20 +2355,21 @@ nxt_router_gen_error(nxt_task_t *task, nxt_conn_t *c, int code, const char* fmt, ...) { va_list args; + nxt_mp_t *mp; nxt_buf_t *b; + /* TODO: fix when called from main thread */ + /* TODO: fix when called in the middle of response */ + + mp = nxt_mp_create(1024, 128, 256, 32); + va_start(args, fmt); - b = nxt_router_get_error_buf(task, c->mem_pool, code, fmt, args); + b = nxt_router_get_error_buf(task, mp, code, fmt, args); va_end(args); - if (c->socket.data != NULL) { - nxt_mp_free(c->mem_pool, c->socket.data); - c->socket.data = NULL; - } - if (c->socket.fd == -1) { - nxt_mp_release(c->mem_pool, b->next); - nxt_mp_release(c->mem_pool, b); + nxt_mp_release(mp, b->next); + nxt_mp_release(mp, b); return; } @@ -2204,17 +2413,35 @@ nxt_router_sw_ready(nxt_task_t *task, nxt_port_recv_msg_t *msg, void *data) static void nxt_router_sw_error(nxt_task_t *task, nxt_port_recv_msg_t *msg, void *data) { + nxt_app_t *app; + nxt_queue_link_t *lnk; + nxt_req_app_link_t *ra; nxt_start_worker_t *sw; sw = data; nxt_assert(sw != NULL); + nxt_assert(sw->app != NULL); nxt_assert(sw->app->pending_workers != 0); + app = sw->app; + sw->app->pending_workers--; nxt_debug(task, "sw %p error, failed to start app '%V'", - sw, &sw->app->name); + sw, &app->name); + + if (!nxt_queue_is_empty(&app->requests)) { + lnk = nxt_queue_last(&app->requests); + nxt_queue_remove(lnk); + + ra = nxt_queue_link_data(lnk, nxt_req_app_link_t, link); + + nxt_debug(task, "app '%V' %p abort next stream #%uD", + &app->name, app, ra->stream); + + nxt_router_ra_abort(task, ra, ra->work.data); + } nxt_router_sw_release(task, sw); } @@ -2237,11 +2464,11 @@ nxt_router_send_sw_request(nxt_task_t *task, void *obj, void *data) if (nxt_queue_is_empty(&app->requests)) { ra = sw->ra; - app_port = nxt_router_app_get_port(app, ra->req_id); + app_port = nxt_router_app_get_port(app, ra->stream); if (app_port != NULL) { - nxt_debug(task, "app '%V' %p process request #%uxD", - &app->name, app, ra->req_id); + nxt_debug(task, "app '%V' %p process stream #%uD", + &app->name, app, ra->stream); ra->app_port = app_port; @@ -2330,7 +2557,7 @@ nxt_router_app_free(nxt_task_t *task, nxt_app_t *app) static nxt_port_t * -nxt_router_app_get_port(nxt_app_t *app, uint32_t req_id) +nxt_router_app_get_port(nxt_app_t *app, uint32_t stream) { nxt_port_t *port; nxt_queue_link_t *lnk; @@ -2347,7 +2574,7 @@ nxt_router_app_get_port(nxt_app_t *app, uint32_t req_id) port = nxt_queue_link_data(lnk, nxt_port_t, app_link); - port->app_req_id = req_id; + port->app_stream = stream; } nxt_thread_mutex_unlock(&app->mutex); @@ -2395,11 +2622,11 @@ nxt_router_app_release_port(nxt_task_t *task, void *obj, void *data) ra = nxt_queue_link_data(lnk, nxt_req_app_link_t, link); - nxt_debug(task, "app '%V' %p process next request #%uxD", - &app->name, app, ra->req_id); + nxt_debug(task, "app '%V' %p process next stream #%uD", + &app->name, app, ra->stream); ra->app_port = port; - port->app_req_id = ra->req_id; + port->app_stream = ra->stream; nxt_router_process_http_request_mp(task, ra, port); @@ -2408,7 +2635,7 @@ nxt_router_app_release_port(nxt_task_t *task, void *obj, void *data) return; } - port->app_req_id = 0; + port->app_stream = 0; if (port->pair[1] == -1) { nxt_debug(task, "app '%V' %p port already closed (pid %PI dead?)", @@ -2452,7 +2679,7 @@ nxt_router_app_remove_port(nxt_port_t *port) nxt_bool_t busy; app = port->app; - busy = port->app_req_id != 0; + busy = port->app_stream != 0; if (app == NULL) { nxt_thread_log_debug("port %p app remove, no app", port); @@ -2483,8 +2710,9 @@ nxt_router_app_remove_port(nxt_port_t *port) return 1; } - nxt_thread_log_debug("port %p app remove, busy, app '%V' %p, req #%uxD", - port, &app->name, app, port->app_req_id); + nxt_thread_log_debug("port %p app remove, busy, app '%V' %p, " + "app stream #%uD", port, &app->name, app, + port->app_stream); return 0; } @@ -2496,6 +2724,7 @@ nxt_router_app_port(nxt_task_t *task, nxt_req_app_link_t *ra) nxt_app_t *app; nxt_conn_t *c; nxt_port_t *port; + nxt_event_engine_t *engine; nxt_start_worker_t *sw; nxt_socket_conf_joint_t *joint; @@ -2511,8 +2740,16 @@ nxt_router_app_port(nxt_task_t *task, nxt_req_app_link_t *ra) return NXT_ERROR; } + engine = task->thread->engine; - port = nxt_router_app_get_port(app, ra->req_id); + nxt_timer_disable(engine, &c->read_timer); + + if (app->timeout != 0) { + c->read_timer.handler = nxt_router_app_timeout; + nxt_timer_add(engine, &c->read_timer, app->timeout); + } + + port = nxt_router_app_get_port(app, ra->stream); if (port != NULL) { nxt_debug(task, "already have port for app '%V'", &app->name); @@ -2740,18 +2977,16 @@ nxt_router_process_http_request(nxt_task_t *task, nxt_conn_t *c, nxt_mp_t *port_mp; nxt_int_t res; nxt_port_t *port; - nxt_req_id_t req_id; nxt_event_engine_t *engine; nxt_req_app_link_t *ra; nxt_req_conn_link_t *rc; engine = task->thread->engine; - do { - req_id = nxt_random(&task->thread->random); - } while (nxt_event_engine_request_find(engine, req_id) != NULL); - - rc = nxt_conn_request_add(c, req_id); + rc = nxt_port_rpc_register_handler_ex(task, engine->port, + nxt_router_response_ready_handler, + nxt_router_response_error_handler, + sizeof(nxt_req_conn_link_t)); if (nxt_slow_path(rc == NULL)) { nxt_router_gen_error(task, c, 500, "Failed to allocate " @@ -2760,17 +2995,19 @@ nxt_router_process_http_request(nxt_task_t *task, nxt_conn_t *c, return; } - nxt_event_engine_request_add(engine, rc); + rc->stream = nxt_port_rpc_ex_stream(rc); + rc->conn = c; - nxt_debug(task, "req_id %uxD linked to conn %p at engine %p", - req_id, c, engine); + nxt_queue_insert_tail(&c->requests, &rc->link); + + nxt_debug(task, "stream #%uD linked to conn %p at engine %p", + rc->stream, c, engine); c->socket.data = NULL; ra = nxt_router_ra_create(task, rc); ra->ap = ap; - ra->reply_port = engine->port; res = nxt_router_app_port(task, ra); @@ -2781,10 +3018,12 @@ nxt_router_process_http_request(nxt_task_t *task, nxt_conn_t *c, port = ra->app_port; if (nxt_slow_path(port == NULL)) { - nxt_router_gen_error(task, rc->conn, 500, "Application port not found"); + nxt_router_gen_error(task, c, 500, "Application port not found"); return; } + nxt_port_rpc_ex_set_peer(task, engine->port, rc, port->pid); + port_mp = port->mem_pool; port->mem_pool = c->mem_pool; @@ -2792,7 +3031,6 @@ nxt_router_process_http_request(nxt_task_t *task, nxt_conn_t *c, port->mem_pool = port_mp; - nxt_router_ra_release(task, ra, ra->work.data); } @@ -2807,6 +3045,10 @@ nxt_router_process_http_request_mp(nxt_task_t *task, nxt_req_app_link_t *ra, nxt_app_wmsg_t wmsg; nxt_app_parse_ctx_t *ap; + /* TODO: it is unsafe to use ra->rc and ra->rc->conn in main thread */ + + nxt_assert(ra->rc != NULL); + reply_port = ra->reply_port; ap = ra->ap; c = ra->rc->conn; @@ -2828,7 +3070,7 @@ nxt_router_process_http_request_mp(nxt_task_t *task, nxt_req_app_link_t *ra, wmsg.port = port; wmsg.write = NULL; wmsg.buf = &wmsg.write; - wmsg.stream = ra->req_id; + wmsg.stream = ra->stream; res = port->app->prepare_msg(task, &ap->r, &wmsg); @@ -2843,7 +3085,7 @@ nxt_router_process_http_request_mp(nxt_task_t *task, nxt_req_app_link_t *ra, wmsg.port->socket.fd); res = nxt_port_socket_write(task, wmsg.port, NXT_PORT_MSG_DATA, - -1, ra->req_id, reply_port->id, wmsg.write); + -1, ra->stream, reply_port->id, wmsg.write); if (nxt_slow_path(res != NXT_OK)) { nxt_router_gen_error(task, c, 500, @@ -3217,7 +3459,7 @@ nxt_router_conn_free(nxt_task_t *task, void *obj, void *data) nxt_queue_each(rc, &c->requests, nxt_req_conn_link_t, link) { - nxt_debug(task, "conn %p close, req %uxD", c, rc->req_id); + nxt_debug(task, "conn %p close, stream #%uD", c, rc->stream); if (rc->app_port != NULL) { nxt_router_app_release_port(task, rc->app_port, rc->app_port->app); @@ -3225,9 +3467,9 @@ nxt_router_conn_free(nxt_task_t *task, void *obj, void *data) rc->app_port = NULL; } - rc->conn = NULL; + nxt_router_rc_unlink(rc); - nxt_event_engine_request_remove(task->thread->engine, rc); + nxt_port_rpc_cancel(task, task->thread->engine->port, rc->stream); } nxt_queue_loop; @@ -3281,6 +3523,22 @@ nxt_router_conn_timeout(nxt_task_t *task, void *obj, void *data) } +static void +nxt_router_app_timeout(nxt_task_t *task, void *obj, void *data) +{ + nxt_conn_t *c; + nxt_timer_t *timer; + + timer = obj; + + nxt_debug(task, "router app timeout"); + + c = nxt_read_timer_conn(timer); + + nxt_router_gen_error(task, c, 408, "Application timeout"); +} + + static nxt_msec_t nxt_router_conn_timeout_value(nxt_conn_t *c, uintptr_t data) { |