diff options
-rw-r--r-- | src/nxt_controller.c | 23 | ||||
-rw-r--r-- | src/nxt_main.h | 1 | ||||
-rw-r--r-- | src/nxt_main_process.c | 33 | ||||
-rw-r--r-- | src/nxt_main_process.h | 9 | ||||
-rw-r--r-- | src/nxt_port.c | 6 | ||||
-rw-r--r-- | src/nxt_port.h | 102 | ||||
-rw-r--r-- | src/nxt_process.c | 2 | ||||
-rw-r--r-- | src/nxt_process.h | 2 | ||||
-rw-r--r-- | src/nxt_router.c | 20 | ||||
-rw-r--r-- | src/nxt_worker_process.c | 84 |
10 files changed, 135 insertions, 147 deletions
diff --git a/src/nxt_controller.c b/src/nxt_controller.c index a97f4098..c8bfbad6 100644 --- a/src/nxt_controller.c +++ b/src/nxt_controller.c @@ -99,20 +99,15 @@ static const nxt_event_conn_state_t nxt_controller_conn_write_state; static const nxt_event_conn_state_t nxt_controller_conn_close_state; -nxt_port_handler_t nxt_controller_process_port_handlers[] = { - nxt_worker_process_quit_handler, - nxt_controller_process_new_port_handler, - nxt_port_change_log_file_handler, - nxt_port_mmap_handler, - nxt_port_data_handler, - nxt_port_remove_pid_handler, - NULL, /* NXT_PORT_MSG_READY */ - NULL, /* NXT_PORT_MSG_START_WORKER */ - NULL, /* NXT_PORT_MSG_SOCKET */ - NULL, /* NXT_PORT_MSG_MODULES */ - NULL, /* NXT_PORT_MSG_CONF_STORE */ - nxt_port_rpc_handler, - nxt_port_rpc_handler, +nxt_port_handlers_t nxt_controller_process_port_handlers = { + .quit = nxt_worker_process_quit_handler, + .new_port = nxt_controller_process_new_port_handler, + .change_file = nxt_port_change_log_file_handler, + .mmap = nxt_port_mmap_handler, + .data = nxt_port_data_handler, + .remove_pid = nxt_port_remove_pid_handler, + .rpc_ready = nxt_port_rpc_handler, + .rpc_error = nxt_port_rpc_handler, }; diff --git a/src/nxt_main.h b/src/nxt_main.h index 15c56727..4cce7e62 100644 --- a/src/nxt_main.h +++ b/src/nxt_main.h @@ -19,6 +19,7 @@ typedef struct nxt_port_s nxt_port_t; typedef struct nxt_task_s nxt_task_t; typedef struct nxt_port_recv_msg_s nxt_port_recv_msg_t; typedef void (*nxt_port_handler_t)(nxt_task_t *task, nxt_port_recv_msg_t *msg); +typedef struct nxt_port_handlers_s nxt_port_handlers_t; typedef struct nxt_sig_event_s nxt_sig_event_t; typedef struct nxt_runtime_s nxt_runtime_t; diff --git a/src/nxt_main_process.c b/src/nxt_main_process.c index 2e91628b..7f39cc1d 100644 --- a/src/nxt_main_process.c +++ b/src/nxt_main_process.c @@ -231,20 +231,15 @@ failed: } -static nxt_port_handler_t nxt_main_process_port_handlers[] = { - NULL, /* NXT_PORT_MSG_QUIT */ - NULL, /* NXT_PORT_MSG_NEW_PORT */ - NULL, /* NXT_PORT_MSG_CHANGE_FILE */ - NULL, /* NXT_PORT_MSG_MMAP */ - nxt_port_main_data_handler, - NULL, /* NXT_PORT_MSG_REMOVE_PID */ - nxt_port_ready_handler, - nxt_port_main_start_worker_handler, - nxt_main_port_socket_handler, - nxt_main_port_modules_handler, - nxt_main_port_conf_store_handler, - nxt_port_rpc_handler, - nxt_port_rpc_handler, +static nxt_port_handlers_t nxt_main_process_port_handlers = { + .data = nxt_port_main_data_handler, + .process_ready = nxt_port_process_ready_handler, + .start_worker = nxt_port_main_start_worker_handler, + .socket = nxt_main_port_socket_handler, + .modules = nxt_main_port_modules_handler, + .conf_store = nxt_main_port_conf_store_handler, + .rpc_ready = nxt_port_rpc_handler, + .rpc_error = nxt_port_rpc_handler, }; @@ -278,7 +273,7 @@ nxt_main_process_port_create(nxt_task_t *task, nxt_runtime_t *rt) * A main process port. A write port is not closed * since it should be inherited by worker processes. */ - nxt_port_enable(task, port, nxt_main_process_port_handlers); + nxt_port_enable(task, port, &nxt_main_process_port_handlers); process->ready = 1; @@ -363,7 +358,7 @@ nxt_main_start_controller_process(nxt_task_t *task, nxt_runtime_t *rt) init->start = nxt_controller_start; init->name = "controller"; init->user_cred = &rt->user_cred; - init->port_handlers = nxt_controller_process_port_handlers; + init->port_handlers = &nxt_controller_process_port_handlers; init->signals = nxt_worker_process_signals; init->type = NXT_PROCESS_CONTROLLER; init->data = &conf; @@ -393,7 +388,7 @@ nxt_main_start_discovery_process(nxt_task_t *task, nxt_runtime_t *rt) init->start = nxt_discovery_start; init->name = "discovery"; init->user_cred = &rt->user_cred; - init->port_handlers = nxt_discovery_process_port_handlers; + init->port_handlers = &nxt_discovery_process_port_handlers; init->signals = nxt_worker_process_signals; init->type = NXT_PROCESS_DISCOVERY; init->data = rt; @@ -417,7 +412,7 @@ nxt_main_start_router_process(nxt_task_t *task, nxt_runtime_t *rt) init->start = nxt_router_start; init->name = "router"; init->user_cred = &rt->user_cred; - init->port_handlers = nxt_router_process_port_handlers; + init->port_handlers = &nxt_router_process_port_handlers; init->signals = nxt_worker_process_signals; init->type = NXT_PROCESS_ROUTER; init->data = rt; @@ -479,7 +474,7 @@ nxt_main_start_worker_process(nxt_task_t *task, nxt_runtime_t *rt, init->start = nxt_app_start; init->name = (char *) title; - init->port_handlers = nxt_app_process_port_handlers; + init->port_handlers = &nxt_app_process_port_handlers; init->signals = nxt_worker_process_signals; init->type = NXT_PROCESS_WORKER; init->data = app_conf; diff --git a/src/nxt_main_process.h b/src/nxt_main_process.h index d53dabfa..54861a60 100644 --- a/src/nxt_main_process.h +++ b/src/nxt_main_process.h @@ -28,11 +28,10 @@ nxt_int_t nxt_router_start(nxt_task_t *task, void *data); nxt_int_t nxt_discovery_start(nxt_task_t *task, void *data); nxt_int_t nxt_app_start(nxt_task_t *task, void *data); -extern nxt_port_handler_t nxt_controller_process_port_handlers[]; -extern nxt_port_handler_t nxt_worker_process_port_handlers[]; -extern nxt_port_handler_t nxt_discovery_process_port_handlers[]; -extern nxt_port_handler_t nxt_app_process_port_handlers[]; -extern nxt_port_handler_t nxt_router_process_port_handlers[]; +extern nxt_port_handlers_t nxt_controller_process_port_handlers; +extern nxt_port_handlers_t nxt_discovery_process_port_handlers; +extern nxt_port_handlers_t nxt_app_process_port_handlers; +extern nxt_port_handlers_t nxt_router_process_port_handlers; extern const nxt_sig_event_t nxt_main_process_signals[]; extern const nxt_sig_event_t nxt_worker_process_signals[]; diff --git a/src/nxt_port.c b/src/nxt_port.c index eef8873b..bec08477 100644 --- a/src/nxt_port.c +++ b/src/nxt_port.c @@ -121,11 +121,11 @@ nxt_port_reset_next_id() void nxt_port_enable(nxt_task_t *task, nxt_port_t *port, - nxt_port_handler_t *handlers) + nxt_port_handlers_t *handlers) { port->pid = nxt_pid; port->handler = nxt_port_handler; - port->data = handlers; + port->data = (nxt_port_handler_t *) (handlers); nxt_port_read_enable(task, port); } @@ -271,7 +271,7 @@ nxt_port_new_port_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg) void -nxt_port_ready_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg) +nxt_port_process_ready_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg) { nxt_port_t *port; nxt_process_t *process; diff --git a/src/nxt_port.h b/src/nxt_port.h index 1e501390..c5b2b40e 100644 --- a/src/nxt_port.h +++ b/src/nxt_port.h @@ -8,6 +8,40 @@ #define _NXT_PORT_H_INCLUDED_ +struct nxt_port_handlers_s { + /* RPC responses. */ + nxt_port_handler_t rpc_ready; + nxt_port_handler_t rpc_error; + + /* Main process RPC requests. */ + nxt_port_handler_t start_worker; + nxt_port_handler_t socket; + nxt_port_handler_t modules; + nxt_port_handler_t conf_store; + + /* File descriptor exchange. */ + nxt_port_handler_t change_file; + nxt_port_handler_t new_port; + nxt_port_handler_t mmap; + + /* New process ready. */ + nxt_port_handler_t process_ready; + + /* Process exit/crash notification. */ + nxt_port_handler_t remove_pid; + + /* Stop process command. */ + nxt_port_handler_t quit; + + /* Various data. */ + nxt_port_handler_t data; +}; + + +#define nxt_port_handler_idx(name) \ + ( &((nxt_port_handlers_t *) 0)->name - (nxt_port_handler_t *) 0) + + typedef enum { NXT_PORT_MSG_LAST = 0x100, NXT_PORT_MSG_CLOSE_FD = 0x200, @@ -15,39 +49,49 @@ typedef enum { NXT_PORT_MSG_MASK = 0xFF, - _NXT_PORT_MSG_QUIT = 0, - _NXT_PORT_MSG_NEW_PORT, - _NXT_PORT_MSG_CHANGE_FILE, - _NXT_PORT_MSG_MMAP, - _NXT_PORT_MSG_DATA, - _NXT_PORT_MSG_REMOVE_PID, - _NXT_PORT_MSG_READY, - _NXT_PORT_MSG_START_WORKER, - _NXT_PORT_MSG_SOCKET, - _NXT_PORT_MSG_MODULES, - _NXT_PORT_MSG_CONF_STORE, - _NXT_PORT_MSG_RPC_READY, - _NXT_PORT_MSG_RPC_ERROR, - - NXT_PORT_MSG_MAX, + _NXT_PORT_MSG_RPC_READY = nxt_port_handler_idx(rpc_ready), + _NXT_PORT_MSG_RPC_ERROR = nxt_port_handler_idx(rpc_error), + + _NXT_PORT_MSG_START_WORKER = nxt_port_handler_idx(start_worker), + _NXT_PORT_MSG_SOCKET = nxt_port_handler_idx(socket), + _NXT_PORT_MSG_MODULES = nxt_port_handler_idx(modules), + _NXT_PORT_MSG_CONF_STORE = nxt_port_handler_idx(conf_store), + + _NXT_PORT_MSG_CHANGE_FILE = nxt_port_handler_idx(change_file), + _NXT_PORT_MSG_NEW_PORT = nxt_port_handler_idx(new_port), + _NXT_PORT_MSG_MMAP = nxt_port_handler_idx(mmap), + + _NXT_PORT_MSG_PROCESS_READY = nxt_port_handler_idx(process_ready), + _NXT_PORT_MSG_REMOVE_PID = nxt_port_handler_idx(remove_pid), + _NXT_PORT_MSG_QUIT = nxt_port_handler_idx(quit), + + _NXT_PORT_MSG_DATA = nxt_port_handler_idx(data), + + NXT_PORT_MSG_MAX = sizeof(nxt_port_handlers_t) / + sizeof(nxt_port_handler_t), + + NXT_PORT_MSG_RPC_READY = _NXT_PORT_MSG_RPC_READY, + NXT_PORT_MSG_RPC_READY_LAST = _NXT_PORT_MSG_RPC_READY | NXT_PORT_MSG_LAST, + NXT_PORT_MSG_RPC_ERROR = _NXT_PORT_MSG_RPC_ERROR | NXT_PORT_MSG_LAST, - NXT_PORT_MSG_QUIT = _NXT_PORT_MSG_QUIT | NXT_PORT_MSG_LAST, - NXT_PORT_MSG_NEW_PORT = _NXT_PORT_MSG_NEW_PORT | NXT_PORT_MSG_LAST, - NXT_PORT_MSG_CHANGE_FILE = _NXT_PORT_MSG_CHANGE_FILE | NXT_PORT_MSG_LAST, - NXT_PORT_MSG_MMAP = _NXT_PORT_MSG_MMAP | NXT_PORT_MSG_LAST | - NXT_PORT_MSG_CLOSE_FD | NXT_PORT_MSG_SYNC, - NXT_PORT_MSG_DATA = _NXT_PORT_MSG_DATA, - NXT_PORT_MSG_DATA_LAST = _NXT_PORT_MSG_DATA | NXT_PORT_MSG_LAST, - NXT_PORT_MSG_REMOVE_PID = _NXT_PORT_MSG_REMOVE_PID | NXT_PORT_MSG_LAST, - NXT_PORT_MSG_READY = _NXT_PORT_MSG_READY | NXT_PORT_MSG_LAST, NXT_PORT_MSG_START_WORKER = _NXT_PORT_MSG_START_WORKER | NXT_PORT_MSG_LAST, NXT_PORT_MSG_SOCKET = _NXT_PORT_MSG_SOCKET | NXT_PORT_MSG_LAST, NXT_PORT_MSG_MODULES = _NXT_PORT_MSG_MODULES | NXT_PORT_MSG_LAST, NXT_PORT_MSG_CONF_STORE = _NXT_PORT_MSG_CONF_STORE | NXT_PORT_MSG_LAST, - NXT_PORT_MSG_RPC_READY = _NXT_PORT_MSG_RPC_READY, - NXT_PORT_MSG_RPC_READY_LAST = _NXT_PORT_MSG_RPC_READY | NXT_PORT_MSG_LAST, - NXT_PORT_MSG_RPC_ERROR = _NXT_PORT_MSG_RPC_ERROR | NXT_PORT_MSG_LAST, + + NXT_PORT_MSG_CHANGE_FILE = _NXT_PORT_MSG_CHANGE_FILE | NXT_PORT_MSG_LAST, + NXT_PORT_MSG_NEW_PORT = _NXT_PORT_MSG_NEW_PORT | NXT_PORT_MSG_LAST, + NXT_PORT_MSG_MMAP = _NXT_PORT_MSG_MMAP | NXT_PORT_MSG_LAST | + NXT_PORT_MSG_CLOSE_FD | NXT_PORT_MSG_SYNC, + + NXT_PORT_MSG_PROCESS_READY = _NXT_PORT_MSG_PROCESS_READY | + NXT_PORT_MSG_LAST, + NXT_PORT_MSG_QUIT = _NXT_PORT_MSG_QUIT | NXT_PORT_MSG_LAST, + NXT_PORT_MSG_REMOVE_PID = _NXT_PORT_MSG_REMOVE_PID | NXT_PORT_MSG_LAST, + + NXT_PORT_MSG_DATA = _NXT_PORT_MSG_DATA, + NXT_PORT_MSG_DATA_LAST = _NXT_PORT_MSG_DATA | NXT_PORT_MSG_LAST, } nxt_port_msg_type_t; @@ -169,7 +213,7 @@ nxt_int_t nxt_port_socket_write(nxt_task_t *task, nxt_port_t *port, nxt_buf_t *b); void nxt_port_enable(nxt_task_t *task, nxt_port_t *port, - nxt_port_handler_t *handlers); + nxt_port_handlers_t *handlers); void nxt_port_send_new_port(nxt_task_t *task, nxt_runtime_t *rt, nxt_port_t *port, uint32_t stream); nxt_int_t nxt_port_send_port(nxt_task_t *task, nxt_port_t *port, @@ -179,7 +223,7 @@ void nxt_port_change_log_file(nxt_task_t *task, nxt_runtime_t *rt, void nxt_port_quit_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg); void nxt_port_new_port_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg); -void nxt_port_ready_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg); +void nxt_port_process_ready_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg); void nxt_port_change_log_file_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg); void nxt_port_mmap_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg); diff --git a/src/nxt_process.c b/src/nxt_process.c index 186e898d..e6653bd7 100644 --- a/src/nxt_process.c +++ b/src/nxt_process.c @@ -161,7 +161,7 @@ nxt_process_start(nxt_task_t *task, nxt_process_t *process) nxt_port_enable(task, port, init->port_handlers); - ret = nxt_port_socket_write(task, main_port, NXT_PORT_MSG_READY, + ret = nxt_port_socket_write(task, main_port, NXT_PORT_MSG_PROCESS_READY, -1, init->stream, 0, NULL); if (nxt_slow_path(ret != NXT_OK)) { diff --git a/src/nxt_process.h b/src/nxt_process.h index 2d1080cc..63302701 100644 --- a/src/nxt_process.h +++ b/src/nxt_process.h @@ -30,7 +30,7 @@ struct nxt_process_init_s { const char *name; nxt_user_cred_t *user_cred; - nxt_port_handler_t *port_handlers; + nxt_port_handlers_t *port_handlers; const nxt_sig_event_t *signals; nxt_process_type_t type; diff --git a/src/nxt_router.c b/src/nxt_router.c index 34078511..d9ba2d24 100644 --- a/src/nxt_router.c +++ b/src/nxt_router.c @@ -1783,21 +1783,9 @@ nxt_router_engine_post(nxt_event_engine_t *engine, nxt_work_t *jobs) } -static nxt_port_handler_t nxt_router_app_port_handlers[] = { - NULL, /* NXT_PORT_MSG_QUIT */ - NULL, /* NXT_PORT_MSG_NEW_PORT */ - NULL, /* NXT_PORT_MSG_CHANGE_FILE */ - /* TODO: remove mmap_handler from app ports */ - nxt_port_mmap_handler, /* NXT_PORT_MSG_MMAP */ - nxt_port_rpc_handler, /* NXT_PORT_MSG_DATA */ - NULL, /* NXT_PORT_MSG_REMOVE_PID */ - NULL, /* NXT_PORT_MSG_READY */ - NULL, /* NXT_PORT_MSG_START_WORKER */ - NULL, /* NXT_PORT_MSG_SOCKET */ - NULL, /* NXT_PORT_MSG_MODULES */ - NULL, /* NXT_PORT_MSG_CONF_STORE */ - nxt_port_rpc_handler, - nxt_port_rpc_handler, +static nxt_port_handlers_t nxt_router_app_port_handlers = { + .mmap = nxt_port_mmap_handler, + .data = nxt_port_rpc_handler, }; @@ -1844,7 +1832,7 @@ nxt_router_thread_start(void *data) engine->port = port; - nxt_port_enable(task, port, nxt_router_app_port_handlers); + nxt_port_enable(task, port, &nxt_router_app_port_handlers); nxt_event_engine_start(engine); } diff --git a/src/nxt_worker_process.c b/src/nxt_worker_process.c index 1a7d0c53..c66eb179 100644 --- a/src/nxt_worker_process.c +++ b/src/nxt_worker_process.c @@ -20,71 +20,37 @@ static void nxt_worker_process_sigquit_handler(nxt_task_t *task, void *obj, void *data); -nxt_port_handler_t nxt_worker_process_port_handlers[] = { - nxt_worker_process_quit_handler, - nxt_port_new_port_handler, - nxt_port_change_log_file_handler, - nxt_port_mmap_handler, - nxt_port_data_handler, - nxt_port_remove_pid_handler, - NULL, /* NXT_PORT_MSG_READY */ - NULL, /* NXT_PORT_MSG_START_WORKER */ - NULL, /* NXT_PORT_MSG_SOCKET */ - NULL, /* NXT_PORT_MSG_MODULES */ - NULL, /* NXT_PORT_MSG_CONF_STORE */ - nxt_port_rpc_handler, - nxt_port_rpc_handler, +nxt_port_handlers_t nxt_app_process_port_handlers = { + .quit = nxt_worker_process_quit_handler, + .new_port = nxt_port_new_port_handler, + .change_file = nxt_port_change_log_file_handler, + .mmap = nxt_port_mmap_handler, + .data = nxt_port_app_data_handler, + .remove_pid = nxt_port_remove_pid_handler, }; -nxt_port_handler_t nxt_app_process_port_handlers[] = { - nxt_worker_process_quit_handler, - nxt_port_new_port_handler, - nxt_port_change_log_file_handler, - nxt_port_mmap_handler, - nxt_port_app_data_handler, - nxt_port_remove_pid_handler, - NULL, /* NXT_PORT_MSG_READY */ - NULL, /* NXT_PORT_MSG_START_WORKER */ - NULL, /* NXT_PORT_MSG_SOCKET */ - NULL, /* NXT_PORT_MSG_MODULES */ - NULL, /* NXT_PORT_MSG_CONF_STORE */ - nxt_port_rpc_handler, - nxt_port_rpc_handler, +nxt_port_handlers_t nxt_router_process_port_handlers = { + .quit = nxt_worker_process_quit_handler, + .new_port = nxt_router_new_port_handler, + .change_file = nxt_port_change_log_file_handler, + .mmap = nxt_port_mmap_handler, + .data = nxt_router_conf_data_handler, + .remove_pid = nxt_router_remove_pid_handler, + .rpc_ready = nxt_port_rpc_handler, + .rpc_error = nxt_port_rpc_handler, }; -nxt_port_handler_t nxt_router_process_port_handlers[] = { - nxt_worker_process_quit_handler, - nxt_router_new_port_handler, - nxt_port_change_log_file_handler, - nxt_port_mmap_handler, - nxt_router_conf_data_handler, - nxt_router_remove_pid_handler, - NULL, /* NXT_PORT_MSG_READY */ - NULL, /* NXT_PORT_MSG_START_WORKER */ - NULL, /* NXT_PORT_MSG_SOCKET */ - NULL, /* NXT_PORT_MSG_MODULES */ - NULL, /* NXT_PORT_MSG_CONF_STORE */ - nxt_port_rpc_handler, - nxt_port_rpc_handler, -}; - - -nxt_port_handler_t nxt_discovery_process_port_handlers[] = { - nxt_worker_process_quit_handler, - nxt_port_new_port_handler, - nxt_port_change_log_file_handler, - nxt_port_mmap_handler, - nxt_port_data_handler, - nxt_port_remove_pid_handler, - NULL, /* NXT_PORT_MSG_READY */ - NULL, /* NXT_PORT_MSG_START_WORKER */ - NULL, /* NXT_PORT_MSG_SOCKET */ - NULL, /* NXT_PORT_MSG_MODULES */ - NULL, /* NXT_PORT_MSG_CONF_STORE */ - nxt_port_rpc_handler, - nxt_port_rpc_handler, +nxt_port_handlers_t nxt_discovery_process_port_handlers = { + .quit = nxt_worker_process_quit_handler, + .new_port = nxt_port_new_port_handler, + .change_file = nxt_port_change_log_file_handler, + .mmap = nxt_port_mmap_handler, + .data = nxt_port_data_handler, + .remove_pid = nxt_port_remove_pid_handler, + .rpc_ready = nxt_port_rpc_handler, + .rpc_error = nxt_port_rpc_handler, }; |