diff options
-rw-r--r-- | src/nxt_controller.c | 50 | ||||
-rw-r--r-- | src/nxt_port_memory.c | 135 | ||||
-rw-r--r-- | src/nxt_port_memory.h | 1 | ||||
-rw-r--r-- | src/nxt_router.c | 49 |
4 files changed, 152 insertions, 83 deletions
diff --git a/src/nxt_controller.c b/src/nxt_controller.c index a61c127d..8c9d4c53 100644 --- a/src/nxt_controller.c +++ b/src/nxt_controller.c @@ -54,7 +54,7 @@ static nxt_int_t nxt_controller_conf_default(void); static void nxt_controller_conf_init_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg, void *data); static void nxt_controller_flush_requests(nxt_task_t *task); -static nxt_int_t nxt_controller_conf_send(nxt_task_t *task, +static nxt_int_t nxt_controller_conf_send(nxt_task_t *task, nxt_mp_t *mp, nxt_conf_value_t *conf, nxt_port_rpc_handler_t handler, void *data); static void nxt_controller_conn_init(nxt_task_t *task, void *obj, void *data); @@ -344,7 +344,7 @@ nxt_controller_send_current_conf(nxt_task_t *task) conf = nxt_controller_conf.root; if (conf != NULL) { - rc = nxt_controller_conf_send(task, conf, + rc = nxt_controller_conf_send(task, nxt_controller_conf.pool, conf, nxt_controller_conf_init_handler, NULL); if (nxt_fast_path(rc == NXT_OK)) { @@ -497,11 +497,14 @@ nxt_controller_flush_requests(nxt_task_t *task) static nxt_int_t -nxt_controller_conf_send(nxt_task_t *task, nxt_conf_value_t *conf, +nxt_controller_conf_send(nxt_task_t *task, nxt_mp_t *mp, nxt_conf_value_t *conf, nxt_port_rpc_handler_t handler, void *data) { + void *mem; + u_char *end; size_t size; uint32_t stream; + nxt_fd_t fd; nxt_int_t rc; nxt_buf_t *b; nxt_port_t *router_port, *controller_port; @@ -518,30 +521,53 @@ nxt_controller_conf_send(nxt_task_t *task, nxt_conf_value_t *conf, size = nxt_conf_json_length(conf, NULL); - b = nxt_port_mmap_get_buf(task, router_port, size); + b = nxt_buf_mem_alloc(mp, sizeof(size_t), 0); if (nxt_slow_path(b == NULL)) { return NXT_ERROR; } - b->mem.free = nxt_conf_json_print(b->mem.free, conf, NULL); + fd = nxt_shm_open(task, size); + if (nxt_slow_path(fd == -1)) { + return NXT_ERROR; + } + + mem = nxt_mem_mmap(NULL, size, PROT_READ | PROT_WRITE, MAP_SHARED, fd, 0); + if (nxt_slow_path(mem == MAP_FAILED)) { + goto fail; + } + + end = nxt_conf_json_print(mem, conf, NULL); + + nxt_mem_munmap(mem, size); + + size = end - (u_char *) mem; + + b->mem.free = nxt_cpymem(b->mem.pos, &size, sizeof(size_t)); stream = nxt_port_rpc_register_handler(task, controller_port, handler, handler, router_port->pid, data); - if (nxt_slow_path(stream == 0)) { - return NXT_ERROR; + goto fail; } - rc = nxt_port_socket_write(task, router_port, NXT_PORT_MSG_DATA_LAST, -1, - stream, controller_port->id, b); + rc = nxt_port_socket_write(task, router_port, + NXT_PORT_MSG_DATA_LAST | NXT_PORT_MSG_CLOSE_FD, + fd, stream, controller_port->id, b); if (nxt_slow_path(rc != NXT_OK)) { nxt_port_rpc_cancel(task, controller_port, stream); - return NXT_ERROR; + + goto fail; } return NXT_OK; + +fail: + + nxt_fd_close(fd); + + return NXT_ERROR; } @@ -1201,7 +1227,7 @@ nxt_controller_process_config(nxt_task_t *task, nxt_controller_request_t *req, goto alloc_fail; } - rc = nxt_controller_conf_send(task, value, + rc = nxt_controller_conf_send(task, mp, value, nxt_controller_conf_handler, req); if (nxt_slow_path(rc != NXT_OK)) { @@ -1282,7 +1308,7 @@ nxt_controller_process_config(nxt_task_t *task, nxt_controller_request_t *req, goto alloc_fail; } - rc = nxt_controller_conf_send(task, value, + rc = nxt_controller_conf_send(task, mp, value, nxt_controller_conf_handler, req); if (nxt_slow_path(rc != NXT_OK)) { diff --git a/src/nxt_port_memory.c b/src/nxt_port_memory.c index f4d2125c..fd472cc6 100644 --- a/src/nxt_port_memory.c +++ b/src/nxt_port_memory.c @@ -286,7 +286,6 @@ nxt_port_new_port_mmap(nxt_task_t *task, nxt_process_t *process, nxt_port_t *port, nxt_bool_t tracking, nxt_int_t n) { void *mem; - u_char *p, name[64]; nxt_fd_t fd; nxt_int_t i; nxt_free_map_t *free_map; @@ -310,63 +309,8 @@ nxt_port_new_port_mmap(nxt_task_t *task, nxt_process_t *process, return NULL; } - p = nxt_sprintf(name, name + sizeof(name), NXT_SHM_PREFIX "unit.%PI.%uxD", - nxt_pid, nxt_random(&task->thread->random)); - *p = '\0'; - -#if (NXT_HAVE_MEMFD_CREATE) - - fd = syscall(SYS_memfd_create, name, MFD_CLOEXEC); - - if (nxt_slow_path(fd == -1)) { - nxt_alert(task, "memfd_create(%s) failed %E", name, nxt_errno); - - goto remove_fail; - } - - nxt_debug(task, "memfd_create(%s): %FD", name, fd); - -#elif (NXT_HAVE_SHM_OPEN_ANON) - - fd = shm_open(SHM_ANON, O_RDWR, S_IRUSR | S_IWUSR); - - nxt_debug(task, "shm_open(SHM_ANON): %FD", fd); - - if (nxt_slow_path(fd == -1)) { - nxt_alert(task, "shm_open(SHM_ANON) failed %E", nxt_errno); - - goto remove_fail; - } - -#elif (NXT_HAVE_SHM_OPEN) - - /* Just in case. */ - shm_unlink((char *) name); - - fd = shm_open((char *) name, O_CREAT | O_EXCL | O_RDWR, S_IRUSR | S_IWUSR); - - nxt_debug(task, "shm_open(%s): %FD", name, fd); - + fd = nxt_shm_open(task, PORT_MMAP_SIZE); if (nxt_slow_path(fd == -1)) { - nxt_alert(task, "shm_open(%s) failed %E", name, nxt_errno); - - goto remove_fail; - } - - if (nxt_slow_path(shm_unlink((char *) name) == -1)) { - nxt_log(task, NXT_LOG_WARN, "shm_unlink(%s) failed %E", name, - nxt_errno); - } - -#else - -#error No working shared memory implementation. - -#endif - - if (nxt_slow_path(ftruncate(fd, PORT_MMAP_SIZE) == -1)) { - nxt_log(task, NXT_LOG_WARN, "ftruncate() failed %E", nxt_errno); - goto remove_fail; } @@ -423,6 +367,83 @@ remove_fail: } +nxt_int_t +nxt_shm_open(nxt_task_t *task, size_t size) +{ + nxt_fd_t fd; + +#if (NXT_HAVE_MEMFD_CREATE || NXT_HAVE_SHM_OPEN) + + u_char *p, name[64]; + + p = nxt_sprintf(name, name + sizeof(name), NXT_SHM_PREFIX "unit.%PI.%uxD", + nxt_pid, nxt_random(&task->thread->random)); + *p = '\0'; + +#endif + +#if (NXT_HAVE_MEMFD_CREATE) + + fd = syscall(SYS_memfd_create, name, MFD_CLOEXEC); + + if (nxt_slow_path(fd == -1)) { + nxt_alert(task, "memfd_create(%s) failed %E", name, nxt_errno); + + return -1; + } + + nxt_debug(task, "memfd_create(%s): %FD", name, fd); + +#elif (NXT_HAVE_SHM_OPEN_ANON) + + fd = shm_open(SHM_ANON, O_RDWR, S_IRUSR | S_IWUSR); + + if (nxt_slow_path(fd == -1)) { + nxt_alert(task, "shm_open(SHM_ANON) failed %E", nxt_errno); + + return -1; + } + + nxt_debug(task, "shm_open(SHM_ANON): %FD", fd); + +#elif (NXT_HAVE_SHM_OPEN) + + /* Just in case. */ + shm_unlink((char *) name); + + fd = shm_open((char *) name, O_CREAT | O_EXCL | O_RDWR, S_IRUSR | S_IWUSR); + + if (nxt_slow_path(fd == -1)) { + nxt_alert(task, "shm_open(%s) failed %E", name, nxt_errno); + + return -1; + } + + nxt_debug(task, "shm_open(%s): %FD", name, fd); + + if (nxt_slow_path(shm_unlink((char *) name) == -1)) { + nxt_log(task, NXT_LOG_WARN, "shm_unlink(%s) failed %E", name, + nxt_errno); + } + +#else + +#error No working shared memory implementation. + +#endif + + if (nxt_slow_path(ftruncate(fd, size) == -1)) { + nxt_alert(task, "ftruncate() failed %E", nxt_errno); + + nxt_fd_close(fd); + + return -1; + } + + return fd; +} + + static nxt_port_mmap_handler_t * nxt_port_mmap_get(nxt_task_t *task, nxt_port_t *port, nxt_chunk_id_t *c, nxt_int_t n, nxt_bool_t tracking) diff --git a/src/nxt_port_memory.h b/src/nxt_port_memory.h index 748549b1..2cd4bd76 100644 --- a/src/nxt_port_memory.h +++ b/src/nxt_port_memory.h @@ -71,5 +71,6 @@ typedef enum nxt_port_method_e nxt_port_method_t; nxt_port_method_t nxt_port_mmap_get_method(nxt_task_t *task, nxt_port_t *port, nxt_buf_t *b); +nxt_int_t nxt_shm_open(nxt_task_t *task, size_t size); #endif /* _NXT_PORT_MEMORY_H_INCLUDED_ */ diff --git a/src/nxt_router.c b/src/nxt_router.c index bf82501c..d4d037e1 100644 --- a/src/nxt_router.c +++ b/src/nxt_router.c @@ -906,8 +906,9 @@ nxt_router_new_port_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg) void nxt_router_conf_data_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg) { + void *p; + size_t size; nxt_int_t ret; - nxt_buf_t *b; nxt_router_temp_conf_t *tmcf; tmcf = nxt_router_temp_conf(task); @@ -915,9 +916,33 @@ nxt_router_conf_data_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg) return; } - nxt_debug(task, "nxt_router_conf_data_handler(%O): %*s", - nxt_buf_used_size(msg->buf), - (size_t) nxt_buf_used_size(msg->buf), msg->buf->mem.pos); + if (nxt_slow_path(msg->fd == -1)) { + nxt_alert(task, "conf_data_handler: invalid file shm fd"); + return; + } + + if (nxt_buf_mem_used_size(&msg->buf->mem) != sizeof(size_t)) { + nxt_alert(task, "conf_data_handler: unexpected buffer size (%d)", + (int) nxt_buf_mem_used_size(&msg->buf->mem)); + + nxt_fd_close(msg->fd); + msg->fd = -1; + + return; + } + + nxt_memcpy(&size, msg->buf->mem.pos, sizeof(size_t)); + + p = nxt_mem_mmap(NULL, size, PROT_READ, MAP_SHARED, msg->fd, 0); + + nxt_fd_close(msg->fd); + msg->fd = -1; + + if (nxt_slow_path(p == MAP_FAILED)) { + return; + } + + nxt_debug(task, "conf_data_handler(%uz): %*s", size, size, p); tmcf->router_conf->router = nxt_router; tmcf->stream = msg->port_msg.stream; @@ -928,20 +953,12 @@ nxt_router_conf_data_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg) if (nxt_slow_path(tmcf->port == NULL)) { nxt_alert(task, "reply port not found"); - return; + goto fail; } nxt_port_use(task, tmcf->port, 1); - b = nxt_buf_chk_make_plain(tmcf->router_conf->mem_pool, - msg->buf, msg->size); - if (nxt_slow_path(b == NULL)) { - nxt_router_conf_error(task, tmcf); - - return; - } - - ret = nxt_router_conf_create(task, tmcf, b->mem.pos, b->mem.free); + ret = nxt_router_conf_create(task, tmcf, p, nxt_pointer_to(p, size)); if (nxt_fast_path(ret == NXT_OK)) { nxt_router_conf_apply(task, tmcf, NULL); @@ -949,6 +966,10 @@ nxt_router_conf_data_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg) } else { nxt_router_conf_error(task, tmcf); } + +fail: + + nxt_mem_munmap(p, size); } |