summaryrefslogtreecommitdiffhomepage
path: root/src/nxt_process.c
diff options
context:
space:
mode:
authorTiago Natel de Moura <t.nateldemoura@f5.com>2020-03-09 16:28:25 +0000
committerTiago Natel de Moura <t.nateldemoura@f5.com>2020-03-09 16:28:25 +0000
commite9e5ddd5a5d9ce99768833137eac2551a710becf (patch)
tree940e591004b16216d5122cd12b367f3ebf15e0d3 /src/nxt_process.c
parentaacf11152c314efb1895b6d44ba72dc9f1801c7d (diff)
downloadunit-e9e5ddd5a5d9ce99768833137eac2551a710becf.tar.gz
unit-e9e5ddd5a5d9ce99768833137eac2551a710becf.tar.bz2
Refactor of process management.
The process abstraction has changed to: setup(task, process) start(task, process_data) prefork(task, process, mp) The prefork() occurs in the main process right before fork. The file src/nxt_main_process.c is completely free of process specific logic. The creation of a process now supports a PROCESS_CREATED state. The The setup() function of each process can set its state to either created or ready. If created, a MSG_PROCESS_CREATED is sent to main process, where external setup can be done (required for rootfs under container). The core processes (discovery, controller and router) doesn't need external setup, then they all proceeds to their start() function straight away. In the case of applications, the load of the module happens at the process setup() time and The module's init() function has changed to be the start() of the process. The module API has changed to: setup(task, process, conf) start(task, data) As a direct benefit of the PROCESS_CREATED message, the clone(2) of processes using pid namespaces now doesn't need to create a pipe to make the child block until parent setup uid/gid mappings nor it needs to receive the child pid.
Diffstat (limited to '')
-rw-r--r--src/nxt_process.c534
1 files changed, 374 insertions, 160 deletions
diff --git a/src/nxt_process.c b/src/nxt_process.c
index f5959edf..e84549b3 100644
--- a/src/nxt_process.c
+++ b/src/nxt_process.c
@@ -13,9 +13,18 @@
#include <signal.h>
-static void nxt_process_start(nxt_task_t *task, nxt_process_t *process);
-static nxt_int_t nxt_process_worker_setup(nxt_task_t *task,
- nxt_process_t *process, int parentfd);
+static nxt_int_t nxt_process_setup(nxt_task_t *task, nxt_process_t *process);
+static nxt_int_t nxt_process_child_fixup(nxt_task_t *task,
+ nxt_process_t *process);
+static nxt_int_t nxt_process_send_created(nxt_task_t *task,
+ nxt_process_t *process);
+static nxt_int_t nxt_process_send_ready(nxt_task_t *task,
+ nxt_process_t *process);
+static void nxt_process_created_ok(nxt_task_t *task, nxt_port_recv_msg_t *msg,
+ void *data);
+static void nxt_process_created_error(nxt_task_t *task,
+ nxt_port_recv_msg_t *msg, void *data);
+
/* A cached process pid. */
nxt_pid_t nxt_pid;
@@ -47,62 +56,58 @@ nxt_bool_t nxt_proc_remove_notify_matrix[NXT_PROCESS_MAX][NXT_PROCESS_MAX] = {
static nxt_int_t
-nxt_process_worker_setup(nxt_task_t *task, nxt_process_t *process, int parentfd)
+nxt_process_child_fixup(nxt_task_t *task, nxt_process_t *process)
{
- pid_t rpid, pid;
- ssize_t n;
- nxt_int_t parent_status;
nxt_process_t *p;
nxt_runtime_t *rt;
nxt_process_init_t *init;
nxt_process_type_t ptype;
- pid = getpid();
- rpid = 0;
- rt = task->thread->runtime;
- init = process->init;
+ init = nxt_process_init(process);
- /* Setup the worker process. */
+ nxt_pid = nxt_getpid();
- n = read(parentfd, &rpid, sizeof(rpid));
- if (nxt_slow_path(n == -1 || n != sizeof(rpid))) {
- nxt_alert(task, "failed to read real pid");
- return NXT_ERROR;
- }
-
- if (nxt_slow_path(rpid == 0)) {
- nxt_alert(task, "failed to get real pid from parent");
- return NXT_ERROR;
- }
-
- nxt_pid = rpid;
+ process->pid = nxt_pid;
/* Clean inherited cached thread tid. */
task->thread->tid = 0;
- process->pid = nxt_pid;
+#if (NXT_HAVE_CLONE && NXT_HAVE_CLONE_NEWPID)
+ if (nxt_is_clone_flag_set(process->isolation.clone.flags, NEWPID)) {
+ ssize_t pidsz;
+ char procpid[10];
- if (nxt_pid != pid) {
- nxt_debug(task, "app \"%s\" real pid %d", init->name, nxt_pid);
- nxt_debug(task, "app \"%s\" isolated pid: %d", init->name, pid);
- }
+ nxt_debug(task, "%s isolated pid is %d", process->name, nxt_pid);
- n = read(parentfd, &parent_status, sizeof(parent_status));
- if (nxt_slow_path(n == -1 || n != sizeof(parent_status))) {
- nxt_alert(task, "failed to read parent status");
- return NXT_ERROR;
- }
+ pidsz = readlink("/proc/self", procpid, sizeof(procpid));
+
+ if (nxt_slow_path(pidsz < 0 || pidsz >= (ssize_t) sizeof(procpid))) {
+ nxt_alert(task, "failed to read real pid from /proc/self");
+ return NXT_ERROR;
+ }
+
+ procpid[pidsz] = '\0';
+
+ nxt_pid = (nxt_pid_t) strtol(procpid, NULL, 10);
+
+ nxt_assert(nxt_pid > 0 && nxt_errno != ERANGE);
- if (nxt_slow_path(parent_status != NXT_OK)) {
- return parent_status;
+ process->pid = nxt_pid;
+ task->thread->tid = nxt_pid;
+
+ nxt_debug(task, "%s real pid is %d", process->name, nxt_pid);
}
+#endif
+
ptype = init->type;
nxt_port_reset_next_id();
nxt_event_engine_thread_adopt(task->thread->engine);
+ rt = task->thread->runtime;
+
/* Remove not ready processes. */
nxt_runtime_process_each(rt, p) {
@@ -114,7 +119,7 @@ nxt_process_worker_setup(nxt_task_t *task, nxt_process_t *process, int parentfd)
continue;
}
- if (!p->ready) {
+ if (p->state != NXT_PROCESS_STATE_READY) {
nxt_debug(task, "remove not ready process %PI", p->pid);
nxt_process_close_ports(task, p);
@@ -127,12 +132,6 @@ nxt_process_worker_setup(nxt_task_t *task, nxt_process_t *process, int parentfd)
} nxt_runtime_process_loop;
- nxt_runtime_process_add(task, process);
-
- nxt_process_start(task, process);
-
- process->ready = 1;
-
return NXT_OK;
}
@@ -140,48 +139,36 @@ nxt_process_worker_setup(nxt_task_t *task, nxt_process_t *process, int parentfd)
nxt_pid_t
nxt_process_create(nxt_task_t *task, nxt_process_t *process)
{
- int pipefd[2];
nxt_int_t ret;
nxt_pid_t pid;
- nxt_process_init_t *init;
-
- if (nxt_slow_path(pipe(pipefd) == -1)) {
- nxt_alert(task, "failed to create process pipe for passing rpid");
- return -1;
- }
-
- init = process->init;
#if (NXT_HAVE_CLONE)
- pid = nxt_clone(SIGCHLD | init->isolation.clone.flags);
+ pid = nxt_clone(SIGCHLD | process->isolation.clone.flags);
if (nxt_slow_path(pid < 0)) {
- nxt_alert(task, "clone() failed while creating \"%s\" %E",
- init->name, nxt_errno);
- goto cleanup;
+ nxt_alert(task, "clone() failed for %s %E", process->name, nxt_errno);
+ return pid;
}
#else
pid = fork();
if (nxt_slow_path(pid < 0)) {
- nxt_alert(task, "fork() failed while creating \"%s\" %E",
- init->name, nxt_errno);
- goto cleanup;
+ nxt_alert(task, "fork() failed for %s %E", process->name, nxt_errno);
+ return pid;
}
#endif
if (pid == 0) {
/* Child. */
- if (nxt_slow_path(close(pipefd[1]) == -1)) {
- nxt_alert(task, "failed to close writer pipe fd");
- }
-
- ret = nxt_process_worker_setup(task, process, pipefd[0]);
+ ret = nxt_process_child_fixup(task, process);
if (nxt_slow_path(ret != NXT_OK)) {
- exit(1);
+ nxt_process_quit(task, 1);
+ return -1;
}
- if (nxt_slow_path(close(pipefd[0]) == -1)) {
- nxt_alert(task, "failed to close writer pipe fd");
+ nxt_runtime_process_add(task, process);
+
+ if (nxt_slow_path(nxt_process_setup(task, process) != NXT_OK)) {
+ nxt_process_quit(task, 1);
}
/*
@@ -193,78 +180,24 @@ nxt_process_create(nxt_task_t *task, nxt_process_t *process)
/* Parent. */
- /*
- * At this point, the child process is blocked reading the
- * pipe fd to get its real pid (rpid).
- *
- * If anything goes wrong now, we need to terminate the child
- * process by sending a NXT_ERROR in the pipe.
- */
-
#if (NXT_HAVE_CLONE)
- nxt_debug(task, "clone(\"%s\"): %PI", init->name, pid);
+ nxt_debug(task, "clone(%s): %PI", process->name, pid);
#else
- nxt_debug(task, "fork(\"%s\"): %PI", init->name, pid);
+ nxt_debug(task, "fork(%s): %PI", process->name, pid);
#endif
- if (nxt_slow_path(write(pipefd[1], &pid, sizeof(pid)) == -1)) {
- nxt_alert(task, "failed to write real pid");
- goto fail;
- }
-
-#if (NXT_HAVE_CLONE && NXT_HAVE_CLONE_NEWUSER)
- if (NXT_CLONE_USER(init->isolation.clone.flags)) {
- ret = nxt_clone_credential_map(task, pid, init->user_cred,
- &init->isolation.clone);
- if (nxt_slow_path(ret != NXT_OK)) {
- goto fail;
- }
- }
-#endif
-
- ret = NXT_OK;
-
- if (nxt_slow_path(write(pipefd[1], &ret, sizeof(ret)) == -1)) {
- nxt_alert(task, "failed to write status");
- goto fail;
- }
-
process->pid = pid;
nxt_runtime_process_add(task, process);
- goto cleanup;
-
-fail:
-
- ret = NXT_ERROR;
-
- if (nxt_slow_path(write(pipefd[1], &ret, sizeof(ret)) == -1)) {
- nxt_alert(task, "failed to write status");
- }
-
- waitpid(pid, NULL, 0);
-
- pid = -1;
-
-cleanup:
-
- if (nxt_slow_path(close(pipefd[0]) != 0)) {
- nxt_alert(task, "failed to close pipe: %E", nxt_errno);
- }
-
- if (nxt_slow_path(close(pipefd[1]) != 0)) {
- nxt_alert(task, "failed to close pipe: %E", nxt_errno);
- }
-
return pid;
}
-static void
-nxt_process_start(nxt_task_t *task, nxt_process_t *process)
+static nxt_int_t
+nxt_process_setup(nxt_task_t *task, nxt_process_t *process)
{
- nxt_int_t ret, cap_setid;
+ nxt_int_t ret;
nxt_port_t *port, *main_port;
nxt_thread_t *thread;
nxt_runtime_t *rt;
@@ -272,37 +205,17 @@ nxt_process_start(nxt_task_t *task, nxt_process_t *process)
nxt_event_engine_t *engine;
const nxt_event_interface_t *interface;
- init = process->init;
+ init = nxt_process_init(process);
- nxt_log(task, NXT_LOG_INFO, "%s started", init->name);
+ nxt_debug(task, "%s setup", process->name);
- nxt_process_title(task, "unit: %s", init->name);
+ nxt_process_title(task, "unit: %s", process->name);
thread = task->thread;
rt = thread->runtime;
nxt_random_init(&thread->random);
- cap_setid = rt->capabilities.setid;
-
-#if (NXT_HAVE_CLONE_NEWUSER)
- if (!cap_setid && NXT_CLONE_USER(init->isolation.clone.flags)) {
- cap_setid = 1;
- }
-#endif
-
- if (cap_setid) {
- ret = nxt_credential_setgids(task, init->user_cred);
- if (nxt_slow_path(ret != NXT_OK)) {
- goto fail;
- }
-
- ret = nxt_credential_setuid(task, init->user_cred);
- if (nxt_slow_path(ret != NXT_OK)) {
- goto fail;
- }
- }
-
rt->type = init->type;
engine = thread->engine;
@@ -312,17 +225,17 @@ nxt_process_start(nxt_task_t *task, nxt_process_t *process)
interface = nxt_service_get(rt->services, "engine", rt->engine);
if (nxt_slow_path(interface == NULL)) {
- goto fail;
+ return NXT_ERROR;
}
if (nxt_event_engine_change(engine, interface, rt->batch) != NXT_OK) {
- goto fail;
+ return NXT_ERROR;
}
ret = nxt_runtime_thread_pool_create(thread, rt, rt->auxiliary_threads,
60000 * 1000000LL);
if (nxt_slow_path(ret != NXT_OK)) {
- goto fail;
+ return NXT_ERROR;
}
main_port = rt->port_by_type[NXT_PROCESS_MAIN];
@@ -334,28 +247,282 @@ nxt_process_start(nxt_task_t *task, nxt_process_t *process)
nxt_port_write_close(port);
- ret = init->start(task, init->data);
+ nxt_port_enable(task, port, init->port_handlers);
+
+ ret = init->setup(task, process);
if (nxt_slow_path(ret != NXT_OK)) {
- goto fail;
+ return NXT_ERROR;
}
- nxt_port_enable(task, port, init->port_handlers);
+ switch (process->state) {
- ret = nxt_port_socket_write(task, main_port, NXT_PORT_MSG_PROCESS_READY,
- -1, init->stream, 0, NULL);
+ case NXT_PROCESS_STATE_CREATED:
+ ret = nxt_process_send_created(task, process);
+ break;
+
+ case NXT_PROCESS_STATE_READY:
+ ret = nxt_process_send_ready(task, process);
+
+ if (nxt_slow_path(ret != NXT_OK)) {
+ break;
+ }
+
+ ret = init->start(task, &process->data);
+ break;
+
+ default:
+ nxt_assert(0);
+ }
if (nxt_slow_path(ret != NXT_OK)) {
- nxt_log(task, NXT_LOG_ERR, "failed to send READY message to main");
+ nxt_alert(task, "%s failed to start", process->name);
+ }
+
+ return ret;
+}
+
+static nxt_int_t
+nxt_process_send_created(nxt_task_t *task, nxt_process_t *process)
+{
+ uint32_t stream;
+ nxt_int_t ret;
+ nxt_port_t *my_port, *main_port;
+ nxt_runtime_t *rt;
+
+ nxt_assert(process->state == NXT_PROCESS_STATE_CREATED);
+
+ rt = task->thread->runtime;
+
+ my_port = nxt_process_port_first(process);
+ main_port = rt->port_by_type[NXT_PROCESS_MAIN];
+
+ nxt_assert(my_port != NULL && main_port != NULL);
+
+ stream = nxt_port_rpc_register_handler(task, my_port,
+ nxt_process_created_ok,
+ nxt_process_created_error,
+ main_port->pid, process);
+
+ if (nxt_slow_path(stream == 0)) {
+ return NXT_ERROR;
+ }
+
+ ret = nxt_port_socket_write(task, main_port, NXT_PORT_MSG_PROCESS_CREATED,
+ -1, stream, my_port->id, NULL);
+
+ if (nxt_slow_path(ret != NXT_OK)) {
+ nxt_alert(task, "%s failed to send CREATED message", process->name);
+ nxt_port_rpc_cancel(task, my_port, stream);
+ return NXT_ERROR;
+ }
+
+ nxt_debug(task, "%s created", process->name);
+
+ return NXT_OK;
+}
+
+
+static void
+nxt_process_created_ok(nxt_task_t *task, nxt_port_recv_msg_t *msg, void *data)
+{
+ nxt_int_t ret;
+ nxt_process_t *process;
+ nxt_process_init_t *init;
+
+ process = data;
+ init = nxt_process_init(process);
+
+ ret = nxt_process_apply_creds(task, process);
+ if (nxt_slow_path(ret != NXT_OK)) {
goto fail;
}
- return;
+ nxt_log(task, NXT_LOG_INFO, "%s started", process->name);
+
+ ret = init->start(task, &process->data);
fail:
- exit(1);
+ nxt_process_quit(task, ret == NXT_OK ? 0 : 1);
+}
+
+
+static void
+nxt_process_created_error(nxt_task_t *task, nxt_port_recv_msg_t *msg,
+ void *data)
+{
+ nxt_process_t *process;
+ nxt_process_init_t *init;
+
+ process = data;
+ init = nxt_process_init(process);
+
+ nxt_alert(task, "%s failed to start", init->name);
+
+ nxt_process_quit(task, 1);
+}
+
+
+nxt_int_t
+nxt_process_core_setup(nxt_task_t *task, nxt_process_t *process)
+{
+ nxt_int_t ret;
+
+ ret = nxt_process_apply_creds(task, process);
+ if (nxt_slow_path(ret != NXT_OK)) {
+ return NXT_ERROR;
+ }
+
+ process->state = NXT_PROCESS_STATE_READY;
+
+ return NXT_OK;
+}
+
+
+#if (NXT_HAVE_CLONE_NEWUSER)
+
+nxt_int_t
+nxt_process_vldt_isolation_creds(nxt_task_t *task, nxt_process_t *process)
+{
+ nxt_int_t ret;
+ nxt_clone_t *clone;
+ nxt_credential_t *creds;
+
+ clone = &process->isolation.clone;
+ creds = process->user_cred;
+
+ if (clone->uidmap.size == 0 && clone->gidmap.size == 0) {
+ return NXT_OK;
+ }
+
+ if (!nxt_is_clone_flag_set(clone->flags, NEWUSER)) {
+ if (nxt_slow_path(clone->uidmap.size > 0)) {
+ nxt_log(task, NXT_LOG_ERR, "\"uidmap\" is set but "
+ "\"isolation.namespaces.credential\" is false or unset");
+
+ return NXT_ERROR;
+ }
+
+ if (nxt_slow_path(clone->gidmap.size > 0)) {
+ nxt_log(task, NXT_LOG_ERR, "\"gidmap\" is set but "
+ "\"isolation.namespaces.credential\" is false or unset");
+
+ return NXT_ERROR;
+ }
+
+ return NXT_OK;
+ }
+
+ ret = nxt_clone_vldt_credential_uidmap(task, &clone->uidmap, creds);
+ if (nxt_slow_path(ret != NXT_OK)) {
+ return NXT_ERROR;
+ }
+
+ return nxt_clone_vldt_credential_gidmap(task, &clone->gidmap, creds);
+}
+
+#endif
+
+
+nxt_int_t
+nxt_process_creds_set(nxt_task_t *task, nxt_process_t *process, nxt_str_t *user,
+ nxt_str_t *group)
+{
+ char *str;
+
+ process->user_cred = nxt_mp_zalloc(process->mem_pool,
+ sizeof(nxt_credential_t));
+
+ if (nxt_slow_path(process->user_cred == NULL)) {
+ return NXT_ERROR;
+ }
+
+ str = nxt_mp_zalloc(process->mem_pool, user->length + 1);
+ if (nxt_slow_path(str == NULL)) {
+ return NXT_ERROR;
+ }
+
+ nxt_memcpy(str, user->start, user->length);
+ str[user->length] = '\0';
+
+ process->user_cred->user = str;
+
+ if (group->start != NULL) {
+ str = nxt_mp_zalloc(process->mem_pool, group->length + 1);
+ if (nxt_slow_path(str == NULL)) {
+ return NXT_ERROR;
+ }
+
+ nxt_memcpy(str, group->start, group->length);
+ str[group->length] = '\0';
+
+ } else {
+ str = NULL;
+ }
+
+ return nxt_credential_get(task, process->mem_pool, process->user_cred, str);
+}
+
+
+nxt_int_t
+nxt_process_apply_creds(nxt_task_t *task, nxt_process_t *process)
+{
+ nxt_int_t ret, cap_setid;
+ nxt_runtime_t *rt;
+
+ rt = task->thread->runtime;
+
+ cap_setid = rt->capabilities.setid;
+
+#if (NXT_HAVE_CLONE && NXT_HAVE_CLONE_NEWUSER)
+ if (!cap_setid
+ && nxt_is_clone_flag_set(process->isolation.clone.flags, NEWUSER)) {
+ cap_setid = 1;
+ }
+#endif
+
+ if (cap_setid) {
+ ret = nxt_credential_setgids(task, process->user_cred);
+ if (nxt_slow_path(ret != NXT_OK)) {
+ return NXT_ERROR;
+ }
+
+ ret = nxt_credential_setuid(task, process->user_cred);
+ if (nxt_slow_path(ret != NXT_OK)) {
+ return NXT_ERROR;
+ }
+ }
+
+ return NXT_OK;
+}
+
+
+static nxt_int_t
+nxt_process_send_ready(nxt_task_t *task, nxt_process_t *process)
+{
+ nxt_int_t ret;
+ nxt_port_t *main_port;
+ nxt_runtime_t *rt;
+
+ rt = task->thread->runtime;
+
+ main_port = rt->port_by_type[NXT_PROCESS_MAIN];
+
+ nxt_assert(main_port != NULL);
+
+ ret = nxt_port_socket_write(task, main_port, NXT_PORT_MSG_PROCESS_READY,
+ -1, process->stream, 0, NULL);
+
+ if (nxt_slow_path(ret != NXT_OK)) {
+ nxt_alert(task, "%s failed to send READY message", process->name);
+ return NXT_ERROR;
+ }
+
+ nxt_debug(task, "%s sent ready", process->name);
+
+ return NXT_OK;
}
@@ -625,3 +792,50 @@ nxt_process_connected_port_find(nxt_process_t *process, nxt_port_t *port)
return res;
}
+
+
+void
+nxt_process_quit(nxt_task_t *task, nxt_uint_t exit_status)
+{
+ nxt_uint_t n;
+ nxt_queue_t *listen;
+ nxt_runtime_t *rt;
+ nxt_queue_link_t *link, *next;
+ nxt_listen_event_t *lev;
+ nxt_listen_socket_t *ls;
+
+ rt = task->thread->runtime;
+
+ nxt_debug(task, "close listen connections");
+
+ listen = &task->thread->engine->listen_connections;
+
+ for (link = nxt_queue_first(listen);
+ link != nxt_queue_tail(listen);
+ link = next)
+ {
+ next = nxt_queue_next(link);
+ lev = nxt_queue_link_data(link, nxt_listen_event_t, link);
+ nxt_queue_remove(link);
+
+ nxt_fd_event_close(task->thread->engine, &lev->socket);
+ }
+
+ if (rt->listen_sockets != NULL) {
+
+ ls = rt->listen_sockets->elts;
+ n = rt->listen_sockets->nelts;
+
+ while (n != 0) {
+ nxt_socket_close(task, ls->socket);
+ ls->socket = -1;
+
+ ls++;
+ n--;
+ }
+
+ rt->listen_sockets->nelts = 0;
+ }
+
+ nxt_runtime_quit(task, exit_status);
+}