summaryrefslogtreecommitdiffhomepage
diff options
context:
space:
mode:
-rw-r--r--src/nxt_controller.c50
-rw-r--r--src/nxt_port_memory.c135
-rw-r--r--src/nxt_port_memory.h1
-rw-r--r--src/nxt_router.c49
4 files changed, 152 insertions, 83 deletions
diff --git a/src/nxt_controller.c b/src/nxt_controller.c
index a61c127d..8c9d4c53 100644
--- a/src/nxt_controller.c
+++ b/src/nxt_controller.c
@@ -54,7 +54,7 @@ static nxt_int_t nxt_controller_conf_default(void);
static void nxt_controller_conf_init_handler(nxt_task_t *task,
nxt_port_recv_msg_t *msg, void *data);
static void nxt_controller_flush_requests(nxt_task_t *task);
-static nxt_int_t nxt_controller_conf_send(nxt_task_t *task,
+static nxt_int_t nxt_controller_conf_send(nxt_task_t *task, nxt_mp_t *mp,
nxt_conf_value_t *conf, nxt_port_rpc_handler_t handler, void *data);
static void nxt_controller_conn_init(nxt_task_t *task, void *obj, void *data);
@@ -344,7 +344,7 @@ nxt_controller_send_current_conf(nxt_task_t *task)
conf = nxt_controller_conf.root;
if (conf != NULL) {
- rc = nxt_controller_conf_send(task, conf,
+ rc = nxt_controller_conf_send(task, nxt_controller_conf.pool, conf,
nxt_controller_conf_init_handler, NULL);
if (nxt_fast_path(rc == NXT_OK)) {
@@ -497,11 +497,14 @@ nxt_controller_flush_requests(nxt_task_t *task)
static nxt_int_t
-nxt_controller_conf_send(nxt_task_t *task, nxt_conf_value_t *conf,
+nxt_controller_conf_send(nxt_task_t *task, nxt_mp_t *mp, nxt_conf_value_t *conf,
nxt_port_rpc_handler_t handler, void *data)
{
+ void *mem;
+ u_char *end;
size_t size;
uint32_t stream;
+ nxt_fd_t fd;
nxt_int_t rc;
nxt_buf_t *b;
nxt_port_t *router_port, *controller_port;
@@ -518,30 +521,53 @@ nxt_controller_conf_send(nxt_task_t *task, nxt_conf_value_t *conf,
size = nxt_conf_json_length(conf, NULL);
- b = nxt_port_mmap_get_buf(task, router_port, size);
+ b = nxt_buf_mem_alloc(mp, sizeof(size_t), 0);
if (nxt_slow_path(b == NULL)) {
return NXT_ERROR;
}
- b->mem.free = nxt_conf_json_print(b->mem.free, conf, NULL);
+ fd = nxt_shm_open(task, size);
+ if (nxt_slow_path(fd == -1)) {
+ return NXT_ERROR;
+ }
+
+ mem = nxt_mem_mmap(NULL, size, PROT_READ | PROT_WRITE, MAP_SHARED, fd, 0);
+ if (nxt_slow_path(mem == MAP_FAILED)) {
+ goto fail;
+ }
+
+ end = nxt_conf_json_print(mem, conf, NULL);
+
+ nxt_mem_munmap(mem, size);
+
+ size = end - (u_char *) mem;
+
+ b->mem.free = nxt_cpymem(b->mem.pos, &size, sizeof(size_t));
stream = nxt_port_rpc_register_handler(task, controller_port,
handler, handler,
router_port->pid, data);
-
if (nxt_slow_path(stream == 0)) {
- return NXT_ERROR;
+ goto fail;
}
- rc = nxt_port_socket_write(task, router_port, NXT_PORT_MSG_DATA_LAST, -1,
- stream, controller_port->id, b);
+ rc = nxt_port_socket_write(task, router_port,
+ NXT_PORT_MSG_DATA_LAST | NXT_PORT_MSG_CLOSE_FD,
+ fd, stream, controller_port->id, b);
if (nxt_slow_path(rc != NXT_OK)) {
nxt_port_rpc_cancel(task, controller_port, stream);
- return NXT_ERROR;
+
+ goto fail;
}
return NXT_OK;
+
+fail:
+
+ nxt_fd_close(fd);
+
+ return NXT_ERROR;
}
@@ -1201,7 +1227,7 @@ nxt_controller_process_config(nxt_task_t *task, nxt_controller_request_t *req,
goto alloc_fail;
}
- rc = nxt_controller_conf_send(task, value,
+ rc = nxt_controller_conf_send(task, mp, value,
nxt_controller_conf_handler, req);
if (nxt_slow_path(rc != NXT_OK)) {
@@ -1282,7 +1308,7 @@ nxt_controller_process_config(nxt_task_t *task, nxt_controller_request_t *req,
goto alloc_fail;
}
- rc = nxt_controller_conf_send(task, value,
+ rc = nxt_controller_conf_send(task, mp, value,
nxt_controller_conf_handler, req);
if (nxt_slow_path(rc != NXT_OK)) {
diff --git a/src/nxt_port_memory.c b/src/nxt_port_memory.c
index f4d2125c..fd472cc6 100644
--- a/src/nxt_port_memory.c
+++ b/src/nxt_port_memory.c
@@ -286,7 +286,6 @@ nxt_port_new_port_mmap(nxt_task_t *task, nxt_process_t *process,
nxt_port_t *port, nxt_bool_t tracking, nxt_int_t n)
{
void *mem;
- u_char *p, name[64];
nxt_fd_t fd;
nxt_int_t i;
nxt_free_map_t *free_map;
@@ -310,63 +309,8 @@ nxt_port_new_port_mmap(nxt_task_t *task, nxt_process_t *process,
return NULL;
}
- p = nxt_sprintf(name, name + sizeof(name), NXT_SHM_PREFIX "unit.%PI.%uxD",
- nxt_pid, nxt_random(&task->thread->random));
- *p = '\0';
-
-#if (NXT_HAVE_MEMFD_CREATE)
-
- fd = syscall(SYS_memfd_create, name, MFD_CLOEXEC);
-
- if (nxt_slow_path(fd == -1)) {
- nxt_alert(task, "memfd_create(%s) failed %E", name, nxt_errno);
-
- goto remove_fail;
- }
-
- nxt_debug(task, "memfd_create(%s): %FD", name, fd);
-
-#elif (NXT_HAVE_SHM_OPEN_ANON)
-
- fd = shm_open(SHM_ANON, O_RDWR, S_IRUSR | S_IWUSR);
-
- nxt_debug(task, "shm_open(SHM_ANON): %FD", fd);
-
- if (nxt_slow_path(fd == -1)) {
- nxt_alert(task, "shm_open(SHM_ANON) failed %E", nxt_errno);
-
- goto remove_fail;
- }
-
-#elif (NXT_HAVE_SHM_OPEN)
-
- /* Just in case. */
- shm_unlink((char *) name);
-
- fd = shm_open((char *) name, O_CREAT | O_EXCL | O_RDWR, S_IRUSR | S_IWUSR);
-
- nxt_debug(task, "shm_open(%s): %FD", name, fd);
-
+ fd = nxt_shm_open(task, PORT_MMAP_SIZE);
if (nxt_slow_path(fd == -1)) {
- nxt_alert(task, "shm_open(%s) failed %E", name, nxt_errno);
-
- goto remove_fail;
- }
-
- if (nxt_slow_path(shm_unlink((char *) name) == -1)) {
- nxt_log(task, NXT_LOG_WARN, "shm_unlink(%s) failed %E", name,
- nxt_errno);
- }
-
-#else
-
-#error No working shared memory implementation.
-
-#endif
-
- if (nxt_slow_path(ftruncate(fd, PORT_MMAP_SIZE) == -1)) {
- nxt_log(task, NXT_LOG_WARN, "ftruncate() failed %E", nxt_errno);
-
goto remove_fail;
}
@@ -423,6 +367,83 @@ remove_fail:
}
+nxt_int_t
+nxt_shm_open(nxt_task_t *task, size_t size)
+{
+ nxt_fd_t fd;
+
+#if (NXT_HAVE_MEMFD_CREATE || NXT_HAVE_SHM_OPEN)
+
+ u_char *p, name[64];
+
+ p = nxt_sprintf(name, name + sizeof(name), NXT_SHM_PREFIX "unit.%PI.%uxD",
+ nxt_pid, nxt_random(&task->thread->random));
+ *p = '\0';
+
+#endif
+
+#if (NXT_HAVE_MEMFD_CREATE)
+
+ fd = syscall(SYS_memfd_create, name, MFD_CLOEXEC);
+
+ if (nxt_slow_path(fd == -1)) {
+ nxt_alert(task, "memfd_create(%s) failed %E", name, nxt_errno);
+
+ return -1;
+ }
+
+ nxt_debug(task, "memfd_create(%s): %FD", name, fd);
+
+#elif (NXT_HAVE_SHM_OPEN_ANON)
+
+ fd = shm_open(SHM_ANON, O_RDWR, S_IRUSR | S_IWUSR);
+
+ if (nxt_slow_path(fd == -1)) {
+ nxt_alert(task, "shm_open(SHM_ANON) failed %E", nxt_errno);
+
+ return -1;
+ }
+
+ nxt_debug(task, "shm_open(SHM_ANON): %FD", fd);
+
+#elif (NXT_HAVE_SHM_OPEN)
+
+ /* Just in case. */
+ shm_unlink((char *) name);
+
+ fd = shm_open((char *) name, O_CREAT | O_EXCL | O_RDWR, S_IRUSR | S_IWUSR);
+
+ if (nxt_slow_path(fd == -1)) {
+ nxt_alert(task, "shm_open(%s) failed %E", name, nxt_errno);
+
+ return -1;
+ }
+
+ nxt_debug(task, "shm_open(%s): %FD", name, fd);
+
+ if (nxt_slow_path(shm_unlink((char *) name) == -1)) {
+ nxt_log(task, NXT_LOG_WARN, "shm_unlink(%s) failed %E", name,
+ nxt_errno);
+ }
+
+#else
+
+#error No working shared memory implementation.
+
+#endif
+
+ if (nxt_slow_path(ftruncate(fd, size) == -1)) {
+ nxt_alert(task, "ftruncate() failed %E", nxt_errno);
+
+ nxt_fd_close(fd);
+
+ return -1;
+ }
+
+ return fd;
+}
+
+
static nxt_port_mmap_handler_t *
nxt_port_mmap_get(nxt_task_t *task, nxt_port_t *port, nxt_chunk_id_t *c,
nxt_int_t n, nxt_bool_t tracking)
diff --git a/src/nxt_port_memory.h b/src/nxt_port_memory.h
index 748549b1..2cd4bd76 100644
--- a/src/nxt_port_memory.h
+++ b/src/nxt_port_memory.h
@@ -71,5 +71,6 @@ typedef enum nxt_port_method_e nxt_port_method_t;
nxt_port_method_t
nxt_port_mmap_get_method(nxt_task_t *task, nxt_port_t *port, nxt_buf_t *b);
+nxt_int_t nxt_shm_open(nxt_task_t *task, size_t size);
#endif /* _NXT_PORT_MEMORY_H_INCLUDED_ */
diff --git a/src/nxt_router.c b/src/nxt_router.c
index bf82501c..d4d037e1 100644
--- a/src/nxt_router.c
+++ b/src/nxt_router.c
@@ -906,8 +906,9 @@ nxt_router_new_port_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg)
void
nxt_router_conf_data_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg)
{
+ void *p;
+ size_t size;
nxt_int_t ret;
- nxt_buf_t *b;
nxt_router_temp_conf_t *tmcf;
tmcf = nxt_router_temp_conf(task);
@@ -915,9 +916,33 @@ nxt_router_conf_data_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg)
return;
}
- nxt_debug(task, "nxt_router_conf_data_handler(%O): %*s",
- nxt_buf_used_size(msg->buf),
- (size_t) nxt_buf_used_size(msg->buf), msg->buf->mem.pos);
+ if (nxt_slow_path(msg->fd == -1)) {
+ nxt_alert(task, "conf_data_handler: invalid file shm fd");
+ return;
+ }
+
+ if (nxt_buf_mem_used_size(&msg->buf->mem) != sizeof(size_t)) {
+ nxt_alert(task, "conf_data_handler: unexpected buffer size (%d)",
+ (int) nxt_buf_mem_used_size(&msg->buf->mem));
+
+ nxt_fd_close(msg->fd);
+ msg->fd = -1;
+
+ return;
+ }
+
+ nxt_memcpy(&size, msg->buf->mem.pos, sizeof(size_t));
+
+ p = nxt_mem_mmap(NULL, size, PROT_READ, MAP_SHARED, msg->fd, 0);
+
+ nxt_fd_close(msg->fd);
+ msg->fd = -1;
+
+ if (nxt_slow_path(p == MAP_FAILED)) {
+ return;
+ }
+
+ nxt_debug(task, "conf_data_handler(%uz): %*s", size, size, p);
tmcf->router_conf->router = nxt_router;
tmcf->stream = msg->port_msg.stream;
@@ -928,20 +953,12 @@ nxt_router_conf_data_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg)
if (nxt_slow_path(tmcf->port == NULL)) {
nxt_alert(task, "reply port not found");
- return;
+ goto fail;
}
nxt_port_use(task, tmcf->port, 1);
- b = nxt_buf_chk_make_plain(tmcf->router_conf->mem_pool,
- msg->buf, msg->size);
- if (nxt_slow_path(b == NULL)) {
- nxt_router_conf_error(task, tmcf);
-
- return;
- }
-
- ret = nxt_router_conf_create(task, tmcf, b->mem.pos, b->mem.free);
+ ret = nxt_router_conf_create(task, tmcf, p, nxt_pointer_to(p, size));
if (nxt_fast_path(ret == NXT_OK)) {
nxt_router_conf_apply(task, tmcf, NULL);
@@ -949,6 +966,10 @@ nxt_router_conf_data_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg)
} else {
nxt_router_conf_error(task, tmcf);
}
+
+fail:
+
+ nxt_mem_munmap(p, size);
}