summaryrefslogtreecommitdiffhomepage
diff options
context:
space:
mode:
authorMax Romanov <max.romanov@nginx.com>2020-07-25 11:06:32 +0300
committerMax Romanov <max.romanov@nginx.com>2020-07-25 11:06:32 +0300
commitc617480eefc0822d52f9153906bb526ad483b9a3 (patch)
tree8764ebe8defc21eabd2930e8d41293c3804c7b84
parent10f90f0d483d1a46a58d7fd42fb406cd46a9c1a6 (diff)
downloadunit-c617480eefc0822d52f9153906bb526ad483b9a3.tar.gz
unit-c617480eefc0822d52f9153906bb526ad483b9a3.tar.bz2
Using plain shared memory for configuration pass.
There is no restrictions on configration size and using segmented shared memory only doubles memory usage because to parse configration on router side, it needs to be 'plain' e. g. located in single continous memory buffer.
-rw-r--r--src/nxt_controller.c50
-rw-r--r--src/nxt_port_memory.c135
-rw-r--r--src/nxt_port_memory.h1
-rw-r--r--src/nxt_router.c49
4 files changed, 152 insertions, 83 deletions
diff --git a/src/nxt_controller.c b/src/nxt_controller.c
index a61c127d..8c9d4c53 100644
--- a/src/nxt_controller.c
+++ b/src/nxt_controller.c
@@ -54,7 +54,7 @@ static nxt_int_t nxt_controller_conf_default(void);
static void nxt_controller_conf_init_handler(nxt_task_t *task,
nxt_port_recv_msg_t *msg, void *data);
static void nxt_controller_flush_requests(nxt_task_t *task);
-static nxt_int_t nxt_controller_conf_send(nxt_task_t *task,
+static nxt_int_t nxt_controller_conf_send(nxt_task_t *task, nxt_mp_t *mp,
nxt_conf_value_t *conf, nxt_port_rpc_handler_t handler, void *data);
static void nxt_controller_conn_init(nxt_task_t *task, void *obj, void *data);
@@ -344,7 +344,7 @@ nxt_controller_send_current_conf(nxt_task_t *task)
conf = nxt_controller_conf.root;
if (conf != NULL) {
- rc = nxt_controller_conf_send(task, conf,
+ rc = nxt_controller_conf_send(task, nxt_controller_conf.pool, conf,
nxt_controller_conf_init_handler, NULL);
if (nxt_fast_path(rc == NXT_OK)) {
@@ -497,11 +497,14 @@ nxt_controller_flush_requests(nxt_task_t *task)
static nxt_int_t
-nxt_controller_conf_send(nxt_task_t *task, nxt_conf_value_t *conf,
+nxt_controller_conf_send(nxt_task_t *task, nxt_mp_t *mp, nxt_conf_value_t *conf,
nxt_port_rpc_handler_t handler, void *data)
{
+ void *mem;
+ u_char *end;
size_t size;
uint32_t stream;
+ nxt_fd_t fd;
nxt_int_t rc;
nxt_buf_t *b;
nxt_port_t *router_port, *controller_port;
@@ -518,30 +521,53 @@ nxt_controller_conf_send(nxt_task_t *task, nxt_conf_value_t *conf,
size = nxt_conf_json_length(conf, NULL);
- b = nxt_port_mmap_get_buf(task, router_port, size);
+ b = nxt_buf_mem_alloc(mp, sizeof(size_t), 0);
if (nxt_slow_path(b == NULL)) {
return NXT_ERROR;
}
- b->mem.free = nxt_conf_json_print(b->mem.free, conf, NULL);
+ fd = nxt_shm_open(task, size);
+ if (nxt_slow_path(fd == -1)) {
+ return NXT_ERROR;
+ }
+
+ mem = nxt_mem_mmap(NULL, size, PROT_READ | PROT_WRITE, MAP_SHARED, fd, 0);
+ if (nxt_slow_path(mem == MAP_FAILED)) {
+ goto fail;
+ }
+
+ end = nxt_conf_json_print(mem, conf, NULL);
+
+ nxt_mem_munmap(mem, size);
+
+ size = end - (u_char *) mem;
+
+ b->mem.free = nxt_cpymem(b->mem.pos, &size, sizeof(size_t));
stream = nxt_port_rpc_register_handler(task, controller_port,
handler, handler,
router_port->pid, data);
-
if (nxt_slow_path(stream == 0)) {
- return NXT_ERROR;
+ goto fail;
}
- rc = nxt_port_socket_write(task, router_port, NXT_PORT_MSG_DATA_LAST, -1,
- stream, controller_port->id, b);
+ rc = nxt_port_socket_write(task, router_port,
+ NXT_PORT_MSG_DATA_LAST | NXT_PORT_MSG_CLOSE_FD,
+ fd, stream, controller_port->id, b);
if (nxt_slow_path(rc != NXT_OK)) {
nxt_port_rpc_cancel(task, controller_port, stream);
- return NXT_ERROR;
+
+ goto fail;
}
return NXT_OK;
+
+fail:
+
+ nxt_fd_close(fd);
+
+ return NXT_ERROR;
}
@@ -1201,7 +1227,7 @@ nxt_controller_process_config(nxt_task_t *task, nxt_controller_request_t *req,
goto alloc_fail;
}
- rc = nxt_controller_conf_send(task, value,
+ rc = nxt_controller_conf_send(task, mp, value,
nxt_controller_conf_handler, req);
if (nxt_slow_path(rc != NXT_OK)) {
@@ -1282,7 +1308,7 @@ nxt_controller_process_config(nxt_task_t *task, nxt_controller_request_t *req,
goto alloc_fail;
}
- rc = nxt_controller_conf_send(task, value,
+ rc = nxt_controller_conf_send(task, mp, value,
nxt_controller_conf_handler, req);
if (nxt_slow_path(rc != NXT_OK)) {
diff --git a/src/nxt_port_memory.c b/src/nxt_port_memory.c
index f4d2125c..fd472cc6 100644
--- a/src/nxt_port_memory.c
+++ b/src/nxt_port_memory.c
@@ -286,7 +286,6 @@ nxt_port_new_port_mmap(nxt_task_t *task, nxt_process_t *process,
nxt_port_t *port, nxt_bool_t tracking, nxt_int_t n)
{
void *mem;
- u_char *p, name[64];
nxt_fd_t fd;
nxt_int_t i;
nxt_free_map_t *free_map;
@@ -310,63 +309,8 @@ nxt_port_new_port_mmap(nxt_task_t *task, nxt_process_t *process,
return NULL;
}
- p = nxt_sprintf(name, name + sizeof(name), NXT_SHM_PREFIX "unit.%PI.%uxD",
- nxt_pid, nxt_random(&task->thread->random));
- *p = '\0';
-
-#if (NXT_HAVE_MEMFD_CREATE)
-
- fd = syscall(SYS_memfd_create, name, MFD_CLOEXEC);
-
- if (nxt_slow_path(fd == -1)) {
- nxt_alert(task, "memfd_create(%s) failed %E", name, nxt_errno);
-
- goto remove_fail;
- }
-
- nxt_debug(task, "memfd_create(%s): %FD", name, fd);
-
-#elif (NXT_HAVE_SHM_OPEN_ANON)
-
- fd = shm_open(SHM_ANON, O_RDWR, S_IRUSR | S_IWUSR);
-
- nxt_debug(task, "shm_open(SHM_ANON): %FD", fd);
-
- if (nxt_slow_path(fd == -1)) {
- nxt_alert(task, "shm_open(SHM_ANON) failed %E", nxt_errno);
-
- goto remove_fail;
- }
-
-#elif (NXT_HAVE_SHM_OPEN)
-
- /* Just in case. */
- shm_unlink((char *) name);
-
- fd = shm_open((char *) name, O_CREAT | O_EXCL | O_RDWR, S_IRUSR | S_IWUSR);
-
- nxt_debug(task, "shm_open(%s): %FD", name, fd);
-
+ fd = nxt_shm_open(task, PORT_MMAP_SIZE);
if (nxt_slow_path(fd == -1)) {
- nxt_alert(task, "shm_open(%s) failed %E", name, nxt_errno);
-
- goto remove_fail;
- }
-
- if (nxt_slow_path(shm_unlink((char *) name) == -1)) {
- nxt_log(task, NXT_LOG_WARN, "shm_unlink(%s) failed %E", name,
- nxt_errno);
- }
-
-#else
-
-#error No working shared memory implementation.
-
-#endif
-
- if (nxt_slow_path(ftruncate(fd, PORT_MMAP_SIZE) == -1)) {
- nxt_log(task, NXT_LOG_WARN, "ftruncate() failed %E", nxt_errno);
-
goto remove_fail;
}
@@ -423,6 +367,83 @@ remove_fail:
}
+nxt_int_t
+nxt_shm_open(nxt_task_t *task, size_t size)
+{
+ nxt_fd_t fd;
+
+#if (NXT_HAVE_MEMFD_CREATE || NXT_HAVE_SHM_OPEN)
+
+ u_char *p, name[64];
+
+ p = nxt_sprintf(name, name + sizeof(name), NXT_SHM_PREFIX "unit.%PI.%uxD",
+ nxt_pid, nxt_random(&task->thread->random));
+ *p = '\0';
+
+#endif
+
+#if (NXT_HAVE_MEMFD_CREATE)
+
+ fd = syscall(SYS_memfd_create, name, MFD_CLOEXEC);
+
+ if (nxt_slow_path(fd == -1)) {
+ nxt_alert(task, "memfd_create(%s) failed %E", name, nxt_errno);
+
+ return -1;
+ }
+
+ nxt_debug(task, "memfd_create(%s): %FD", name, fd);
+
+#elif (NXT_HAVE_SHM_OPEN_ANON)
+
+ fd = shm_open(SHM_ANON, O_RDWR, S_IRUSR | S_IWUSR);
+
+ if (nxt_slow_path(fd == -1)) {
+ nxt_alert(task, "shm_open(SHM_ANON) failed %E", nxt_errno);
+
+ return -1;
+ }
+
+ nxt_debug(task, "shm_open(SHM_ANON): %FD", fd);
+
+#elif (NXT_HAVE_SHM_OPEN)
+
+ /* Just in case. */
+ shm_unlink((char *) name);
+
+ fd = shm_open((char *) name, O_CREAT | O_EXCL | O_RDWR, S_IRUSR | S_IWUSR);
+
+ if (nxt_slow_path(fd == -1)) {
+ nxt_alert(task, "shm_open(%s) failed %E", name, nxt_errno);
+
+ return -1;
+ }
+
+ nxt_debug(task, "shm_open(%s): %FD", name, fd);
+
+ if (nxt_slow_path(shm_unlink((char *) name) == -1)) {
+ nxt_log(task, NXT_LOG_WARN, "shm_unlink(%s) failed %E", name,
+ nxt_errno);
+ }
+
+#else
+
+#error No working shared memory implementation.
+
+#endif
+
+ if (nxt_slow_path(ftruncate(fd, size) == -1)) {
+ nxt_alert(task, "ftruncate() failed %E", nxt_errno);
+
+ nxt_fd_close(fd);
+
+ return -1;
+ }
+
+ return fd;
+}
+
+
static nxt_port_mmap_handler_t *
nxt_port_mmap_get(nxt_task_t *task, nxt_port_t *port, nxt_chunk_id_t *c,
nxt_int_t n, nxt_bool_t tracking)
diff --git a/src/nxt_port_memory.h b/src/nxt_port_memory.h
index 748549b1..2cd4bd76 100644
--- a/src/nxt_port_memory.h
+++ b/src/nxt_port_memory.h
@@ -71,5 +71,6 @@ typedef enum nxt_port_method_e nxt_port_method_t;
nxt_port_method_t
nxt_port_mmap_get_method(nxt_task_t *task, nxt_port_t *port, nxt_buf_t *b);
+nxt_int_t nxt_shm_open(nxt_task_t *task, size_t size);
#endif /* _NXT_PORT_MEMORY_H_INCLUDED_ */
diff --git a/src/nxt_router.c b/src/nxt_router.c
index bf82501c..d4d037e1 100644
--- a/src/nxt_router.c
+++ b/src/nxt_router.c
@@ -906,8 +906,9 @@ nxt_router_new_port_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg)
void
nxt_router_conf_data_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg)
{
+ void *p;
+ size_t size;
nxt_int_t ret;
- nxt_buf_t *b;
nxt_router_temp_conf_t *tmcf;
tmcf = nxt_router_temp_conf(task);
@@ -915,9 +916,33 @@ nxt_router_conf_data_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg)
return;
}
- nxt_debug(task, "nxt_router_conf_data_handler(%O): %*s",
- nxt_buf_used_size(msg->buf),
- (size_t) nxt_buf_used_size(msg->buf), msg->buf->mem.pos);
+ if (nxt_slow_path(msg->fd == -1)) {
+ nxt_alert(task, "conf_data_handler: invalid file shm fd");
+ return;
+ }
+
+ if (nxt_buf_mem_used_size(&msg->buf->mem) != sizeof(size_t)) {
+ nxt_alert(task, "conf_data_handler: unexpected buffer size (%d)",
+ (int) nxt_buf_mem_used_size(&msg->buf->mem));
+
+ nxt_fd_close(msg->fd);
+ msg->fd = -1;
+
+ return;
+ }
+
+ nxt_memcpy(&size, msg->buf->mem.pos, sizeof(size_t));
+
+ p = nxt_mem_mmap(NULL, size, PROT_READ, MAP_SHARED, msg->fd, 0);
+
+ nxt_fd_close(msg->fd);
+ msg->fd = -1;
+
+ if (nxt_slow_path(p == MAP_FAILED)) {
+ return;
+ }
+
+ nxt_debug(task, "conf_data_handler(%uz): %*s", size, size, p);
tmcf->router_conf->router = nxt_router;
tmcf->stream = msg->port_msg.stream;
@@ -928,20 +953,12 @@ nxt_router_conf_data_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg)
if (nxt_slow_path(tmcf->port == NULL)) {
nxt_alert(task, "reply port not found");
- return;
+ goto fail;
}
nxt_port_use(task, tmcf->port, 1);
- b = nxt_buf_chk_make_plain(tmcf->router_conf->mem_pool,
- msg->buf, msg->size);
- if (nxt_slow_path(b == NULL)) {
- nxt_router_conf_error(task, tmcf);
-
- return;
- }
-
- ret = nxt_router_conf_create(task, tmcf, b->mem.pos, b->mem.free);
+ ret = nxt_router_conf_create(task, tmcf, p, nxt_pointer_to(p, size));
if (nxt_fast_path(ret == NXT_OK)) {
nxt_router_conf_apply(task, tmcf, NULL);
@@ -949,6 +966,10 @@ nxt_router_conf_data_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg)
} else {
nxt_router_conf_error(task, tmcf);
}
+
+fail:
+
+ nxt_mem_munmap(p, size);
}