diff options
author | Max Romanov <max.romanov@nginx.com> | 2017-07-12 20:32:16 +0300 |
---|---|---|
committer | Max Romanov <max.romanov@nginx.com> | 2017-07-12 20:32:16 +0300 |
commit | b0c1e740cf404f8fed5eed75fddb205ca74314e0 (patch) | |
tree | 08dcefc827c5dfb1570b682ea8d1e9abf17a31dc /src/nxt_router.c | |
parent | c38bcb7d70729434893ae4d5f2f58a78a36d2bd5 (diff) | |
download | unit-b0c1e740cf404f8fed5eed75fddb205ca74314e0.tar.gz unit-b0c1e740cf404f8fed5eed75fddb205ca74314e0.tar.bz2 |
New process port exchange changed. READY message type introduced.
Application process start request DATA message from router to master.
Master notifies router via NEW_PORT message after worker process become ready.
Diffstat (limited to '')
-rw-r--r-- | src/nxt_router.c | 662 |
1 files changed, 531 insertions, 131 deletions
diff --git a/src/nxt_router.c b/src/nxt_router.c index b54d1cf3..fa43b09f 100644 --- a/src/nxt_router.c +++ b/src/nxt_router.c @@ -20,6 +20,19 @@ typedef struct { } nxt_router_listener_conf_t; +typedef struct nxt_start_worker_s nxt_start_worker_t; + +struct nxt_start_worker_s { + uint32_t stream; + nxt_app_t *app; + nxt_req_conn_link_t *rc; + nxt_mp_t *mem_pool; + void *joint; + + nxt_work_t work; +}; + + static nxt_router_temp_conf_t *nxt_router_temp_conf(nxt_task_t *task); static nxt_int_t nxt_router_conf_new(nxt_task_t *task, nxt_router_temp_conf_t *tmcf, u_char *start, u_char *end); @@ -87,11 +100,22 @@ static void nxt_router_thread_exit_handler(nxt_task_t *task, void *obj, static void nxt_router_conf_release(nxt_task_t *task, nxt_socket_conf_joint_t *joint); +static nxt_port_t * nxt_router_app_get_port(nxt_app_t *app); +static void nxt_router_app_release_port(nxt_task_t *task, void *obj, + void *data); + +static void nxt_router_sw_add(nxt_task_t *task, nxt_router_t *router, + nxt_start_worker_t *sw); +static nxt_start_worker_t *nxt_router_sw_find_remove(nxt_task_t *task, + nxt_router_t *router, uint32_t id); + static void nxt_router_conn_init(nxt_task_t *task, void *obj, void *data); static void nxt_router_conn_http_header_parse(nxt_task_t *task, void *obj, void *data); static void nxt_router_process_http_request(nxt_task_t *task, nxt_conn_t *c, nxt_app_parse_ctx_t *ap); +static void nxt_router_process_http_request_mp(nxt_task_t *task, + nxt_req_conn_link_t *rc, nxt_mp_t *mp); static void nxt_router_conn_ready(nxt_task_t *task, void *obj, void *data); static void nxt_router_conn_close(nxt_task_t *task, void *obj, void *data); static void nxt_router_conn_free(nxt_task_t *task, void *obj, void *data); @@ -99,13 +123,19 @@ static void nxt_router_conn_error(nxt_task_t *task, void *obj, void *data); static void nxt_router_conn_timeout(nxt_task_t *task, void *obj, void *data); static nxt_msec_t nxt_router_conn_timeout_value(nxt_conn_t *c, uintptr_t data); +static void nxt_router_gen_error(nxt_task_t *task, nxt_conn_t *c, int code, + const char* fmt, ...); + static nxt_router_t *nxt_router; nxt_int_t -nxt_router_start(nxt_task_t *task, nxt_runtime_t *rt) +nxt_router_start(nxt_task_t *task, void *data) { - nxt_int_t ret; - nxt_router_t *router; + nxt_int_t ret; + nxt_router_t *router; + nxt_runtime_t *rt; + + rt = task->thread->runtime; ret = nxt_app_http_init(task, rt); if (nxt_slow_path(ret != NXT_OK)) { @@ -127,6 +157,51 @@ nxt_router_start(nxt_task_t *task, nxt_runtime_t *rt) } +static void +nxt_router_sw_release(nxt_task_t *task, void *obj, void *data) +{ + nxt_start_worker_t *sw; + nxt_socket_conf_joint_t *joint; + + sw = obj; + joint = sw->joint; + + nxt_debug(task, "sw #%uxD release", sw->stream); + + if (nxt_mp_release(sw->mem_pool, sw) == 0) { + nxt_router_conf_release(task, joint); + } +} + + +void +nxt_router_new_port_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg) +{ + nxt_start_worker_t *sw; + + nxt_port_new_port_handler(task, msg); + + if (msg->new_port == NULL || msg->new_port->type != NXT_PROCESS_WORKER) { + return; + } + + sw = nxt_router_sw_find_remove(task, nxt_router, msg->port_msg.stream); + + if (nxt_fast_path(sw != NULL)) { + msg->new_port->app = sw->app; + + nxt_router_app_release_port(task, msg->new_port, sw->app); + + sw->work.handler = nxt_router_sw_release; + + nxt_debug(task, "post sw #%uxD release to %p", sw->stream, + sw->work.data); + + nxt_event_engine_post(sw->work.data, &sw->work); + } +} + + void nxt_router_conf_data_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg) { @@ -412,7 +487,6 @@ nxt_router_conf_create(nxt_task_t *task, nxt_router_temp_conf_t *tmcf, nxt_app_t *app, *prev; nxt_app_type_t type; nxt_sockaddr_t *sa; - nxt_queue_link_t *qlk, *nqlk; nxt_conf_value_t *conf, *http; nxt_conf_value_t *applications, *application; nxt_conf_value_t *listeners, *listener; @@ -496,19 +570,15 @@ nxt_router_conf_create(nxt_task_t *task, nxt_router_temp_conf_t *tmcf, nxt_debug(task, "application type: %V", &apcf.type); nxt_debug(task, "application workers: %D", apcf.workers); - if (nxt_str_eq(&apcf.type, "python", 6)) { - type = NXT_APP_PYTHON; - - } else if (nxt_str_eq(&apcf.type, "php", 3)) { - type = NXT_APP_PHP; + type = nxt_app_parse_type(&apcf.type); - } else if (nxt_str_eq(&apcf.type, "ruby", 4)) { - type = NXT_APP_RUBY; - - } else if (nxt_str_eq(&apcf.type, "go", 2)) { - type = NXT_APP_GO; + if (type == NXT_APP_UNKNOWN) { + nxt_log(task, NXT_LOG_CRIT, "unknown application type: \"%V\"", + &apcf.type); + goto app_fail; + } - } else { + if (nxt_app_modules[type] == NULL) { nxt_log(task, NXT_LOG_CRIT, "unsupported application type: \"%V\"", &apcf.type); goto app_fail; @@ -519,10 +589,14 @@ nxt_router_conf_create(nxt_task_t *task, nxt_router_temp_conf_t *tmcf, goto app_fail; } + nxt_queue_init(&app->ports); + nxt_queue_init(&app->requests); + app->name = name; app->type = type; app->max_workers = apcf.workers; app->live = 1; + app->module = nxt_app_modules[type]; nxt_queue_insert_tail(&tmcf->apps, &app->link); } @@ -607,16 +681,13 @@ app_fail: fail: - for (qlk = nxt_queue_first(&tmcf->apps); - qlk != nxt_queue_tail(&tmcf->apps); - qlk = nqlk) - { - nqlk = nxt_queue_next(qlk); - app = nxt_queue_link_data(qlk, nxt_app_t, link); + nxt_queue_each(app, &tmcf->apps, nxt_app_t, link) { + nxt_queue_remove(&app->link); nxt_thread_mutex_destroy(&app->mutex); nxt_free(app); - } + + } nxt_queue_loop; return NXT_ERROR; } @@ -625,19 +696,15 @@ fail: static nxt_app_t * nxt_router_app_find(nxt_queue_t *queue, nxt_str_t *name) { - nxt_app_t *app; - nxt_queue_link_t *qlk; + nxt_app_t *app; - for (qlk = nxt_queue_first(queue); - qlk != nxt_queue_tail(queue); - qlk = nxt_queue_next(qlk)) - { - app = nxt_queue_link_data(qlk, nxt_app_t, link); + nxt_queue_each(app, queue, nxt_app_t, link) { if (nxt_strstr_eq(name, &app->name)) { return app; } - } + + } nxt_queue_loop; return NULL; } @@ -1088,8 +1155,6 @@ nxt_router_thread_create(nxt_task_t *task, nxt_runtime_t *rt, nxt_event_engine_t *engine) { nxt_int_t ret; - nxt_port_t *port; - nxt_process_t *process; nxt_thread_link_t *link; nxt_thread_handle_t handle; @@ -1107,28 +1172,6 @@ nxt_router_thread_create(nxt_task_t *task, nxt_runtime_t *rt, nxt_queue_insert_tail(&rt->engines, &engine->link); - process = nxt_runtime_process_find(rt, nxt_pid); - if (nxt_slow_path(process == NULL)) { - return NXT_ERROR; - } - - port = nxt_process_port_new(process); - if (nxt_slow_path(port == NULL)) { - return NXT_ERROR; - } - - ret = nxt_port_socket_init(task, port, 0); - if (nxt_slow_path(ret != NXT_OK)) { - return ret; - } - - port->engine = 0; - port->type = NXT_PROCESS_ROUTER; - - engine->port = port; - - nxt_runtime_port_add(rt, port); - ret = nxt_thread_create(&handle, link); if (nxt_slow_path(ret != NXT_OK)) { @@ -1142,20 +1185,14 @@ nxt_router_thread_create(nxt_task_t *task, nxt_runtime_t *rt, static void nxt_router_apps_sort(nxt_router_t *router, nxt_router_temp_conf_t *tmcf) { - nxt_app_t *app; - nxt_queue_link_t *qlk, *nqlk; + nxt_app_t *app; - for (qlk = nxt_queue_first(&router->apps); - qlk != nxt_queue_tail(&router->apps); - qlk = nqlk) - { - nqlk = nxt_queue_next(qlk); - app = nxt_queue_link_data(qlk, nxt_app_t, link); + nxt_queue_each(app, &router->apps, nxt_app_t, link) { nxt_queue_remove(&app->link); - // RELEASE APP - } + // TODO RELEASE APP + } nxt_queue_loop; nxt_queue_add(&router->apps, &tmcf->previous); nxt_queue_add(&router->apps, &tmcf->apps); @@ -1232,6 +1269,8 @@ static nxt_port_handler_t nxt_router_app_port_handlers[] = { static void nxt_router_thread_start(void *data) { + nxt_int_t ret; + nxt_port_t *port; nxt_task_t *task; nxt_thread_t *thread; nxt_thread_link_t *link; @@ -1252,12 +1291,26 @@ nxt_router_thread_start(void *data) thread->task = &engine->task; thread->fiber = &engine->fibers->fiber; - nxt_mp_thread_adopt(engine->port->mem_pool); + engine->mem_pool = nxt_mp_create(4096, 128, 1024, 64); + + port = nxt_mp_zalloc(engine->mem_pool, sizeof(nxt_port_t)); + if (nxt_slow_path(port == NULL)) { + return; + } + + port->id = nxt_port_get_next_id(); + port->pid = nxt_pid; + + ret = nxt_port_socket_init(task, port, 0); + if (nxt_slow_path(ret != NXT_OK)) { + return; + } - engine->port->socket.task = task; - nxt_port_create(task, engine->port, nxt_router_app_port_handlers); + port->type = NXT_PROCESS_ROUTER; - engine->mem_pool = nxt_mp_create(4096, 128, 1024, 64); + engine->port = port; + + nxt_port_enable(task, port, nxt_router_app_port_handlers); nxt_event_engine_start(engine); } @@ -1456,6 +1509,9 @@ nxt_router_conf_release(nxt_task_t *task, nxt_socket_conf_joint_t *joint) nxt_thread_spin_unlock(lock); + /* TODO remove engine->port */ + /* TODO excude from connected ports */ + if (rtcf != NULL) { nxt_debug(task, "old router conf is destroyed"); @@ -1473,6 +1529,7 @@ nxt_router_conf_release(nxt_task_t *task, nxt_socket_conf_joint_t *joint) static void nxt_router_thread_exit_handler(nxt_task_t *task, void *obj, void *data) { + nxt_port_t *port; nxt_thread_link_t *link; nxt_event_engine_t *engine; nxt_thread_handle_t handle; @@ -1486,13 +1543,27 @@ nxt_router_thread_exit_handler(nxt_task_t *task, void *obj, void *data) nxt_queue_remove(&engine->link); + port = engine->port; + + // TODO notify all apps + + if (port->pair[0] != -1) { + nxt_fd_close(port->pair[0]); + } + + if (port->pair[1] != -1) { + nxt_fd_close(port->pair[1]); + } + + if (port->mem_pool) { + nxt_mp_destroy(port->mem_pool); + } + nxt_mp_destroy(engine->mem_pool); nxt_event_engine_free(engine); nxt_free(link); - - // TODO: free port } @@ -1616,28 +1687,265 @@ nxt_router_app_data_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg) } } +nxt_inline const char * +nxt_router_text_by_code(int code) +{ + switch (code) { + case 400: return "Bad request"; + case 404: return "Not found"; + case 403: return "Forbidden"; + case 500: + default: return "Internal server error"; + } +} -nxt_inline nxt_port_t * -nxt_router_app_port(nxt_task_t *task) +static void +nxt_router_gen_error(nxt_task_t *task, nxt_conn_t *c, int code, + const char* fmt, ...) { - nxt_port_t *port; - nxt_runtime_t *rt; + va_list args; + nxt_buf_t *b, *last; + const char *msg; + + b = nxt_buf_mem_alloc(c->mem_pool, 16384, 0); + if (nxt_slow_path(b == NULL)) { + /* TODO pogorevaTb */ + } + + b->mem.free = nxt_sprintf(b->mem.free, b->mem.end, + "HTTP/1.0 %d %s\r\n" + "Content-Type: text/plain\r\n" + "Connection: close\r\n\r\n", + code, nxt_router_text_by_code(code)); + + msg = (const char *) b->mem.free; + + va_start(args, fmt); + b->mem.free = nxt_vsprintf(b->mem.free, b->mem.end, fmt, args); + va_end(args); + + nxt_log_alert(task->log, "error %d: %s", code, msg); + + last = nxt_buf_sync_alloc(c->mem_pool, NXT_BUF_SYNC_LAST); + if (nxt_slow_path(last == NULL)) { + /* TODO pogorevaTb */ + } + + nxt_buf_chain_add(&b, last); + + if (c->write == NULL) { + c->write = b; + c->write_state = &nxt_router_conn_write_state; + + nxt_conn_write(task->thread->engine, c); + } else { + nxt_debug(task, "router data attach out bufs to existing chain"); + + nxt_buf_chain_add(&c->write, b); + } +} + + +static void +nxt_router_send_sw_request(nxt_task_t *task, void *obj, void *data) +{ + nxt_buf_t *b; + nxt_app_t *app; + nxt_port_t *port; + nxt_runtime_t *rt; + nxt_start_worker_t *sw; + + sw = obj; + app = sw->app; + + nxt_debug(task, "send sw #%uD", sw->stream); + + nxt_router_sw_add(task, nxt_router, sw); + nxt_queue_insert_tail(&app->requests, &sw->rc->app_link); rt = task->thread->runtime; + port = rt->port_by_type[NXT_PROCESS_MASTER]; - nxt_runtime_port_each(rt, port) { + b = nxt_buf_mem_alloc(port->mem_pool, app->conf.length, 0); - if (nxt_pid == port->pid) { - continue; - } + nxt_buf_cpystr(b, &app->conf); - if (port->type == NXT_PROCESS_WORKER) { - return port; - } + nxt_port_socket_write(task, port, NXT_PORT_MSG_DATA, -1, sw->stream, 0, b); +} - } nxt_runtime_port_loop; - return NULL; +static nxt_port_t * +nxt_router_app_get_port(nxt_app_t *app) +{ + nxt_port_t *port; + nxt_queue_link_t *lnk; + + port = NULL; + + nxt_thread_mutex_lock(&app->mutex); + + if (!nxt_queue_is_empty(&app->ports)) { + lnk = nxt_queue_first(&app->ports); + nxt_queue_remove(lnk); + + lnk->next = NULL; + + port = nxt_queue_link_data(lnk, nxt_port_t, app_link); + } + + nxt_thread_mutex_unlock(&app->mutex); + + return port; +} + + +static void +nxt_router_app_release_port(nxt_task_t *task, void *obj, void *data) +{ + nxt_app_t *app; + nxt_port_t *port; + nxt_work_t *work; + nxt_queue_link_t *lnk; + nxt_req_conn_link_t *rc; + + port = obj; + app = data; + + nxt_assert(app != NULL); + nxt_assert(app == port->app); + nxt_assert(port->app_link.next == NULL); + + + if (task->thread->engine != port->engine) { + work = (nxt_work_t *) (port + 1); + + nxt_debug(task, "post release port to engine %p", port->engine); + + work->next = NULL; + work->handler = nxt_router_app_release_port; + work->task = port->socket.task; + work->obj = port; + work->data = app; + + nxt_event_engine_post(port->engine, work); + + return; + } + + if (!nxt_queue_is_empty(&app->requests)) { + lnk = nxt_queue_first(&app->requests); + nxt_queue_remove(lnk); + + rc = nxt_queue_link_data(lnk, nxt_req_conn_link_t, app_link); + + nxt_debug(task, "process request #%uxD", rc->req_id); + + rc->app_port = port; + + nxt_router_process_http_request_mp(task, rc, rc->app_port->mem_pool); + + return; + } + + nxt_debug(task, "app requests queue is empty"); + + nxt_thread_mutex_lock(&app->mutex); + + nxt_queue_insert_head(&app->ports, &port->app_link); + + nxt_thread_mutex_unlock(&app->mutex); +} + + +void +nxt_router_app_remove_port(nxt_port_t *port) +{ + nxt_app_t *app; + + if (port->app_link.next == NULL) { + return; + } + + app = port->app; + +#if (NXT_DEBUG) + if (nxt_slow_path(app == NULL)) { + nxt_abort(); + } +#endif + + nxt_thread_mutex_lock(&app->mutex); + + nxt_queue_remove(&port->app_link); + port->app_link.next = NULL; + + nxt_thread_mutex_unlock(&app->mutex); +} + + +nxt_inline nxt_int_t +nxt_router_app_port(nxt_task_t *task, nxt_req_conn_link_t *rc) +{ + nxt_app_t *app; + nxt_conn_t *c; + nxt_port_t *port, *master_port; + nxt_runtime_t *rt; + nxt_start_worker_t *sw; + nxt_socket_conf_joint_t *joint; + + port = NULL; + c = rc->conn; + + joint = c->listen->socket.data; + app = joint->socket_conf->application; + + + if (app == NULL) { + nxt_router_gen_error(task, rc->conn, 500, + "Application is NULL in socket_conf"); + return NXT_ERROR; + } + + + port = nxt_router_app_get_port(app); + + if (port != NULL) { + rc->app_port = port; + return NXT_OK; + } + + + sw = nxt_mp_retain(c->mem_pool, sizeof(nxt_start_worker_t)); + + if (nxt_slow_path(sw == NULL)) { + nxt_router_gen_error(task, rc->conn, 500, + "Failed to allocate start worker struct"); + return NXT_ERROR; + } + + nxt_memzero(sw, sizeof(nxt_start_worker_t)); + + sw->stream = nxt_random(&task->thread->random); + sw->app = app; + sw->rc = rc; + sw->mem_pool = c->mem_pool; + sw->joint = c->listen->socket.data; + + sw->work.handler = nxt_router_send_sw_request; + sw->work.task = task; + sw->work.obj = sw; + sw->work.data = task->thread->engine; + + rt = task->thread->runtime; + + master_port = rt->port_by_type[NXT_PROCESS_MASTER]; + + nxt_debug(task, "post send sw %uxD to master engine %p", sw->stream, + master_port->engine); + + nxt_event_engine_post(master_port->engine, &sw->work); + + return NXT_AGAIN; } @@ -1715,13 +2023,12 @@ nxt_router_conn_http_header_parse(nxt_task_t *task, void *obj, void *data) } size = c->read->mem.free - c->read->mem.pos; - nxt_memcpy(b->mem.pos, c->read->mem.pos, size); - b->mem.free += size; - c->read = b; + c->read = nxt_buf_cpy(b, c->read->mem.pos, size); } else { - // TODO 500 Too long request headers - nxt_log_alert(task->log, "Too long request headers"); + nxt_router_gen_error(task, c, 400, + "Too long request headers"); + return; } } } @@ -1735,15 +2042,12 @@ nxt_router_conn_http_header_parse(nxt_task_t *task, void *obj, void *data) b = nxt_buf_mem_alloc(c->mem_pool, h->parsed_content_length, 0); if (nxt_slow_path(b == NULL)) { - // TODO 500 Failed to allocate buffer for request body - nxt_log_alert(task->log, "Failed to allocate buffer for " - "request body"); + nxt_router_gen_error(task, c, 500, "Failed to allocate " + "buffer for request body"); + return; } - b->mem.free = nxt_cpymem(b->mem.free, c->read->mem.pos, - preread); - - c->read = b; + c->read = nxt_buf_cpy(b, c->read->mem.pos, preread); } nxt_debug(task, "router request body read again, rest: %uz", @@ -1761,26 +2065,11 @@ static void nxt_router_process_http_request(nxt_task_t *task, nxt_conn_t *c, nxt_app_parse_ctx_t *ap) { - nxt_mp_t *port_mp; nxt_int_t res; - nxt_port_t *port, *c_port; nxt_req_id_t req_id; - nxt_app_wmsg_t wmsg; nxt_event_engine_t *engine; nxt_req_conn_link_t *rc; - if (nxt_slow_path(nxt_app == NULL)) { - // 500 Application not found - nxt_log_alert(task->log, "application is NULL"); - } - - port = nxt_router_app_port(task); - - if (nxt_slow_path(port == NULL)) { - // 500 Application port not found - nxt_log_alert(task->log, "application port not found"); - } - engine = task->thread->engine; do { @@ -1790,8 +2079,10 @@ nxt_router_process_http_request(nxt_task_t *task, nxt_conn_t *c, rc = nxt_conn_request_add(c, req_id); if (nxt_slow_path(rc == NULL)) { - // 500 Failed to allocate req->conn link - nxt_log_alert(task->log, "failed to allocate req->conn link"); + nxt_router_gen_error(task, c, 500, "Failed to allocate " + "req->conn link"); + + return; } nxt_event_engine_request_add(engine, rc); @@ -1799,33 +2090,66 @@ nxt_router_process_http_request(nxt_task_t *task, nxt_conn_t *c, nxt_debug(task, "req_id %uxD linked to conn %p at engine %p", req_id, c, engine); + rc->reply_port = engine->port; + + res = nxt_router_app_port(task, rc); + + if (res != NXT_OK) { + return; + } + + nxt_router_process_http_request_mp(task, rc, c->mem_pool); +} + + +static void +nxt_router_process_http_request_mp(nxt_task_t *task, nxt_req_conn_link_t *rc, + nxt_mp_t *mp) +{ + nxt_mp_t *port_mp; + nxt_int_t res; + nxt_port_t *port, *c_port, *reply_port; + nxt_app_wmsg_t wmsg; + nxt_app_parse_ctx_t *ap; + + port = rc->app_port; + + if (nxt_slow_path(port == NULL)) { + nxt_router_gen_error(task, rc->conn, 500, "Application port not found"); + return; + } + + reply_port = rc->reply_port; + ap = rc->conn->socket.data; + port_mp = port->mem_pool; - port->mem_pool = c->mem_pool; + port->mem_pool = mp; - c_port = nxt_process_connected_port_find(port->process, - engine->port->pid, - engine->port->id); - if (nxt_slow_path(c_port != engine->port)) { - res = nxt_port_send_port(task, port, engine->port); + c_port = nxt_process_connected_port_find(port->process, reply_port->pid, + reply_port->id); + if (nxt_slow_path(c_port != reply_port)) { + res = nxt_port_send_port(task, port, reply_port, 0); if (nxt_slow_path(res != NXT_OK)) { - // 500 Failed to send reply port - nxt_log_alert(task->log, "failed to send reply port to application"); + nxt_router_gen_error(task, rc->conn, 500, + "Failed to send reply port to application"); + goto fail; } - nxt_process_connected_port_add(port->process, engine->port); + nxt_process_connected_port_add(port->process, reply_port); } wmsg.port = port; wmsg.write = NULL; wmsg.buf = &wmsg.write; - wmsg.stream = req_id; + wmsg.stream = rc->req_id; - res = nxt_app->prepare_msg(task, &ap->r, &wmsg); + res = rc->app_port->app->module->prepare_msg(task, &ap->r, &wmsg); if (nxt_slow_path(res != NXT_OK)) { - // 500 Failed to prepare message - nxt_log_alert(task->log, "failed to prepare message for application"); + nxt_router_gen_error(task, rc->conn, 500, + "Failed to prepare message for application"); + goto fail; } nxt_debug(task, "about to send %d bytes buffer to worker port %d", @@ -1833,13 +2157,15 @@ nxt_router_process_http_request(nxt_task_t *task, nxt_conn_t *c, wmsg.port->socket.fd); res = nxt_port_socket_write(task, wmsg.port, NXT_PORT_MSG_DATA, - -1, req_id, engine->port->id, wmsg.write); + -1, rc->req_id, reply_port->id, wmsg.write); if (nxt_slow_path(res != NXT_OK)) { - // 500 Failed to send message - nxt_log_alert(task->log, "failed to send message to application"); + nxt_router_gen_error(task, rc->conn, 500, + "Failed to send message to application"); + goto fail; } +fail: port->mem_pool = port_mp; } @@ -1934,6 +2260,12 @@ nxt_router_conn_free(nxt_task_t *task, void *obj, void *data) nxt_debug(task, "conn %p close, req %uxD", c, rc->req_id); + if (rc->app_port != NULL) { + nxt_router_app_release_port(task, rc->app_port, rc->app_port->app); + + rc->app_port = NULL; + } + nxt_event_engine_request_remove(task->thread->engine, rc); } nxt_queue_loop; @@ -1944,9 +2276,9 @@ nxt_router_conn_free(nxt_task_t *task, void *obj, void *data) task = &task->thread->engine->task; - nxt_mp_release(c->mem_pool, c); - - nxt_router_conf_release(task, joint); + if (nxt_mp_release(c->mem_pool, c) == 0) { + nxt_router_conf_release(task, joint); + } } @@ -1992,3 +2324,71 @@ nxt_router_conn_timeout_value(nxt_conn_t *c, uintptr_t data) return nxt_value_at(nxt_msec_t, joint->socket_conf, data); } + + +static nxt_int_t +nxt_sw_test(nxt_lvlhsh_query_t *lhq, void *data) +{ + return NXT_OK; +} + + +static const nxt_lvlhsh_proto_t lvlhsh_sw_proto nxt_aligned(64) = { + NXT_LVLHSH_DEFAULT, + nxt_sw_test, + nxt_lvlhsh_alloc, + nxt_lvlhsh_free, +}; + + +static void +nxt_router_sw_add(nxt_task_t *task, nxt_router_t *router, + nxt_start_worker_t *sw) +{ + nxt_lvlhsh_query_t lhq; + + lhq.key_hash = nxt_murmur_hash2(&sw->stream, sizeof(sw->stream)); + lhq.key.length = sizeof(sw->stream); + lhq.key.start = (u_char *) &sw->stream; + lhq.proto = &lvlhsh_sw_proto; + lhq.replace = 0; + lhq.value = sw; + lhq.pool = task->thread->runtime->mem_pool; + + switch (nxt_lvlhsh_insert(&router->start_workers, &lhq)) { + + case NXT_OK: + break; + + default: + nxt_log_error(NXT_LOG_WARN, task->log, "stream %08uxD sw add failed", + sw->stream); + break; + } +} + + +static nxt_start_worker_t * +nxt_router_sw_find_remove(nxt_task_t *task, nxt_router_t *router, uint32_t id) +{ + nxt_lvlhsh_query_t lhq; + + lhq.key_hash = nxt_murmur_hash2(&id, sizeof(id)); + lhq.key.length = sizeof(id); + lhq.key.start = (u_char *) &id; + lhq.proto = &lvlhsh_sw_proto; + lhq.pool = task->thread->runtime->mem_pool; + + switch (nxt_lvlhsh_delete(&router->start_workers, &lhq)) { + + case NXT_OK: + return lhq.value; + + default: + nxt_log_error(NXT_LOG_WARN, task->log, "stream %08uxD sw remove failed", + id); + break; + } + + return NULL; +} |