diff options
author | Max Romanov <max.romanov@nginx.com> | 2017-07-12 20:32:16 +0300 |
---|---|---|
committer | Max Romanov <max.romanov@nginx.com> | 2017-07-12 20:32:16 +0300 |
commit | b0c1e740cf404f8fed5eed75fddb205ca74314e0 (patch) | |
tree | 08dcefc827c5dfb1570b682ea8d1e9abf17a31dc /src | |
parent | c38bcb7d70729434893ae4d5f2f58a78a36d2bd5 (diff) | |
download | unit-b0c1e740cf404f8fed5eed75fddb205ca74314e0.tar.gz unit-b0c1e740cf404f8fed5eed75fddb205ca74314e0.tar.bz2 |
New process port exchange changed. READY message type introduced.
Application process start request DATA message from router to master.
Master notifies router via NEW_PORT message after worker process become ready.
Diffstat (limited to 'src')
-rw-r--r-- | src/nxt_application.c | 58 | ||||
-rw-r--r-- | src/nxt_application.h | 60 | ||||
-rw-r--r-- | src/nxt_buf.h | 15 | ||||
-rw-r--r-- | src/nxt_conn.h | 5 | ||||
-rw-r--r-- | src/nxt_controller.c | 13 | ||||
-rw-r--r-- | src/nxt_log.h | 13 | ||||
-rw-r--r-- | src/nxt_master_process.c | 203 | ||||
-rw-r--r-- | src/nxt_master_process.h | 6 | ||||
-rw-r--r-- | src/nxt_mp.c | 6 | ||||
-rw-r--r-- | src/nxt_mp.h | 2 | ||||
-rw-r--r-- | src/nxt_port.c | 95 | ||||
-rw-r--r-- | src/nxt_port.h | 26 | ||||
-rw-r--r-- | src/nxt_port_memory.c | 68 | ||||
-rw-r--r-- | src/nxt_port_memory.h | 2 | ||||
-rw-r--r-- | src/nxt_port_socket.c | 13 | ||||
-rw-r--r-- | src/nxt_process.c | 132 | ||||
-rw-r--r-- | src/nxt_process.h | 28 | ||||
-rw-r--r-- | src/nxt_python_wsgi.c | 115 | ||||
-rw-r--r-- | src/nxt_router.c | 662 | ||||
-rw-r--r-- | src/nxt_router.h | 13 | ||||
-rw-r--r-- | src/nxt_runtime.c | 70 | ||||
-rw-r--r-- | src/nxt_runtime.h | 4 | ||||
-rw-r--r-- | src/nxt_worker_process.c | 2 |
23 files changed, 1226 insertions, 385 deletions
diff --git a/src/nxt_application.c b/src/nxt_application.c index 37e2577c..50feac2d 100644 --- a/src/nxt_application.c +++ b/src/nxt_application.c @@ -9,19 +9,27 @@ #include <nxt_main.h> #include <nxt_runtime.h> #include <nxt_application.h> +#include <nxt_master_process.h> -nxt_application_module_t *nxt_app; +nxt_application_module_t *nxt_app_modules[NXT_APP_MAX]; static nxt_thread_mutex_t nxt_app_mutex; static nxt_thread_cond_t nxt_app_cond; static nxt_http_fields_hash_entry_t nxt_app_request_fields[]; -static nxt_http_fields_hash_t *nxt_app_request_fields_hash; +static nxt_http_fields_hash_t *nxt_app_request_fields_hash; + +static nxt_application_module_t *nxt_app; nxt_int_t -nxt_app_start(nxt_task_t *task, nxt_runtime_t *rt) +nxt_app_start(nxt_task_t *task, void *data) { + nxt_int_t ret; + nxt_common_app_conf_t *app_conf; + + app_conf = data; + if (nxt_slow_path(nxt_thread_mutex_create(&nxt_app_mutex) != NXT_OK)) { return NXT_ERROR; } @@ -30,11 +38,18 @@ nxt_app_start(nxt_task_t *task, nxt_runtime_t *rt) return NXT_ERROR; } - if (nxt_slow_path(nxt_app->init(task) != NXT_OK)) { + nxt_app = nxt_app_modules[app_conf->type_id]; + + ret = nxt_app->init(task, data); + + if (nxt_slow_path(ret != NXT_OK)) { nxt_debug(task, "application init failed"); + + } else { + nxt_debug(task, "application init done"); } - return NXT_OK; + return ret; } @@ -570,3 +585,36 @@ nxt_app_msg_write_raw(nxt_task_t *task, nxt_app_wmsg_t *msg, const u_char *c, return NXT_OK; } + + +nxt_app_type_t +nxt_app_parse_type(nxt_str_t *str) +{ + if (nxt_str_eq(str, "python", 6)) { + return NXT_APP_PYTHON; + + } else if (nxt_str_eq(str, "python2", 7)) { + return NXT_APP_PYTHON2; + + } else if (nxt_str_eq(str, "python3", 7)) { + return NXT_APP_PYTHON3; + + } else if (nxt_str_eq(str, "php", 3)) { + return NXT_APP_PHP; + + } else if (nxt_str_eq(str, "php5", 4)) { + return NXT_APP_PHP5; + + } else if (nxt_str_eq(str, "php7", 4)) { + return NXT_APP_PHP7; + + } else if (nxt_str_eq(str, "ruby", 4)) { + return NXT_APP_RUBY; + + } else if (nxt_str_eq(str, "go", 2)) { + return NXT_APP_GO; + + } + + return NXT_APP_UNKNOWN; +} diff --git a/src/nxt_application.h b/src/nxt_application.h index 5a585c89..449f1158 100644 --- a/src/nxt_application.h +++ b/src/nxt_application.h @@ -10,13 +10,57 @@ typedef enum { - NXT_APP_PYTHON = 0, + NXT_APP_UNKNOWN = 0, + NXT_APP_PYTHON, + NXT_APP_PYTHON2, + NXT_APP_PYTHON3, NXT_APP_PHP, + NXT_APP_PHP5, + NXT_APP_PHP7, NXT_APP_RUBY, NXT_APP_GO, + + NXT_APP_MAX, } nxt_app_type_t; +typedef struct nxt_common_app_conf_s nxt_common_app_conf_t; + + +typedef struct { + nxt_str_t path; + nxt_str_t module; +} nxt_python_app_conf_t; + + +typedef struct { + nxt_str_t root; + nxt_str_t script; + nxt_str_t index; +} nxt_php_app_conf_t; + + +typedef struct { + nxt_str_t executable; +} nxt_go_app_conf_t; + + +struct nxt_common_app_conf_s { + nxt_str_t type; + nxt_app_type_t type_id; + nxt_str_t user; + nxt_str_t group; + + uint32_t workers; + + union { + nxt_python_app_conf_t python; + nxt_php_app_conf_t php; + nxt_go_app_conf_t go; + } u; +}; + + typedef struct { nxt_str_t name; nxt_str_t value; @@ -138,18 +182,22 @@ nxt_int_t nxt_app_msg_read_size(nxt_task_t *task, nxt_app_rmsg_t *rmsg, size_t *size); -typedef struct { - nxt_int_t (*init)(nxt_task_t *task); +typedef struct nxt_app_module_s nxt_application_module_t; +typedef struct nxt_app_module_s nxt_app_module_t; + +struct nxt_app_module_s { + nxt_int_t (*init)(nxt_task_t *task, + nxt_common_app_conf_t *conf); nxt_int_t (*prepare_msg)(nxt_task_t *task, nxt_app_request_t *r, nxt_app_wmsg_t *wmsg); nxt_int_t (*run)(nxt_task_t *task, nxt_app_rmsg_t *rmsg, nxt_app_wmsg_t *wmsg); -} nxt_application_module_t; +}; -extern nxt_application_module_t *nxt_app; +extern nxt_application_module_t *nxt_app_modules[NXT_APP_MAX]; @@ -227,4 +275,6 @@ nxt_app_msg_read_length(u_char *src, size_t *length) } +nxt_app_type_t nxt_app_parse_type(nxt_str_t *str); + #endif /* _NXT_APPLICATION_H_INCLIDED_ */ diff --git a/src/nxt_buf.h b/src/nxt_buf.h index ae96b24a..2e896093 100644 --- a/src/nxt_buf.h +++ b/src/nxt_buf.h @@ -258,5 +258,20 @@ nxt_buf_free(mp, b) \ NXT_EXPORT void nxt_buf_chain_add(nxt_buf_t **head, nxt_buf_t *in); NXT_EXPORT size_t nxt_buf_chain_length(nxt_buf_t *b); +nxt_inline nxt_buf_t * +nxt_buf_cpy(nxt_buf_t *b, const void *src, size_t length) +{ + nxt_memcpy(b->mem.free, src, length); + b->mem.free += length; + + return b; +} + +nxt_inline nxt_buf_t * +nxt_buf_cpystr(nxt_buf_t *b, const nxt_str_t *str) +{ + return nxt_buf_cpy(b, str->start, str->length); +} + #endif /* _NXT_BUF_H_INCLIDED_ */ diff --git a/src/nxt_conn.h b/src/nxt_conn.h index f440e04b..fa9f5b9d 100644 --- a/src/nxt_conn.h +++ b/src/nxt_conn.h @@ -187,8 +187,11 @@ typedef uint32_t nxt_req_id_t; typedef struct { nxt_req_id_t req_id; nxt_conn_t *conn; + nxt_port_t *app_port; + nxt_port_t *reply_port; - nxt_queue_link_t link; + nxt_queue_link_t link; /* for nxt_conn_t.requests */ + nxt_queue_link_t app_link; /* for nxt_app_t.requests */ } nxt_req_conn_link_t; diff --git a/src/nxt_controller.c b/src/nxt_controller.c index 4cf6ff95..ec0f99bc 100644 --- a/src/nxt_controller.c +++ b/src/nxt_controller.c @@ -89,15 +89,18 @@ static const nxt_event_conn_state_t nxt_controller_conn_close_state; nxt_int_t -nxt_controller_start(nxt_task_t *task, nxt_runtime_t *rt) +nxt_controller_start(nxt_task_t *task, void *data) { nxt_mp_t *mp; + nxt_runtime_t *rt; nxt_conf_value_t *conf; nxt_http_fields_hash_t *hash; static const nxt_str_t json = nxt_string("{ \"listeners\": {}, \"applications\": {} }"); + rt = task->thread->runtime; + hash = nxt_http_fields_hash_create(nxt_controller_request_fields, rt->mem_pool); if (nxt_slow_path(hash == NULL)) { @@ -853,13 +856,7 @@ nxt_controller_conf_pass(nxt_task_t *task, nxt_conf_value_t *conf) rt = task->thread->runtime; - nxt_runtime_port_each(rt, port) { - - if (port->type == NXT_PROCESS_ROUTER) { - break; - } - - } nxt_runtime_port_loop; + port = rt->port_by_type[NXT_PROCESS_ROUTER]; size = nxt_conf_json_length(conf, NULL); diff --git a/src/nxt_log.h b/src/nxt_log.h index 74260576..504d2ee8 100644 --- a/src/nxt_log.h +++ b/src/nxt_log.h @@ -162,4 +162,17 @@ NXT_EXPORT extern nxt_log_t nxt_main_log; NXT_EXPORT extern nxt_str_t nxt_log_levels[]; +#define nxt_assert(c) \ + do { \ + if (nxt_fast_path(c)) { \ + break; \ + \ + } else { \ + nxt_thread_log_alert("%s:%d assertion failed: %s", \ + __FILE__, __LINE__, #c ); \ + nxt_abort(); \ + } \ + } while (0) + + #endif /* _NXT_LOG_H_INCLUDED_ */ diff --git a/src/nxt_master_process.c b/src/nxt_master_process.c index 3b638389..c5699572 100644 --- a/src/nxt_master_process.c +++ b/src/nxt_master_process.c @@ -8,6 +8,8 @@ #include <nxt_runtime.h> #include <nxt_port.h> #include <nxt_master_process.h> +#include <nxt_conf.h> +#include <nxt_application.h> static nxt_int_t nxt_master_process_port_create(nxt_task_t *task, @@ -17,8 +19,8 @@ static nxt_int_t nxt_master_start_controller_process(nxt_task_t *task, nxt_runtime_t *rt); static nxt_int_t nxt_master_start_router_process(nxt_task_t *task, nxt_runtime_t *rt); -static nxt_int_t nxt_master_start_worker_processes(nxt_task_t *task, - nxt_runtime_t *rt); +static nxt_int_t nxt_master_start_worker_process(nxt_task_t *task, + nxt_runtime_t *rt, nxt_common_app_conf_t *app_conf, uint32_t stream); static nxt_int_t nxt_master_create_worker_process(nxt_task_t *task, nxt_runtime_t *rt, nxt_process_init_t *init); static void nxt_master_process_sigterm_handler(nxt_task_t *task, void *obj, @@ -64,15 +66,135 @@ nxt_master_process_start(nxt_thread_t *thr, nxt_task_t *task, return ret; } - ret = nxt_master_start_router_process(task, rt); + return nxt_master_start_router_process(task, rt); +} + + +static nxt_conf_map_t nxt_common_app_conf[] = { + { + nxt_string("type"), + NXT_CONF_MAP_STR, + offsetof(nxt_common_app_conf_t, type), + }, + + { + nxt_string("user"), + NXT_CONF_MAP_STR, + offsetof(nxt_common_app_conf_t, user), + }, + + { + nxt_string("group"), + NXT_CONF_MAP_STR, + offsetof(nxt_common_app_conf_t, group), + }, + + { + nxt_string("workers"), + NXT_CONF_MAP_INT32, + offsetof(nxt_common_app_conf_t, workers), + }, + + { + nxt_string("path"), + NXT_CONF_MAP_STR, + offsetof(nxt_common_app_conf_t, u.python.path), + }, + + { + nxt_string("module"), + NXT_CONF_MAP_STR, + offsetof(nxt_common_app_conf_t, u.python.module), + }, + + { + nxt_string("root"), + NXT_CONF_MAP_STR, + offsetof(nxt_common_app_conf_t, u.php.root), + }, + + { + nxt_string("script"), + NXT_CONF_MAP_STR, + offsetof(nxt_common_app_conf_t, u.php.script), + }, + + { + nxt_string("index"), + NXT_CONF_MAP_STR, + offsetof(nxt_common_app_conf_t, u.php.index), + }, + + { + nxt_string("executable"), + NXT_CONF_MAP_STR, + offsetof(nxt_common_app_conf_t, u.go.executable), + }, +}; + + +static void +nxt_port_master_data_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg) +{ + size_t dump_size; + nxt_mp_t *mp; + nxt_int_t ret; + nxt_buf_t *b; + nxt_conf_value_t *conf; + nxt_common_app_conf_t app_conf; + + static nxt_str_t nobody = nxt_string("nobody"); + + b = msg->buf; + dump_size = b->mem.free - b->mem.pos; + + if (dump_size > 300) { + dump_size = 300; + } + + nxt_debug(task, "master data: %*s", dump_size, b->mem.pos); + + mp = nxt_mp_create(1024, 128, 256, 32); + + conf = nxt_conf_json_parse(mp, b->mem.pos, b->mem.free); + b->mem.pos = b->mem.free; + + if (conf == NULL) { + nxt_log(task, NXT_LOG_CRIT, "configuration parsing error"); + return; + } + + nxt_memzero(&app_conf, sizeof(nxt_common_app_conf_t)); + + app_conf.user = nobody; + + ret = nxt_conf_map_object(conf, nxt_common_app_conf, + nxt_nitems(nxt_common_app_conf), &app_conf); if (ret != NXT_OK) { - return ret; + nxt_log(task, NXT_LOG_CRIT, "root map error"); + return; } - return nxt_master_start_worker_processes(task, rt); + app_conf.type_id = nxt_app_parse_type(&app_conf.type); + + ret = nxt_master_start_worker_process(task, task->thread->runtime, + &app_conf, msg->port_msg.stream); + + nxt_mp_destroy(mp); } +static nxt_port_handler_t nxt_master_process_port_handlers[] = { + NULL, + nxt_port_new_port_handler, + NULL, + nxt_port_mmap_handler, + nxt_port_master_data_handler, + NULL, + nxt_port_ready_handler, +}; + + static nxt_int_t nxt_master_process_port_create(nxt_task_t *task, nxt_runtime_t *rt) { @@ -85,7 +207,8 @@ nxt_master_process_port_create(nxt_task_t *task, nxt_runtime_t *rt) return NXT_ERROR; } - port = nxt_process_port_new(process); + port = nxt_process_port_new(rt, process, nxt_port_get_next_id(), + NXT_PROCESS_MASTER); if (nxt_slow_path(port == NULL)) { return NXT_ERROR; } @@ -95,16 +218,15 @@ nxt_master_process_port_create(nxt_task_t *task, nxt_runtime_t *rt) return ret; } - port->engine = 0; - port->type = NXT_PROCESS_MASTER; - nxt_runtime_port_add(rt, port); /* * A master process port. A write port is not closed * since it should be inherited by worker processes. */ - nxt_port_read_enable(task, port); + nxt_port_enable(task, port, nxt_master_process_port_handlers); + + process->ready = 1; return NXT_OK; } @@ -148,6 +270,9 @@ nxt_master_start_controller_process(nxt_task_t *task, nxt_runtime_t *rt) init->port_handlers = nxt_controller_process_port_handlers; init->signals = nxt_worker_process_signals; init->type = NXT_PROCESS_CONTROLLER; + init->data = rt; + init->stream = 0; + init->restart = 1; return nxt_master_create_worker_process(task, rt, init); } @@ -169,41 +294,45 @@ nxt_master_start_router_process(nxt_task_t *task, nxt_runtime_t *rt) init->port_handlers = nxt_router_process_port_handlers; init->signals = nxt_worker_process_signals; init->type = NXT_PROCESS_ROUTER; + init->data = rt; + init->stream = 0; + init->restart = 1; return nxt_master_create_worker_process(task, rt, init); } static nxt_int_t -nxt_master_start_worker_processes(nxt_task_t *task, nxt_runtime_t *rt) +nxt_master_start_worker_process(nxt_task_t *task, nxt_runtime_t *rt, + nxt_common_app_conf_t *app_conf, uint32_t stream) { - nxt_int_t ret; - nxt_uint_t n; nxt_process_init_t *init; - init = nxt_mp_get(rt->mem_pool, sizeof(nxt_process_init_t)); + init = nxt_malloc(sizeof(nxt_process_init_t) + + sizeof(nxt_user_cred_t)); if (nxt_slow_path(init == NULL)) { return NXT_ERROR; } + init->user_cred = (nxt_user_cred_t *) (init + 1); + init->start = nxt_app_start; init->name = "worker process"; - init->user_cred = &rt->user_cred; + + init->user_cred->user = (char *) app_conf->user.start; + if (nxt_user_cred_get(task, init->user_cred, + (char *) app_conf->group.start) != NXT_OK) { + return NXT_ERROR; + } + init->port_handlers = nxt_app_process_port_handlers; init->signals = nxt_worker_process_signals; init->type = NXT_PROCESS_WORKER; + init->data = app_conf; + init->stream = stream; + init->restart = 0; - n = rt->worker_processes; - - while (n-- != 0) { - ret = nxt_master_create_worker_process(task, rt, init); - - if (ret != NXT_OK) { - return ret; - } - } - - return NXT_OK; + return nxt_master_create_worker_process(task, rt, init); } @@ -214,7 +343,7 @@ nxt_master_create_worker_process(nxt_task_t *task, nxt_runtime_t *rt, nxt_int_t ret; nxt_pid_t pid; nxt_port_t *port; - nxt_process_t *process, *master_process; + nxt_process_t *process; /* * TODO: remove process, init, ports from array on memory and fork failures. @@ -226,24 +355,20 @@ nxt_master_create_worker_process(nxt_task_t *task, nxt_runtime_t *rt, } process->init = init; - master_process = rt->mprocess; - init->master_port = nxt_process_port_first(master_process); - port = nxt_process_port_new(process); + port = nxt_process_port_new(rt, process, 0, init->type); if (nxt_slow_path(port == NULL)) { + nxt_runtime_process_destroy(rt, process); return NXT_ERROR; } - init->port = port; - ret = nxt_port_socket_init(task, port, 0); if (nxt_slow_path(ret != NXT_OK)) { + nxt_mp_free(rt->mem_pool, port); + nxt_runtime_process_destroy(rt, process); return ret; } - port->engine = 0; - port->type = init->type; - pid = nxt_process_create(task, process); switch (pid) { @@ -261,7 +386,6 @@ nxt_master_create_worker_process(nxt_task_t *task, nxt_runtime_t *rt, nxt_port_read_close(port); nxt_port_write_enable(task, port); - nxt_port_send_new_port(task, rt, port); return NXT_OK; } } @@ -503,7 +627,12 @@ nxt_master_cleanup_worker_process(nxt_task_t *task, nxt_pid_t pid) } } else if (init != NULL) { - (void) nxt_master_create_worker_process(task, rt, init); + if (init->restart != 0) { + (void) nxt_master_create_worker_process(task, rt, init); + + } else { + nxt_free(init); + } } } } diff --git a/src/nxt_master_process.h b/src/nxt_master_process.h index 6f7cc067..bcdf645d 100644 --- a/src/nxt_master_process.h +++ b/src/nxt_master_process.h @@ -12,9 +12,9 @@ nxt_int_t nxt_master_process_start(nxt_thread_t *thr, nxt_task_t *task, nxt_runtime_t *runtime); void nxt_master_stop_worker_processes(nxt_task_t *task, nxt_runtime_t *runtime); -nxt_int_t nxt_controller_start(nxt_task_t *task, nxt_runtime_t *rt); -nxt_int_t nxt_router_start(nxt_task_t *task, nxt_runtime_t *rt); - +nxt_int_t nxt_controller_start(nxt_task_t *task, void *data); +nxt_int_t nxt_router_start(nxt_task_t *task, void *data); +nxt_int_t nxt_app_start(nxt_task_t *task, void *data); extern nxt_port_handler_t nxt_controller_process_port_handlers[]; extern nxt_port_handler_t nxt_worker_process_port_handlers[]; diff --git a/src/nxt_mp.c b/src/nxt_mp.c index 0b82cf2a..90adcf8d 100644 --- a/src/nxt_mp.c +++ b/src/nxt_mp.c @@ -917,7 +917,7 @@ nxt_mp_retain(nxt_mp_t *mp, size_t size) } -void +uint32_t nxt_mp_release(nxt_mp_t *mp, void *p) { nxt_mp_free(mp, p); @@ -928,7 +928,11 @@ nxt_mp_release(nxt_mp_t *mp, void *p) if (mp->retain == 0) { nxt_mp_destroy(mp); + + return 0; } + + return mp->retain; } diff --git a/src/nxt_mp.h b/src/nxt_mp.h index 78040df2..72404e89 100644 --- a/src/nxt_mp.h +++ b/src/nxt_mp.h @@ -87,7 +87,7 @@ NXT_EXPORT void *nxt_mp_retain(nxt_mp_t *mp, size_t size) * nxt_mp_release() returns freeable memory and decreases memory pool * retention counter. If the counter becomes zero the pool is destroyed. */ -NXT_EXPORT void nxt_mp_release(nxt_mp_t *mp, void *p); +NXT_EXPORT uint32_t nxt_mp_release(nxt_mp_t *mp, void *p); /* nxt_mp_nget() returns non-aligned non-freeable memory. */ diff --git a/src/nxt_port.c b/src/nxt_port.c index 404e6424..3abc4125 100644 --- a/src/nxt_port.c +++ b/src/nxt_port.c @@ -11,13 +11,28 @@ static void nxt_port_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg); +static nxt_atomic_uint_t nxt_port_last_id; + +nxt_port_id_t +nxt_port_get_next_id() +{ + return nxt_atomic_fetch_add(&nxt_port_last_id, 1); +} + void -nxt_port_create(nxt_task_t *task, nxt_port_t *port, +nxt_port_reset_next_id() +{ + nxt_port_last_id = 1; +} + + +void +nxt_port_enable(nxt_task_t *task, nxt_port_t *port, nxt_port_handler_t *handlers) { port->pid = nxt_pid; - port->engine = task->thread->engine->id; + port->engine = task->thread->engine; port->handler = nxt_port_handler; port->data = handlers; @@ -77,12 +92,13 @@ nxt_port_quit_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg) void nxt_port_send_new_port(nxt_task_t *task, nxt_runtime_t *rt, - nxt_port_t *new_port) + nxt_port_t *new_port, uint32_t stream) { + nxt_port_t *port; nxt_process_t *process; - nxt_debug(task, "new port %d for process %PI engine %uD", - new_port->pair[1], new_port->pid, new_port->engine); + nxt_debug(task, "new port %d for process %PI", + new_port->pair[1], new_port->pid); nxt_runtime_process_each(rt, process) { @@ -90,15 +106,22 @@ nxt_port_send_new_port(nxt_task_t *task, nxt_runtime_t *rt, continue; } - (void) nxt_port_send_port(task, nxt_process_port_first(process), - new_port); + port = nxt_process_port_first(process); + + if (port->type == NXT_PROCESS_MASTER || + port->type == NXT_PROCESS_CONTROLLER || + port->type == NXT_PROCESS_ROUTER) { + + (void) nxt_port_send_port(task, port, new_port, stream); + } } nxt_runtime_process_loop; } nxt_int_t -nxt_port_send_port(nxt_task_t *task, nxt_port_t *port, nxt_port_t *new_port) +nxt_port_send_port(nxt_task_t *task, nxt_port_t *port, nxt_port_t *new_port, + uint32_t stream) { nxt_buf_t *b; nxt_port_msg_new_port_t *msg; @@ -116,13 +139,12 @@ nxt_port_send_port(nxt_task_t *task, nxt_port_t *port, nxt_port_t *new_port) msg->id = new_port->id; msg->pid = new_port->pid; - msg->engine = new_port->engine; msg->max_size = port->max_size; msg->max_share = port->max_share; msg->type = new_port->type; return nxt_port_socket_write(task, port, NXT_PORT_MSG_NEW_PORT, - new_port->pair[1], 0, 0, b); + new_port->pair[1], stream, 0, b); } @@ -140,12 +162,26 @@ nxt_port_new_port_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg) new_port_msg = (nxt_port_msg_new_port_t *) msg->buf->mem.pos; msg->buf->mem.pos = msg->buf->mem.free; + nxt_debug(task, "new port %d received for process %PI:%d", + msg->fd, new_port_msg->pid, new_port_msg->id); + + port = nxt_runtime_port_find(rt, new_port_msg->pid, new_port_msg->id); + if (port != NULL) { + nxt_debug(task, "port %PI:%d already exists", new_port_msg->pid, + new_port_msg->id); + + nxt_fd_close(msg->fd); + msg->fd = -1; + return; + } + process = nxt_runtime_process_get(rt, new_port_msg->pid); if (nxt_slow_path(process == NULL)) { return; } - port = nxt_process_port_new(process); + port = nxt_process_port_new(rt, process, new_port_msg->id, + new_port_msg->type); if (nxt_slow_path(port == NULL)) { return; } @@ -157,16 +193,10 @@ nxt_port_new_port_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg) port->mem_pool = mp; - nxt_debug(task, "new port %d received for process %PI engine %uD", - msg->fd, new_port_msg->pid, new_port_msg->engine); - - port->id = new_port_msg->id; - port->engine = new_port_msg->engine; port->pair[0] = -1; port->pair[1] = msg->fd; port->max_size = new_port_msg->max_size; port->max_share = new_port_msg->max_share; - port->type = new_port_msg->type; nxt_queue_init(&port->messages); @@ -175,6 +205,37 @@ nxt_port_new_port_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg) nxt_runtime_port_add(rt, port); nxt_port_write_enable(task, port); + + msg->new_port = port; +} + + +void +nxt_port_ready_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg) +{ + nxt_port_t *port; + nxt_process_t *process; + nxt_runtime_t *rt; + + rt = task->thread->runtime; + + process = nxt_runtime_process_get(rt, msg->port_msg.pid); + if (nxt_slow_path(process == NULL)) { + return; + } + + process->ready = 1; + + port = nxt_process_port_first(process); + if (nxt_slow_path(port == NULL)) { + return; + } + + nxt_debug(task, "process %PI ready", msg->port_msg.pid); + + if (nxt_runtime_is_master(rt)) { + nxt_port_send_new_port(task, rt, port, msg->port_msg.stream); + } } diff --git a/src/nxt_port.h b/src/nxt_port.h index c7566426..603540e2 100644 --- a/src/nxt_port.h +++ b/src/nxt_port.h @@ -15,6 +15,7 @@ typedef enum { NXT_PORT_MSG_MMAP, NXT_PORT_MSG_DATA, NXT_PORT_MSG_REMOVE_PID, + NXT_PORT_MSG_READY, NXT_PORT_MSG_MAX, } nxt_port_msg_type_t; @@ -53,13 +54,19 @@ struct nxt_port_recv_msg_s { nxt_port_t *port; nxt_port_msg_t port_msg; size_t size; + nxt_port_t *new_port; }; +typedef struct nxt_app_s nxt_app_t; struct nxt_port_s { nxt_fd_event_t socket; nxt_queue_link_t link; /* for nxt_process_t.ports */ + nxt_process_t *process; + + nxt_queue_link_t app_link; /* for nxt_app_t.ports */ + nxt_app_t *app; nxt_queue_t messages; /* of nxt_port_send_msg_t */ @@ -69,25 +76,24 @@ struct nxt_port_s { uint32_t max_share; nxt_port_handler_t handler; - void *data; + nxt_port_handler_t *data; nxt_mp_t *mem_pool; + nxt_event_engine_t *engine; + nxt_buf_t *free_bufs; nxt_socket_t pair[2]; nxt_port_id_t id; nxt_pid_t pid; - uint32_t engine; - nxt_process_type_t type:8; - nxt_process_t *process; + nxt_process_type_t type; }; typedef struct { nxt_port_id_t id; nxt_pid_t pid; - uint32_t engine; size_t max_size; size_t max_share; nxt_process_type_t type:8; @@ -104,6 +110,9 @@ typedef union { } nxt_port_data_t; +nxt_port_id_t nxt_port_get_next_id(void); +void nxt_port_reset_next_id(void); + nxt_int_t nxt_port_socket_init(nxt_task_t *task, nxt_port_t *port, size_t max_size); void nxt_port_destroy(nxt_port_t *port); @@ -115,19 +124,20 @@ nxt_int_t nxt_port_socket_write(nxt_task_t *task, nxt_port_t *port, nxt_uint_t type, nxt_fd_t fd, uint32_t stream, nxt_port_id_t reply_port, nxt_buf_t *b); -void nxt_port_create(nxt_task_t *task, nxt_port_t *port, +void nxt_port_enable(nxt_task_t *task, nxt_port_t *port, nxt_port_handler_t *handlers); void nxt_port_write(nxt_task_t *task, nxt_runtime_t *rt, nxt_uint_t type, nxt_fd_t fd, uint32_t stream, nxt_buf_t *b); void nxt_port_send_new_port(nxt_task_t *task, nxt_runtime_t *rt, - nxt_port_t *port); + nxt_port_t *port, uint32_t stream); nxt_int_t nxt_port_send_port(nxt_task_t *task, nxt_port_t *port, - nxt_port_t *new_port); + nxt_port_t *new_port, uint32_t stream); void nxt_port_change_log_file(nxt_task_t *task, nxt_runtime_t *rt, nxt_uint_t slot, nxt_fd_t fd); void nxt_port_quit_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg); void nxt_port_new_port_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg); +void nxt_port_ready_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg); void nxt_port_change_log_file_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg); void nxt_port_mmap_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg); diff --git a/src/nxt_port_memory.c b/src/nxt_port_memory.c index 38e7db70..32d4aa5f 100644 --- a/src/nxt_port_memory.c +++ b/src/nxt_port_memory.c @@ -26,6 +26,56 @@ nxt_port_mmap_destroy(nxt_port_mmap_t *port_mmap) } +static nxt_array_t * +nxt_port_mmaps_create() +{ + nxt_mp_t *mp; + + mp = nxt_mp_create(1024, 128, 256, 32); + + if (nxt_slow_path(mp == NULL)) { + return NULL; + } + + return nxt_array_create(mp, 1, sizeof(nxt_port_mmap_t)); +} + + +static nxt_port_mmap_t * +nxt_port_mmap_add(nxt_array_t *port_mmaps) +{ + nxt_mp_thread_adopt(port_mmaps->mem_pool); + + return nxt_array_zero_add(port_mmaps); +} + + +void +nxt_port_mmaps_destroy(nxt_array_t *port_mmaps, nxt_bool_t destroy_pool) +{ + uint32_t i; + nxt_port_mmap_t *port_mmap; + + if (port_mmaps == NULL) { + return; + } + + nxt_mp_thread_adopt(port_mmaps->mem_pool); + + port_mmap = port_mmaps->elts; + + for (i = 0; i < port_mmaps->nelts; i++) { + nxt_port_mmap_destroy(port_mmap); + } + + port_mmaps->nelts = 0; + + if (destroy_pool != 0) { + nxt_mp_destroy(port_mmaps->mem_pool); + } +} + + static void nxt_port_mmap_buf_completion(nxt_task_t *task, void *obj, void *data) { @@ -63,6 +113,7 @@ nxt_port_mmap_buf_completion(nxt_task_t *task, void *obj, void *data) p = b->mem.pos - 1; c = nxt_port_mmap_chunk_id(hdr, p) + 1; p = nxt_port_mmap_chunk_start(hdr, c); + } else { p = b->mem.start; c = nxt_port_mmap_chunk_id(hdr, p); @@ -103,9 +154,7 @@ nxt_port_incoming_port_mmap(nxt_task_t *task, nxt_process_t *process, nxt_thread_mutex_lock(&process->incoming_mutex); if (process->incoming == NULL) { - process->incoming_mp = nxt_mp_create(1024, 128, 256, 32); - process->incoming = nxt_array_create(process->incoming_mp, 1, - sizeof(nxt_port_mmap_t)); + process->incoming = nxt_port_mmaps_create(); } if (nxt_slow_path(process->incoming == NULL)) { @@ -114,9 +163,7 @@ nxt_port_incoming_port_mmap(nxt_task_t *task, nxt_process_t *process, goto fail; } - nxt_mp_thread_adopt(process->incoming_mp); - - port_mmap = nxt_array_zero_add(process->incoming); + port_mmap = nxt_port_mmap_add(process->incoming); if (nxt_slow_path(port_mmap == NULL)) { nxt_log(task, NXT_LOG_WARN, "failed to add mmap to incoming array"); @@ -140,6 +187,7 @@ nxt_port_incoming_port_mmap(nxt_task_t *task, nxt_process_t *process, if (nxt_slow_path(port_mmap->hdr->id != process->incoming->nelts - 1)) { nxt_log(task, NXT_LOG_WARN, "port mmap id mismatch (%d != %d)", port_mmap->hdr->id, process->incoming->nelts - 1); + nxt_abort(); } fail: @@ -195,9 +243,7 @@ nxt_port_new_port_mmap(nxt_task_t *task, nxt_process_t *process, port_mmap = NULL; if (process->outgoing == NULL) { - process->outgoing_mp = nxt_mp_create(1024, 128, 256, 32); - process->outgoing = nxt_array_create(process->outgoing_mp, 1, - sizeof(nxt_port_mmap_t)); + process->outgoing = nxt_port_mmaps_create(); } if (nxt_slow_path(process->outgoing == NULL)) { @@ -206,9 +252,7 @@ nxt_port_new_port_mmap(nxt_task_t *task, nxt_process_t *process, return NULL; } - nxt_mp_thread_adopt(process->outgoing_mp); - - port_mmap = nxt_array_zero_add(process->outgoing); + port_mmap = nxt_port_mmap_add(process->outgoing); if (nxt_slow_path(port_mmap == NULL)) { nxt_log(task, NXT_LOG_WARN, "failed to add port mmap to outgoing array"); diff --git a/src/nxt_port_memory.h b/src/nxt_port_memory.h index 332bd856..38e17a50 100644 --- a/src/nxt_port_memory.h +++ b/src/nxt_port_memory.h @@ -15,6 +15,8 @@ typedef struct nxt_port_mmap_header_s nxt_port_mmap_header_t; void nxt_port_mmap_destroy(nxt_port_mmap_t *port_mmap); +void nxt_port_mmaps_destroy(nxt_array_t *port_mmaps, nxt_bool_t destroy_pool); + /* * Allocates nxt_but_t structure from port's mem_pool, assigns this buf 'mem' * pointers to first available shared mem bucket(s). 'size' used as a hint to diff --git a/src/nxt_port_socket.c b/src/nxt_port_socket.c index 56dfe755..a0c6fea3 100644 --- a/src/nxt_port_socket.c +++ b/src/nxt_port_socket.c @@ -121,7 +121,9 @@ nxt_port_write_enable(nxt_task_t *task, nxt_port_t *port) port->socket.log = &nxt_main_log; port->socket.write_ready = 1; - port->socket.write_work_queue = &task->thread->engine->fast_work_queue; + port->engine = task->thread->engine; + + port->socket.write_work_queue = &port->engine->fast_work_queue; port->socket.write_handler = nxt_port_write_handler; port->socket.error_handler = nxt_port_error_handler; } @@ -344,6 +346,7 @@ nxt_port_write_handler(nxt_task_t *task, void *obj, void *data) } while (port->socket.write_ready); if (nxt_fd_event_is_disabled(port->socket.write)) { + /* TODO task->thread->engine or port->engine ? */ nxt_fd_event_enable_write(task->thread->engine, &port->socket); } @@ -362,11 +365,13 @@ nxt_port_read_enable(nxt_task_t *task, nxt_port_t *port) port->socket.fd = port->pair[0]; port->socket.log = &nxt_main_log; - port->socket.read_work_queue = &task->thread->engine->fast_work_queue; + port->engine = task->thread->engine; + + port->socket.read_work_queue = &port->engine->fast_work_queue; port->socket.read_handler = nxt_port_read_handler; port->socket.error_handler = nxt_port_error_handler; - nxt_fd_event_enable_read(task->thread->engine, &port->socket); + nxt_fd_event_enable_read(port->engine, &port->socket); } @@ -389,6 +394,8 @@ nxt_port_read_handler(nxt_task_t *task, void *obj, void *data) port = msg.port = nxt_container_of(obj, nxt_port_t, socket); + nxt_assert(port->engine == task->thread->engine); + for ( ;; ) { b = nxt_port_buf_alloc(port); diff --git a/src/nxt_process.c b/src/nxt_process.c index 7d5af92d..5899ce30 100644 --- a/src/nxt_process.c +++ b/src/nxt_process.c @@ -8,7 +8,7 @@ #include <nxt_master_process.h> -static void nxt_process_start(nxt_task_t *task, nxt_process_init_t *process); +static void nxt_process_start(nxt_task_t *task, nxt_process_t *process); static nxt_int_t nxt_user_groups_get(nxt_task_t *task, nxt_user_cred_t *uc); @@ -23,6 +23,7 @@ nxt_pid_t nxt_process_create(nxt_task_t *task, nxt_process_t *process) { nxt_pid_t pid; + nxt_process_t *p; nxt_runtime_t *rt; rt = task->thread->runtime; @@ -44,13 +45,31 @@ nxt_process_create(nxt_task_t *task, nxt_process_t *process) task->thread->tid = 0; process->pid = nxt_pid; - process->init->port->pid = nxt_pid; rt->types = 0; + nxt_port_reset_next_id(); + + /* Remove not ready processes */ + nxt_runtime_process_each(rt, p) { + + if (!p->ready) { + nxt_debug(task, "remove not ready process %PI", p->pid); + + nxt_runtime_process_remove(rt, p); + } else { + nxt_port_mmaps_destroy(p->incoming, 0); + nxt_port_mmaps_destroy(p->outgoing, 0); + } + + } nxt_runtime_process_loop; + nxt_runtime_process_add(rt, process); - nxt_process_start(task, process->init); + nxt_process_start(task, process); + + process->ready = 1; + break; default: @@ -58,7 +77,6 @@ nxt_process_create(nxt_task_t *task, nxt_process_t *process) nxt_debug(task, "fork(\"%s\"): %PI", process->init->name, pid); process->pid = pid; - process->init->port->pid = pid; nxt_runtime_process_add(rt, process); @@ -70,26 +88,30 @@ nxt_process_create(nxt_task_t *task, nxt_process_t *process) static void -nxt_process_start(nxt_task_t *task, nxt_process_init_t *process) +nxt_process_start(nxt_task_t *task, nxt_process_t *process) { nxt_int_t ret; + nxt_port_t *port, *master_port; nxt_thread_t *thread; nxt_runtime_t *rt; + nxt_process_init_t *init; nxt_event_engine_t *engine; const nxt_event_interface_t *interface; - nxt_log(task, NXT_LOG_INFO, "%s started", process->name); + init = process->init; - nxt_process_title(task, "nginext: %s", process->name); + nxt_log(task, NXT_LOG_INFO, "%s started", init->name); + + nxt_process_title(task, "nginext: %s", init->name); thread = task->thread; nxt_random_init(&thread->random); - if (process->user_cred != NULL && getuid() == 0) { + if (init->user_cred != NULL && getuid() == 0) { /* Super-user. */ - ret = nxt_user_cred_set(task, process->user_cred); + ret = nxt_user_cred_set(task, init->user_cred); if (ret != NXT_OK) { goto fail; } @@ -97,15 +119,15 @@ nxt_process_start(nxt_task_t *task, nxt_process_init_t *process) rt = thread->runtime; - rt->types |= (1U << process->type); + rt->types |= (1U << init->type); engine = thread->engine; /* Update inherited master process event engine and signals processing. */ - engine->signals->sigev = process->signals; + engine->signals->sigev = init->signals; interface = nxt_service_get(rt->services, "engine", rt->engine); - if (interface == NULL) { + if (nxt_slow_path(interface == NULL)) { goto fail; } @@ -113,25 +135,40 @@ nxt_process_start(nxt_task_t *task, nxt_process_init_t *process) goto fail; } - nxt_port_read_close(process->master_port); - nxt_port_write_enable(task, process->master_port); - - /* A worker process port. */ - nxt_port_write_close(process->port); - nxt_port_create(task, process->port, process->port_handlers); - ret = nxt_runtime_thread_pool_create(thread, rt, rt->auxiliary_threads, 60000 * 1000000LL); - if (ret != NXT_OK) { + if (nxt_slow_path(ret != NXT_OK)) { + goto fail; + } + + master_port = rt->port_by_type[NXT_PROCESS_MASTER]; + + nxt_port_read_close(master_port); + nxt_port_write_enable(task, master_port); + + port = nxt_process_port_first(process); + + nxt_port_write_close(port); + + ret = init->start(task, init->data); + + if (nxt_slow_path(ret != NXT_OK)) { goto fail; } - ret = process->start(task, rt); + nxt_port_enable(task, port, init->port_handlers); - if (ret == NXT_OK) { - return; + ret = nxt_port_socket_write(task, master_port, NXT_PORT_MSG_READY, + -1, init->stream, 0, NULL); + + if (nxt_slow_path(ret != NXT_OK)) { + nxt_log(task, NXT_LOG_ERR, "failed to send READY message to master"); + + goto fail; } + return; + fail: exit(1); @@ -527,15 +564,24 @@ nxt_user_cred_set(nxt_task_t *task, nxt_user_cred_t *uc) nxt_port_t * -nxt_process_port_new(nxt_process_t *process) +nxt_process_port_new(nxt_runtime_t *rt, nxt_process_t *process, + nxt_port_id_t id, nxt_process_type_t type) { + size_t size; nxt_port_t *port; - port = nxt_mp_zalloc(process->mem_pool, sizeof(nxt_port_t)); + size = sizeof(nxt_port_t); + if (size == NXT_PROCESS_WORKER) { + size += sizeof(nxt_work_t); + } + + port = nxt_mp_zalloc(rt->mem_pool, size); + if (nxt_fast_path(port != NULL)) { - port->id = process->last_port_id++; + port->id = id; port->pid = process->pid; port->process = process; + port->type = type; nxt_process_port_add(process, port); } @@ -547,25 +593,45 @@ nxt_process_port_new(nxt_process_t *process) void nxt_process_connected_port_add(nxt_process_t *process, nxt_port_t *port) { - /* TODO lock ports */ + nxt_thread_mutex_lock(&process->cp_mutex); + + if (process->cp_mem_pool == NULL) { + process->cp_mem_pool = nxt_mp_create(1024, 128, 256, 32); + } - nxt_port_hash_add(&process->connected_ports, process->mem_pool, port); + nxt_mp_thread_adopt(process->cp_mem_pool); + + nxt_port_hash_add(&process->connected_ports, process->cp_mem_pool, port); + + nxt_thread_mutex_unlock(&process->cp_mutex); } void nxt_process_connected_port_remove(nxt_process_t *process, nxt_port_t *port) { - /* TODO lock ports */ + nxt_thread_mutex_lock(&process->cp_mutex); + + if (process->cp_mem_pool != NULL) { + nxt_mp_thread_adopt(process->cp_mem_pool); + + nxt_port_hash_remove(&process->connected_ports, process->cp_mem_pool, + port); + } - nxt_port_hash_remove(&process->connected_ports, process->mem_pool, port); + nxt_thread_mutex_unlock(&process->cp_mutex); } nxt_port_t * nxt_process_connected_port_find(nxt_process_t *process, nxt_pid_t pid, nxt_port_id_t port_id) { - /* TODO lock ports */ + nxt_port_t *res; - return nxt_port_hash_find(&process->connected_ports, pid, port_id); -} + nxt_thread_mutex_lock(&process->cp_mutex); + + res = nxt_port_hash_find(&process->connected_ports, pid, port_id); + nxt_thread_mutex_unlock(&process->cp_mutex); + + return res; +} diff --git a/src/nxt_process.h b/src/nxt_process.h index 28824bd0..c49d2be3 100644 --- a/src/nxt_process.h +++ b/src/nxt_process.h @@ -14,6 +14,8 @@ typedef enum { NXT_PROCESS_CONTROLLER, NXT_PROCESS_ROUTER, NXT_PROCESS_WORKER, + + NXT_PROCESS_MAX, } nxt_process_type_t; @@ -31,38 +33,41 @@ typedef struct { } nxt_user_cred_t; typedef struct nxt_process_init_s nxt_process_init_t; -typedef nxt_int_t (*nxt_process_star_t)(nxt_task_t *task, nxt_runtime_t *rt); +typedef nxt_int_t (*nxt_process_start_t)(nxt_task_t *task, void *data); struct nxt_process_init_s { - nxt_process_star_t start; + nxt_process_start_t start; const char *name; nxt_user_cred_t *user_cred; - nxt_port_t *port; - nxt_port_t *master_port; nxt_port_handler_t *port_handlers; const nxt_sig_event_t *signals; - nxt_process_type_t type:8; /* 3 bits */ + nxt_process_type_t type; + + void *data; + uint32_t stream; + + nxt_bool_t restart; }; typedef struct { - nxt_mp_t *mem_pool; - nxt_pid_t pid; nxt_queue_t ports; /* of nxt_port_t */ - nxt_port_id_t last_port_id; + nxt_bool_t ready; nxt_process_init_t *init; + nxt_thread_mutex_t incoming_mutex; - nxt_mp_t *incoming_mp; nxt_array_t *incoming; /* of nxt_port_mmap_t */ + nxt_thread_mutex_t outgoing_mutex; - nxt_mp_t *outgoing_mp; nxt_array_t *outgoing; /* of nxt_port_mmap_t */ + nxt_thread_mutex_t cp_mutex; + nxt_mp_t *cp_mem_pool; nxt_lvlhsh_t connected_ports; /* of nxt_port_t */ } nxt_process_t; @@ -77,7 +82,8 @@ NXT_EXPORT void nxt_nanosleep(nxt_nsec_t ns); NXT_EXPORT void nxt_process_arguments(nxt_task_t *task, char **orig_argv, char ***orig_envp); -NXT_EXPORT nxt_port_t * nxt_process_port_new(nxt_process_t *process); +NXT_EXPORT nxt_port_t * nxt_process_port_new(nxt_runtime_t *rt, + nxt_process_t *process, nxt_port_id_t id, nxt_process_type_t type); #define nxt_process_port_remove(port) \ nxt_queue_remove(&port->link) diff --git a/src/nxt_python_wsgi.c b/src/nxt_python_wsgi.c index cf3858a5..aabdc0a1 100644 --- a/src/nxt_python_wsgi.c +++ b/src/nxt_python_wsgi.c @@ -58,7 +58,7 @@ typedef struct { } nxt_py_error_t; -static nxt_int_t nxt_python_init(nxt_task_t *task); +static nxt_int_t nxt_python_init(nxt_task_t *task, nxt_common_app_conf_t *conf); static nxt_int_t nxt_python_prepare_msg(nxt_task_t *task, nxt_app_request_t *r, nxt_app_wmsg_t *msg); @@ -167,8 +167,6 @@ static PyTypeObject nxt_py_input_type = { }; -static char *nxt_py_module; - static PyObject *nxt_py_application; static PyObject *nxt_py_start_resp_obj; static PyObject *nxt_py_environ_ptyp; @@ -181,94 +179,67 @@ static nxt_python_run_ctx_t *nxt_python_run_ctx; nxt_int_t nxt_python_wsgi_init(nxt_thread_t *thr, nxt_runtime_t *rt) { - char **argv; - char *p; - - argv = nxt_process_argv; - - while (*argv != NULL) { - p = *argv++; - - if (nxt_strcmp(p, "--py-module") == 0) { - if (*argv == NULL) { - nxt_log_emerg(thr->log, - "no argument for option \"--py-module\""); - return NXT_ERROR; - } - - nxt_py_module = *argv++; - - nxt_log_error(NXT_LOG_INFO, thr->log, "python module: \"%s\"", - nxt_py_module); - - break; - } - } + nxt_app_modules[NXT_APP_PYTHON] = &nxt_python_module; - if (nxt_py_module == NULL) { - return NXT_OK; - } +#if PY_MAJOR_VERSION == 2 + nxt_app_modules[NXT_APP_PYTHON2] = &nxt_python_module; +#endif - nxt_app = &nxt_python_module; +#if PY_MAJOR_VERSION == 3 + nxt_app_modules[NXT_APP_PYTHON3] = &nxt_python_module; +#endif return NXT_OK; } static nxt_int_t -nxt_python_init(nxt_task_t *task) +nxt_python_init(nxt_task_t *task, nxt_common_app_conf_t *conf) { - char **argv; - char *p, *dir; - PyObject *obj, *pypath, *module; + char *nxt_py_module; + PyObject *obj, *pypath, *module; + nxt_python_app_conf_t *c; + c = &conf->u.python; + + if (c->module.length == 0) { + nxt_log_emerg(task->log, "python module is empty"); + return NXT_ERROR; + } Py_InitializeEx(0); obj = NULL; module = NULL; - argv = nxt_process_argv; - while (*argv != NULL) { - p = *argv++; + if (c->path.length > 0) { + obj = PyString_FromStringAndSize((char *) c->path.start, + c->path.length); - if (nxt_strcmp(p, "--py-path") == 0) { - if (*argv == NULL) { - nxt_log_emerg(task->log, "no argument for option \"--py-path\""); - goto fail; - } - - dir = *argv++; - - nxt_log_error(NXT_LOG_INFO, task->log, "python path \"%s\"", dir); - - obj = PyString_FromString((char *) dir); - - if (nxt_slow_path(obj == NULL)) { - nxt_log_alert(task->log, - "Python failed create string object \"%s\"", dir); - goto fail; - } - - pypath = PySys_GetObject((char *) "path"); - - if (nxt_slow_path(pypath == NULL)) { - nxt_log_alert(task->log, - "Python failed to get \"sys.path\" list"); - goto fail; - } + if (nxt_slow_path(obj == NULL)) { + nxt_log_alert(task->log, + "Python failed create string object \"%V\"", + &c->path); + goto fail; + } - if (nxt_slow_path(PyList_Insert(pypath, 0, obj) != 0)) { - nxt_log_alert(task->log, - "Python failed to insert \"%s\" into \"sys.path\"", dir); - goto fail; - } + pypath = PySys_GetObject((char *) "path"); - Py_DECREF(obj); - obj = NULL; + if (nxt_slow_path(pypath == NULL)) { + nxt_log_alert(task->log, + "Python failed to get \"sys.path\" list"); + goto fail; + } - continue; + if (nxt_slow_path(PyList_Insert(pypath, 0, obj) != 0)) { + nxt_log_alert(task->log, + "Python failed to insert \"%V\" into \"sys.path\"", + &c->path); + goto fail; } + + Py_DECREF(obj); + obj = NULL; } obj = PyCFunction_New(nxt_py_start_resp_method, NULL); @@ -303,7 +274,9 @@ nxt_python_init(nxt_task_t *task) Py_DECREF(obj); - // PyOS_AfterFork(); + nxt_py_module = nxt_alloca(c->module.length + 1); + nxt_memcpy(nxt_py_module, c->module.start, c->module.length); + nxt_py_module[c->module.length] = '\0'; module = PyImport_ImportModule(nxt_py_module); diff --git a/src/nxt_router.c b/src/nxt_router.c index b54d1cf3..fa43b09f 100644 --- a/src/nxt_router.c +++ b/src/nxt_router.c @@ -20,6 +20,19 @@ typedef struct { } nxt_router_listener_conf_t; +typedef struct nxt_start_worker_s nxt_start_worker_t; + +struct nxt_start_worker_s { + uint32_t stream; + nxt_app_t *app; + nxt_req_conn_link_t *rc; + nxt_mp_t *mem_pool; + void *joint; + + nxt_work_t work; +}; + + static nxt_router_temp_conf_t *nxt_router_temp_conf(nxt_task_t *task); static nxt_int_t nxt_router_conf_new(nxt_task_t *task, nxt_router_temp_conf_t *tmcf, u_char *start, u_char *end); @@ -87,11 +100,22 @@ static void nxt_router_thread_exit_handler(nxt_task_t *task, void *obj, static void nxt_router_conf_release(nxt_task_t *task, nxt_socket_conf_joint_t *joint); +static nxt_port_t * nxt_router_app_get_port(nxt_app_t *app); +static void nxt_router_app_release_port(nxt_task_t *task, void *obj, + void *data); + +static void nxt_router_sw_add(nxt_task_t *task, nxt_router_t *router, + nxt_start_worker_t *sw); +static nxt_start_worker_t *nxt_router_sw_find_remove(nxt_task_t *task, + nxt_router_t *router, uint32_t id); + static void nxt_router_conn_init(nxt_task_t *task, void *obj, void *data); static void nxt_router_conn_http_header_parse(nxt_task_t *task, void *obj, void *data); static void nxt_router_process_http_request(nxt_task_t *task, nxt_conn_t *c, nxt_app_parse_ctx_t *ap); +static void nxt_router_process_http_request_mp(nxt_task_t *task, + nxt_req_conn_link_t *rc, nxt_mp_t *mp); static void nxt_router_conn_ready(nxt_task_t *task, void *obj, void *data); static void nxt_router_conn_close(nxt_task_t *task, void *obj, void *data); static void nxt_router_conn_free(nxt_task_t *task, void *obj, void *data); @@ -99,13 +123,19 @@ static void nxt_router_conn_error(nxt_task_t *task, void *obj, void *data); static void nxt_router_conn_timeout(nxt_task_t *task, void *obj, void *data); static nxt_msec_t nxt_router_conn_timeout_value(nxt_conn_t *c, uintptr_t data); +static void nxt_router_gen_error(nxt_task_t *task, nxt_conn_t *c, int code, + const char* fmt, ...); + static nxt_router_t *nxt_router; nxt_int_t -nxt_router_start(nxt_task_t *task, nxt_runtime_t *rt) +nxt_router_start(nxt_task_t *task, void *data) { - nxt_int_t ret; - nxt_router_t *router; + nxt_int_t ret; + nxt_router_t *router; + nxt_runtime_t *rt; + + rt = task->thread->runtime; ret = nxt_app_http_init(task, rt); if (nxt_slow_path(ret != NXT_OK)) { @@ -127,6 +157,51 @@ nxt_router_start(nxt_task_t *task, nxt_runtime_t *rt) } +static void +nxt_router_sw_release(nxt_task_t *task, void *obj, void *data) +{ + nxt_start_worker_t *sw; + nxt_socket_conf_joint_t *joint; + + sw = obj; + joint = sw->joint; + + nxt_debug(task, "sw #%uxD release", sw->stream); + + if (nxt_mp_release(sw->mem_pool, sw) == 0) { + nxt_router_conf_release(task, joint); + } +} + + +void +nxt_router_new_port_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg) +{ + nxt_start_worker_t *sw; + + nxt_port_new_port_handler(task, msg); + + if (msg->new_port == NULL || msg->new_port->type != NXT_PROCESS_WORKER) { + return; + } + + sw = nxt_router_sw_find_remove(task, nxt_router, msg->port_msg.stream); + + if (nxt_fast_path(sw != NULL)) { + msg->new_port->app = sw->app; + + nxt_router_app_release_port(task, msg->new_port, sw->app); + + sw->work.handler = nxt_router_sw_release; + + nxt_debug(task, "post sw #%uxD release to %p", sw->stream, + sw->work.data); + + nxt_event_engine_post(sw->work.data, &sw->work); + } +} + + void nxt_router_conf_data_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg) { @@ -412,7 +487,6 @@ nxt_router_conf_create(nxt_task_t *task, nxt_router_temp_conf_t *tmcf, nxt_app_t *app, *prev; nxt_app_type_t type; nxt_sockaddr_t *sa; - nxt_queue_link_t *qlk, *nqlk; nxt_conf_value_t *conf, *http; nxt_conf_value_t *applications, *application; nxt_conf_value_t *listeners, *listener; @@ -496,19 +570,15 @@ nxt_router_conf_create(nxt_task_t *task, nxt_router_temp_conf_t *tmcf, nxt_debug(task, "application type: %V", &apcf.type); nxt_debug(task, "application workers: %D", apcf.workers); - if (nxt_str_eq(&apcf.type, "python", 6)) { - type = NXT_APP_PYTHON; - - } else if (nxt_str_eq(&apcf.type, "php", 3)) { - type = NXT_APP_PHP; + type = nxt_app_parse_type(&apcf.type); - } else if (nxt_str_eq(&apcf.type, "ruby", 4)) { - type = NXT_APP_RUBY; - - } else if (nxt_str_eq(&apcf.type, "go", 2)) { - type = NXT_APP_GO; + if (type == NXT_APP_UNKNOWN) { + nxt_log(task, NXT_LOG_CRIT, "unknown application type: \"%V\"", + &apcf.type); + goto app_fail; + } - } else { + if (nxt_app_modules[type] == NULL) { nxt_log(task, NXT_LOG_CRIT, "unsupported application type: \"%V\"", &apcf.type); goto app_fail; @@ -519,10 +589,14 @@ nxt_router_conf_create(nxt_task_t *task, nxt_router_temp_conf_t *tmcf, goto app_fail; } + nxt_queue_init(&app->ports); + nxt_queue_init(&app->requests); + app->name = name; app->type = type; app->max_workers = apcf.workers; app->live = 1; + app->module = nxt_app_modules[type]; nxt_queue_insert_tail(&tmcf->apps, &app->link); } @@ -607,16 +681,13 @@ app_fail: fail: - for (qlk = nxt_queue_first(&tmcf->apps); - qlk != nxt_queue_tail(&tmcf->apps); - qlk = nqlk) - { - nqlk = nxt_queue_next(qlk); - app = nxt_queue_link_data(qlk, nxt_app_t, link); + nxt_queue_each(app, &tmcf->apps, nxt_app_t, link) { + nxt_queue_remove(&app->link); nxt_thread_mutex_destroy(&app->mutex); nxt_free(app); - } + + } nxt_queue_loop; return NXT_ERROR; } @@ -625,19 +696,15 @@ fail: static nxt_app_t * nxt_router_app_find(nxt_queue_t *queue, nxt_str_t *name) { - nxt_app_t *app; - nxt_queue_link_t *qlk; + nxt_app_t *app; - for (qlk = nxt_queue_first(queue); - qlk != nxt_queue_tail(queue); - qlk = nxt_queue_next(qlk)) - { - app = nxt_queue_link_data(qlk, nxt_app_t, link); + nxt_queue_each(app, queue, nxt_app_t, link) { if (nxt_strstr_eq(name, &app->name)) { return app; } - } + + } nxt_queue_loop; return NULL; } @@ -1088,8 +1155,6 @@ nxt_router_thread_create(nxt_task_t *task, nxt_runtime_t *rt, nxt_event_engine_t *engine) { nxt_int_t ret; - nxt_port_t *port; - nxt_process_t *process; nxt_thread_link_t *link; nxt_thread_handle_t handle; @@ -1107,28 +1172,6 @@ nxt_router_thread_create(nxt_task_t *task, nxt_runtime_t *rt, nxt_queue_insert_tail(&rt->engines, &engine->link); - process = nxt_runtime_process_find(rt, nxt_pid); - if (nxt_slow_path(process == NULL)) { - return NXT_ERROR; - } - - port = nxt_process_port_new(process); - if (nxt_slow_path(port == NULL)) { - return NXT_ERROR; - } - - ret = nxt_port_socket_init(task, port, 0); - if (nxt_slow_path(ret != NXT_OK)) { - return ret; - } - - port->engine = 0; - port->type = NXT_PROCESS_ROUTER; - - engine->port = port; - - nxt_runtime_port_add(rt, port); - ret = nxt_thread_create(&handle, link); if (nxt_slow_path(ret != NXT_OK)) { @@ -1142,20 +1185,14 @@ nxt_router_thread_create(nxt_task_t *task, nxt_runtime_t *rt, static void nxt_router_apps_sort(nxt_router_t *router, nxt_router_temp_conf_t *tmcf) { - nxt_app_t *app; - nxt_queue_link_t *qlk, *nqlk; + nxt_app_t *app; - for (qlk = nxt_queue_first(&router->apps); - qlk != nxt_queue_tail(&router->apps); - qlk = nqlk) - { - nqlk = nxt_queue_next(qlk); - app = nxt_queue_link_data(qlk, nxt_app_t, link); + nxt_queue_each(app, &router->apps, nxt_app_t, link) { nxt_queue_remove(&app->link); - // RELEASE APP - } + // TODO RELEASE APP + } nxt_queue_loop; nxt_queue_add(&router->apps, &tmcf->previous); nxt_queue_add(&router->apps, &tmcf->apps); @@ -1232,6 +1269,8 @@ static nxt_port_handler_t nxt_router_app_port_handlers[] = { static void nxt_router_thread_start(void *data) { + nxt_int_t ret; + nxt_port_t *port; nxt_task_t *task; nxt_thread_t *thread; nxt_thread_link_t *link; @@ -1252,12 +1291,26 @@ nxt_router_thread_start(void *data) thread->task = &engine->task; thread->fiber = &engine->fibers->fiber; - nxt_mp_thread_adopt(engine->port->mem_pool); + engine->mem_pool = nxt_mp_create(4096, 128, 1024, 64); + + port = nxt_mp_zalloc(engine->mem_pool, sizeof(nxt_port_t)); + if (nxt_slow_path(port == NULL)) { + return; + } + + port->id = nxt_port_get_next_id(); + port->pid = nxt_pid; + + ret = nxt_port_socket_init(task, port, 0); + if (nxt_slow_path(ret != NXT_OK)) { + return; + } - engine->port->socket.task = task; - nxt_port_create(task, engine->port, nxt_router_app_port_handlers); + port->type = NXT_PROCESS_ROUTER; - engine->mem_pool = nxt_mp_create(4096, 128, 1024, 64); + engine->port = port; + + nxt_port_enable(task, port, nxt_router_app_port_handlers); nxt_event_engine_start(engine); } @@ -1456,6 +1509,9 @@ nxt_router_conf_release(nxt_task_t *task, nxt_socket_conf_joint_t *joint) nxt_thread_spin_unlock(lock); + /* TODO remove engine->port */ + /* TODO excude from connected ports */ + if (rtcf != NULL) { nxt_debug(task, "old router conf is destroyed"); @@ -1473,6 +1529,7 @@ nxt_router_conf_release(nxt_task_t *task, nxt_socket_conf_joint_t *joint) static void nxt_router_thread_exit_handler(nxt_task_t *task, void *obj, void *data) { + nxt_port_t *port; nxt_thread_link_t *link; nxt_event_engine_t *engine; nxt_thread_handle_t handle; @@ -1486,13 +1543,27 @@ nxt_router_thread_exit_handler(nxt_task_t *task, void *obj, void *data) nxt_queue_remove(&engine->link); + port = engine->port; + + // TODO notify all apps + + if (port->pair[0] != -1) { + nxt_fd_close(port->pair[0]); + } + + if (port->pair[1] != -1) { + nxt_fd_close(port->pair[1]); + } + + if (port->mem_pool) { + nxt_mp_destroy(port->mem_pool); + } + nxt_mp_destroy(engine->mem_pool); nxt_event_engine_free(engine); nxt_free(link); - - // TODO: free port } @@ -1616,28 +1687,265 @@ nxt_router_app_data_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg) } } +nxt_inline const char * +nxt_router_text_by_code(int code) +{ + switch (code) { + case 400: return "Bad request"; + case 404: return "Not found"; + case 403: return "Forbidden"; + case 500: + default: return "Internal server error"; + } +} -nxt_inline nxt_port_t * -nxt_router_app_port(nxt_task_t *task) +static void +nxt_router_gen_error(nxt_task_t *task, nxt_conn_t *c, int code, + const char* fmt, ...) { - nxt_port_t *port; - nxt_runtime_t *rt; + va_list args; + nxt_buf_t *b, *last; + const char *msg; + + b = nxt_buf_mem_alloc(c->mem_pool, 16384, 0); + if (nxt_slow_path(b == NULL)) { + /* TODO pogorevaTb */ + } + + b->mem.free = nxt_sprintf(b->mem.free, b->mem.end, + "HTTP/1.0 %d %s\r\n" + "Content-Type: text/plain\r\n" + "Connection: close\r\n\r\n", + code, nxt_router_text_by_code(code)); + + msg = (const char *) b->mem.free; + + va_start(args, fmt); + b->mem.free = nxt_vsprintf(b->mem.free, b->mem.end, fmt, args); + va_end(args); + + nxt_log_alert(task->log, "error %d: %s", code, msg); + + last = nxt_buf_sync_alloc(c->mem_pool, NXT_BUF_SYNC_LAST); + if (nxt_slow_path(last == NULL)) { + /* TODO pogorevaTb */ + } + + nxt_buf_chain_add(&b, last); + + if (c->write == NULL) { + c->write = b; + c->write_state = &nxt_router_conn_write_state; + + nxt_conn_write(task->thread->engine, c); + } else { + nxt_debug(task, "router data attach out bufs to existing chain"); + + nxt_buf_chain_add(&c->write, b); + } +} + + +static void +nxt_router_send_sw_request(nxt_task_t *task, void *obj, void *data) +{ + nxt_buf_t *b; + nxt_app_t *app; + nxt_port_t *port; + nxt_runtime_t *rt; + nxt_start_worker_t *sw; + + sw = obj; + app = sw->app; + + nxt_debug(task, "send sw #%uD", sw->stream); + + nxt_router_sw_add(task, nxt_router, sw); + nxt_queue_insert_tail(&app->requests, &sw->rc->app_link); rt = task->thread->runtime; + port = rt->port_by_type[NXT_PROCESS_MASTER]; - nxt_runtime_port_each(rt, port) { + b = nxt_buf_mem_alloc(port->mem_pool, app->conf.length, 0); - if (nxt_pid == port->pid) { - continue; - } + nxt_buf_cpystr(b, &app->conf); - if (port->type == NXT_PROCESS_WORKER) { - return port; - } + nxt_port_socket_write(task, port, NXT_PORT_MSG_DATA, -1, sw->stream, 0, b); +} - } nxt_runtime_port_loop; - return NULL; +static nxt_port_t * +nxt_router_app_get_port(nxt_app_t *app) +{ + nxt_port_t *port; + nxt_queue_link_t *lnk; + + port = NULL; + + nxt_thread_mutex_lock(&app->mutex); + + if (!nxt_queue_is_empty(&app->ports)) { + lnk = nxt_queue_first(&app->ports); + nxt_queue_remove(lnk); + + lnk->next = NULL; + + port = nxt_queue_link_data(lnk, nxt_port_t, app_link); + } + + nxt_thread_mutex_unlock(&app->mutex); + + return port; +} + + +static void +nxt_router_app_release_port(nxt_task_t *task, void *obj, void *data) +{ + nxt_app_t *app; + nxt_port_t *port; + nxt_work_t *work; + nxt_queue_link_t *lnk; + nxt_req_conn_link_t *rc; + + port = obj; + app = data; + + nxt_assert(app != NULL); + nxt_assert(app == port->app); + nxt_assert(port->app_link.next == NULL); + + + if (task->thread->engine != port->engine) { + work = (nxt_work_t *) (port + 1); + + nxt_debug(task, "post release port to engine %p", port->engine); + + work->next = NULL; + work->handler = nxt_router_app_release_port; + work->task = port->socket.task; + work->obj = port; + work->data = app; + + nxt_event_engine_post(port->engine, work); + + return; + } + + if (!nxt_queue_is_empty(&app->requests)) { + lnk = nxt_queue_first(&app->requests); + nxt_queue_remove(lnk); + + rc = nxt_queue_link_data(lnk, nxt_req_conn_link_t, app_link); + + nxt_debug(task, "process request #%uxD", rc->req_id); + + rc->app_port = port; + + nxt_router_process_http_request_mp(task, rc, rc->app_port->mem_pool); + + return; + } + + nxt_debug(task, "app requests queue is empty"); + + nxt_thread_mutex_lock(&app->mutex); + + nxt_queue_insert_head(&app->ports, &port->app_link); + + nxt_thread_mutex_unlock(&app->mutex); +} + + +void +nxt_router_app_remove_port(nxt_port_t *port) +{ + nxt_app_t *app; + + if (port->app_link.next == NULL) { + return; + } + + app = port->app; + +#if (NXT_DEBUG) + if (nxt_slow_path(app == NULL)) { + nxt_abort(); + } +#endif + + nxt_thread_mutex_lock(&app->mutex); + + nxt_queue_remove(&port->app_link); + port->app_link.next = NULL; + + nxt_thread_mutex_unlock(&app->mutex); +} + + +nxt_inline nxt_int_t +nxt_router_app_port(nxt_task_t *task, nxt_req_conn_link_t *rc) +{ + nxt_app_t *app; + nxt_conn_t *c; + nxt_port_t *port, *master_port; + nxt_runtime_t *rt; + nxt_start_worker_t *sw; + nxt_socket_conf_joint_t *joint; + + port = NULL; + c = rc->conn; + + joint = c->listen->socket.data; + app = joint->socket_conf->application; + + + if (app == NULL) { + nxt_router_gen_error(task, rc->conn, 500, + "Application is NULL in socket_conf"); + return NXT_ERROR; + } + + + port = nxt_router_app_get_port(app); + + if (port != NULL) { + rc->app_port = port; + return NXT_OK; + } + + + sw = nxt_mp_retain(c->mem_pool, sizeof(nxt_start_worker_t)); + + if (nxt_slow_path(sw == NULL)) { + nxt_router_gen_error(task, rc->conn, 500, + "Failed to allocate start worker struct"); + return NXT_ERROR; + } + + nxt_memzero(sw, sizeof(nxt_start_worker_t)); + + sw->stream = nxt_random(&task->thread->random); + sw->app = app; + sw->rc = rc; + sw->mem_pool = c->mem_pool; + sw->joint = c->listen->socket.data; + + sw->work.handler = nxt_router_send_sw_request; + sw->work.task = task; + sw->work.obj = sw; + sw->work.data = task->thread->engine; + + rt = task->thread->runtime; + + master_port = rt->port_by_type[NXT_PROCESS_MASTER]; + + nxt_debug(task, "post send sw %uxD to master engine %p", sw->stream, + master_port->engine); + + nxt_event_engine_post(master_port->engine, &sw->work); + + return NXT_AGAIN; } @@ -1715,13 +2023,12 @@ nxt_router_conn_http_header_parse(nxt_task_t *task, void *obj, void *data) } size = c->read->mem.free - c->read->mem.pos; - nxt_memcpy(b->mem.pos, c->read->mem.pos, size); - b->mem.free += size; - c->read = b; + c->read = nxt_buf_cpy(b, c->read->mem.pos, size); } else { - // TODO 500 Too long request headers - nxt_log_alert(task->log, "Too long request headers"); + nxt_router_gen_error(task, c, 400, + "Too long request headers"); + return; } } } @@ -1735,15 +2042,12 @@ nxt_router_conn_http_header_parse(nxt_task_t *task, void *obj, void *data) b = nxt_buf_mem_alloc(c->mem_pool, h->parsed_content_length, 0); if (nxt_slow_path(b == NULL)) { - // TODO 500 Failed to allocate buffer for request body - nxt_log_alert(task->log, "Failed to allocate buffer for " - "request body"); + nxt_router_gen_error(task, c, 500, "Failed to allocate " + "buffer for request body"); + return; } - b->mem.free = nxt_cpymem(b->mem.free, c->read->mem.pos, - preread); - - c->read = b; + c->read = nxt_buf_cpy(b, c->read->mem.pos, preread); } nxt_debug(task, "router request body read again, rest: %uz", @@ -1761,26 +2065,11 @@ static void nxt_router_process_http_request(nxt_task_t *task, nxt_conn_t *c, nxt_app_parse_ctx_t *ap) { - nxt_mp_t *port_mp; nxt_int_t res; - nxt_port_t *port, *c_port; nxt_req_id_t req_id; - nxt_app_wmsg_t wmsg; nxt_event_engine_t *engine; nxt_req_conn_link_t *rc; - if (nxt_slow_path(nxt_app == NULL)) { - // 500 Application not found - nxt_log_alert(task->log, "application is NULL"); - } - - port = nxt_router_app_port(task); - - if (nxt_slow_path(port == NULL)) { - // 500 Application port not found - nxt_log_alert(task->log, "application port not found"); - } - engine = task->thread->engine; do { @@ -1790,8 +2079,10 @@ nxt_router_process_http_request(nxt_task_t *task, nxt_conn_t *c, rc = nxt_conn_request_add(c, req_id); if (nxt_slow_path(rc == NULL)) { - // 500 Failed to allocate req->conn link - nxt_log_alert(task->log, "failed to allocate req->conn link"); + nxt_router_gen_error(task, c, 500, "Failed to allocate " + "req->conn link"); + + return; } nxt_event_engine_request_add(engine, rc); @@ -1799,33 +2090,66 @@ nxt_router_process_http_request(nxt_task_t *task, nxt_conn_t *c, nxt_debug(task, "req_id %uxD linked to conn %p at engine %p", req_id, c, engine); + rc->reply_port = engine->port; + + res = nxt_router_app_port(task, rc); + + if (res != NXT_OK) { + return; + } + + nxt_router_process_http_request_mp(task, rc, c->mem_pool); +} + + +static void +nxt_router_process_http_request_mp(nxt_task_t *task, nxt_req_conn_link_t *rc, + nxt_mp_t *mp) +{ + nxt_mp_t *port_mp; + nxt_int_t res; + nxt_port_t *port, *c_port, *reply_port; + nxt_app_wmsg_t wmsg; + nxt_app_parse_ctx_t *ap; + + port = rc->app_port; + + if (nxt_slow_path(port == NULL)) { + nxt_router_gen_error(task, rc->conn, 500, "Application port not found"); + return; + } + + reply_port = rc->reply_port; + ap = rc->conn->socket.data; + port_mp = port->mem_pool; - port->mem_pool = c->mem_pool; + port->mem_pool = mp; - c_port = nxt_process_connected_port_find(port->process, - engine->port->pid, - engine->port->id); - if (nxt_slow_path(c_port != engine->port)) { - res = nxt_port_send_port(task, port, engine->port); + c_port = nxt_process_connected_port_find(port->process, reply_port->pid, + reply_port->id); + if (nxt_slow_path(c_port != reply_port)) { + res = nxt_port_send_port(task, port, reply_port, 0); if (nxt_slow_path(res != NXT_OK)) { - // 500 Failed to send reply port - nxt_log_alert(task->log, "failed to send reply port to application"); + nxt_router_gen_error(task, rc->conn, 500, + "Failed to send reply port to application"); + goto fail; } - nxt_process_connected_port_add(port->process, engine->port); + nxt_process_connected_port_add(port->process, reply_port); } wmsg.port = port; wmsg.write = NULL; wmsg.buf = &wmsg.write; - wmsg.stream = req_id; + wmsg.stream = rc->req_id; - res = nxt_app->prepare_msg(task, &ap->r, &wmsg); + res = rc->app_port->app->module->prepare_msg(task, &ap->r, &wmsg); if (nxt_slow_path(res != NXT_OK)) { - // 500 Failed to prepare message - nxt_log_alert(task->log, "failed to prepare message for application"); + nxt_router_gen_error(task, rc->conn, 500, + "Failed to prepare message for application"); + goto fail; } nxt_debug(task, "about to send %d bytes buffer to worker port %d", @@ -1833,13 +2157,15 @@ nxt_router_process_http_request(nxt_task_t *task, nxt_conn_t *c, wmsg.port->socket.fd); res = nxt_port_socket_write(task, wmsg.port, NXT_PORT_MSG_DATA, - -1, req_id, engine->port->id, wmsg.write); + -1, rc->req_id, reply_port->id, wmsg.write); if (nxt_slow_path(res != NXT_OK)) { - // 500 Failed to send message - nxt_log_alert(task->log, "failed to send message to application"); + nxt_router_gen_error(task, rc->conn, 500, + "Failed to send message to application"); + goto fail; } +fail: port->mem_pool = port_mp; } @@ -1934,6 +2260,12 @@ nxt_router_conn_free(nxt_task_t *task, void *obj, void *data) nxt_debug(task, "conn %p close, req %uxD", c, rc->req_id); + if (rc->app_port != NULL) { + nxt_router_app_release_port(task, rc->app_port, rc->app_port->app); + + rc->app_port = NULL; + } + nxt_event_engine_request_remove(task->thread->engine, rc); } nxt_queue_loop; @@ -1944,9 +2276,9 @@ nxt_router_conn_free(nxt_task_t *task, void *obj, void *data) task = &task->thread->engine->task; - nxt_mp_release(c->mem_pool, c); - - nxt_router_conf_release(task, joint); + if (nxt_mp_release(c->mem_pool, c) == 0) { + nxt_router_conf_release(task, joint); + } } @@ -1992,3 +2324,71 @@ nxt_router_conn_timeout_value(nxt_conn_t *c, uintptr_t data) return nxt_value_at(nxt_msec_t, joint->socket_conf, data); } + + +static nxt_int_t +nxt_sw_test(nxt_lvlhsh_query_t *lhq, void *data) +{ + return NXT_OK; +} + + +static const nxt_lvlhsh_proto_t lvlhsh_sw_proto nxt_aligned(64) = { + NXT_LVLHSH_DEFAULT, + nxt_sw_test, + nxt_lvlhsh_alloc, + nxt_lvlhsh_free, +}; + + +static void +nxt_router_sw_add(nxt_task_t *task, nxt_router_t *router, + nxt_start_worker_t *sw) +{ + nxt_lvlhsh_query_t lhq; + + lhq.key_hash = nxt_murmur_hash2(&sw->stream, sizeof(sw->stream)); + lhq.key.length = sizeof(sw->stream); + lhq.key.start = (u_char *) &sw->stream; + lhq.proto = &lvlhsh_sw_proto; + lhq.replace = 0; + lhq.value = sw; + lhq.pool = task->thread->runtime->mem_pool; + + switch (nxt_lvlhsh_insert(&router->start_workers, &lhq)) { + + case NXT_OK: + break; + + default: + nxt_log_error(NXT_LOG_WARN, task->log, "stream %08uxD sw add failed", + sw->stream); + break; + } +} + + +static nxt_start_worker_t * +nxt_router_sw_find_remove(nxt_task_t *task, nxt_router_t *router, uint32_t id) +{ + nxt_lvlhsh_query_t lhq; + + lhq.key_hash = nxt_murmur_hash2(&id, sizeof(id)); + lhq.key.length = sizeof(id); + lhq.key.start = (u_char *) &id; + lhq.proto = &lvlhsh_sw_proto; + lhq.pool = task->thread->runtime->mem_pool; + + switch (nxt_lvlhsh_delete(&router->start_workers, &lhq)) { + + case NXT_OK: + return lhq.value; + + default: + nxt_log_error(NXT_LOG_WARN, task->log, "stream %08uxD sw remove failed", + id); + break; + } + + return NULL; +} diff --git a/src/nxt_router.h b/src/nxt_router.h index 99e70559..76977438 100644 --- a/src/nxt_router.h +++ b/src/nxt_router.h @@ -20,6 +20,8 @@ typedef struct { nxt_queue_t sockets; /* of nxt_socket_conf_t */ nxt_queue_t apps; /* of nxt_app_t */ + + nxt_lvlhsh_t start_workers; /* stream to nxt_start_worker_t */ } nxt_router_t; @@ -61,9 +63,13 @@ typedef struct { } nxt_router_temp_conf_t; -typedef struct { +typedef struct nxt_app_module_s nxt_app_module_t; +typedef struct nxt_app_s nxt_app_t; + +struct nxt_app_s { nxt_thread_mutex_t mutex; nxt_queue_t ports; + nxt_queue_t requests; /* of nxt_req_conn_link_t */ nxt_str_t name; uint32_t workers; @@ -75,7 +81,8 @@ typedef struct { nxt_queue_link_t link; nxt_str_t conf; -} nxt_app_t; + nxt_app_module_t *module; +}; typedef struct { @@ -111,7 +118,9 @@ typedef struct { } nxt_socket_conf_joint_t; +void nxt_router_new_port_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg); void nxt_router_conf_data_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg); +void nxt_router_app_remove_port(nxt_port_t *port); #endif /* _NXT_ROUTER_H_INCLUDED_ */ diff --git a/src/nxt_runtime.c b/src/nxt_runtime.c index 91c4ba70..6ddf27c1 100644 --- a/src/nxt_runtime.c +++ b/src/nxt_runtime.c @@ -9,6 +9,7 @@ #include <nxt_runtime.h> #include <nxt_port.h> #include <nxt_master_process.h> +#include <nxt_router.h> static nxt_int_t nxt_runtime_inherited_listen_sockets(nxt_task_t *task, @@ -1479,16 +1480,34 @@ nxt_runtime_process_new(nxt_runtime_t *rt) nxt_queue_init(&process->ports); - /* TODO each process should have it's own mem_pool for ports allocation */ - process->mem_pool = rt->mem_pool; - nxt_thread_mutex_create(&process->incoming_mutex); nxt_thread_mutex_create(&process->outgoing_mutex); + nxt_thread_mutex_create(&process->cp_mutex); return process; } +void +nxt_runtime_process_destroy(nxt_runtime_t *rt, nxt_process_t *process) +{ + nxt_port_mmaps_destroy(process->incoming, 1); + nxt_port_mmaps_destroy(process->outgoing, 1); + + if (process->cp_mem_pool != NULL) { + nxt_mp_thread_adopt(process->cp_mem_pool); + + nxt_mp_destroy(process->cp_mem_pool); + } + + nxt_thread_mutex_destroy(&process->incoming_mutex); + nxt_thread_mutex_destroy(&process->outgoing_mutex); + nxt_thread_mutex_destroy(&process->cp_mutex); + + nxt_mp_free(rt->mem_pool, process); +} + + static nxt_int_t nxt_runtime_lvlhsh_pid_test(nxt_lvlhsh_query_t *lhq, void *data) { @@ -1606,6 +1625,8 @@ nxt_runtime_process_add(nxt_runtime_t *rt, nxt_process_t *process) nxt_process_port_each(process, port) { + port->pid = process->pid; + nxt_runtime_port_add(rt, port); } nxt_process_port_loop; @@ -1621,9 +1642,7 @@ nxt_runtime_process_add(nxt_runtime_t *rt, nxt_process_t *process) void nxt_runtime_process_remove(nxt_runtime_t *rt, nxt_process_t *process) { - uint32_t i; nxt_port_t *port; - nxt_port_mmap_t *port_mmap; nxt_lvlhsh_query_t lhq; lhq.key_hash = nxt_murmur_hash2(&process->pid, sizeof(process->pid)); @@ -1645,35 +1664,8 @@ nxt_runtime_process_remove(nxt_runtime_t *rt, nxt_process_t *process) } nxt_process_port_loop; - if (process->incoming) { - nxt_mp_thread_adopt(process->incoming_mp); - - port_mmap = process->incoming->elts; - - for (i = 0; i < process->incoming->nelts; i++) { - nxt_port_mmap_destroy(port_mmap); - } - - nxt_thread_mutex_destroy(&process->incoming_mutex); - - nxt_mp_destroy(process->incoming_mp); - } - - if (process->outgoing) { - nxt_mp_thread_adopt(process->outgoing_mp); - - port_mmap = process->outgoing->elts; - - for (i = 0; i < process->outgoing->nelts; i++) { - nxt_port_mmap_destroy(port_mmap); - } - - nxt_thread_mutex_destroy(&process->outgoing_mutex); + nxt_runtime_process_destroy(rt, process); - nxt_mp_destroy(process->outgoing_mp); - } - - nxt_mp_free(rt->mem_pool, process); break; default: @@ -1704,6 +1696,8 @@ void nxt_runtime_port_add(nxt_runtime_t *rt, nxt_port_t *port) { nxt_port_hash_add(&rt->ports, rt->mem_pool, port); + + rt->port_by_type[port->type] = port; } @@ -1712,6 +1706,10 @@ nxt_runtime_port_remove(nxt_runtime_t *rt, nxt_port_t *port) { nxt_port_hash_remove(&rt->ports, rt->mem_pool, port); + if (rt->port_by_type[port->type] == port) { + rt->port_by_type[port->type] = NULL; + } + if (port->pair[0] != -1) { nxt_fd_close(port->pair[0]); } @@ -1720,11 +1718,15 @@ nxt_runtime_port_remove(nxt_runtime_t *rt, nxt_port_t *port) nxt_fd_close(port->pair[1]); } + if (port->type == NXT_PROCESS_WORKER) { + nxt_router_app_remove_port(port); + } + if (port->mem_pool) { nxt_mp_destroy(port->mem_pool); } - nxt_mp_free(port->process->mem_pool, port); + nxt_mp_free(rt->mem_pool, port); } diff --git a/src/nxt_runtime.h b/src/nxt_runtime.h index 7cbd466a..01087719 100644 --- a/src/nxt_runtime.h +++ b/src/nxt_runtime.h @@ -39,6 +39,7 @@ struct nxt_runtime_s { size_t nprocesses; nxt_lvlhsh_t processes; /* of nxt_process_t */ + nxt_port_t *port_by_type[NXT_PROCESS_MAX]; nxt_lvlhsh_t ports; /* of nxt_port_t */ nxt_list_t *log_files; /* of nxt_file_t */ @@ -100,6 +101,8 @@ nxt_runtime_is_master(nxt_runtime_t *rt) nxt_process_t *nxt_runtime_process_new(nxt_runtime_t *rt); +void nxt_runtime_process_destroy(nxt_runtime_t *rt, nxt_process_t *process); + nxt_process_t *nxt_runtime_process_get(nxt_runtime_t *rt, nxt_pid_t pid); void nxt_runtime_process_add(nxt_runtime_t *rt, nxt_process_t *process); @@ -150,7 +153,6 @@ void nxt_cdecl nxt_log_time_handler(nxt_uint_t level, nxt_log_t *log, const char *fmt, ...); void nxt_stream_connection_init(nxt_task_t *task, void *obj, void *data); -nxt_int_t nxt_app_start(nxt_task_t *task, nxt_runtime_t *rt); void nxt_port_app_data_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg); diff --git a/src/nxt_worker_process.c b/src/nxt_worker_process.c index 86ee6408..a7db8b6b 100644 --- a/src/nxt_worker_process.c +++ b/src/nxt_worker_process.c @@ -54,7 +54,7 @@ nxt_port_handler_t nxt_app_process_port_handlers[] = { nxt_port_handler_t nxt_router_process_port_handlers[] = { nxt_worker_process_quit_handler, - nxt_port_new_port_handler, + nxt_router_new_port_handler, nxt_port_change_log_file_handler, nxt_port_mmap_handler, nxt_router_conf_data_handler, |