diff options
Diffstat (limited to '')
-rw-r--r-- | src/nxt_router.c | 418 |
1 files changed, 296 insertions, 122 deletions
diff --git a/src/nxt_router.c b/src/nxt_router.c index 083a093d..fbd3462e 100644 --- a/src/nxt_router.c +++ b/src/nxt_router.c @@ -20,18 +20,33 @@ typedef struct { } nxt_router_listener_conf_t; +typedef struct nxt_req_app_link_s nxt_req_app_link_t; typedef struct nxt_start_worker_s nxt_start_worker_t; struct nxt_start_worker_s { uint32_t stream; nxt_app_t *app; - nxt_req_conn_link_t *rc; + nxt_req_app_link_t *ra; nxt_mp_t *mem_pool; nxt_work_t work; }; +struct nxt_req_app_link_s { + nxt_req_id_t req_id; + nxt_port_t *app_port; + nxt_port_t *reply_port; + nxt_app_parse_ctx_t *ap; + nxt_req_conn_link_t *rc; + + nxt_queue_link_t link; /* for nxt_app_t.requests */ + + nxt_mp_t *mem_pool; + nxt_work_t work; +}; + + static nxt_router_temp_conf_t *nxt_router_temp_conf(nxt_task_t *task); static nxt_int_t nxt_router_conf_new(nxt_task_t *task, nxt_router_temp_conf_t *tmcf, u_char *start, u_char *end); @@ -98,8 +113,10 @@ static void nxt_router_thread_exit_handler(nxt_task_t *task, void *obj, static void nxt_router_conf_release(nxt_task_t *task, nxt_socket_conf_joint_t *joint); -static nxt_bool_t nxt_router_app_free(nxt_app_t *app); -static nxt_port_t * nxt_router_app_get_port(nxt_app_t *app); +static void nxt_router_send_sw_request(nxt_task_t *task, void *obj, + void *data); +static nxt_bool_t nxt_router_app_free(nxt_task_t *task, nxt_app_t *app); +static nxt_port_t * nxt_router_app_get_port(nxt_app_t *app, uint32_t req_id); static void nxt_router_app_release_port(nxt_task_t *task, void *obj, void *data); @@ -114,7 +131,7 @@ static void nxt_router_conn_http_header_parse(nxt_task_t *task, void *obj, static void nxt_router_process_http_request(nxt_task_t *task, nxt_conn_t *c, nxt_app_parse_ctx_t *ap); static void nxt_router_process_http_request_mp(nxt_task_t *task, - nxt_req_conn_link_t *rc, nxt_mp_t *mp); + nxt_req_app_link_t *ra, nxt_port_t *port); static void nxt_router_conn_ready(nxt_task_t *task, void *obj, void *data); static void nxt_router_conn_close(nxt_task_t *task, void *obj, void *data); static void nxt_router_conn_free(nxt_task_t *task, void *obj, void *data); @@ -156,12 +173,74 @@ nxt_router_start(nxt_task_t *task, void *data) } +static nxt_start_worker_t * +nxt_router_sw_create(nxt_task_t *task, nxt_mp_t *mp, nxt_app_t *app, + nxt_req_app_link_t *ra) +{ + nxt_port_t *master_port; + nxt_runtime_t *rt; + nxt_start_worker_t *sw; + + sw = nxt_mp_retain(mp, sizeof(nxt_start_worker_t)); + + if (nxt_slow_path(sw == NULL)) { + return NULL; + } + + nxt_memzero(sw, sizeof(nxt_start_worker_t)); + + sw->stream = nxt_random(&task->thread->random); + sw->mem_pool = mp; + + sw->app = app; + sw->ra = ra; + + nxt_debug(task, "sw #%uxD create, request #%uD, app '%V' %p", sw->stream, + ra->req_id, &app->name, app); + + rt = task->thread->runtime; + master_port = rt->port_by_type[NXT_PROCESS_MASTER]; + + sw->work.handler = nxt_router_send_sw_request; + sw->work.task = &master_port->engine->task; + sw->work.obj = sw; + sw->work.data = task->thread->engine; + sw->work.next = NULL; + + if (task->thread->engine != master_port->engine) { + nxt_debug(task, "post send sw %uxD to master engine %p", sw->stream, + master_port->engine); + + nxt_event_engine_post(master_port->engine, &sw->work); + + } else { + nxt_router_send_sw_request(task, sw, sw->work.data); + } + + return sw; +} + + static void nxt_router_sw_release(nxt_task_t *task, void *obj, void *data) { + nxt_event_engine_t *engine; nxt_start_worker_t *sw; sw = obj; + engine = data; + + if (task->thread->engine != engine) { + sw->work.handler = nxt_router_sw_release; + sw->work.task = &engine->task; + sw->work.next = NULL; + + nxt_debug(task, "sw #%uxD post release to %p", sw->stream, engine); + + nxt_event_engine_post(engine, &sw->work); + + return; + } nxt_debug(task, "sw #%uxD release", sw->stream); @@ -169,11 +248,80 @@ nxt_router_sw_release(nxt_task_t *task, void *obj, void *data) } +static nxt_req_app_link_t * +nxt_router_ra_create(nxt_task_t *task, nxt_req_conn_link_t *rc) +{ + nxt_mp_t *mp; + nxt_req_app_link_t *ra; + + mp = rc->conn->mem_pool; + + ra = nxt_mp_retain(mp, sizeof(nxt_req_app_link_t)); + + if (nxt_slow_path(ra == NULL)) { + return NULL; + } + + nxt_debug(task, "ra #%uxD create", ra->req_id); + + nxt_memzero(ra, sizeof(nxt_req_app_link_t)); + + ra->req_id = rc->req_id; + ra->app_port = NULL; + ra->rc = rc; + + ra->mem_pool = mp; + + ra->work.handler = NULL; + ra->work.task = &task->thread->engine->task; + ra->work.obj = ra; + ra->work.data = task->thread->engine; + + return ra; +} + + +static void +nxt_router_ra_release(nxt_task_t *task, void *obj, void *data) +{ + nxt_req_app_link_t *ra; + nxt_event_engine_t *engine; + + ra = obj; + engine = data; + + if (task->thread->engine != engine) { + ra->work.handler = nxt_router_ra_release; + ra->work.task = &engine->task; + ra->work.next = NULL; + + nxt_debug(task, "ra #%uxD post release to %p", ra->req_id, engine); + + nxt_event_engine_post(engine, &ra->work); + + return; + } + + nxt_debug(task, "ra #%uxD release", ra->req_id); + + if (ra->app_port != NULL) { + + if (ra->rc->conn != NULL) { + ra->rc->app_port = ra->app_port; + + } else { + nxt_router_app_release_port(task, ra->app_port, ra->app_port->app); + } + } + + nxt_mp_release(ra->mem_pool, ra); +} + + void nxt_router_new_port_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg) { nxt_start_worker_t *sw; - nxt_event_engine_t *engine; nxt_port_new_port_handler(task, msg); @@ -186,23 +334,16 @@ nxt_router_new_port_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg) if (nxt_fast_path(sw != NULL)) { msg->new_port->app = sw->app; - sw->app->workers++; - nxt_assert(sw->app->pending_workers != 0); sw->app->pending_workers--; + sw->app->workers++; - nxt_router_app_release_port(task, msg->new_port, sw->app); - - engine = sw->work.data; - - sw->work.handler = nxt_router_sw_release; - sw->work.task = &engine->task; + nxt_debug(task, "sw #%uxD got port %p", sw->stream, msg->new_port); - nxt_debug(task, "post sw #%uxD release to %p", sw->stream, - sw->work.data); + nxt_router_app_release_port(task, msg->new_port, sw->app); - nxt_event_engine_post(engine, &sw->work); + nxt_router_sw_release(task, sw, sw->work.data); } } @@ -1185,32 +1326,39 @@ nxt_router_thread_create(nxt_task_t *task, nxt_runtime_t *rt, static void nxt_router_apps_sort(nxt_router_t *router, nxt_router_temp_conf_t *tmcf) { - nxt_app_t *app; - nxt_port_t *port; + nxt_app_t *app; + nxt_port_t *port; nxt_queue_each(app, &router->apps, nxt_app_t, link) { nxt_queue_remove(&app->link); + nxt_thread_log_debug("about to remove app '%V' %p", &app->name, app); + app->live = 0; - if (nxt_router_app_free(app) != 0) { + if (nxt_router_app_free(NULL, app) != 0) { continue; } - if (nxt_queue_is_empty(&app->requests)) { + if (!nxt_queue_is_empty(&app->requests)) { - do { - port = nxt_router_app_get_port(app); - if (port == NULL) { - break; - } + nxt_thread_log_debug("app '%V' %p pending requests found", + &app->name, app); + continue; + } + + do { + port = nxt_router_app_get_port(app, 0); + if (port == NULL) { + break; + } - nxt_port_socket_write(&port->engine->task, port, - NXT_PORT_MSG_QUIT, -1, 0, 0, NULL); - } while (1); + nxt_thread_log_debug("port %p send quit", port); - } + nxt_port_socket_write(&port->engine->task, port, + NXT_PORT_MSG_QUIT, -1, 0, 0, NULL); + } while (1); } nxt_queue_loop; @@ -1677,6 +1825,12 @@ nxt_router_app_data_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg) } nxt_buf_chain_add(&b, last); + + if (rc->app_port != NULL) { + nxt_router_app_release_port(task, rc->app_port, rc->app_port->app); + + rc->app_port = NULL; + } } if (b == NULL) { @@ -1781,31 +1935,28 @@ nxt_router_send_sw_request(nxt_task_t *task, void *obj, void *data) nxt_port_t *port; nxt_runtime_t *rt; nxt_start_worker_t *sw; - nxt_event_engine_t *engine; sw = obj; app = sw->app; - if (app->workers + app->pending_workers >= app->max_workers) { - engine = sw->work.data; - - sw->work.handler = nxt_router_sw_release; - sw->work.task = &engine->task; + nxt_queue_insert_tail(&app->requests, &sw->ra->link); - nxt_debug(task, "%uD/%uD running/penging workers, post sw #%uxD " - "release to %p", sw->stream, sw->work.data); + if (app->workers + app->pending_workers >= app->max_workers) { + nxt_debug(task, "app '%V' %p %uD/%uD running/penging workers, " + "post sw #%uxD release to %p", &app->name, app, + app->workers, app->pending_workers, sw->stream, + sw->work.data); - nxt_event_engine_post(engine, &sw->work); + nxt_router_sw_release(task, sw, sw->work.data); return; } app->pending_workers++; - nxt_debug(task, "send sw #%uD", sw->stream); + nxt_debug(task, "sw #%uxD send", sw->stream); nxt_router_sw_add(task, nxt_router, sw); - nxt_queue_insert_tail(&app->requests, &sw->rc->app_link); rt = task->thread->runtime; port = rt->port_by_type[NXT_PROCESS_MASTER]; @@ -1819,8 +1970,17 @@ nxt_router_send_sw_request(nxt_task_t *task, void *obj, void *data) static nxt_bool_t -nxt_router_app_free(nxt_app_t *app) +nxt_router_app_free(nxt_task_t *task, nxt_app_t *app) { + nxt_port_t *master_port; + nxt_runtime_t *rt; + nxt_queue_link_t *lnk; + nxt_req_app_link_t *ra; + + nxt_thread_log_debug("app '%V' %p state: %d/%uD/%uD/%d", &app->name, app, + app->live, app->workers, app->pending_workers, + nxt_queue_is_empty(&app->requests)); + if (app->live == 0 && app->workers == 0 && app->pending_workers == 0 && nxt_queue_is_empty(&app->requests)) { @@ -1831,12 +1991,26 @@ nxt_router_app_free(nxt_app_t *app) return 1; } + if (app->live == 1 && nxt_queue_is_empty(&app->requests) == 0 && + (app->workers + app->pending_workers < app->max_workers)) { + + lnk = nxt_queue_first(&app->requests); + nxt_queue_remove(lnk); + + ra = nxt_queue_link_data(lnk, nxt_req_app_link_t, link); + + rt = task->thread->runtime; + master_port = rt->port_by_type[NXT_PROCESS_MASTER]; + + nxt_router_sw_create(task, master_port->mem_pool, app, ra); + } + return 0; } static nxt_port_t * -nxt_router_app_get_port(nxt_app_t *app) +nxt_router_app_get_port(nxt_app_t *app, uint32_t req_id) { nxt_port_t *port; nxt_queue_link_t *lnk; @@ -1852,6 +2026,8 @@ nxt_router_app_get_port(nxt_app_t *app) lnk->next = NULL; port = nxt_queue_link_data(lnk, nxt_port_t, app_link); + + port->app_req_id = req_id; } nxt_thread_mutex_unlock(&app->mutex); @@ -1867,7 +2043,7 @@ nxt_router_app_release_port(nxt_task_t *task, void *obj, void *data) nxt_port_t *port; nxt_work_t *work; nxt_queue_link_t *lnk; - nxt_req_conn_link_t *rc; + nxt_req_app_link_t *ra; port = obj; app = data; @@ -1897,24 +2073,28 @@ nxt_router_app_release_port(nxt_task_t *task, void *obj, void *data) lnk = nxt_queue_first(&app->requests); nxt_queue_remove(lnk); - rc = nxt_queue_link_data(lnk, nxt_req_conn_link_t, app_link); + ra = nxt_queue_link_data(lnk, nxt_req_app_link_t, link); + + nxt_debug(task, "app '%V' %p process next request #%uxD", + &app->name, app, ra->req_id); - nxt_debug(task, "app '%V' process next request #%uxD", - &app->name, rc->req_id); + ra->app_port = port; - rc->app_port = port; + nxt_router_process_http_request_mp(task, ra, port); - nxt_router_process_http_request_mp(task, rc, rc->app_port->mem_pool); + nxt_router_ra_release(task, ra, ra->work.data); return; } + port->app_req_id = 0; + if (port->pair[1] == -1) { - nxt_debug(task, "app '%V' port already closed (pid %PI dead?)", - &app->name, port->pid); + nxt_debug(task, "app '%V' %p port already closed (pid %PI dead?)", + &app->name, app, port->pid); app->workers--; - nxt_router_app_free(app); + nxt_router_app_free(task, app); port->app = NULL; @@ -1924,8 +2104,8 @@ nxt_router_app_release_port(nxt_task_t *task, void *obj, void *data) } if (!app->live) { - nxt_debug(task, "app '%V' is not alive, send QUIT to port", - &app->name); + nxt_debug(task, "app '%V' %p is not alive, send QUIT to port", + &app->name, app); nxt_port_socket_write(task, port, NXT_PORT_MSG_QUIT, -1, 0, 0, NULL); @@ -1933,8 +2113,8 @@ nxt_router_app_release_port(nxt_task_t *task, void *obj, void *data) return; } - nxt_debug(task, "app '%V' requests queue is empty, keep the port", - &app->name); + nxt_debug(task, "app '%V' %p requests queue is empty, keep the port", + &app->name, app); nxt_thread_mutex_lock(&app->mutex); @@ -1951,9 +2131,11 @@ nxt_router_app_remove_port(nxt_port_t *port) nxt_bool_t busy; app = port->app; - busy = 1; + busy = port->app_req_id != 0; if (app == NULL) { + nxt_thread_log_debug("port %p app remove, no app", port); + nxt_assert(port->app_link.next == NULL); return 1; @@ -1966,84 +2148,66 @@ nxt_router_app_remove_port(nxt_port_t *port) nxt_queue_remove(&port->app_link); port->app_link.next = NULL; - busy = 0; } nxt_thread_mutex_unlock(&app->mutex); if (busy == 0) { + nxt_thread_log_debug("port %p app remove, free, app '%V' %p", port, + &app->name, app); app->workers--; - nxt_router_app_free(app); + nxt_router_app_free(&port->engine->task, app); return 1; } + nxt_thread_log_debug("port %p app remove, busy, app '%V' %p, req #%uxD", + port, &app->name, app, port->app_req_id); + return 0; } -nxt_inline nxt_int_t -nxt_router_app_port(nxt_task_t *task, nxt_req_conn_link_t *rc) +static nxt_int_t +nxt_router_app_port(nxt_task_t *task, nxt_req_app_link_t *ra) { nxt_app_t *app; nxt_conn_t *c; - nxt_port_t *port, *master_port; - nxt_runtime_t *rt; + nxt_port_t *port; nxt_start_worker_t *sw; nxt_socket_conf_joint_t *joint; port = NULL; - c = rc->conn; + c = ra->rc->conn; joint = c->listen->socket.data; app = joint->socket_conf->application; - if (app == NULL) { - nxt_router_gen_error(task, rc->conn, 500, + nxt_router_gen_error(task, c, 500, "Application is NULL in socket_conf"); return NXT_ERROR; } - port = nxt_router_app_get_port(app); + port = nxt_router_app_get_port(app, ra->req_id); if (port != NULL) { nxt_debug(task, "already have port for app '%V'", &app->name); - rc->app_port = port; + ra->app_port = port; return NXT_OK; } - sw = nxt_mp_retain(c->mem_pool, sizeof(nxt_start_worker_t)); + sw = nxt_router_sw_create(task, c->mem_pool, app, ra); if (nxt_slow_path(sw == NULL)) { - nxt_router_gen_error(task, rc->conn, 500, + nxt_router_gen_error(task, c, 500, "Failed to allocate start worker struct"); return NXT_ERROR; } - nxt_memzero(sw, sizeof(nxt_start_worker_t)); - - sw->stream = nxt_random(&task->thread->random); - sw->app = app; - sw->rc = rc; - sw->mem_pool = c->mem_pool; - - rt = task->thread->runtime; - master_port = rt->port_by_type[NXT_PROCESS_MASTER]; - - sw->work.handler = nxt_router_send_sw_request; - sw->work.task = &master_port->engine->task; - sw->work.obj = sw; - sw->work.data = task->thread->engine; - - nxt_debug(task, "post send sw %uxD to master engine %p", sw->stream, - master_port->engine); - - nxt_event_engine_post(master_port->engine, &sw->work); - return NXT_AGAIN; } @@ -2164,9 +2328,12 @@ static void nxt_router_process_http_request(nxt_task_t *task, nxt_conn_t *c, nxt_app_parse_ctx_t *ap) { + nxt_mp_t *port_mp; nxt_int_t res; + nxt_port_t *port; nxt_req_id_t req_id; nxt_event_engine_t *engine; + nxt_req_app_link_t *ra; nxt_req_conn_link_t *rc; engine = task->thread->engine; @@ -2184,47 +2351,55 @@ nxt_router_process_http_request(nxt_task_t *task, nxt_conn_t *c, return; } - rc->ap = ap; - nxt_event_engine_request_add(engine, rc); nxt_debug(task, "req_id %uxD linked to conn %p at engine %p", req_id, c, engine); - rc->reply_port = engine->port; - res = nxt_router_app_port(task, rc); + ra = nxt_router_ra_create(task, rc); + + ra->ap = ap; + ra->reply_port = engine->port; + + res = nxt_router_app_port(task, ra); if (res != NXT_OK) { return; } - nxt_router_process_http_request_mp(task, rc, c->mem_pool); + port = ra->app_port; + + if (nxt_slow_path(port == NULL)) { + nxt_router_gen_error(task, rc->conn, 500, "Application port not found"); + return; + } + + port_mp = port->mem_pool; + port->mem_pool = c->mem_pool; + + nxt_router_process_http_request_mp(task, ra, port); + + port->mem_pool = port_mp; + + + nxt_router_ra_release(task, ra, ra->work.data); } static void -nxt_router_process_http_request_mp(nxt_task_t *task, nxt_req_conn_link_t *rc, - nxt_mp_t *mp) +nxt_router_process_http_request_mp(nxt_task_t *task, nxt_req_app_link_t *ra, + nxt_port_t *port) { - nxt_mp_t *port_mp; nxt_int_t res; - nxt_port_t *port, *c_port, *reply_port; + nxt_port_t *c_port, *reply_port; + nxt_conn_t *c; nxt_app_wmsg_t wmsg; nxt_app_parse_ctx_t *ap; - port = rc->app_port; - - if (nxt_slow_path(port == NULL)) { - nxt_router_gen_error(task, rc->conn, 500, "Application port not found"); - return; - } - - reply_port = rc->reply_port; - ap = rc->ap; - - port_mp = port->mem_pool; - port->mem_pool = mp; + reply_port = ra->reply_port; + ap = ra->ap; + c = ra->rc->conn; c_port = nxt_process_connected_port_find(port->process, reply_port->pid, reply_port->id); @@ -2232,9 +2407,9 @@ nxt_router_process_http_request_mp(nxt_task_t *task, nxt_req_conn_link_t *rc, res = nxt_port_send_port(task, port, reply_port, 0); if (nxt_slow_path(res != NXT_OK)) { - nxt_router_gen_error(task, rc->conn, 500, + nxt_router_gen_error(task, c, 500, "Failed to send reply port to application"); - goto fail; + return; } nxt_process_connected_port_add(port->process, reply_port); @@ -2243,14 +2418,14 @@ nxt_router_process_http_request_mp(nxt_task_t *task, nxt_req_conn_link_t *rc, wmsg.port = port; wmsg.write = NULL; wmsg.buf = &wmsg.write; - wmsg.stream = rc->req_id; + wmsg.stream = ra->req_id; - res = rc->app_port->app->module->prepare_msg(task, &ap->r, &wmsg); + res = port->app->module->prepare_msg(task, &ap->r, &wmsg); if (nxt_slow_path(res != NXT_OK)) { - nxt_router_gen_error(task, rc->conn, 500, + nxt_router_gen_error(task, c, 500, "Failed to prepare message for application"); - goto fail; + return; } nxt_debug(task, "about to send %d bytes buffer to worker port %d", @@ -2258,16 +2433,13 @@ nxt_router_process_http_request_mp(nxt_task_t *task, nxt_req_conn_link_t *rc, wmsg.port->socket.fd); res = nxt_port_socket_write(task, wmsg.port, NXT_PORT_MSG_DATA, - -1, rc->req_id, reply_port->id, wmsg.write); + -1, ra->req_id, reply_port->id, wmsg.write); if (nxt_slow_path(res != NXT_OK)) { - nxt_router_gen_error(task, rc->conn, 500, + nxt_router_gen_error(task, c, 500, "Failed to send message to application"); - goto fail; + return; } - -fail: - port->mem_pool = port_mp; } @@ -2378,6 +2550,8 @@ nxt_router_conn_free(nxt_task_t *task, void *obj, void *data) rc->app_port = NULL; } + rc->conn = NULL; + nxt_event_engine_request_remove(task->thread->engine, rc); } nxt_queue_loop; |