summaryrefslogtreecommitdiffhomepage
path: root/src
diff options
context:
space:
mode:
Diffstat (limited to 'src')
-rw-r--r--src/nxt_application.c585
-rw-r--r--src/nxt_application.h1
-rw-r--r--src/nxt_clone.h3
-rw-r--r--src/nxt_conf_validation.c44
-rw-r--r--src/nxt_conn_write.c33
-rw-r--r--src/nxt_fs.c76
-rw-r--r--src/nxt_fs.h35
-rw-r--r--src/nxt_h1proto.c36
-rw-r--r--src/nxt_http_chunk_parse.c2
-rw-r--r--src/nxt_http_proxy.c30
-rw-r--r--src/nxt_http_route.c10
-rw-r--r--src/nxt_http_static.c5
-rw-r--r--src/nxt_http_variables.c20
-rw-r--r--src/nxt_isolation.c1012
-rw-r--r--src/nxt_isolation.h18
-rw-r--r--src/nxt_lvlhsh.c14
-rw-r--r--src/nxt_main_process.c14
-rw-r--r--src/nxt_malloc.c16
-rw-r--r--src/nxt_php_sapi.c188
-rw-r--r--src/nxt_process.c382
-rw-r--r--src/nxt_process.h35
-rw-r--r--src/nxt_router.c20
-rw-r--r--src/nxt_router.h1
-rw-r--r--src/nxt_runtime.c6
-rw-r--r--src/nxt_string.h10
-rw-r--r--src/nxt_unit.c243
-rw-r--r--src/nxt_unit.h6
-rw-r--r--src/nxt_upstream.c4
-rw-r--r--src/python/nxt_python.c340
-rw-r--r--src/python/nxt_python.h60
-rw-r--r--src/python/nxt_python_asgi.c1227
-rw-r--r--src/python/nxt_python_asgi.h60
-rw-r--r--src/python/nxt_python_asgi_http.c591
-rw-r--r--src/python/nxt_python_asgi_lifespan.c505
-rw-r--r--src/python/nxt_python_asgi_str.c141
-rw-r--r--src/python/nxt_python_asgi_str.h69
-rw-r--r--src/python/nxt_python_asgi_websocket.c1084
-rw-r--r--src/python/nxt_python_wsgi.c (renamed from src/nxt_python_wsgi.c)442
38 files changed, 5837 insertions, 1531 deletions
diff --git a/src/nxt_application.c b/src/nxt_application.c
index 57e4615e..6935346c 100644
--- a/src/nxt_application.c
+++ b/src/nxt_application.c
@@ -14,6 +14,7 @@
#include <nxt_application.h>
#include <nxt_unit.h>
#include <nxt_port_memory_int.h>
+#include <nxt_isolation.h>
#include <glob.h>
@@ -41,45 +42,10 @@ static void nxt_discovery_quit(nxt_task_t *task, nxt_port_recv_msg_t *msg,
void *data);
static nxt_app_module_t *nxt_app_module_load(nxt_task_t *task,
const char *name);
-static nxt_int_t nxt_app_main_prefork(nxt_task_t *task, nxt_process_t *process,
- nxt_mp_t *mp);
static nxt_int_t nxt_app_setup(nxt_task_t *task, nxt_process_t *process);
static nxt_int_t nxt_app_set_environment(nxt_conf_value_t *environment);
static u_char *nxt_cstr_dup(nxt_mp_t *mp, u_char *dst, u_char *src);
-#if (NXT_HAVE_ISOLATION_ROOTFS)
-static nxt_int_t nxt_app_set_isolation_mounts(nxt_task_t *task,
- nxt_process_t *process, nxt_str_t *app_type);
-static nxt_int_t nxt_app_set_lang_mounts(nxt_task_t *task,
- nxt_process_t *process, nxt_array_t *syspaths);
-static nxt_int_t nxt_app_set_isolation_rootfs(nxt_task_t *task,
- nxt_conf_value_t *isolation, nxt_process_t *process);
-static nxt_int_t nxt_app_prepare_rootfs(nxt_task_t *task,
- nxt_process_t *process);
-#endif
-
-static nxt_int_t nxt_app_set_isolation(nxt_task_t *task,
- nxt_conf_value_t *isolation, nxt_process_t *process);
-
-#if (NXT_HAVE_CLONE)
-static nxt_int_t nxt_app_set_isolation_namespaces(nxt_task_t *task,
- nxt_conf_value_t *isolation, nxt_process_t *process);
-static nxt_int_t nxt_app_clone_flags(nxt_task_t *task,
- nxt_conf_value_t *namespaces, nxt_clone_t *clone);
-#endif
-
-#if (NXT_HAVE_CLONE_NEWUSER)
-static nxt_int_t nxt_app_set_isolation_creds(nxt_task_t *task,
- nxt_conf_value_t *isolation, nxt_process_t *process);
-static nxt_int_t nxt_app_isolation_credential_map(nxt_task_t *task,
- nxt_mp_t *mem_pool, nxt_conf_value_t *map_array,
- nxt_clone_credential_map_t *map);
-#endif
-
-#if (NXT_HAVE_PR_SET_NO_NEW_PRIVS)
-static nxt_int_t nxt_app_set_isolation_new_privs(nxt_task_t *task,
- nxt_conf_value_t *isolation, nxt_process_t *process);
-#endif
nxt_str_t nxt_server = nxt_string(NXT_SERVER);
@@ -126,7 +92,7 @@ const nxt_process_init_t nxt_discovery_process = {
const nxt_process_init_t nxt_app_process = {
.type = NXT_PROCESS_APP,
.setup = nxt_app_setup,
- .prefork = nxt_app_main_prefork,
+ .prefork = nxt_isolation_main_prefork,
.restart = 0,
.start = NULL, /* set to module->start */
.port_handlers = &nxt_app_process_port_handlers,
@@ -474,81 +440,6 @@ nxt_discovery_quit(nxt_task_t *task, nxt_port_recv_msg_t *msg, void *data)
static nxt_int_t
-nxt_app_main_prefork(nxt_task_t *task, nxt_process_t *process, nxt_mp_t *mp)
-{
- nxt_int_t cap_setid;
- nxt_int_t ret;
- nxt_runtime_t *rt;
- nxt_common_app_conf_t *app_conf;
-
- rt = task->thread->runtime;
- app_conf = process->data.app;
- cap_setid = rt->capabilities.setid;
-
- if (app_conf->isolation != NULL) {
- ret = nxt_app_set_isolation(task, app_conf->isolation, process);
- if (nxt_slow_path(ret != NXT_OK)) {
- return ret;
- }
- }
-
-#if (NXT_HAVE_CLONE_NEWUSER)
- if (nxt_is_clone_flag_set(process->isolation.clone.flags, NEWUSER)) {
- cap_setid = 1;
- }
-#endif
-
-#if (NXT_HAVE_ISOLATION_ROOTFS)
- if (process->isolation.rootfs != NULL) {
- ret = nxt_app_set_isolation_mounts(task, process, &app_conf->type);
- if (nxt_slow_path(ret != NXT_OK)) {
- return ret;
- }
- }
-#endif
-
- if (cap_setid) {
- ret = nxt_process_creds_set(task, process, &app_conf->user,
- &app_conf->group);
-
- if (nxt_slow_path(ret != NXT_OK)) {
- return ret;
- }
-
- } else {
- if (!nxt_str_eq(&app_conf->user, (u_char *) rt->user_cred.user,
- nxt_strlen(rt->user_cred.user)))
- {
- nxt_alert(task, "cannot set user \"%V\" for app \"%V\": "
- "missing capabilities", &app_conf->user, &app_conf->name);
-
- return NXT_ERROR;
- }
-
- if (app_conf->group.length > 0
- && !nxt_str_eq(&app_conf->group, (u_char *) rt->group,
- nxt_strlen(rt->group)))
- {
- nxt_alert(task, "cannot set group \"%V\" for app \"%V\": "
- "missing capabilities", &app_conf->group,
- &app_conf->name);
-
- return NXT_ERROR;
- }
- }
-
-#if (NXT_HAVE_CLONE_NEWUSER)
- ret = nxt_process_vldt_isolation_creds(task, process);
- if (nxt_slow_path(ret != NXT_OK)) {
- return ret;
- }
-#endif
-
- return NXT_OK;
-}
-
-
-static nxt_int_t
nxt_app_setup(nxt_task_t *task, nxt_process_t *process)
{
nxt_int_t ret;
@@ -594,13 +485,13 @@ nxt_app_setup(nxt_task_t *task, nxt_process_t *process)
#if (NXT_HAVE_ISOLATION_ROOTFS)
if (process->isolation.rootfs != NULL) {
if (process->isolation.mounts != NULL) {
- ret = nxt_app_prepare_rootfs(task, process);
+ ret = nxt_isolation_prepare_rootfs(task, process);
if (nxt_slow_path(ret != NXT_OK)) {
return ret;
}
}
- ret = nxt_process_change_root(task, process);
+ ret = nxt_isolation_change_root(task, process);
if (nxt_slow_path(ret != NXT_OK)) {
return NXT_ERROR;
}
@@ -686,474 +577,6 @@ nxt_app_set_environment(nxt_conf_value_t *environment)
}
-static nxt_int_t
-nxt_app_set_isolation(nxt_task_t *task, nxt_conf_value_t *isolation,
- nxt_process_t *process)
-{
-#if (NXT_HAVE_CLONE)
- if (nxt_slow_path(nxt_app_set_isolation_namespaces(task, isolation, process)
- != NXT_OK))
- {
- return NXT_ERROR;
- }
-#endif
-
-#if (NXT_HAVE_CLONE_NEWUSER)
- if (nxt_slow_path(nxt_app_set_isolation_creds(task, isolation, process)
- != NXT_OK))
- {
- return NXT_ERROR;
- }
-#endif
-
-#if (NXT_HAVE_ISOLATION_ROOTFS)
- if (nxt_slow_path(nxt_app_set_isolation_rootfs(task, isolation, process)
- != NXT_OK))
- {
- return NXT_ERROR;
- }
-#endif
-
-#if (NXT_HAVE_PR_SET_NO_NEW_PRIVS)
- if (nxt_slow_path(nxt_app_set_isolation_new_privs(task, isolation, process)
- != NXT_OK))
- {
- return NXT_ERROR;
- }
-#endif
-
- return NXT_OK;
-}
-
-
-#if (NXT_HAVE_CLONE)
-
-static nxt_int_t
-nxt_app_set_isolation_namespaces(nxt_task_t *task, nxt_conf_value_t *isolation,
- nxt_process_t *process)
-{
- nxt_int_t ret;
- nxt_conf_value_t *obj;
-
- static nxt_str_t nsname = nxt_string("namespaces");
-
- obj = nxt_conf_get_object_member(isolation, &nsname, NULL);
- if (obj != NULL) {
- ret = nxt_app_clone_flags(task, obj, &process->isolation.clone);
- if (nxt_slow_path(ret != NXT_OK)) {
- return NXT_ERROR;
- }
- }
-
- return NXT_OK;
-}
-
-#endif
-
-
-#if (NXT_HAVE_CLONE_NEWUSER)
-
-static nxt_int_t
-nxt_app_set_isolation_creds(nxt_task_t *task, nxt_conf_value_t *isolation,
- nxt_process_t *process)
-{
- nxt_int_t ret;
- nxt_clone_t *clone;
- nxt_conf_value_t *array;
-
- static nxt_str_t uidname = nxt_string("uidmap");
- static nxt_str_t gidname = nxt_string("gidmap");
-
- clone = &process->isolation.clone;
-
- array = nxt_conf_get_object_member(isolation, &uidname, NULL);
- if (array != NULL) {
- ret = nxt_app_isolation_credential_map(task, process->mem_pool, array,
- &clone->uidmap);
-
- if (nxt_slow_path(ret != NXT_OK)) {
- return NXT_ERROR;
- }
- }
-
- array = nxt_conf_get_object_member(isolation, &gidname, NULL);
- if (array != NULL) {
- ret = nxt_app_isolation_credential_map(task, process->mem_pool, array,
- &clone->gidmap);
-
- if (nxt_slow_path(ret != NXT_OK)) {
- return NXT_ERROR;
- }
- }
-
- return NXT_OK;
-}
-
-
-static nxt_int_t
-nxt_app_isolation_credential_map(nxt_task_t *task, nxt_mp_t *mp,
- nxt_conf_value_t *map_array, nxt_clone_credential_map_t *map)
-{
- nxt_int_t ret;
- nxt_uint_t i;
- nxt_conf_value_t *obj;
-
- static nxt_conf_map_t nxt_clone_map_entry_conf[] = {
- {
- nxt_string("container"),
- NXT_CONF_MAP_INT,
- offsetof(nxt_clone_map_entry_t, container),
- },
-
- {
- nxt_string("host"),
- NXT_CONF_MAP_INT,
- offsetof(nxt_clone_map_entry_t, host),
- },
-
- {
- nxt_string("size"),
- NXT_CONF_MAP_INT,
- offsetof(nxt_clone_map_entry_t, size),
- },
- };
-
- map->size = nxt_conf_array_elements_count(map_array);
-
- if (map->size == 0) {
- return NXT_OK;
- }
-
- map->map = nxt_mp_alloc(mp, map->size * sizeof(nxt_clone_map_entry_t));
- if (nxt_slow_path(map->map == NULL)) {
- return NXT_ERROR;
- }
-
- for (i = 0; i < map->size; i++) {
- obj = nxt_conf_get_array_element(map_array, i);
-
- ret = nxt_conf_map_object(mp, obj, nxt_clone_map_entry_conf,
- nxt_nitems(nxt_clone_map_entry_conf),
- map->map + i);
- if (nxt_slow_path(ret != NXT_OK)) {
- nxt_alert(task, "clone map entry map error");
- return NXT_ERROR;
- }
- }
-
- return NXT_OK;
-}
-
-#endif
-
-#if (NXT_HAVE_CLONE)
-
-static nxt_int_t
-nxt_app_clone_flags(nxt_task_t *task, nxt_conf_value_t *namespaces,
- nxt_clone_t *clone)
-{
- uint32_t index;
- nxt_str_t name;
- nxt_int_t flag;
- nxt_conf_value_t *value;
-
- index = 0;
-
- for ( ;; ) {
- value = nxt_conf_next_object_member(namespaces, &name, &index);
-
- if (value == NULL) {
- break;
- }
-
- flag = 0;
-
-#if (NXT_HAVE_CLONE_NEWUSER)
- if (nxt_str_eq(&name, "credential", 10)) {
- flag = CLONE_NEWUSER;
- }
-#endif
-
-#if (NXT_HAVE_CLONE_NEWPID)
- if (nxt_str_eq(&name, "pid", 3)) {
- flag = CLONE_NEWPID;
- }
-#endif
-
-#if (NXT_HAVE_CLONE_NEWNET)
- if (nxt_str_eq(&name, "network", 7)) {
- flag = CLONE_NEWNET;
- }
-#endif
-
-#if (NXT_HAVE_CLONE_NEWUTS)
- if (nxt_str_eq(&name, "uname", 5)) {
- flag = CLONE_NEWUTS;
- }
-#endif
-
-#if (NXT_HAVE_CLONE_NEWNS)
- if (nxt_str_eq(&name, "mount", 5)) {
- flag = CLONE_NEWNS;
- }
-#endif
-
-#if (NXT_HAVE_CLONE_NEWCGROUP)
- if (nxt_str_eq(&name, "cgroup", 6)) {
- flag = CLONE_NEWCGROUP;
- }
-#endif
-
- if (!flag) {
- nxt_alert(task, "unknown namespace flag: \"%V\"", &name);
- return NXT_ERROR;
- }
-
- if (nxt_conf_get_boolean(value)) {
- clone->flags |= flag;
- }
- }
-
- return NXT_OK;
-}
-
-#endif
-
-
-#if (NXT_HAVE_ISOLATION_ROOTFS)
-
-static nxt_int_t
-nxt_app_set_isolation_rootfs(nxt_task_t *task, nxt_conf_value_t *isolation,
- nxt_process_t *process)
-{
- nxt_str_t str;
- nxt_conf_value_t *obj;
-
- static nxt_str_t rootfs_name = nxt_string("rootfs");
-
- obj = nxt_conf_get_object_member(isolation, &rootfs_name, NULL);
- if (obj != NULL) {
- nxt_conf_get_string(obj, &str);
-
- if (nxt_slow_path(str.length <= 1 || str.start[0] != '/')) {
- nxt_log(task, NXT_LOG_ERR, "rootfs requires an absolute path other "
- "than \"/\" but given \"%V\"", &str);
-
- return NXT_ERROR;
- }
-
- if (str.start[str.length - 1] == '/') {
- str.length--;
- }
-
- process->isolation.rootfs = nxt_mp_alloc(process->mem_pool,
- str.length + 1);
-
- if (nxt_slow_path(process->isolation.rootfs == NULL)) {
- return NXT_ERROR;
- }
-
- nxt_memcpy(process->isolation.rootfs, str.start, str.length);
-
- process->isolation.rootfs[str.length] = '\0';
- }
-
- return NXT_OK;
-}
-
-
-static nxt_int_t
-nxt_app_set_isolation_mounts(nxt_task_t *task, nxt_process_t *process,
- nxt_str_t *app_type)
-{
- nxt_int_t ret, cap_chroot;
- nxt_runtime_t *rt;
- nxt_app_lang_module_t *lang;
-
- rt = task->thread->runtime;
- cap_chroot = rt->capabilities.chroot;
- lang = nxt_app_lang_module(rt, app_type);
-
- nxt_assert(lang != NULL);
-
-#if (NXT_HAVE_CLONE_NEWUSER)
- if (nxt_is_clone_flag_set(process->isolation.clone.flags, NEWUSER)) {
- cap_chroot = 1;
- }
-#endif
-
- if (!cap_chroot) {
- nxt_log(task, NXT_LOG_ERR, "The \"rootfs\" field requires privileges");
- return NXT_ERROR;
- }
-
- if (lang->mounts != NULL && lang->mounts->nelts > 0) {
- ret = nxt_app_set_lang_mounts(task, process, lang->mounts);
- if (nxt_slow_path(ret != NXT_OK)) {
- return NXT_ERROR;
- }
- }
-
- return NXT_OK;
-}
-
-
-static nxt_int_t
-nxt_app_set_lang_mounts(nxt_task_t *task, nxt_process_t *process,
- nxt_array_t *lang_mounts)
-{
- u_char *p;
- size_t i, n, rootfs_len, len;
- nxt_mp_t *mp;
- nxt_array_t *mounts;
- const u_char *rootfs;
- nxt_fs_mount_t *mnt, *lang_mnt;
-
- rootfs = process->isolation.rootfs;
- rootfs_len = nxt_strlen(rootfs);
- mp = process->mem_pool;
-
- /* copy to init mem pool */
- mounts = nxt_array_copy(mp, NULL, lang_mounts);
- if (mounts == NULL) {
- return NXT_ERROR;
- }
-
- n = mounts->nelts;
- mnt = mounts->elts;
- lang_mnt = lang_mounts->elts;
-
- for (i = 0; i < n; i++) {
- len = nxt_strlen(lang_mnt[i].dst);
-
- mnt[i].dst = nxt_mp_alloc(mp, rootfs_len + len + 1);
- if (mnt[i].dst == NULL) {
- return NXT_ERROR;
- }
-
- p = nxt_cpymem(mnt[i].dst, rootfs, rootfs_len);
- p = nxt_cpymem(p, lang_mnt[i].dst, len);
- *p = '\0';
- }
-
- process->isolation.mounts = mounts;
-
- return NXT_OK;
-}
-
-
-static nxt_int_t
-nxt_app_prepare_rootfs(nxt_task_t *task, nxt_process_t *process)
-{
- size_t i, n;
- nxt_int_t ret, hasproc;
- struct stat st;
- nxt_array_t *mounts;
- const u_char *dst;
- nxt_fs_mount_t *mnt;
-
- hasproc = 0;
-
-#if (NXT_HAVE_CLONE_NEWPID) && (NXT_HAVE_CLONE_NEWNS)
- nxt_fs_mount_t mount;
-
- if (nxt_is_clone_flag_set(process->isolation.clone.flags, NEWPID)
- && nxt_is_clone_flag_set(process->isolation.clone.flags, NEWNS))
- {
- /*
- * This mount point will automatically be gone when the namespace is
- * destroyed.
- */
-
- mount.fstype = (u_char *) "proc";
- mount.src = (u_char *) "proc";
- mount.dst = (u_char *) "/proc";
- mount.data = (u_char *) "";
- mount.flags = 0;
-
- ret = nxt_fs_mkdir_all(mount.dst, S_IRWXU | S_IRWXG | S_IRWXO);
- if (nxt_fast_path(ret == NXT_OK)) {
- ret = nxt_fs_mount(task, &mount);
- if (nxt_fast_path(ret == NXT_OK)) {
- hasproc = 1;
- }
-
- } else {
- nxt_log(task, NXT_LOG_WARN, "mkdir(%s) %E", mount.dst, nxt_errno);
- }
- }
-#endif
-
- mounts = process->isolation.mounts;
-
- n = mounts->nelts;
- mnt = mounts->elts;
-
- for (i = 0; i < n; i++) {
- dst = mnt[i].dst;
-
- if (nxt_slow_path(nxt_memcmp(mnt[i].fstype, "bind", 4) == 0
- && stat((const char *) mnt[i].src, &st) != 0))
- {
- nxt_log(task, NXT_LOG_WARN, "host path not found: %s", mnt[i].src);
- continue;
- }
-
- if (hasproc && nxt_memcmp(mnt[i].fstype, "proc", 4) == 0
- && nxt_memcmp(mnt[i].dst, "/proc", 5) == 0)
- {
- continue;
- }
-
- ret = nxt_fs_mkdir_all(dst, S_IRWXU | S_IRWXG | S_IRWXO);
- if (nxt_slow_path(ret != NXT_OK)) {
- nxt_alert(task, "mkdir(%s) %E", dst, nxt_errno);
- goto undo;
- }
-
- ret = nxt_fs_mount(task, &mnt[i]);
- if (nxt_slow_path(ret != NXT_OK)) {
- goto undo;
- }
- }
-
- return NXT_OK;
-
-undo:
-
- n = i + 1;
-
- for (i = 0; i < n; i++) {
- nxt_fs_unmount(mnt[i].dst);
- }
-
- return NXT_ERROR;
-}
-
-#endif
-
-
-#if (NXT_HAVE_PR_SET_NO_NEW_PRIVS)
-
-static nxt_int_t
-nxt_app_set_isolation_new_privs(nxt_task_t *task, nxt_conf_value_t *isolation,
- nxt_process_t *process)
-{
- nxt_conf_value_t *obj;
-
- static nxt_str_t new_privs_name = nxt_string("new_privs");
-
- obj = nxt_conf_get_object_member(isolation, &new_privs_name, NULL);
- if (obj != NULL) {
- process->isolation.new_privs = nxt_conf_get_boolean(obj);
- }
-
- return NXT_OK;
-}
-
-#endif
-
-
static u_char *
nxt_cstr_dup(nxt_mp_t *mp, u_char *dst, u_char *src)
{
diff --git a/src/nxt_application.h b/src/nxt_application.h
index 3144dc3f..cb49a033 100644
--- a/src/nxt_application.h
+++ b/src/nxt_application.h
@@ -50,6 +50,7 @@ typedef struct {
char *home;
nxt_str_t path;
nxt_str_t module;
+ char *callable;
} nxt_python_app_conf_t;
diff --git a/src/nxt_clone.h b/src/nxt_clone.h
index e89fd82d..c2066ce6 100644
--- a/src/nxt_clone.h
+++ b/src/nxt_clone.h
@@ -42,9 +42,6 @@ pid_t nxt_clone(nxt_int_t flags);
#if (NXT_HAVE_CLONE_NEWUSER)
-#define NXT_CLONE_MNT(flags) \
- ((flags & CLONE_NEWNS) == CLONE_NEWNS)
-
NXT_EXPORT nxt_int_t nxt_clone_credential_map(nxt_task_t *task, pid_t pid,
nxt_credential_t *creds, nxt_clone_t *clone);
NXT_EXPORT nxt_int_t nxt_clone_vldt_credential_uidmap(nxt_task_t *task,
diff --git a/src/nxt_conf_validation.c b/src/nxt_conf_validation.c
index b5530b85..4364057b 100644
--- a/src/nxt_conf_validation.c
+++ b/src/nxt_conf_validation.c
@@ -496,12 +496,6 @@ static nxt_conf_vldt_object_t nxt_conf_vldt_app_limits_members[] = {
NULL,
NULL },
- { nxt_string("reschedule_timeout"),
- NXT_CONF_VLDT_INTEGER,
- 0,
- NULL,
- NULL },
-
{ nxt_string("requests"),
NXT_CONF_VLDT_INTEGER,
0,
@@ -622,6 +616,21 @@ static nxt_conf_vldt_object_t nxt_conf_vldt_app_procmap_members[] = {
#endif
+#if (NXT_HAVE_ISOLATION_ROOTFS)
+
+static nxt_conf_vldt_object_t nxt_conf_vldt_app_automount_members[] = {
+ { nxt_string("language_deps"),
+ NXT_CONF_VLDT_BOOLEAN,
+ 0,
+ NULL,
+ NULL },
+
+ NXT_CONF_VLDT_END
+};
+
+#endif
+
+
static nxt_conf_vldt_object_t nxt_conf_vldt_app_isolation_members[] = {
{ nxt_string("namespaces"),
NXT_CONF_VLDT_OBJECT,
@@ -653,6 +662,12 @@ static nxt_conf_vldt_object_t nxt_conf_vldt_app_isolation_members[] = {
NULL,
NULL },
+ { nxt_string("automount"),
+ NXT_CONF_VLDT_OBJECT,
+ 0,
+ &nxt_conf_vldt_object,
+ (void *) &nxt_conf_vldt_app_automount_members },
+
#endif
#if (NXT_HAVE_PR_SET_NO_NEW_PRIVS)
@@ -758,6 +773,12 @@ static nxt_conf_vldt_object_t nxt_conf_vldt_python_members[] = {
NULL,
NULL },
+ { nxt_string("callable"),
+ NXT_CONF_VLDT_STRING,
+ 0,
+ NULL,
+ NULL },
+
NXT_CONF_VLDT_NEXT(&nxt_conf_vldt_common_members)
};
@@ -1188,10 +1209,17 @@ static nxt_int_t
nxt_conf_vldt_listener(nxt_conf_validation_t *vldt, nxt_str_t *name,
nxt_conf_value_t *value)
{
- nxt_int_t ret;
+ nxt_int_t ret;
+ nxt_sockaddr_t *sa;
- ret = nxt_conf_vldt_type(vldt, name, value, NXT_CONF_VLDT_OBJECT);
+ sa = nxt_sockaddr_parse(vldt->pool, name);
+ if (nxt_slow_path(sa == NULL)) {
+ return nxt_conf_vldt_error(vldt,
+ "The listener address \"%V\" is invalid.",
+ name);
+ }
+ ret = nxt_conf_vldt_type(vldt, name, value, NXT_CONF_VLDT_OBJECT);
if (ret != NXT_OK) {
return ret;
}
diff --git a/src/nxt_conn_write.c b/src/nxt_conn_write.c
index d7a6a8da..bcf9e8fa 100644
--- a/src/nxt_conn_write.c
+++ b/src/nxt_conn_write.c
@@ -246,24 +246,47 @@ nxt_sendfile(int fd, int s, off_t pos, size_t size)
{
ssize_t res;
-#ifdef NXT_HAVE_MACOSX_SENDFILE
+#if (NXT_HAVE_MACOSX_SENDFILE)
+
off_t sent = size;
int rc = sendfile(fd, s, pos, &sent, NULL, 0);
res = (rc == 0 || sent > 0) ? sent : -1;
-#endif
-#ifdef NXT_HAVE_FREEBSD_SENDFILE
+#elif (NXT_HAVE_FREEBSD_SENDFILE)
+
off_t sent = 0;
int rc = sendfile(fd, s, pos, size, NULL, &sent, 0);
res = (rc == 0 || sent > 0) ? sent : -1;
-#endif
-#ifdef NXT_HAVE_LINUX_SENDFILE
+#elif (NXT_HAVE_LINUX_SENDFILE)
+
res = sendfile(s, fd, &pos, size);
+
+#else
+
+ int err;
+ void *map;
+ off_t page_off;
+
+ page_off = pos % nxt_pagesize;
+
+ map = nxt_mem_mmap(NULL, size + page_off, PROT_READ, MAP_SHARED, fd,
+ pos - page_off);
+ if (nxt_slow_path(map == MAP_FAILED)) {
+ return -1;
+ }
+
+ res = write(s, nxt_pointer_to(map, page_off), size);
+
+ /* Backup and restore errno to catch socket errors in the upper level. */
+ err = errno;
+ nxt_mem_munmap(map, size + page_off);
+ errno = err;
+
#endif
return res;
diff --git a/src/nxt_fs.c b/src/nxt_fs.c
index fe271802..0228c25a 100644
--- a/src/nxt_fs.c
+++ b/src/nxt_fs.c
@@ -40,30 +40,31 @@ nxt_fs_mount(nxt_task_t *task, nxt_fs_mount_t *mnt)
nxt_int_t
nxt_fs_mount(nxt_task_t *task, nxt_fs_mount_t *mnt)
{
+ u_char *data, *p, *end;
+ size_t iovlen;
+ nxt_int_t ret;
const char *fstype;
- uint8_t is_bind, is_proc;
- struct iovec iov[8];
+ struct iovec iov[128];
char errmsg[256];
- is_bind = nxt_strncmp(mnt->fstype, "bind", 4) == 0;
- is_proc = nxt_strncmp(mnt->fstype, "proc", 4) == 0;
+ if (nxt_strncmp(mnt->fstype, "bind", 4) == 0) {
+ fstype = "nullfs";
- if (nxt_slow_path(!is_bind && !is_proc)) {
- nxt_alert(task, "mount type \"%s\" not implemented.", mnt->fstype);
- return NXT_ERROR;
- }
+ } else if (nxt_strncmp(mnt->fstype, "proc", 4) == 0) {
+ fstype = "procfs";
- if (is_bind) {
- fstype = "nullfs";
+ } else if (nxt_strncmp(mnt->fstype, "tmpfs", 5) == 0) {
+ fstype = "tmpfs";
} else {
- fstype = "procfs";
+ nxt_alert(task, "mount type \"%s\" not implemented.", mnt->fstype);
+ return NXT_ERROR;
}
iov[0].iov_base = (void *) "fstype";
iov[0].iov_len = 7;
iov[1].iov_base = (void *) fstype;
- iov[1].iov_len = strlen(fstype) + 1;
+ iov[1].iov_len = nxt_strlen(fstype) + 1;
iov[2].iov_base = (void *) "fspath";
iov[2].iov_len = 7;
iov[3].iov_base = (void *) mnt->dst;
@@ -77,12 +78,55 @@ nxt_fs_mount(nxt_task_t *task, nxt_fs_mount_t *mnt)
iov[7].iov_base = (void *) errmsg;
iov[7].iov_len = sizeof(errmsg);
- if (nxt_slow_path(nmount(iov, 8, 0) < 0)) {
- nxt_alert(task, "nmount(%p, 8, 0) %s", errmsg);
- return NXT_ERROR;
+ iovlen = 8;
+
+ data = NULL;
+
+ if (mnt->data != NULL) {
+ data = (u_char *) nxt_strdup(mnt->data);
+ if (nxt_slow_path(data == NULL)) {
+ return NXT_ERROR;
+ }
+
+ end = data - 1;
+
+ do {
+ p = end + 1;
+ end = nxt_strchr(p, '=');
+ if (end == NULL) {
+ break;
+ }
+
+ *end = '\0';
+
+ iov[iovlen++].iov_base = (void *) p;
+ iov[iovlen++].iov_len = (end - p) + 1;
+
+ p = end + 1;
+
+ end = nxt_strchr(p, ',');
+ if (end != NULL) {
+ *end = '\0';
+ }
+
+ iov[iovlen++].iov_base = (void *) p;
+ iov[iovlen++].iov_len = nxt_strlen(p) + 1;
+
+ } while (end != NULL && nxt_nitems(iov) > (iovlen + 2));
}
- return NXT_OK;
+ ret = NXT_OK;
+
+ if (nxt_slow_path(nmount(iov, iovlen, 0) < 0)) {
+ nxt_alert(task, "nmount(%p, %d, 0) %s", iov, iovlen, errmsg);
+ ret = NXT_ERROR;
+ }
+
+ if (data != NULL) {
+ free(data);
+ }
+
+ return ret;
}
#endif
diff --git a/src/nxt_fs.h b/src/nxt_fs.h
index 85c78b27..bbd7ab9f 100644
--- a/src/nxt_fs.h
+++ b/src/nxt_fs.h
@@ -18,13 +18,38 @@
#define NXT_MS_REC 0
#endif
+#ifdef MS_NOSUID
+#define NXT_MS_NOSUID MS_NOSUID
+#else
+#define NXT_MS_NOSUID 0
+#endif
+
+#ifdef MS_NOEXEC
+#define NXT_MS_NOEXEC MS_NOEXEC
+#else
+#define NXT_MS_NOEXEC 0
+#endif
+
+#ifdef MS_RELATIME
+#define NXT_MS_RELATIME MS_RELATIME
+#else
+#define NXT_MS_RELATIME 0
+#endif
+
+#ifdef MS_NODEV
+#define NXT_MS_NODEV MS_NODEV
+#else
+#define NXT_MS_NODEV 0
+#endif
+
typedef struct {
- u_char *src;
- u_char *dst;
- u_char *fstype;
- nxt_int_t flags;
- u_char *data;
+ u_char *src;
+ u_char *dst;
+ u_char *fstype;
+ nxt_int_t flags;
+ u_char *data;
+ nxt_uint_t builtin; /* 1-bit */
} nxt_fs_mount_t;
diff --git a/src/nxt_h1proto.c b/src/nxt_h1proto.c
index b34be019..dc23d7c4 100644
--- a/src/nxt_h1proto.c
+++ b/src/nxt_h1proto.c
@@ -503,7 +503,10 @@ nxt_h1p_conn_request_init(nxt_task_t *task, void *obj, void *data)
joint->count++;
r->conf = joint;
- c->local = joint->socket_conf->sockaddr;
+
+ if (c->local == NULL) {
+ c->local = joint->socket_conf->sockaddr;
+ }
nxt_h1p_conn_request_header_parse(task, c, h1p);
return;
@@ -734,9 +737,16 @@ nxt_h1p_connection(void *ctx, nxt_http_field_t *field, uintptr_t data)
r = ctx;
field->hopbyhop = 1;
- if (field->value_length == 5 && nxt_memcmp(field->value, "close", 5) == 0) {
+ if (field->value_length == 5
+ && nxt_memcasecmp(field->value, "close", 5) == 0)
+ {
r->proto.h1->keepalive = 0;
+ } else if (field->value_length == 10
+ && nxt_memcasecmp(field->value, "keep-alive", 10) == 0)
+ {
+ r->proto.h1->keepalive = 1;
+
} else if (field->value_length == 7
&& nxt_memcasecmp(field->value, "upgrade", 7) == 0)
{
@@ -1749,7 +1759,15 @@ nxt_h1p_conn_timer_value(nxt_conn_t *c, uintptr_t data)
joint = c->listen->socket.data;
- return nxt_value_at(nxt_msec_t, joint->socket_conf, data);
+ if (nxt_fast_path(joint != NULL)) {
+ return nxt_value_at(nxt_msec_t, joint->socket_conf, data);
+ }
+
+ /*
+ * Listening socket had been closed while
+ * connection was in keep-alive state.
+ */
+ return 1;
}
@@ -1829,6 +1847,8 @@ nxt_h1p_idle_close(nxt_task_t *task, void *obj, void *data)
nxt_debug(task, "h1p idle close");
+ nxt_queue_remove(&c->link);
+
nxt_h1p_idle_response(task, c);
}
@@ -1863,10 +1883,9 @@ nxt_h1p_idle_timeout(nxt_task_t *task, void *obj, void *data)
static void
nxt_h1p_idle_response(nxt_task_t *task, nxt_conn_t *c)
{
- u_char *p;
- size_t size;
- nxt_buf_t *out, *last;
- nxt_h1proto_t *h1p;
+ u_char *p;
+ size_t size;
+ nxt_buf_t *out, *last;
size = nxt_length(NXT_H1P_IDLE_TIMEOUT)
+ nxt_http_date_cache.size
@@ -1896,9 +1915,6 @@ nxt_h1p_idle_response(nxt_task_t *task, nxt_conn_t *c)
last->completion_handler = nxt_h1p_idle_response_sent;
last->parent = c;
- h1p = c->socket.data;
- h1p->conn_write_tail = &last->next;
-
c->write = out;
c->write_state = &nxt_h1p_timeout_response_state;
diff --git a/src/nxt_http_chunk_parse.c b/src/nxt_http_chunk_parse.c
index 2164524b..be3a2023 100644
--- a/src/nxt_http_chunk_parse.c
+++ b/src/nxt_http_chunk_parse.c
@@ -74,7 +74,7 @@ nxt_http_chunk_parse(nxt_task_t *task, nxt_http_chunk_parse_t *hcp,
goto next;
}
- /* ret == NXT_HTTP_CHUNK_END_ON_BORDER */
+ /* ret == NXT_HTTP_CHUNK_END */
}
ch = *hcp->pos++;
diff --git a/src/nxt_http_proxy.c b/src/nxt_http_proxy.c
index 34d0f36e..338d9fce 100644
--- a/src/nxt_http_proxy.c
+++ b/src/nxt_http_proxy.c
@@ -27,7 +27,6 @@ static void nxt_http_proxy_header_send(nxt_task_t *task, void *obj, void *data);
static void nxt_http_proxy_header_sent(nxt_task_t *task, void *obj, void *data);
static void nxt_http_proxy_header_read(nxt_task_t *task, void *obj, void *data);
static void nxt_http_proxy_send_body(nxt_task_t *task, void *obj, void *data);
-static void nxt_http_proxy_read(nxt_task_t *task, void *obj, void *data);
static void nxt_http_proxy_buf_mem_completion(nxt_task_t *task, void *obj,
void *data);
static void nxt_http_proxy_error(nxt_task_t *task, void *obj, void *data);
@@ -275,39 +274,16 @@ nxt_http_proxy_header_read(nxt_task_t *task, void *obj, void *data)
}
-static void
-nxt_http_proxy_send_body(nxt_task_t *task, void *obj, void *data)
-{
- nxt_buf_t *out;
- nxt_http_peer_t *peer;
- nxt_http_request_t *r;
-
- r = obj;
- peer = data;
- out = peer->body;
-
- if (out != NULL) {
- peer->body = NULL;
- nxt_http_request_send(task, r, out);
-
- }
-
- if (!peer->closed) {
- nxt_http_proto[peer->protocol].peer_read(task, peer);
- }
-}
-
-
static const nxt_http_request_state_t nxt_http_proxy_read_state
nxt_aligned(64) =
{
- .ready_handler = nxt_http_proxy_read,
+ .ready_handler = nxt_http_proxy_send_body,
.error_handler = nxt_http_proxy_error,
};
static void
-nxt_http_proxy_read(nxt_task_t *task, void *obj, void *data)
+nxt_http_proxy_send_body(nxt_task_t *task, void *obj, void *data)
{
nxt_buf_t *out;
nxt_http_peer_t *peer;
@@ -316,9 +292,9 @@ nxt_http_proxy_read(nxt_task_t *task, void *obj, void *data)
r = obj;
peer = data;
out = peer->body;
- peer->body = NULL;
if (out != NULL) {
+ peer->body = NULL;
nxt_http_request_send(task, r, out);
}
diff --git a/src/nxt_http_route.c b/src/nxt_http_route.c
index 0b2103cd..ae91076a 100644
--- a/src/nxt_http_route.c
+++ b/src/nxt_http_route.c
@@ -1085,10 +1085,6 @@ nxt_http_route_pattern_create(nxt_task_t *task, nxt_mp_t *mp,
pattern->negative = 1;
pattern->any = 0;
-
- if (test.length == 0) {
- return NXT_OK;
- }
}
if (test.length == 0) {
@@ -1594,6 +1590,7 @@ nxt_http_action_t *
nxt_http_action_create(nxt_task_t *task, nxt_router_temp_conf_t *tmcf,
nxt_str_t *name)
{
+ nxt_int_t ret;
nxt_http_action_t *action;
action = nxt_mp_alloc(tmcf->router_conf->mem_pool,
@@ -1605,7 +1602,10 @@ nxt_http_action_create(nxt_task_t *task, nxt_router_temp_conf_t *tmcf,
action->name = *name;
action->handler = NULL;
- nxt_http_action_resolve(task, tmcf, action);
+ ret = nxt_http_action_resolve(task, tmcf, action);
+ if (nxt_slow_path(ret != NXT_OK)) {
+ return NULL;
+ }
return action;
}
diff --git a/src/nxt_http_static.c b/src/nxt_http_static.c
index ee18be1b..5687ef2c 100644
--- a/src/nxt_http_static.c
+++ b/src/nxt_http_static.c
@@ -470,14 +470,17 @@ nxt_http_static_mtypes_init(nxt_mp_t *mp, nxt_lvlhsh_t *hash)
{ nxt_string("text/css"), ".css" },
{ nxt_string("image/svg+xml"), ".svg" },
- { nxt_string("image/svg+xml"), ".svg" },
{ nxt_string("image/webp"), ".webp" },
{ nxt_string("image/png"), ".png" },
+ { nxt_string("image/apng"), ".apng" },
{ nxt_string("image/jpeg"), ".jpeg" },
{ nxt_string("image/jpeg"), ".jpg" },
{ nxt_string("image/gif"), ".gif" },
{ nxt_string("image/x-icon"), ".ico" },
+ { nxt_string("image/avif"), ".avif" },
+ { nxt_string("image/avif-sequence"), ".avifs" },
+
{ nxt_string("font/woff"), ".woff" },
{ nxt_string("font/woff2"), ".woff2" },
{ nxt_string("font/otf"), ".otf" },
diff --git a/src/nxt_http_variables.c b/src/nxt_http_variables.c
index 222d717c..1c0b561d 100644
--- a/src/nxt_http_variables.c
+++ b/src/nxt_http_variables.c
@@ -11,6 +11,8 @@ static nxt_int_t nxt_http_var_method(nxt_task_t *task, nxt_var_query_t *query,
nxt_str_t *str, void *ctx);
static nxt_int_t nxt_http_var_uri(nxt_task_t *task, nxt_var_query_t *query,
nxt_str_t *str, void *ctx);
+static nxt_int_t nxt_http_var_host(nxt_task_t *task, nxt_var_query_t *query,
+ nxt_str_t *str, void *ctx);
static nxt_var_decl_t nxt_http_vars[] = {
@@ -21,6 +23,10 @@ static nxt_var_decl_t nxt_http_vars[] = {
{ nxt_string("uri"),
&nxt_http_var_uri,
0 },
+
+ { nxt_string("host"),
+ &nxt_http_var_host,
+ 0 },
};
@@ -57,3 +63,17 @@ nxt_http_var_uri(nxt_task_t *task, nxt_var_query_t *query, nxt_str_t *str,
return NXT_OK;
}
+
+
+static nxt_int_t
+nxt_http_var_host(nxt_task_t *task, nxt_var_query_t *query, nxt_str_t *str,
+ void *ctx)
+{
+ nxt_http_request_t *r;
+
+ r = ctx;
+
+ *str = r->host;
+
+ return NXT_OK;
+}
diff --git a/src/nxt_isolation.c b/src/nxt_isolation.c
new file mode 100644
index 00000000..ac7a37e8
--- /dev/null
+++ b/src/nxt_isolation.c
@@ -0,0 +1,1012 @@
+/*
+ * Copyright (C) NGINX, Inc.
+ */
+
+#include <nxt_main.h>
+#include <nxt_application.h>
+#include <nxt_process.h>
+#include <nxt_isolation.h>
+
+#if (NXT_HAVE_PIVOT_ROOT)
+#include <mntent.h>
+#endif
+
+
+static nxt_int_t nxt_isolation_set(nxt_task_t *task,
+ nxt_conf_value_t *isolation, nxt_process_t *process);
+
+#if (NXT_HAVE_CLONE)
+static nxt_int_t nxt_isolation_set_namespaces(nxt_task_t *task,
+ nxt_conf_value_t *isolation, nxt_process_t *process);
+static nxt_int_t nxt_isolation_clone_flags(nxt_task_t *task,
+ nxt_conf_value_t *namespaces, nxt_clone_t *clone);
+#endif
+
+#if (NXT_HAVE_CLONE_NEWUSER)
+static nxt_int_t nxt_isolation_set_creds(nxt_task_t *task,
+ nxt_conf_value_t *isolation, nxt_process_t *process);
+static nxt_int_t nxt_isolation_credential_map(nxt_task_t *task,
+ nxt_mp_t *mem_pool, nxt_conf_value_t *map_array,
+ nxt_clone_credential_map_t *map);
+static nxt_int_t nxt_isolation_vldt_creds(nxt_task_t *task,
+ nxt_process_t *process);
+#endif
+
+#if (NXT_HAVE_ISOLATION_ROOTFS)
+static nxt_int_t nxt_isolation_set_rootfs(nxt_task_t *task,
+ nxt_conf_value_t *isolation, nxt_process_t *process);
+static nxt_int_t nxt_isolation_set_automount(nxt_task_t *task,
+ nxt_conf_value_t *isolation, nxt_process_t *process);
+static nxt_int_t nxt_isolation_set_mounts(nxt_task_t *task,
+ nxt_process_t *process, nxt_str_t *app_type);
+static nxt_int_t nxt_isolation_set_lang_mounts(nxt_task_t *task,
+ nxt_process_t *process, nxt_array_t *syspaths);
+static void nxt_isolation_unmount_all(nxt_task_t *task, nxt_process_t *process);
+
+#if (NXT_HAVE_PIVOT_ROOT) && (NXT_HAVE_CLONE_NEWNS)
+static nxt_int_t nxt_isolation_pivot_root(nxt_task_t *task, const char *rootfs);
+static nxt_int_t nxt_isolation_make_private_mount(nxt_task_t *task,
+ const char *rootfs);
+nxt_inline int nxt_pivot_root(const char *new_root, const char *old_root);
+#endif
+
+static nxt_int_t nxt_isolation_chroot(nxt_task_t *task, const char *path);
+#endif
+
+#if (NXT_HAVE_PR_SET_NO_NEW_PRIVS)
+static nxt_int_t nxt_isolation_set_new_privs(nxt_task_t *task,
+ nxt_conf_value_t *isolation, nxt_process_t *process);
+#endif
+
+
+nxt_int_t
+nxt_isolation_main_prefork(nxt_task_t *task, nxt_process_t *process,
+ nxt_mp_t *mp)
+{
+ nxt_int_t cap_setid;
+ nxt_int_t ret;
+ nxt_runtime_t *rt;
+ nxt_common_app_conf_t *app_conf;
+
+ rt = task->thread->runtime;
+ app_conf = process->data.app;
+ cap_setid = rt->capabilities.setid;
+
+ if (app_conf->isolation != NULL) {
+ ret = nxt_isolation_set(task, app_conf->isolation, process);
+ if (nxt_slow_path(ret != NXT_OK)) {
+ return ret;
+ }
+ }
+
+#if (NXT_HAVE_CLONE_NEWUSER)
+ if (nxt_is_clone_flag_set(process->isolation.clone.flags, NEWUSER)) {
+ cap_setid = 1;
+ }
+#endif
+
+#if (NXT_HAVE_ISOLATION_ROOTFS)
+ if (process->isolation.rootfs != NULL) {
+ ret = nxt_isolation_set_mounts(task, process, &app_conf->type);
+ if (nxt_slow_path(ret != NXT_OK)) {
+ return ret;
+ }
+ }
+#endif
+
+ if (cap_setid) {
+ ret = nxt_process_creds_set(task, process, &app_conf->user,
+ &app_conf->group);
+
+ if (nxt_slow_path(ret != NXT_OK)) {
+ return ret;
+ }
+
+ } else {
+ if (!nxt_str_eq(&app_conf->user, (u_char *) rt->user_cred.user,
+ nxt_strlen(rt->user_cred.user)))
+ {
+ nxt_alert(task, "cannot set user \"%V\" for app \"%V\": "
+ "missing capabilities", &app_conf->user, &app_conf->name);
+
+ return NXT_ERROR;
+ }
+
+ if (app_conf->group.length > 0
+ && !nxt_str_eq(&app_conf->group, (u_char *) rt->group,
+ nxt_strlen(rt->group)))
+ {
+ nxt_alert(task, "cannot set group \"%V\" for app \"%V\": "
+ "missing capabilities", &app_conf->group,
+ &app_conf->name);
+
+ return NXT_ERROR;
+ }
+ }
+
+#if (NXT_HAVE_CLONE_NEWUSER)
+ ret = nxt_isolation_vldt_creds(task, process);
+ if (nxt_slow_path(ret != NXT_OK)) {
+ return ret;
+ }
+#endif
+
+ return NXT_OK;
+}
+
+
+static nxt_int_t
+nxt_isolation_set(nxt_task_t *task, nxt_conf_value_t *isolation,
+ nxt_process_t *process)
+{
+#if (NXT_HAVE_CLONE)
+ if (nxt_slow_path(nxt_isolation_set_namespaces(task, isolation, process)
+ != NXT_OK))
+ {
+ return NXT_ERROR;
+ }
+#endif
+
+#if (NXT_HAVE_CLONE_NEWUSER)
+ if (nxt_slow_path(nxt_isolation_set_creds(task, isolation, process)
+ != NXT_OK))
+ {
+ return NXT_ERROR;
+ }
+#endif
+
+#if (NXT_HAVE_ISOLATION_ROOTFS)
+ if (nxt_slow_path(nxt_isolation_set_rootfs(task, isolation, process)
+ != NXT_OK))
+ {
+ return NXT_ERROR;
+ }
+
+ if (nxt_slow_path(nxt_isolation_set_automount(task, isolation, process)
+ != NXT_OK))
+ {
+ return NXT_ERROR;
+ }
+#endif
+
+#if (NXT_HAVE_PR_SET_NO_NEW_PRIVS)
+ if (nxt_slow_path(nxt_isolation_set_new_privs(task, isolation, process)
+ != NXT_OK))
+ {
+ return NXT_ERROR;
+ }
+#endif
+
+ return NXT_OK;
+}
+
+
+#if (NXT_HAVE_CLONE)
+
+static nxt_int_t
+nxt_isolation_set_namespaces(nxt_task_t *task, nxt_conf_value_t *isolation,
+ nxt_process_t *process)
+{
+ nxt_int_t ret;
+ nxt_conf_value_t *obj;
+
+ static nxt_str_t nsname = nxt_string("namespaces");
+
+ obj = nxt_conf_get_object_member(isolation, &nsname, NULL);
+ if (obj != NULL) {
+ ret = nxt_isolation_clone_flags(task, obj, &process->isolation.clone);
+ if (nxt_slow_path(ret != NXT_OK)) {
+ return NXT_ERROR;
+ }
+ }
+
+ return NXT_OK;
+}
+
+#endif
+
+
+#if (NXT_HAVE_CLONE_NEWUSER)
+
+static nxt_int_t
+nxt_isolation_set_creds(nxt_task_t *task, nxt_conf_value_t *isolation,
+ nxt_process_t *process)
+{
+ nxt_int_t ret;
+ nxt_clone_t *clone;
+ nxt_conf_value_t *array;
+
+ static nxt_str_t uidname = nxt_string("uidmap");
+ static nxt_str_t gidname = nxt_string("gidmap");
+
+ clone = &process->isolation.clone;
+
+ array = nxt_conf_get_object_member(isolation, &uidname, NULL);
+ if (array != NULL) {
+ ret = nxt_isolation_credential_map(task, process->mem_pool, array,
+ &clone->uidmap);
+
+ if (nxt_slow_path(ret != NXT_OK)) {
+ return NXT_ERROR;
+ }
+ }
+
+ array = nxt_conf_get_object_member(isolation, &gidname, NULL);
+ if (array != NULL) {
+ ret = nxt_isolation_credential_map(task, process->mem_pool, array,
+ &clone->gidmap);
+
+ if (nxt_slow_path(ret != NXT_OK)) {
+ return NXT_ERROR;
+ }
+ }
+
+ return NXT_OK;
+}
+
+
+static nxt_int_t
+nxt_isolation_credential_map(nxt_task_t *task, nxt_mp_t *mp,
+ nxt_conf_value_t *map_array, nxt_clone_credential_map_t *map)
+{
+ nxt_int_t ret;
+ nxt_uint_t i;
+ nxt_conf_value_t *obj;
+
+ static nxt_conf_map_t nxt_clone_map_entry_conf[] = {
+ {
+ nxt_string("container"),
+ NXT_CONF_MAP_INT,
+ offsetof(nxt_clone_map_entry_t, container),
+ },
+
+ {
+ nxt_string("host"),
+ NXT_CONF_MAP_INT,
+ offsetof(nxt_clone_map_entry_t, host),
+ },
+
+ {
+ nxt_string("size"),
+ NXT_CONF_MAP_INT,
+ offsetof(nxt_clone_map_entry_t, size),
+ },
+ };
+
+ map->size = nxt_conf_array_elements_count(map_array);
+
+ if (map->size == 0) {
+ return NXT_OK;
+ }
+
+ map->map = nxt_mp_alloc(mp, map->size * sizeof(nxt_clone_map_entry_t));
+ if (nxt_slow_path(map->map == NULL)) {
+ return NXT_ERROR;
+ }
+
+ for (i = 0; i < map->size; i++) {
+ obj = nxt_conf_get_array_element(map_array, i);
+
+ ret = nxt_conf_map_object(mp, obj, nxt_clone_map_entry_conf,
+ nxt_nitems(nxt_clone_map_entry_conf),
+ map->map + i);
+ if (nxt_slow_path(ret != NXT_OK)) {
+ nxt_alert(task, "clone map entry map error");
+ return NXT_ERROR;
+ }
+ }
+
+ return NXT_OK;
+}
+
+
+static nxt_int_t
+nxt_isolation_vldt_creds(nxt_task_t *task, nxt_process_t *process)
+{
+ nxt_int_t ret;
+ nxt_clone_t *clone;
+ nxt_credential_t *creds;
+
+ clone = &process->isolation.clone;
+ creds = process->user_cred;
+
+ if (clone->uidmap.size == 0 && clone->gidmap.size == 0) {
+ return NXT_OK;
+ }
+
+ if (!nxt_is_clone_flag_set(clone->flags, NEWUSER)) {
+ if (nxt_slow_path(clone->uidmap.size > 0)) {
+ nxt_log(task, NXT_LOG_ERR, "\"uidmap\" is set but "
+ "\"isolation.namespaces.credential\" is false or unset");
+
+ return NXT_ERROR;
+ }
+
+ if (nxt_slow_path(clone->gidmap.size > 0)) {
+ nxt_log(task, NXT_LOG_ERR, "\"gidmap\" is set but "
+ "\"isolation.namespaces.credential\" is false or unset");
+
+ return NXT_ERROR;
+ }
+
+ return NXT_OK;
+ }
+
+ ret = nxt_clone_vldt_credential_uidmap(task, &clone->uidmap, creds);
+ if (nxt_slow_path(ret != NXT_OK)) {
+ return NXT_ERROR;
+ }
+
+ return nxt_clone_vldt_credential_gidmap(task, &clone->gidmap, creds);
+}
+
+#endif
+
+
+#if (NXT_HAVE_CLONE)
+
+static nxt_int_t
+nxt_isolation_clone_flags(nxt_task_t *task, nxt_conf_value_t *namespaces,
+ nxt_clone_t *clone)
+{
+ uint32_t index;
+ nxt_str_t name;
+ nxt_int_t flag;
+ nxt_conf_value_t *value;
+
+ index = 0;
+
+ for ( ;; ) {
+ value = nxt_conf_next_object_member(namespaces, &name, &index);
+
+ if (value == NULL) {
+ break;
+ }
+
+ flag = 0;
+
+#if (NXT_HAVE_CLONE_NEWUSER)
+ if (nxt_str_eq(&name, "credential", 10)) {
+ flag = CLONE_NEWUSER;
+ }
+#endif
+
+#if (NXT_HAVE_CLONE_NEWPID)
+ if (nxt_str_eq(&name, "pid", 3)) {
+ flag = CLONE_NEWPID;
+ }
+#endif
+
+#if (NXT_HAVE_CLONE_NEWNET)
+ if (nxt_str_eq(&name, "network", 7)) {
+ flag = CLONE_NEWNET;
+ }
+#endif
+
+#if (NXT_HAVE_CLONE_NEWUTS)
+ if (nxt_str_eq(&name, "uname", 5)) {
+ flag = CLONE_NEWUTS;
+ }
+#endif
+
+#if (NXT_HAVE_CLONE_NEWNS)
+ if (nxt_str_eq(&name, "mount", 5)) {
+ flag = CLONE_NEWNS;
+ }
+#endif
+
+#if (NXT_HAVE_CLONE_NEWCGROUP)
+ if (nxt_str_eq(&name, "cgroup", 6)) {
+ flag = CLONE_NEWCGROUP;
+ }
+#endif
+
+ if (!flag) {
+ nxt_alert(task, "unknown namespace flag: \"%V\"", &name);
+ return NXT_ERROR;
+ }
+
+ if (nxt_conf_get_boolean(value)) {
+ clone->flags |= flag;
+ }
+ }
+
+ return NXT_OK;
+}
+
+#endif
+
+
+#if (NXT_HAVE_ISOLATION_ROOTFS)
+
+static nxt_int_t
+nxt_isolation_set_rootfs(nxt_task_t *task, nxt_conf_value_t *isolation,
+ nxt_process_t *process)
+{
+ nxt_str_t str;
+ nxt_conf_value_t *obj;
+
+ static nxt_str_t rootfs_name = nxt_string("rootfs");
+
+ obj = nxt_conf_get_object_member(isolation, &rootfs_name, NULL);
+ if (obj != NULL) {
+ nxt_conf_get_string(obj, &str);
+
+ if (nxt_slow_path(str.length <= 1 || str.start[0] != '/')) {
+ nxt_log(task, NXT_LOG_ERR, "rootfs requires an absolute path other "
+ "than \"/\" but given \"%V\"", &str);
+
+ return NXT_ERROR;
+ }
+
+ if (str.start[str.length - 1] == '/') {
+ str.length--;
+ }
+
+ process->isolation.rootfs = nxt_mp_alloc(process->mem_pool,
+ str.length + 1);
+
+ if (nxt_slow_path(process->isolation.rootfs == NULL)) {
+ return NXT_ERROR;
+ }
+
+ nxt_memcpy(process->isolation.rootfs, str.start, str.length);
+
+ process->isolation.rootfs[str.length] = '\0';
+ }
+
+ return NXT_OK;
+}
+
+
+static nxt_int_t
+nxt_isolation_set_automount(nxt_task_t *task, nxt_conf_value_t *isolation,
+ nxt_process_t *process)
+{
+ nxt_conf_value_t *conf, *value;
+ nxt_process_automount_t *automount;
+
+ static nxt_str_t automount_name = nxt_string("automount");
+ static nxt_str_t langdeps_name = nxt_string("language_deps");
+
+ automount = &process->isolation.automount;
+
+ automount->language_deps = 1;
+
+ conf = nxt_conf_get_object_member(isolation, &automount_name, NULL);
+ if (conf != NULL) {
+ value = nxt_conf_get_object_member(conf, &langdeps_name, NULL);
+ if (value != NULL) {
+ automount->language_deps = nxt_conf_get_boolean(value);
+ }
+ }
+
+ return NXT_OK;
+}
+
+
+static nxt_int_t
+nxt_isolation_set_mounts(nxt_task_t *task, nxt_process_t *process,
+ nxt_str_t *app_type)
+{
+ nxt_int_t ret, cap_chroot;
+ nxt_runtime_t *rt;
+ nxt_app_lang_module_t *lang;
+
+ rt = task->thread->runtime;
+ cap_chroot = rt->capabilities.chroot;
+ lang = nxt_app_lang_module(rt, app_type);
+
+ nxt_assert(lang != NULL);
+
+#if (NXT_HAVE_CLONE_NEWUSER)
+ if (nxt_is_clone_flag_set(process->isolation.clone.flags, NEWUSER)) {
+ cap_chroot = 1;
+ }
+#endif
+
+ if (!cap_chroot) {
+ nxt_log(task, NXT_LOG_ERR, "The \"rootfs\" field requires privileges");
+ return NXT_ERROR;
+ }
+
+ ret = nxt_isolation_set_lang_mounts(task, process, lang->mounts);
+ if (nxt_slow_path(ret != NXT_OK)) {
+ return NXT_ERROR;
+ }
+
+ process->isolation.cleanup = nxt_isolation_unmount_all;
+
+ return NXT_OK;
+}
+
+
+static nxt_int_t
+nxt_isolation_set_lang_mounts(nxt_task_t *task, nxt_process_t *process,
+ nxt_array_t *lang_mounts)
+{
+ u_char *p;
+ size_t i, n, rootfs_len, len;
+ nxt_mp_t *mp;
+ nxt_array_t *mounts;
+ const u_char *rootfs;
+ nxt_fs_mount_t *mnt, *lang_mnt;
+
+ mp = process->mem_pool;
+
+ /* copy to init mem pool */
+ mounts = nxt_array_copy(mp, NULL, lang_mounts);
+ if (mounts == NULL) {
+ return NXT_ERROR;
+ }
+
+ n = mounts->nelts;
+ mnt = mounts->elts;
+ lang_mnt = lang_mounts->elts;
+
+ rootfs = process->isolation.rootfs;
+ rootfs_len = nxt_strlen(rootfs);
+
+ for (i = 0; i < n; i++) {
+ len = nxt_strlen(lang_mnt[i].dst);
+
+ mnt[i].dst = nxt_mp_alloc(mp, rootfs_len + len + 1);
+ if (nxt_slow_path(mnt[i].dst == NULL)) {
+ return NXT_ERROR;
+ }
+
+ p = nxt_cpymem(mnt[i].dst, rootfs, rootfs_len);
+ p = nxt_cpymem(p, lang_mnt[i].dst, len);
+ *p = '\0';
+ }
+
+ mnt = nxt_array_add(mounts);
+ if (nxt_slow_path(mnt == NULL)) {
+ return NXT_ERROR;
+ }
+
+ mnt->src = (u_char *) "tmpfs";
+ mnt->fstype = (u_char *) "tmpfs";
+ mnt->flags = NXT_MS_NOSUID | NXT_MS_NODEV | NXT_MS_NOEXEC | NXT_MS_RELATIME;
+ mnt->data = (u_char *) "size=1m,mode=777";
+ mnt->builtin = 1;
+
+ mnt->dst = nxt_mp_nget(mp, rootfs_len + nxt_length("/tmp") + 1);
+ if (nxt_slow_path(mnt->dst == NULL)) {
+ return NXT_ERROR;
+ }
+
+ p = nxt_cpymem(mnt->dst, rootfs, rootfs_len);
+ p = nxt_cpymem(p, "/tmp", 4);
+ *p = '\0';
+
+#if (NXT_HAVE_CLONE_NEWPID) && (NXT_HAVE_CLONE_NEWNS)
+
+ if (nxt_is_clone_flag_set(process->isolation.clone.flags, NEWPID)
+ && nxt_is_clone_flag_set(process->isolation.clone.flags, NEWNS))
+ {
+ mnt = nxt_array_add(mounts);
+ if (nxt_slow_path(mnt == NULL)) {
+ return NXT_ERROR;
+ }
+
+ mnt->fstype = (u_char *) "proc";
+ mnt->src = (u_char *) "proc";
+
+ mnt->dst = nxt_mp_nget(mp, rootfs_len + nxt_length("/proc") + 1);
+ if (nxt_slow_path(mnt->dst == NULL)) {
+ return NXT_ERROR;
+ }
+
+ p = nxt_cpymem(mnt->dst, rootfs, rootfs_len);
+ p = nxt_cpymem(p, "/proc", 5);
+ *p = '\0';
+
+ mnt->data = (u_char *) "";
+ mnt->flags = 0;
+ }
+#endif
+
+ process->isolation.mounts = mounts;
+
+ return NXT_OK;
+}
+
+
+void
+nxt_isolation_unmount_all(nxt_task_t *task, nxt_process_t *process)
+{
+ size_t i, n;
+ nxt_array_t *mounts;
+ nxt_fs_mount_t *mnt;
+ nxt_process_automount_t *automount;
+
+ nxt_debug(task, "unmount all (%s)", process->name);
+
+ automount = &process->isolation.automount;
+ mounts = process->isolation.mounts;
+ n = mounts->nelts;
+ mnt = mounts->elts;
+
+ for (i = 0; i < n; i++) {
+ if (mnt[i].builtin && !automount->language_deps) {
+ continue;
+ }
+
+ nxt_fs_unmount(mnt[i].dst);
+ }
+}
+
+
+nxt_int_t
+nxt_isolation_prepare_rootfs(nxt_task_t *task, nxt_process_t *process)
+{
+ size_t i, n;
+ nxt_int_t ret;
+ struct stat st;
+ nxt_array_t *mounts;
+ const u_char *dst;
+ nxt_fs_mount_t *mnt;
+ nxt_process_automount_t *automount;
+
+ automount = &process->isolation.automount;
+ mounts = process->isolation.mounts;
+
+ n = mounts->nelts;
+ mnt = mounts->elts;
+
+ for (i = 0; i < n; i++) {
+ dst = mnt[i].dst;
+
+ if (mnt[i].builtin && !automount->language_deps) {
+ continue;
+ }
+
+ if (nxt_slow_path(nxt_memcmp(mnt[i].fstype, "bind", 4) == 0
+ && stat((const char *) mnt[i].src, &st) != 0))
+ {
+ nxt_log(task, NXT_LOG_WARN, "host path not found: %s", mnt[i].src);
+ continue;
+ }
+
+ ret = nxt_fs_mkdir_all(dst, S_IRWXU | S_IRWXG | S_IRWXO);
+ if (nxt_slow_path(ret != NXT_OK)) {
+ nxt_alert(task, "mkdir(%s) %E", dst, nxt_errno);
+ goto undo;
+ }
+
+ ret = nxt_fs_mount(task, &mnt[i]);
+ if (nxt_slow_path(ret != NXT_OK)) {
+ goto undo;
+ }
+ }
+
+ return NXT_OK;
+
+undo:
+
+ n = i + 1;
+
+ for (i = 0; i < n; i++) {
+ nxt_fs_unmount(mnt[i].dst);
+ }
+
+ return NXT_ERROR;
+}
+
+
+#if (NXT_HAVE_PIVOT_ROOT) && (NXT_HAVE_CLONE_NEWNS)
+
+nxt_int_t
+nxt_isolation_change_root(nxt_task_t *task, nxt_process_t *process)
+{
+ char *rootfs;
+ nxt_int_t ret;
+
+ rootfs = (char *) process->isolation.rootfs;
+
+ nxt_debug(task, "change root: %s", rootfs);
+
+ if (nxt_is_clone_flag_set(process->isolation.clone.flags, NEWNS)) {
+ ret = nxt_isolation_pivot_root(task, rootfs);
+
+ } else {
+ ret = nxt_isolation_chroot(task, rootfs);
+ }
+
+ if (nxt_fast_path(ret == NXT_OK)) {
+ if (nxt_slow_path(chdir("/") < 0)) {
+ nxt_alert(task, "chdir(\"/\") %E", nxt_errno);
+ return NXT_ERROR;
+ }
+ }
+
+ return ret;
+}
+
+
+/*
+ * pivot_root(2) can only be safely used with containers, otherwise it can
+ * umount(2) the global root filesystem and screw up the machine.
+ */
+
+static nxt_int_t
+nxt_isolation_pivot_root(nxt_task_t *task, const char *path)
+{
+ /*
+ * This implementation makes use of a kernel trick that works for ages
+ * and now documented in Linux kernel 5.
+ * https://lore.kernel.org/linux-man/87r24piwhm.fsf@x220.int.ebiederm.org/T/
+ */
+
+ if (nxt_slow_path(mount("", "/", "", MS_SLAVE|MS_REC, "") != 0)) {
+ nxt_alert(task, "mount(\"/\", MS_SLAVE|MS_REC) failed: %E", nxt_errno);
+ return NXT_ERROR;
+ }
+
+ if (nxt_slow_path(nxt_isolation_make_private_mount(task, path) != NXT_OK)) {
+ return NXT_ERROR;
+ }
+
+ if (nxt_slow_path(mount(path, path, "bind", MS_BIND|MS_REC, "") != 0)) {
+ nxt_alert(task, "error bind mounting rootfs %E", nxt_errno);
+ return NXT_ERROR;
+ }
+
+ if (nxt_slow_path(chdir(path) != 0)) {
+ nxt_alert(task, "failed to chdir(%s) %E", path, nxt_errno);
+ return NXT_ERROR;
+ }
+
+ if (nxt_slow_path(nxt_pivot_root(".", ".") != 0)) {
+ nxt_alert(task, "failed to pivot_root %E", nxt_errno);
+ return NXT_ERROR;
+ }
+
+ /*
+ * Demote the oldroot mount to avoid unmounts getting propagated to
+ * the host.
+ */
+ if (nxt_slow_path(mount("", ".", "", MS_SLAVE | MS_REC, NULL) != 0)) {
+ nxt_alert(task, "failed to bind mount rootfs %E", nxt_errno);
+ return NXT_ERROR;
+ }
+
+ if (nxt_slow_path(umount2(".", MNT_DETACH) != 0)) {
+ nxt_alert(task, "failed to umount old root directory %E", nxt_errno);
+ return NXT_ERROR;
+ }
+
+ return NXT_OK;
+}
+
+
+static nxt_int_t
+nxt_isolation_make_private_mount(nxt_task_t *task, const char *rootfs)
+{
+ char *parent_mnt;
+ FILE *procfile;
+ u_char **mounts;
+ size_t len;
+ uint8_t *shared;
+ nxt_int_t ret, index, nmounts;
+ struct mntent *ent;
+
+ static const char *mount_path = "/proc/self/mounts";
+
+ ret = NXT_ERROR;
+ ent = NULL;
+ shared = NULL;
+ procfile = NULL;
+ parent_mnt = NULL;
+
+ nmounts = 256;
+
+ mounts = nxt_malloc(nmounts * sizeof(uintptr_t));
+ if (nxt_slow_path(mounts == NULL)) {
+ goto fail;
+ }
+
+ shared = nxt_malloc(nmounts);
+ if (nxt_slow_path(shared == NULL)) {
+ goto fail;
+ }
+
+ procfile = setmntent(mount_path, "r");
+ if (nxt_slow_path(procfile == NULL)) {
+ nxt_alert(task, "failed to open %s %E", mount_path, nxt_errno);
+
+ goto fail;
+ }
+
+ index = 0;
+
+again:
+
+ for ( ; index < nmounts; index++) {
+ ent = getmntent(procfile);
+ if (ent == NULL) {
+ nmounts = index;
+ break;
+ }
+
+ mounts[index] = (u_char *) strdup(ent->mnt_dir);
+ shared[index] = hasmntopt(ent, "shared") != NULL;
+ }
+
+ if (ent != NULL) {
+ /* there are still entries to be read */
+
+ nmounts *= 2;
+ mounts = nxt_realloc(mounts, nmounts);
+ if (nxt_slow_path(mounts == NULL)) {
+ goto fail;
+ }
+
+ shared = nxt_realloc(shared, nmounts);
+ if (nxt_slow_path(shared == NULL)) {
+ goto fail;
+ }
+
+ goto again;
+ }
+
+ for (index = 0; index < nmounts; index++) {
+ if (nxt_strcmp(mounts[index], rootfs) == 0) {
+ parent_mnt = (char *) rootfs;
+ break;
+ }
+ }
+
+ if (parent_mnt == NULL) {
+ len = nxt_strlen(rootfs);
+
+ parent_mnt = nxt_malloc(len + 1);
+ if (parent_mnt == NULL) {
+ goto fail;
+ }
+
+ nxt_memcpy(parent_mnt, rootfs, len);
+ parent_mnt[len] = '\0';
+
+ if (parent_mnt[len - 1] == '/') {
+ parent_mnt[len - 1] = '\0';
+ len--;
+ }
+
+ for ( ;; ) {
+ for (index = 0; index < nmounts; index++) {
+ if (nxt_strcmp(mounts[index], parent_mnt) == 0) {
+ goto found;
+ }
+ }
+
+ if (len == 1 && parent_mnt[0] == '/') {
+ nxt_alert(task, "parent mount not found");
+ goto fail;
+ }
+
+ /* parent dir */
+ while (parent_mnt[len - 1] != '/' && len > 0) {
+ len--;
+ }
+
+ if (nxt_slow_path(len == 0)) {
+ nxt_alert(task, "parent mount not found");
+ goto fail;
+ }
+
+ if (len == 1) {
+ parent_mnt[len] = '\0'; /* / */
+ } else {
+ parent_mnt[len - 1] = '\0'; /* /<path> */
+ }
+ }
+ }
+
+found:
+
+ if (shared[index]) {
+ if (nxt_slow_path(mount("", parent_mnt, "", MS_PRIVATE, "") != 0)) {
+ nxt_alert(task, "mount(\"\", \"%s\", MS_PRIVATE) %E", parent_mnt,
+ nxt_errno);
+
+ goto fail;
+ }
+ }
+
+ ret = NXT_OK;
+
+fail:
+
+ if (procfile != NULL) {
+ endmntent(procfile);
+ }
+
+ if (mounts != NULL) {
+ for (index = 0; index < nmounts; index++) {
+ nxt_free(mounts[index]);
+ }
+
+ nxt_free(mounts);
+ }
+
+ if (shared != NULL) {
+ nxt_free(shared);
+ }
+
+ if (parent_mnt != NULL && parent_mnt != rootfs) {
+ nxt_free(parent_mnt);
+ }
+
+ return ret;
+}
+
+
+nxt_inline int
+nxt_pivot_root(const char *new_root, const char *old_root)
+{
+ return syscall(__NR_pivot_root, new_root, old_root);
+}
+
+
+#else /* !(NXT_HAVE_PIVOT_ROOT) || !(NXT_HAVE_CLONE_NEWNS) */
+
+
+nxt_int_t
+nxt_isolation_change_root(nxt_task_t *task, nxt_process_t *process)
+{
+ char *rootfs;
+
+ rootfs = (char *) process->isolation.rootfs;
+
+ nxt_debug(task, "change root: %s", rootfs);
+
+ if (nxt_fast_path(nxt_isolation_chroot(task, rootfs) == NXT_OK)) {
+ if (nxt_slow_path(chdir("/") < 0)) {
+ nxt_alert(task, "chdir(\"/\") %E", nxt_errno);
+ return NXT_ERROR;
+ }
+
+ return NXT_OK;
+ }
+
+ return NXT_ERROR;
+}
+
+#endif
+
+
+static nxt_int_t
+nxt_isolation_chroot(nxt_task_t *task, const char *path)
+{
+ if (nxt_slow_path(chroot(path) < 0)) {
+ nxt_alert(task, "chroot(%s) %E", path, nxt_errno);
+ return NXT_ERROR;
+ }
+
+ return NXT_OK;
+}
+
+#endif /* NXT_HAVE_ISOLATION_ROOTFS */
+
+
+#if (NXT_HAVE_PR_SET_NO_NEW_PRIVS)
+
+static nxt_int_t
+nxt_isolation_set_new_privs(nxt_task_t *task, nxt_conf_value_t *isolation,
+ nxt_process_t *process)
+{
+ nxt_conf_value_t *obj;
+
+ static nxt_str_t new_privs_name = nxt_string("new_privs");
+
+ obj = nxt_conf_get_object_member(isolation, &new_privs_name, NULL);
+ if (obj != NULL) {
+ process->isolation.new_privs = nxt_conf_get_boolean(obj);
+ }
+
+ return NXT_OK;
+}
+
+#endif
diff --git a/src/nxt_isolation.h b/src/nxt_isolation.h
new file mode 100644
index 00000000..88a5f9e1
--- /dev/null
+++ b/src/nxt_isolation.h
@@ -0,0 +1,18 @@
+/*
+ * Copyright (C) NGINX, Inc.
+ */
+
+#ifndef _NXT_ISOLATION_H_
+#define _NXT_ISOLATION_H_
+
+
+nxt_int_t nxt_isolation_main_prefork(nxt_task_t *task, nxt_process_t *process,
+ nxt_mp_t *mp);
+
+#if (NXT_HAVE_ISOLATION_ROOTFS)
+nxt_int_t nxt_isolation_prepare_rootfs(nxt_task_t *task,
+ nxt_process_t *process);
+nxt_int_t nxt_isolation_change_root(nxt_task_t *task, nxt_process_t *process);
+#endif
+
+#endif /* _NXT_ISOLATION_H_ */
diff --git a/src/nxt_lvlhsh.c b/src/nxt_lvlhsh.c
index ec433341..d10dbc58 100644
--- a/src/nxt_lvlhsh.c
+++ b/src/nxt_lvlhsh.c
@@ -1015,17 +1015,3 @@ nxt_lvlhsh_retrieve(nxt_lvlhsh_t *lh, const nxt_lvlhsh_proto_t *proto,
return NULL;
}
-
-
-void *
-nxt_lvlhsh_alloc(void *data, size_t size)
-{
- return nxt_memalign(size, size);
-}
-
-
-void
-nxt_lvlhsh_free(void *data, void *p)
-{
- nxt_free(p);
-}
diff --git a/src/nxt_main_process.c b/src/nxt_main_process.c
index 48eb2abb..d2edab1d 100644
--- a/src/nxt_main_process.c
+++ b/src/nxt_main_process.c
@@ -191,6 +191,12 @@ static nxt_conf_map_t nxt_python_app_conf[] = {
NXT_CONF_MAP_STR,
offsetof(nxt_common_app_conf_t, u.python.module),
},
+
+ {
+ nxt_string("callable"),
+ NXT_CONF_MAP_CSTRZ,
+ offsetof(nxt_common_app_conf_t, u.python.callable),
+ },
};
@@ -878,11 +884,9 @@ nxt_main_cleanup_process(nxt_task_t *task, nxt_pid_t pid)
return;
}
-#if (NXT_HAVE_ISOLATION_ROOTFS)
- if (process->isolation.rootfs != NULL && process->isolation.mounts) {
- (void) nxt_process_unmount_all(task, process);
+ if (process->isolation.cleanup != NULL) {
+ process->isolation.cleanup(task, process);
}
-#endif
name = process->name;
stream = process->stream;
@@ -1292,6 +1296,8 @@ nxt_main_port_modules_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg)
goto fail;
}
+ mnt->builtin = 1;
+
ret = nxt_conf_map_object(rt->mem_pool, value,
nxt_app_lang_mounts_map,
nxt_nitems(nxt_app_lang_mounts_map), mnt);
diff --git a/src/nxt_malloc.c b/src/nxt_malloc.c
index 910ef7cd..fed58e96 100644
--- a/src/nxt_malloc.c
+++ b/src/nxt_malloc.c
@@ -81,6 +81,22 @@ nxt_realloc(void *p, size_t size)
}
+/* nxt_lvlhsh_* functions moved here to avoid references from nxt_lvlhsh.c. */
+
+void *
+nxt_lvlhsh_alloc(void *data, size_t size)
+{
+ return nxt_memalign(size, size);
+}
+
+
+void
+nxt_lvlhsh_free(void *data, void *p)
+{
+ nxt_free(p);
+}
+
+
#if (NXT_DEBUG)
void
diff --git a/src/nxt_php_sapi.c b/src/nxt_php_sapi.c
index bc8341f4..234ceef8 100644
--- a/src/nxt_php_sapi.c
+++ b/src/nxt_php_sapi.c
@@ -77,13 +77,20 @@ typedef void (*zif_handler)(INTERNAL_FUNCTION_PARAMETERS);
#endif
+static nxt_int_t nxt_php_setup(nxt_task_t *task, nxt_process_t *process,
+ nxt_common_app_conf_t *conf);
static nxt_int_t nxt_php_start(nxt_task_t *task, nxt_process_data_t *data);
static nxt_int_t nxt_php_set_target(nxt_task_t *task, nxt_php_target_t *target,
nxt_conf_value_t *conf);
+static nxt_int_t nxt_php_set_ini_path(nxt_task_t *task, nxt_str_t *path,
+ char *workdir);
static void nxt_php_set_options(nxt_task_t *task, nxt_conf_value_t *options,
int type);
static nxt_int_t nxt_php_alter_option(nxt_str_t *name, nxt_str_t *value,
int type);
+#ifdef NXT_PHP8
+static void nxt_php_disable_functions(nxt_str_t *str);
+#endif
static void nxt_php_disable(nxt_task_t *task, const char *type,
nxt_str_t *value, char **ptr, nxt_php_disable_t disable);
@@ -252,7 +259,7 @@ NXT_EXPORT nxt_app_module_t nxt_app_module = {
PHP_VERSION,
NULL,
0,
- NULL,
+ nxt_php_setup,
nxt_php_start,
};
@@ -267,55 +274,20 @@ static void ***tsrm_ls;
static nxt_int_t
-nxt_php_start(nxt_task_t *task, nxt_process_data_t *data)
+nxt_php_setup(nxt_task_t *task, nxt_process_t *process,
+ nxt_common_app_conf_t *conf)
{
- u_char *p;
- uint32_t next;
- nxt_str_t ini_path, name;
- nxt_int_t ret;
- nxt_uint_t n;
- nxt_unit_ctx_t *unit_ctx;
- nxt_unit_init_t php_init;
- nxt_conf_value_t *value;
- nxt_php_app_conf_t *c;
- nxt_common_app_conf_t *conf;
+ nxt_str_t ini_path;
+ nxt_int_t ret;
+ nxt_conf_value_t *value;
+ nxt_php_app_conf_t *c;
static nxt_str_t file_str = nxt_string("file");
static nxt_str_t user_str = nxt_string("user");
static nxt_str_t admin_str = nxt_string("admin");
- conf = data->app;
c = &conf->u.php;
- n = (c->targets != NULL) ? nxt_conf_object_members_count(c->targets) : 1;
-
- nxt_php_targets = nxt_zalloc(sizeof(nxt_php_target_t) * n);
- if (nxt_slow_path(nxt_php_targets == NULL)) {
- return NXT_ERROR;
- }
-
- if (c->targets != NULL) {
- next = 0;
-
- for (n = 0; /* void */; n++) {
- value = nxt_conf_next_object_member(c->targets, &name, &next);
- if (value == NULL) {
- break;
- }
-
- ret = nxt_php_set_target(task, &nxt_php_targets[n], value);
- if (nxt_slow_path(ret != NXT_OK)) {
- return NXT_ERROR;
- }
- }
-
- } else {
- ret = nxt_php_set_target(task, &nxt_php_targets[0], conf->self);
- if (nxt_slow_path(ret != NXT_OK)) {
- return NXT_ERROR;
- }
- }
-
#ifdef ZTS
#if PHP_VERSION_ID >= 70400
@@ -345,15 +317,12 @@ nxt_php_start(nxt_task_t *task, nxt_process_data_t *data)
if (value != NULL) {
nxt_conf_get_string(value, &ini_path);
- p = nxt_malloc(ini_path.length + 1);
- if (nxt_slow_path(p == NULL)) {
+ ret = nxt_php_set_ini_path(task, &ini_path,
+ conf->working_directory);
+
+ if (nxt_slow_path(ret != NXT_OK)) {
return NXT_ERROR;
}
-
- nxt_php_sapi_module.php_ini_path_override = (char *) p;
-
- p = nxt_cpymem(p, ini_path.start, ini_path.length);
- *p = '\0';
}
}
@@ -370,6 +339,55 @@ nxt_php_start(nxt_task_t *task, nxt_process_data_t *data)
nxt_php_set_options(task, value, ZEND_INI_USER);
}
+ return NXT_OK;
+}
+
+
+static nxt_int_t
+nxt_php_start(nxt_task_t *task, nxt_process_data_t *data)
+{
+ uint32_t next;
+ nxt_int_t ret;
+ nxt_str_t name;
+ nxt_uint_t n;
+ nxt_unit_ctx_t *unit_ctx;
+ nxt_unit_init_t php_init;
+ nxt_conf_value_t *value;
+ nxt_php_app_conf_t *c;
+ nxt_common_app_conf_t *conf;
+
+ conf = data->app;
+ c = &conf->u.php;
+
+ n = (c->targets != NULL) ? nxt_conf_object_members_count(c->targets) : 1;
+
+ nxt_php_targets = nxt_zalloc(sizeof(nxt_php_target_t) * n);
+ if (nxt_slow_path(nxt_php_targets == NULL)) {
+ return NXT_ERROR;
+ }
+
+ if (c->targets != NULL) {
+ next = 0;
+
+ for (n = 0; /* void */; n++) {
+ value = nxt_conf_next_object_member(c->targets, &name, &next);
+ if (value == NULL) {
+ break;
+ }
+
+ ret = nxt_php_set_target(task, &nxt_php_targets[n], value);
+ if (nxt_slow_path(ret != NXT_OK)) {
+ return NXT_ERROR;
+ }
+ }
+
+ } else {
+ ret = nxt_php_set_target(task, &nxt_php_targets[0], conf->self);
+ if (nxt_slow_path(ret != NXT_OK)) {
+ return NXT_ERROR;
+ }
+ }
+
ret = nxt_unit_default_init(task, &php_init);
if (nxt_slow_path(ret != NXT_OK)) {
nxt_alert(task, "nxt_unit_default_init() failed");
@@ -386,9 +404,8 @@ nxt_php_start(nxt_task_t *task, nxt_process_data_t *data)
nxt_php_unit_ctx = unit_ctx;
- nxt_unit_run(unit_ctx);
-
- nxt_unit_done(unit_ctx);
+ nxt_unit_run(nxt_php_unit_ctx);
+ nxt_unit_done(nxt_php_unit_ctx);
exit(0);
@@ -510,6 +527,46 @@ nxt_php_set_target(nxt_task_t *task, nxt_php_target_t *target,
}
+static nxt_int_t
+nxt_php_set_ini_path(nxt_task_t *task, nxt_str_t *ini_path, char *workdir)
+{
+ size_t wdlen;
+ u_char *p, *start;
+
+ if (ini_path->start[0] == '/' || workdir == NULL) {
+ p = nxt_malloc(ini_path->length + 1);
+ if (nxt_slow_path(p == NULL)) {
+ return NXT_ERROR;
+ }
+
+ start = p;
+
+ } else {
+ wdlen = nxt_strlen(workdir);
+
+ p = nxt_malloc(wdlen + ini_path->length + 2);
+ if (nxt_slow_path(p == NULL)) {
+ return NXT_ERROR;
+ }
+
+ start = p;
+
+ p = nxt_cpymem(p, workdir, wdlen);
+
+ if (workdir[wdlen - 1] != '/') {
+ *p++ = '/';
+ }
+ }
+
+ p = nxt_cpymem(p, ini_path->start, ini_path->length);
+ *p = '\0';
+
+ nxt_php_sapi_module.php_ini_path_override = (char *) start;
+
+ return NXT_OK;
+}
+
+
static void
nxt_php_set_options(nxt_task_t *task, nxt_conf_value_t *options, int type)
{
@@ -535,9 +592,13 @@ nxt_php_set_options(nxt_task_t *task, nxt_conf_value_t *options, int type)
}
if (nxt_str_eq(&name, "disable_functions", 17)) {
+#ifdef NXT_PHP8
+ nxt_php_disable_functions(&value);
+#else
nxt_php_disable(task, "function", &value,
&PG(disable_functions),
zend_disable_function);
+#endif
continue;
}
@@ -626,6 +687,29 @@ nxt_php_alter_option(nxt_str_t *name, nxt_str_t *value, int type)
#endif
+#ifdef NXT_PHP8
+
+static void
+nxt_php_disable_functions(nxt_str_t *str)
+{
+ char *p;
+
+ p = nxt_malloc(str->length + 1);
+ if (nxt_slow_path(p == NULL)) {
+ return;
+ }
+
+ nxt_memcpy(p, str->start, str->length);
+ p[str->length] = '\0';
+
+ zend_disable_functions(p);
+
+ nxt_free(p);
+}
+
+#endif
+
+
static void
nxt_php_disable(nxt_task_t *task, const char *type, nxt_str_t *value,
char **ptr, nxt_php_disable_t disable)
diff --git a/src/nxt_process.c b/src/nxt_process.c
index 9bfae395..9be7974f 100644
--- a/src/nxt_process.c
+++ b/src/nxt_process.c
@@ -17,10 +17,6 @@
#include <sys/prctl.h>
#endif
-#if (NXT_HAVE_PIVOT_ROOT)
-#include <mntent.h>
-#endif
-
static nxt_int_t nxt_process_setup(nxt_task_t *task, nxt_process_t *process);
static nxt_int_t nxt_process_child_fixup(nxt_task_t *task,
nxt_process_t *process);
@@ -33,16 +29,6 @@ static void nxt_process_created_ok(nxt_task_t *task, nxt_port_recv_msg_t *msg,
static void nxt_process_created_error(nxt_task_t *task,
nxt_port_recv_msg_t *msg, void *data);
-#if (NXT_HAVE_ISOLATION_ROOTFS)
-static nxt_int_t nxt_process_chroot(nxt_task_t *task, const char *path);
-
-#if (NXT_HAVE_PIVOT_ROOT) && (NXT_HAVE_CLONE_NEWNS)
-static nxt_int_t nxt_process_pivot_root(nxt_task_t *task, const char *rootfs);
-static nxt_int_t nxt_process_private_mount(nxt_task_t *task,
- const char *rootfs);
-static int nxt_pivot_root(const char *new_root, const char *old_root);
-#endif
-#endif
/* A cached process pid. */
nxt_pid_t nxt_pid;
@@ -398,51 +384,6 @@ nxt_process_core_setup(nxt_task_t *task, nxt_process_t *process)
}
-#if (NXT_HAVE_CLONE_NEWUSER)
-
-nxt_int_t
-nxt_process_vldt_isolation_creds(nxt_task_t *task, nxt_process_t *process)
-{
- nxt_int_t ret;
- nxt_clone_t *clone;
- nxt_credential_t *creds;
-
- clone = &process->isolation.clone;
- creds = process->user_cred;
-
- if (clone->uidmap.size == 0 && clone->gidmap.size == 0) {
- return NXT_OK;
- }
-
- if (!nxt_is_clone_flag_set(clone->flags, NEWUSER)) {
- if (nxt_slow_path(clone->uidmap.size > 0)) {
- nxt_log(task, NXT_LOG_ERR, "\"uidmap\" is set but "
- "\"isolation.namespaces.credential\" is false or unset");
-
- return NXT_ERROR;
- }
-
- if (nxt_slow_path(clone->gidmap.size > 0)) {
- nxt_log(task, NXT_LOG_ERR, "\"gidmap\" is set but "
- "\"isolation.namespaces.credential\" is false or unset");
-
- return NXT_ERROR;
- }
-
- return NXT_OK;
- }
-
- ret = nxt_clone_vldt_credential_uidmap(task, &clone->uidmap, creds);
- if (nxt_slow_path(ret != NXT_OK)) {
- return NXT_ERROR;
- }
-
- return nxt_clone_vldt_credential_gidmap(task, &clone->gidmap, creds);
-}
-
-#endif
-
-
nxt_int_t
nxt_process_creds_set(nxt_task_t *task, nxt_process_t *process, nxt_str_t *user,
nxt_str_t *group)
@@ -525,329 +466,6 @@ nxt_process_apply_creds(nxt_task_t *task, nxt_process_t *process)
}
-#if (NXT_HAVE_ISOLATION_ROOTFS)
-
-
-#if (NXT_HAVE_PIVOT_ROOT) && (NXT_HAVE_CLONE_NEWNS)
-
-
-nxt_int_t
-nxt_process_change_root(nxt_task_t *task, nxt_process_t *process)
-{
- char *rootfs;
- nxt_int_t ret;
-
- rootfs = (char *) process->isolation.rootfs;
-
- nxt_debug(task, "change root: %s", rootfs);
-
- if (NXT_CLONE_MNT(process->isolation.clone.flags)) {
- ret = nxt_process_pivot_root(task, rootfs);
- } else {
- ret = nxt_process_chroot(task, rootfs);
- }
-
- if (nxt_fast_path(ret == NXT_OK)) {
- if (nxt_slow_path(chdir("/") < 0)) {
- nxt_alert(task, "chdir(\"/\") %E", nxt_errno);
- return NXT_ERROR;
- }
- }
-
- return ret;
-}
-
-
-#else
-
-
-nxt_int_t
-nxt_process_change_root(nxt_task_t *task, nxt_process_t *process)
-{
- char *rootfs;
-
- rootfs = (char *) process->isolation.rootfs;
-
- nxt_debug(task, "change root: %s", rootfs);
-
- if (nxt_fast_path(nxt_process_chroot(task, rootfs) == NXT_OK)) {
- if (nxt_slow_path(chdir("/") < 0)) {
- nxt_alert(task, "chdir(\"/\") %E", nxt_errno);
- return NXT_ERROR;
- }
-
- return NXT_OK;
- }
-
- return NXT_ERROR;
-}
-
-
-#endif
-
-
-static nxt_int_t
-nxt_process_chroot(nxt_task_t *task, const char *path)
-{
- if (nxt_slow_path(chroot(path) < 0)) {
- nxt_alert(task, "chroot(%s) %E", path, nxt_errno);
- return NXT_ERROR;
- }
-
- return NXT_OK;
-}
-
-
-void
-nxt_process_unmount_all(nxt_task_t *task, nxt_process_t *process)
-{
- size_t i, n;
- nxt_array_t *mounts;
- nxt_fs_mount_t *mnt;
-
- nxt_debug(task, "unmount all (%s)", process->name);
-
- mounts = process->isolation.mounts;
- n = mounts->nelts;
- mnt = mounts->elts;
-
- for (i = 0; i < n; i++) {
- nxt_fs_unmount(mnt[i].dst);
- }
-}
-
-
-#if (NXT_HAVE_PIVOT_ROOT) && (NXT_HAVE_CLONE_NEWNS)
-
-/*
- * pivot_root(2) can only be safely used with containers, otherwise it can
- * umount(2) the global root filesystem and screw up the machine.
- */
-
-static nxt_int_t
-nxt_process_pivot_root(nxt_task_t *task, const char *path)
-{
- /*
- * This implementation makes use of a kernel trick that works for ages
- * and now documented in Linux kernel 5.
- * https://lore.kernel.org/linux-man/87r24piwhm.fsf@x220.int.ebiederm.org/T/
- */
-
- if (nxt_slow_path(mount("", "/", "", MS_SLAVE|MS_REC, "") != 0)) {
- nxt_alert(task, "failed to make / a slave mount %E", nxt_errno);
- return NXT_ERROR;
- }
-
- if (nxt_slow_path(nxt_process_private_mount(task, path) != NXT_OK)) {
- return NXT_ERROR;
- }
-
- if (nxt_slow_path(mount(path, path, "bind", MS_BIND|MS_REC, "") != 0)) {
- nxt_alert(task, "error bind mounting rootfs %E", nxt_errno);
- return NXT_ERROR;
- }
-
- if (nxt_slow_path(chdir(path) != 0)) {
- nxt_alert(task, "failed to chdir(%s) %E", path, nxt_errno);
- return NXT_ERROR;
- }
-
- if (nxt_slow_path(nxt_pivot_root(".", ".") != 0)) {
- nxt_alert(task, "failed to pivot_root %E", nxt_errno);
- return NXT_ERROR;
- }
-
- /*
- * Make oldroot a slave mount to avoid unmounts getting propagated to the
- * host.
- */
- if (nxt_slow_path(mount("", ".", "", MS_SLAVE | MS_REC, NULL) != 0)) {
- nxt_alert(task, "failed to bind mount rootfs %E", nxt_errno);
- return NXT_ERROR;
- }
-
- if (nxt_slow_path(umount2(".", MNT_DETACH) != 0)) {
- nxt_alert(task, "failed to umount old root directory %E", nxt_errno);
- return NXT_ERROR;
- }
-
- return NXT_OK;
-}
-
-
-static nxt_int_t
-nxt_process_private_mount(nxt_task_t *task, const char *rootfs)
-{
- char *parent_mnt;
- FILE *procfile;
- u_char **mounts;
- size_t len;
- uint8_t *shared;
- nxt_int_t ret, index, nmounts;
- struct mntent *ent;
-
- static const char *mount_path = "/proc/self/mounts";
-
- ret = NXT_ERROR;
- ent = NULL;
- shared = NULL;
- procfile = NULL;
- parent_mnt = NULL;
-
- nmounts = 256;
-
- mounts = nxt_malloc(nmounts * sizeof(uintptr_t));
- if (nxt_slow_path(mounts == NULL)) {
- goto fail;
- }
-
- shared = nxt_malloc(nmounts);
- if (nxt_slow_path(shared == NULL)) {
- goto fail;
- }
-
- procfile = setmntent(mount_path, "r");
- if (nxt_slow_path(procfile == NULL)) {
- nxt_alert(task, "failed to open %s %E", mount_path, nxt_errno);
-
- goto fail;
- }
-
- index = 0;
-
-again:
-
- for ( ; index < nmounts; index++) {
- ent = getmntent(procfile);
- if (ent == NULL) {
- nmounts = index;
- break;
- }
-
- mounts[index] = (u_char *) strdup(ent->mnt_dir);
- shared[index] = hasmntopt(ent, "shared") != NULL;
- }
-
- if (ent != NULL) {
- /* there are still entries to be read */
-
- nmounts *= 2;
- mounts = nxt_realloc(mounts, nmounts);
- if (nxt_slow_path(mounts == NULL)) {
- goto fail;
- }
-
- shared = nxt_realloc(shared, nmounts);
- if (nxt_slow_path(shared == NULL)) {
- goto fail;
- }
-
- goto again;
- }
-
- for (index = 0; index < nmounts; index++) {
- if (nxt_strcmp(mounts[index], rootfs) == 0) {
- parent_mnt = (char *) rootfs;
- break;
- }
- }
-
- if (parent_mnt == NULL) {
- len = nxt_strlen(rootfs);
-
- parent_mnt = nxt_malloc(len + 1);
- if (parent_mnt == NULL) {
- goto fail;
- }
-
- nxt_memcpy(parent_mnt, rootfs, len);
- parent_mnt[len] = '\0';
-
- if (parent_mnt[len - 1] == '/') {
- parent_mnt[len - 1] = '\0';
- len--;
- }
-
- for ( ;; ) {
- for (index = 0; index < nmounts; index++) {
- if (nxt_strcmp(mounts[index], parent_mnt) == 0) {
- goto found;
- }
- }
-
- if (len == 1 && parent_mnt[0] == '/') {
- nxt_alert(task, "parent mount not found");
- goto fail;
- }
-
- /* parent dir */
- while (parent_mnt[len - 1] != '/' && len > 0) {
- len--;
- }
-
- if (nxt_slow_path(len == 0)) {
- nxt_alert(task, "parent mount not found");
- goto fail;
- }
-
- if (len == 1) {
- parent_mnt[len] = '\0'; /* / */
- } else {
- parent_mnt[len - 1] = '\0'; /* /<path> */
- }
- }
- }
-
-found:
-
- if (shared[index]) {
- if (nxt_slow_path(mount("", parent_mnt, "", MS_PRIVATE, "") != 0)) {
- nxt_alert(task, "mount(\"\", \"%s\", MS_PRIVATE) %E", parent_mnt,
- nxt_errno);
-
- goto fail;
- }
- }
-
- ret = NXT_OK;
-
-fail:
-
- if (procfile != NULL) {
- endmntent(procfile);
- }
-
- if (mounts != NULL) {
- for (index = 0; index < nmounts; index++) {
- nxt_free(mounts[index]);
- }
-
- nxt_free(mounts);
- }
-
- if (shared != NULL) {
- nxt_free(shared);
- }
-
- if (parent_mnt != NULL && parent_mnt != rootfs) {
- nxt_free(parent_mnt);
- }
-
- return ret;
-}
-
-
-static int
-nxt_pivot_root(const char *new_root, const char *old_root)
-{
- return syscall(__NR_pivot_root, new_root, old_root);
-}
-
-#endif
-
-#endif
-
-
static nxt_int_t
nxt_process_send_ready(nxt_task_t *task, nxt_process_t *process)
{
diff --git a/src/nxt_process.h b/src/nxt_process.h
index ecd813e2..d9b4dff1 100644
--- a/src/nxt_process.h
+++ b/src/nxt_process.h
@@ -60,6 +60,9 @@ typedef enum {
typedef struct nxt_port_mmap_s nxt_port_mmap_t;
+typedef struct nxt_process_s nxt_process_t;
+typedef void (*nxt_isolation_cleanup_t)(nxt_task_t *task,
+ nxt_process_t *process);
typedef struct {
@@ -69,21 +72,30 @@ typedef struct {
nxt_port_mmap_t *elts;
} nxt_port_mmaps_t;
+
typedef struct {
- u_char *rootfs;
- nxt_array_t *mounts; /* of nxt_mount_t */
+ uint8_t language_deps; /* 1-byte */
+} nxt_process_automount_t;
+
+
+typedef struct {
+ u_char *rootfs;
+ nxt_process_automount_t automount;
+ nxt_array_t *mounts; /* of nxt_mount_t */
+
+ nxt_isolation_cleanup_t cleanup;
#if (NXT_HAVE_CLONE)
- nxt_clone_t clone;
+ nxt_clone_t clone;
#endif
#if (NXT_HAVE_PR_SET_NO_NEW_PRIVS)
- uint8_t new_privs; /* 1 bit */
+ uint8_t new_privs; /* 1 bit */
#endif
} nxt_process_isolation_t;
-typedef struct {
+struct nxt_process_s {
nxt_pid_t pid;
const char *name;
nxt_queue_t ports; /* of nxt_port_t */
@@ -103,7 +115,7 @@ typedef struct {
nxt_process_data_t data;
nxt_process_isolation_t isolation;
-} nxt_process_t;
+};
typedef nxt_int_t (*nxt_process_prefork_t)(nxt_task_t *task,
@@ -178,17 +190,6 @@ nxt_int_t nxt_process_creds_set(nxt_task_t *task, nxt_process_t *process,
nxt_str_t *user, nxt_str_t *group);
nxt_int_t nxt_process_apply_creds(nxt_task_t *task, nxt_process_t *process);
-#if (NXT_HAVE_CLONE_NEWUSER)
-nxt_int_t nxt_process_vldt_isolation_creds(nxt_task_t *task,
- nxt_process_t *process);
-#endif
-
-nxt_int_t nxt_process_change_root(nxt_task_t *task, nxt_process_t *process);
-
-#if (NXT_HAVE_ISOLATION_ROOTFS)
-void nxt_process_unmount_all(nxt_task_t *task, nxt_process_t *process);
-#endif
-
#if (NXT_HAVE_SETPROCTITLE)
#define nxt_process_title(task, fmt, ...) \
diff --git a/src/nxt_router.c b/src/nxt_router.c
index 0e1de6fa..a3218047 100644
--- a/src/nxt_router.c
+++ b/src/nxt_router.c
@@ -24,7 +24,6 @@ typedef struct {
uint32_t max_processes;
uint32_t spare_processes;
nxt_msec_t timeout;
- nxt_msec_t res_timeout;
nxt_msec_t idle_timeout;
uint32_t requests;
nxt_conf_value_t *limits_value;
@@ -260,7 +259,7 @@ static const nxt_str_t empty_prefix = nxt_string("");
static const nxt_str_t *nxt_app_msg_prefix[] = {
&empty_prefix,
- &http_prefix,
+ &empty_prefix,
&http_prefix,
&http_prefix,
&http_prefix,
@@ -1158,12 +1157,6 @@ static nxt_conf_map_t nxt_router_app_limits_conf[] = {
},
{
- nxt_string("reschedule_timeout"),
- NXT_CONF_MAP_MSEC,
- offsetof(nxt_router_app_conf_t, res_timeout),
- },
-
- {
nxt_string("requests"),
NXT_CONF_MAP_INT32,
offsetof(nxt_router_app_conf_t, requests),
@@ -1423,7 +1416,6 @@ nxt_router_conf_create(nxt_task_t *task, nxt_router_temp_conf_t *tmcf,
apcf.max_processes = 1;
apcf.spare_processes = 0;
apcf.timeout = 0;
- apcf.res_timeout = 1000;
apcf.idle_timeout = 15000;
apcf.requests = 0;
apcf.limits_value = NULL;
@@ -1505,8 +1497,6 @@ nxt_router_conf_create(nxt_task_t *task, nxt_router_temp_conf_t *tmcf,
nxt_debug(task, "application type: %V", &apcf.type);
nxt_debug(task, "application processes: %D", apcf.processes);
nxt_debug(task, "application request timeout: %M", apcf.timeout);
- nxt_debug(task, "application reschedule timeout: %M",
- apcf.res_timeout);
nxt_debug(task, "application requests: %D", apcf.requests);
lang = nxt_app_lang_module(task->thread->runtime, &apcf.type);
@@ -1537,7 +1527,6 @@ nxt_router_conf_create(nxt_task_t *task, nxt_router_temp_conf_t *tmcf,
app->max_pending_processes = apcf.spare_processes
? apcf.spare_processes : 1;
app->timeout = apcf.timeout;
- app->res_timeout = apcf.res_timeout * 1000000;
app->idle_timeout = apcf.idle_timeout;
app->max_requests = apcf.requests;
@@ -1736,6 +1725,10 @@ nxt_router_conf_create(nxt_task_t *task, nxt_router_temp_conf_t *tmcf,
tmcf->router_conf,
&lscf.application);
}
+
+ if (nxt_slow_path(skcf->action == NULL)) {
+ goto fail;
+ }
}
}
@@ -4948,6 +4941,9 @@ nxt_router_app_prepare_request(nxt_task_t *task,
nxt_debug(task, "queue is not empty");
}
+ buf->is_port_mmap_sent = 1;
+ buf->mem.pos = buf->mem.free;
+
} else {
nxt_alert(task, "stream #%uD, app '%V': failed to send app message",
req_rpc_data->stream, &app->name);
diff --git a/src/nxt_router.h b/src/nxt_router.h
index 81b3538c..512f1810 100644
--- a/src/nxt_router.h
+++ b/src/nxt_router.h
@@ -126,7 +126,6 @@ struct nxt_app_s {
uint32_t max_requests;
nxt_msec_t timeout;
- nxt_nsec_t res_timeout;
nxt_msec_t idle_timeout;
nxt_str_t *targets;
diff --git a/src/nxt_runtime.c b/src/nxt_runtime.c
index 435276a0..44970b34 100644
--- a/src/nxt_runtime.c
+++ b/src/nxt_runtime.c
@@ -84,7 +84,11 @@ nxt_runtime_create(nxt_task_t *task)
lang->version = (u_char *) "";
lang->file = NULL;
lang->module = &nxt_external_module;
- lang->mounts = NULL;
+
+ lang->mounts = nxt_array_create(mp, 1, sizeof(nxt_fs_mount_t));
+ if (nxt_slow_path(lang->mounts == NULL)) {
+ goto fail;
+ }
listen_sockets = nxt_array_create(mp, 1, sizeof(nxt_listen_socket_t));
if (nxt_slow_path(listen_sockets == NULL)) {
diff --git a/src/nxt_string.h b/src/nxt_string.h
index 3f9192e2..7e02f59a 100644
--- a/src/nxt_string.h
+++ b/src/nxt_string.h
@@ -31,6 +31,16 @@ nxt_strlen(s) \
#define \
+nxt_strdup(s) \
+ strdup((char *) s)
+
+
+#define \
+nxt_strchr(buf, delim) \
+ (u_char *) strchr((char *) buf, delim)
+
+
+#define \
nxt_memzero(buf, length) \
(void) memset(buf, 0, length)
diff --git a/src/nxt_unit.c b/src/nxt_unit.c
index 6b7d631d..f75d61bc 100644
--- a/src/nxt_unit.c
+++ b/src/nxt_unit.c
@@ -74,7 +74,8 @@ static void nxt_unit_request_info_free(nxt_unit_request_info_impl_t *req);
static nxt_unit_websocket_frame_impl_t *nxt_unit_websocket_frame_get(
nxt_unit_ctx_t *ctx);
static void nxt_unit_websocket_frame_release(nxt_unit_websocket_frame_t *ws);
-static void nxt_unit_websocket_frame_free(nxt_unit_websocket_frame_impl_t *ws);
+static void nxt_unit_websocket_frame_free(nxt_unit_ctx_t *ctx,
+ nxt_unit_websocket_frame_impl_t *ws);
static nxt_unit_mmap_buf_t *nxt_unit_mmap_buf_get(nxt_unit_ctx_t *ctx);
static void nxt_unit_mmap_buf_release(nxt_unit_mmap_buf_t *mmap_buf);
static int nxt_unit_mmap_buf_send(nxt_unit_request_info_t *req,
@@ -119,8 +120,7 @@ static void nxt_unit_mmap_release(nxt_unit_ctx_t *ctx,
nxt_port_mmap_header_t *hdr, void *start, uint32_t size);
static int nxt_unit_send_shm_ack(nxt_unit_ctx_t *ctx, pid_t pid);
-static nxt_unit_process_t *nxt_unit_process_get(nxt_unit_impl_t *lib,
- pid_t pid);
+static nxt_unit_process_t *nxt_unit_process_get(nxt_unit_ctx_t *ctx, pid_t pid);
static nxt_unit_process_t *nxt_unit_process_find(nxt_unit_impl_t *lib,
pid_t pid, int remove);
static nxt_unit_process_t *nxt_unit_process_pop_first(nxt_unit_impl_t *lib);
@@ -184,6 +184,9 @@ static nxt_unit_request_info_t *nxt_unit_request_hash_find(
nxt_unit_ctx_t *ctx, uint32_t stream, int remove);
static char * nxt_unit_snprint_prefix(char *p, char *end, pid_t pid, int level);
+static void *nxt_unit_lvlhsh_alloc(void *data, size_t size);
+static void nxt_unit_lvlhsh_free(void *data, void *p);
+static int nxt_unit_memcasecmp(const void *p1, const void *p2, size_t length);
struct nxt_unit_mmap_buf_s {
@@ -531,7 +534,8 @@ nxt_unit_create(nxt_unit_init_t *init)
nxt_unit_impl_t *lib;
nxt_unit_callbacks_t *cb;
- lib = malloc(sizeof(nxt_unit_impl_t) + init->request_data_size);
+ lib = nxt_unit_malloc(NULL,
+ sizeof(nxt_unit_impl_t) + init->request_data_size);
if (nxt_slow_path(lib == NULL)) {
nxt_unit_alert(NULL, "failed to allocate unit struct");
@@ -586,7 +590,7 @@ nxt_unit_create(nxt_unit_init_t *init)
fail:
- free(lib);
+ nxt_unit_free(NULL, lib);
return NULL;
}
@@ -710,7 +714,7 @@ nxt_unit_lib_release(nxt_unit_impl_t *lib)
nxt_unit_mmaps_destroy(&lib->incoming);
nxt_unit_mmaps_destroy(&lib->outgoing);
- free(lib);
+ nxt_unit_free(NULL, lib);
}
}
@@ -1388,7 +1392,7 @@ nxt_unit_request_check_response_port(nxt_unit_request_info_t *req,
return NXT_UNIT_AGAIN;
}
- port_impl = malloc(sizeof(nxt_unit_port_impl_t));
+ port_impl = nxt_unit_malloc(ctx, sizeof(nxt_unit_port_impl_t));
if (nxt_slow_path(port_impl == NULL)) {
nxt_unit_alert(ctx, "check_response_port: malloc(%d) failed",
(int) sizeof(nxt_unit_port_impl_t));
@@ -1412,7 +1416,7 @@ nxt_unit_request_check_response_port(nxt_unit_request_info_t *req,
pthread_mutex_unlock(&lib->mutex);
- free(port);
+ nxt_unit_free(ctx, port);
return NXT_UNIT_ERROR;
}
@@ -1426,7 +1430,7 @@ nxt_unit_request_check_response_port(nxt_unit_request_info_t *req,
pthread_mutex_unlock(&lib->mutex);
- free(port);
+ nxt_unit_free(ctx, port);
return NXT_UNIT_ERROR;
}
@@ -1634,8 +1638,8 @@ nxt_unit_request_info_get(nxt_unit_ctx_t *ctx)
if (nxt_queue_is_empty(&ctx_impl->free_req)) {
pthread_mutex_unlock(&ctx_impl->mutex);
- req_impl = malloc(sizeof(nxt_unit_request_info_impl_t)
- + lib->request_data_size);
+ req_impl = nxt_unit_malloc(ctx, sizeof(nxt_unit_request_info_impl_t)
+ + lib->request_data_size);
if (nxt_slow_path(req_impl == NULL)) {
return NULL;
}
@@ -1722,7 +1726,7 @@ nxt_unit_request_info_free(nxt_unit_request_info_impl_t *req_impl)
nxt_queue_remove(&req_impl->link);
if (req_impl != &ctx_impl->req) {
- free(req_impl);
+ nxt_unit_free(&ctx_impl->ctx, req_impl);
}
}
@@ -1741,7 +1745,7 @@ nxt_unit_websocket_frame_get(nxt_unit_ctx_t *ctx)
if (nxt_queue_is_empty(&ctx_impl->free_ws)) {
pthread_mutex_unlock(&ctx_impl->mutex);
- ws_impl = malloc(sizeof(nxt_unit_websocket_frame_impl_t));
+ ws_impl = nxt_unit_malloc(ctx, sizeof(nxt_unit_websocket_frame_impl_t));
if (nxt_slow_path(ws_impl == NULL)) {
return NULL;
}
@@ -1783,11 +1787,12 @@ nxt_unit_websocket_frame_release(nxt_unit_websocket_frame_t *ws)
static void
-nxt_unit_websocket_frame_free(nxt_unit_websocket_frame_impl_t *ws_impl)
+nxt_unit_websocket_frame_free(nxt_unit_ctx_t *ctx,
+ nxt_unit_websocket_frame_impl_t *ws_impl)
{
nxt_queue_remove(&ws_impl->link);
- free(ws_impl);
+ nxt_unit_free(ctx, ws_impl);
}
@@ -1815,42 +1820,66 @@ nxt_unit_field_hash(const char *name, size_t name_length)
void
nxt_unit_request_group_dup_fields(nxt_unit_request_info_t *req)
{
+ char *name;
uint32_t i, j;
nxt_unit_field_t *fields, f;
nxt_unit_request_t *r;
+ static nxt_str_t content_length = nxt_string("content-length");
+ static nxt_str_t content_type = nxt_string("content-type");
+ static nxt_str_t cookie = nxt_string("cookie");
+
nxt_unit_req_debug(req, "group_dup_fields");
r = req->request;
fields = r->fields;
for (i = 0; i < r->fields_count; i++) {
+ name = nxt_unit_sptr_get(&fields[i].name);
switch (fields[i].hash) {
case NXT_UNIT_HASH_CONTENT_LENGTH:
- r->content_length_field = i;
+ if (fields[i].name_length == content_length.length
+ && nxt_unit_memcasecmp(name, content_length.start,
+ content_length.length) == 0)
+ {
+ r->content_length_field = i;
+ }
+
break;
case NXT_UNIT_HASH_CONTENT_TYPE:
- r->content_type_field = i;
+ if (fields[i].name_length == content_type.length
+ && nxt_unit_memcasecmp(name, content_type.start,
+ content_type.length) == 0)
+ {
+ r->content_type_field = i;
+ }
+
break;
case NXT_UNIT_HASH_COOKIE:
- r->cookie_field = i;
+ if (fields[i].name_length == cookie.length
+ && nxt_unit_memcasecmp(name, cookie.start,
+ cookie.length) == 0)
+ {
+ r->cookie_field = i;
+ }
+
break;
- };
+ }
for (j = i + 1; j < r->fields_count; j++) {
- if (fields[i].hash != fields[j].hash) {
- continue;
- }
-
- if (j == i + 1) {
+ if (fields[i].hash != fields[j].hash
+ || fields[i].name_length != fields[j].name_length
+ || nxt_unit_memcasecmp(name,
+ nxt_unit_sptr_get(&fields[j].name),
+ fields[j].name_length) != 0)
+ {
continue;
}
f = fields[j];
- f.name.offset += (j - (i + 1)) * sizeof(f);
f.value.offset += (j - (i + 1)) * sizeof(f);
while (j > i + 1) {
@@ -1862,6 +1891,9 @@ nxt_unit_request_group_dup_fields(nxt_unit_request_info_t *req)
fields[j] = f;
+ /* Assign the same name pointer for further grouping simplicity. */
+ nxt_unit_sptr_set(&fields[j].name, name);
+
i++;
}
}
@@ -2297,7 +2329,7 @@ nxt_unit_mmap_buf_get(nxt_unit_ctx_t *ctx)
if (ctx_impl->free_buf == NULL) {
pthread_mutex_unlock(&ctx_impl->mutex);
- mmap_buf = malloc(sizeof(nxt_unit_mmap_buf_t));
+ mmap_buf = nxt_unit_malloc(ctx, sizeof(nxt_unit_mmap_buf_t));
if (nxt_slow_path(mmap_buf == NULL)) {
return NULL;
}
@@ -2615,7 +2647,7 @@ nxt_unit_free_outgoing_buf(nxt_unit_mmap_buf_t *mmap_buf)
}
if (mmap_buf->free_ptr != NULL) {
- free(mmap_buf->free_ptr);
+ nxt_unit_free(&mmap_buf->ctx_impl->ctx, mmap_buf->free_ptr);
mmap_buf->free_ptr = NULL;
}
@@ -2657,7 +2689,7 @@ nxt_unit_read_buf_get_impl(nxt_unit_ctx_impl_t *ctx_impl)
return rbuf;
}
- rbuf = malloc(sizeof(nxt_unit_read_buf_t));
+ rbuf = nxt_unit_malloc(&ctx_impl->ctx, sizeof(nxt_unit_read_buf_t));
if (nxt_fast_path(rbuf != NULL)) {
rbuf->ctx_impl = ctx_impl;
@@ -3016,7 +3048,7 @@ nxt_unit_request_preread(nxt_unit_request_info_t *req, size_t size)
return NULL;
}
- mmap_buf->free_ptr = malloc(size);
+ mmap_buf->free_ptr = nxt_unit_malloc(req->ctx, size);
if (nxt_slow_path(mmap_buf->free_ptr == NULL)) {
nxt_unit_req_alert(req, "preread: failed to allocate buf memory");
nxt_unit_mmap_buf_release(mmap_buf);
@@ -3288,7 +3320,7 @@ int
nxt_unit_websocket_retain(nxt_unit_websocket_frame_t *ws)
{
char *b;
- size_t size;
+ size_t size, hsize;
nxt_unit_websocket_frame_impl_t *ws_impl;
ws_impl = nxt_container_of(ws, nxt_unit_websocket_frame_impl_t, ws);
@@ -3299,19 +3331,30 @@ nxt_unit_websocket_retain(nxt_unit_websocket_frame_t *ws)
size = ws_impl->buf->buf.end - ws_impl->buf->buf.start;
- b = malloc(size);
+ b = nxt_unit_malloc(ws->req->ctx, size);
if (nxt_slow_path(b == NULL)) {
return NXT_UNIT_ERROR;
}
memcpy(b, ws_impl->buf->buf.start, size);
+ hsize = nxt_websocket_frame_header_size(b);
+
ws_impl->buf->buf.start = b;
- ws_impl->buf->buf.free = b;
+ ws_impl->buf->buf.free = b + hsize;
ws_impl->buf->buf.end = b + size;
ws_impl->buf->free_ptr = b;
+ ws_impl->ws.header = (nxt_websocket_header_t *) b;
+
+ if (ws_impl->ws.header->mask) {
+ ws_impl->ws.mask = (uint8_t *) b + hsize - 4;
+
+ } else {
+ ws_impl->ws.mask = NULL;
+ }
+
return NXT_UNIT_OK;
}
@@ -3796,7 +3839,8 @@ nxt_unit_get_outgoing_buf(nxt_unit_ctx_t *ctx, nxt_unit_port_t *port,
mmap_buf->plain_ptr = local_buf;
} else {
- mmap_buf->free_ptr = malloc(size + sizeof(nxt_port_msg_t));
+ mmap_buf->free_ptr = nxt_unit_malloc(ctx,
+ size + sizeof(nxt_port_msg_t));
if (nxt_slow_path(mmap_buf->free_ptr == NULL)) {
return NXT_UNIT_ERROR;
}
@@ -3966,7 +4010,7 @@ nxt_unit_process_release(nxt_unit_process_t *process)
if (c == 1) {
nxt_unit_debug(NULL, "destroy process #%d", (int) process->pid);
- free(process);
+ nxt_unit_free(NULL, process);
}
}
@@ -3983,7 +4027,7 @@ nxt_unit_mmaps_destroy(nxt_unit_mmaps_t *mmaps)
munmap(mm->hdr, PORT_MMAP_SIZE);
}
- free(mmaps->elts);
+ nxt_unit_free(NULL, mmaps->elts);
}
pthread_mutex_destroy(&mmaps->mutex);
@@ -4255,8 +4299,8 @@ nxt_unit_lvlhsh_pid_test(nxt_lvlhsh_query_t *lhq, void *data)
static const nxt_lvlhsh_proto_t lvlhsh_processes_proto nxt_aligned(64) = {
NXT_LVLHSH_DEFAULT,
nxt_unit_lvlhsh_pid_test,
- nxt_lvlhsh_alloc,
- nxt_lvlhsh_free,
+ nxt_unit_lvlhsh_alloc,
+ nxt_unit_lvlhsh_free,
};
@@ -4271,11 +4315,14 @@ nxt_unit_process_lhq_pid(nxt_lvlhsh_query_t *lhq, pid_t *pid)
static nxt_unit_process_t *
-nxt_unit_process_get(nxt_unit_impl_t *lib, pid_t pid)
+nxt_unit_process_get(nxt_unit_ctx_t *ctx, pid_t pid)
{
+ nxt_unit_impl_t *lib;
nxt_unit_process_t *process;
nxt_lvlhsh_query_t lhq;
+ lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit);
+
nxt_unit_process_lhq_pid(&lhq, &pid);
if (nxt_lvlhsh_find(&lib->processes, &lhq) == NXT_OK) {
@@ -4285,9 +4332,9 @@ nxt_unit_process_get(nxt_unit_impl_t *lib, pid_t pid)
return process;
}
- process = malloc(sizeof(nxt_unit_process_t));
+ process = nxt_unit_malloc(ctx, sizeof(nxt_unit_process_t));
if (nxt_slow_path(process == NULL)) {
- nxt_unit_alert(NULL, "failed to allocate process for #%d", (int) pid);
+ nxt_unit_alert(ctx, "failed to allocate process for #%d", (int) pid);
return NULL;
}
@@ -4308,9 +4355,9 @@ nxt_unit_process_get(nxt_unit_impl_t *lib, pid_t pid)
break;
default:
- nxt_unit_alert(NULL, "process %d insert failed", (int) pid);
+ nxt_unit_alert(ctx, "process %d insert failed", (int) pid);
- free(process);
+ nxt_unit_free(ctx, process);
process = NULL;
break;
}
@@ -4881,7 +4928,8 @@ nxt_unit_ctx_alloc(nxt_unit_ctx_t *ctx, void *data)
lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit);
- new_ctx = malloc(sizeof(nxt_unit_ctx_impl_t) + lib->request_data_size);
+ new_ctx = nxt_unit_malloc(ctx, sizeof(nxt_unit_ctx_impl_t)
+ + lib->request_data_size);
if (nxt_slow_path(new_ctx == NULL)) {
nxt_unit_alert(ctx, "failed to allocate context");
@@ -4890,7 +4938,7 @@ nxt_unit_ctx_alloc(nxt_unit_ctx_t *ctx, void *data)
rc = nxt_unit_ctx_init(lib, new_ctx, data);
if (nxt_slow_path(rc != NXT_UNIT_OK)) {
- free(new_ctx);
+ nxt_unit_free(ctx, new_ctx);
return NULL;
}
@@ -4969,7 +5017,7 @@ nxt_unit_ctx_free(nxt_unit_ctx_impl_t *ctx_impl)
while (ctx_impl->free_buf != NULL) {
mmap_buf = ctx_impl->free_buf;
nxt_unit_mmap_buf_unlink(mmap_buf);
- free(mmap_buf);
+ nxt_unit_free(&ctx_impl->ctx, mmap_buf);
}
nxt_queue_each(req_impl, &ctx_impl->free_req,
@@ -4982,7 +5030,7 @@ nxt_unit_ctx_free(nxt_unit_ctx_impl_t *ctx_impl)
nxt_queue_each(ws_impl, &ctx_impl->free_ws,
nxt_unit_websocket_frame_impl_t, link)
{
- nxt_unit_websocket_frame_free(ws_impl);
+ nxt_unit_websocket_frame_free(&ctx_impl->ctx, ws_impl);
} nxt_queue_loop;
@@ -4996,7 +5044,7 @@ nxt_unit_ctx_free(nxt_unit_ctx_impl_t *ctx_impl)
}
if (ctx_impl != &lib->main_ctx) {
- free(ctx_impl);
+ nxt_unit_free(&lib->main_ctx.ctx, ctx_impl);
}
nxt_unit_lib_release(lib);
@@ -5048,7 +5096,7 @@ nxt_unit_create_port(nxt_unit_ctx_t *ctx)
pthread_mutex_lock(&lib->mutex);
- process = nxt_unit_process_get(lib, lib->pid);
+ process = nxt_unit_process_get(ctx, lib->pid);
if (nxt_slow_path(process == NULL)) {
pthread_mutex_unlock(&lib->mutex);
@@ -5181,7 +5229,7 @@ nxt_inline void nxt_unit_port_release(nxt_unit_port_t *port)
: sizeof(nxt_port_queue_t));
}
- free(port_impl);
+ nxt_unit_free(NULL, port_impl);
}
}
@@ -5288,7 +5336,7 @@ nxt_unit_add_port(nxt_unit_ctx_t *ctx, nxt_unit_port_t *port, void *queue)
port->id.pid, port->id.id,
port->in_fd, port->out_fd, queue);
- process = nxt_unit_process_get(lib, port->id.pid);
+ process = nxt_unit_process_get(ctx, port->id.pid);
if (nxt_slow_path(process == NULL)) {
goto unlock;
}
@@ -5297,7 +5345,7 @@ nxt_unit_add_port(nxt_unit_ctx_t *ctx, nxt_unit_port_t *port, void *queue)
process->next_port_id = port->id.id + 1;
}
- new_port = malloc(sizeof(nxt_unit_port_impl_t));
+ new_port = nxt_unit_malloc(ctx, sizeof(nxt_unit_port_impl_t));
if (nxt_slow_path(new_port == NULL)) {
nxt_unit_alert(ctx, "add_port: %d,%d malloc() failed",
port->id.pid, port->id.id);
@@ -5312,7 +5360,7 @@ nxt_unit_add_port(nxt_unit_ctx_t *ctx, nxt_unit_port_t *port, void *queue)
nxt_unit_alert(ctx, "add_port: %d,%d hash_add failed",
port->id.pid, port->id.id);
- free(new_port);
+ nxt_unit_free(ctx, new_port);
new_port = NULL;
@@ -5716,10 +5764,6 @@ retry:
nxt_unit_debug(ctx, "port{%d,%d} recv %d read_queue",
(int) port->id.pid, (int) port->id.id, (int) rbuf->size);
- if (port_impl->from_socket) {
- nxt_unit_warn(ctx, "port protocol warning: READ_QUEUE after READ_SOCKET");
- }
-
goto retry;
}
@@ -5977,8 +6021,8 @@ nxt_unit_port_hash_test(nxt_lvlhsh_query_t *lhq, void *data)
static const nxt_lvlhsh_proto_t lvlhsh_ports_proto nxt_aligned(64) = {
NXT_LVLHSH_DEFAULT,
nxt_unit_port_hash_test,
- nxt_lvlhsh_alloc,
- nxt_lvlhsh_free,
+ nxt_unit_lvlhsh_alloc,
+ nxt_unit_lvlhsh_free,
};
@@ -6076,8 +6120,8 @@ nxt_unit_request_hash_test(nxt_lvlhsh_query_t *lhq, void *data)
static const nxt_lvlhsh_proto_t lvlhsh_requests_proto nxt_aligned(64) = {
NXT_LVLHSH_DEFAULT,
nxt_unit_request_hash_test,
- nxt_lvlhsh_alloc,
- nxt_lvlhsh_free,
+ nxt_unit_lvlhsh_alloc,
+ nxt_unit_lvlhsh_free,
};
@@ -6158,7 +6202,9 @@ nxt_unit_request_hash_find(nxt_unit_ctx_t *ctx, uint32_t stream, int remove)
case NXT_OK:
req_impl = nxt_container_of(lhq.value, nxt_unit_request_info_impl_t,
req);
- req_impl->in_hash = 0;
+ if (remove) {
+ req_impl->in_hash = 0;
+ }
return lhq.value;
@@ -6307,29 +6353,84 @@ nxt_unit_snprint_prefix(char *p, char *end, pid_t pid, int level)
}
-/* The function required by nxt_lvlhsh_alloc() and nxt_lvlvhsh_free(). */
-
-void *
-nxt_memalign(size_t alignment, size_t size)
+static void *
+nxt_unit_lvlhsh_alloc(void *data, size_t size)
{
- void *p;
- nxt_err_t err;
+ int err;
+ void *p;
- err = posix_memalign(&p, alignment, size);
+ err = posix_memalign(&p, size, size);
if (nxt_fast_path(err == 0)) {
+ nxt_unit_debug(NULL, "posix_memalign(%d, %d): %p",
+ (int) size, (int) size, p);
return p;
}
+ nxt_unit_alert(NULL, "posix_memalign(%d, %d) failed: %s (%d)",
+ (int) size, (int) size, strerror(err), err);
return NULL;
}
-#if (NXT_DEBUG)
+
+static void
+nxt_unit_lvlhsh_free(void *data, void *p)
+{
+ nxt_unit_free(NULL, p);
+}
+
+
+void *
+nxt_unit_malloc(nxt_unit_ctx_t *ctx, size_t size)
+{
+ void *p;
+
+ p = malloc(size);
+
+ if (nxt_fast_path(p != NULL)) {
+ nxt_unit_debug(ctx, "malloc(%d): %p", (int) size, p);
+
+ } else {
+ nxt_unit_alert(ctx, "malloc(%d) failed: %s (%d)",
+ (int) size, strerror(errno), errno);
+ }
+
+ return p;
+}
+
void
-nxt_free(void *p)
+nxt_unit_free(nxt_unit_ctx_t *ctx, void *p)
{
+ nxt_unit_debug(ctx, "free(%p)", p);
+
free(p);
}
-#endif
+
+static int
+nxt_unit_memcasecmp(const void *p1, const void *p2, size_t length)
+{
+ u_char c1, c2;
+ nxt_int_t n;
+ const u_char *s1, *s2;
+
+ s1 = p1;
+ s2 = p2;
+
+ while (length-- != 0) {
+ c1 = *s1++;
+ c2 = *s2++;
+
+ c1 = nxt_lowcase(c1);
+ c2 = nxt_lowcase(c2);
+
+ n = c1 - c2;
+
+ if (n != 0) {
+ return n;
+ }
+ }
+
+ return 0;
+}
diff --git a/src/nxt_unit.h b/src/nxt_unit.h
index 67244cf4..e90f0781 100644
--- a/src/nxt_unit.h
+++ b/src/nxt_unit.h
@@ -187,7 +187,7 @@ struct nxt_unit_read_info_s {
/*
* Initialize Unit application library with necessary callbacks and
- * ready/reply port parameters, send 'READY' response to master.
+ * ready/reply port parameters, send 'READY' response to main.
*/
nxt_unit_ctx_t *nxt_unit_init(nxt_unit_init_t *);
@@ -322,6 +322,10 @@ int nxt_unit_websocket_retain(nxt_unit_websocket_frame_t *ws);
void nxt_unit_websocket_done(nxt_unit_websocket_frame_t *ws);
+void *nxt_unit_malloc(nxt_unit_ctx_t *ctx, size_t size);
+
+void nxt_unit_free(nxt_unit_ctx_t *ctx, void *p);
+
#if defined __has_attribute
#if __has_attribute(format)
diff --git a/src/nxt_upstream.c b/src/nxt_upstream.c
index c8ecbbe6..9f81b286 100644
--- a/src/nxt_upstream.c
+++ b/src/nxt_upstream.c
@@ -86,11 +86,11 @@ nxt_upstream_find(nxt_upstreams_t *upstreams, nxt_str_t *name,
action->u.upstream_number = i;
action->handler = nxt_upstream_handler;
- return NXT_DECLINED;
+ return NXT_OK;
}
}
- return NXT_OK;
+ return NXT_DECLINED;
}
diff --git a/src/python/nxt_python.c b/src/python/nxt_python.c
new file mode 100644
index 00000000..01534a47
--- /dev/null
+++ b/src/python/nxt_python.c
@@ -0,0 +1,340 @@
+
+/*
+ * Copyright (C) NGINX, Inc.
+ */
+
+
+#include <Python.h>
+
+#include <nxt_main.h>
+#include <nxt_router.h>
+#include <nxt_unit.h>
+
+#include <python/nxt_python.h>
+
+#include NXT_PYTHON_MOUNTS_H
+
+
+static nxt_int_t nxt_python_start(nxt_task_t *task,
+ nxt_process_data_t *data);
+static void nxt_python_atexit(void);
+
+static uint32_t compat[] = {
+ NXT_VERNUM, NXT_DEBUG,
+};
+
+
+NXT_EXPORT nxt_app_module_t nxt_app_module = {
+ sizeof(compat),
+ compat,
+ nxt_string("python"),
+ PY_VERSION,
+ nxt_python_mounts,
+ nxt_nitems(nxt_python_mounts),
+ NULL,
+ nxt_python_start,
+};
+
+static PyObject *nxt_py_stderr_flush;
+PyObject *nxt_py_application;
+
+#if PY_MAJOR_VERSION == 3
+static wchar_t *nxt_py_home;
+#else
+static char *nxt_py_home;
+#endif
+
+
+static nxt_int_t
+nxt_python_start(nxt_task_t *task, nxt_process_data_t *data)
+{
+ int rc, asgi;
+ char *nxt_py_module;
+ size_t len;
+ PyObject *obj, *pypath, *module;
+ const char *callable;
+ nxt_unit_ctx_t *unit_ctx;
+ nxt_unit_init_t python_init;
+ nxt_common_app_conf_t *app_conf;
+ nxt_python_app_conf_t *c;
+#if PY_MAJOR_VERSION == 3
+ char *path;
+ size_t size;
+ nxt_int_t pep405;
+
+ static const char pyvenv[] = "/pyvenv.cfg";
+ static const char bin_python[] = "/bin/python";
+#endif
+
+ app_conf = data->app;
+ c = &app_conf->u.python;
+
+ if (c->home != NULL) {
+ len = nxt_strlen(c->home);
+
+#if PY_MAJOR_VERSION == 3
+
+ path = nxt_malloc(len + sizeof(pyvenv));
+ if (nxt_slow_path(path == NULL)) {
+ nxt_alert(task, "Failed to allocate memory");
+ return NXT_ERROR;
+ }
+
+ nxt_memcpy(path, c->home, len);
+ nxt_memcpy(path + len, pyvenv, sizeof(pyvenv));
+
+ pep405 = (access(path, R_OK) == 0);
+
+ nxt_free(path);
+
+ if (pep405) {
+ size = (len + sizeof(bin_python)) * sizeof(wchar_t);
+
+ } else {
+ size = (len + 1) * sizeof(wchar_t);
+ }
+
+ nxt_py_home = nxt_malloc(size);
+ if (nxt_slow_path(nxt_py_home == NULL)) {
+ nxt_alert(task, "Failed to allocate memory");
+ return NXT_ERROR;
+ }
+
+ if (pep405) {
+ mbstowcs(nxt_py_home, c->home, len);
+ mbstowcs(nxt_py_home + len, bin_python, sizeof(bin_python));
+ Py_SetProgramName(nxt_py_home);
+
+ } else {
+ mbstowcs(nxt_py_home, c->home, len + 1);
+ Py_SetPythonHome(nxt_py_home);
+ }
+
+#else
+ nxt_py_home = nxt_malloc(len + 1);
+ if (nxt_slow_path(nxt_py_home == NULL)) {
+ nxt_alert(task, "Failed to allocate memory");
+ return NXT_ERROR;
+ }
+
+ nxt_memcpy(nxt_py_home, c->home, len + 1);
+ Py_SetPythonHome(nxt_py_home);
+#endif
+ }
+
+ Py_InitializeEx(0);
+
+ module = NULL;
+ obj = NULL;
+
+ obj = PySys_GetObject((char *) "stderr");
+ if (nxt_slow_path(obj == NULL)) {
+ nxt_alert(task, "Python failed to get \"sys.stderr\" object");
+ goto fail;
+ }
+
+ nxt_py_stderr_flush = PyObject_GetAttrString(obj, "flush");
+ if (nxt_slow_path(nxt_py_stderr_flush == NULL)) {
+ nxt_alert(task, "Python failed to get \"flush\" attribute of "
+ "\"sys.stderr\" object");
+ goto fail;
+ }
+
+ /* obj is a Borrowed reference. */
+
+ if (c->path.length > 0) {
+ obj = PyString_FromStringAndSize((char *) c->path.start,
+ c->path.length);
+
+ if (nxt_slow_path(obj == NULL)) {
+ nxt_alert(task, "Python failed to create string object \"%V\"",
+ &c->path);
+ goto fail;
+ }
+
+ pypath = PySys_GetObject((char *) "path");
+
+ if (nxt_slow_path(pypath == NULL)) {
+ nxt_alert(task, "Python failed to get \"sys.path\" list");
+ goto fail;
+ }
+
+ if (nxt_slow_path(PyList_Insert(pypath, 0, obj) != 0)) {
+ nxt_alert(task, "Python failed to insert \"%V\" into \"sys.path\"",
+ &c->path);
+ goto fail;
+ }
+
+ Py_DECREF(obj);
+ }
+
+ obj = Py_BuildValue("[s]", "unit");
+ if (nxt_slow_path(obj == NULL)) {
+ nxt_alert(task, "Python failed to create the \"sys.argv\" list");
+ goto fail;
+ }
+
+ if (nxt_slow_path(PySys_SetObject((char *) "argv", obj) != 0)) {
+ nxt_alert(task, "Python failed to set the \"sys.argv\" list");
+ goto fail;
+ }
+
+ Py_CLEAR(obj);
+
+ nxt_py_module = nxt_alloca(c->module.length + 1);
+ nxt_memcpy(nxt_py_module, c->module.start, c->module.length);
+ nxt_py_module[c->module.length] = '\0';
+
+ module = PyImport_ImportModule(nxt_py_module);
+ if (nxt_slow_path(module == NULL)) {
+ nxt_alert(task, "Python failed to import module \"%s\"", nxt_py_module);
+ nxt_python_print_exception();
+ goto fail;
+ }
+
+ callable = (c->callable != NULL) ? c->callable : "application";
+
+ obj = PyDict_GetItemString(PyModule_GetDict(module), callable);
+ if (nxt_slow_path(obj == NULL)) {
+ nxt_alert(task, "Python failed to get \"%s\" "
+ "from module \"%s\"", callable, nxt_py_module);
+ goto fail;
+ }
+
+ if (nxt_slow_path(PyCallable_Check(obj) == 0)) {
+ nxt_alert(task, "\"%s\" in module \"%s\" "
+ "is not a callable object", callable, nxt_py_module);
+ goto fail;
+ }
+
+ nxt_py_application = obj;
+ obj = NULL;
+
+ Py_INCREF(nxt_py_application);
+
+ Py_CLEAR(module);
+
+ nxt_unit_default_init(task, &python_init);
+
+ python_init.shm_limit = data->app->shm_limit;
+
+ asgi = nxt_python_asgi_check(nxt_py_application);
+
+ if (asgi) {
+ rc = nxt_python_asgi_init(task, &python_init);
+
+ } else {
+ rc = nxt_python_wsgi_init(task, &python_init);
+ }
+
+ if (nxt_slow_path(rc == NXT_ERROR)) {
+ goto fail;
+ }
+
+ unit_ctx = nxt_unit_init(&python_init);
+ if (nxt_slow_path(unit_ctx == NULL)) {
+ goto fail;
+ }
+
+ if (asgi) {
+ rc = nxt_python_asgi_run(unit_ctx);
+
+ } else {
+ rc = nxt_python_wsgi_run(unit_ctx);
+ }
+
+ nxt_unit_done(unit_ctx);
+
+ nxt_python_atexit();
+
+ exit(rc);
+
+ return NXT_OK;
+
+fail:
+
+ Py_XDECREF(obj);
+ Py_XDECREF(module);
+
+ nxt_python_atexit();
+
+ return NXT_ERROR;
+}
+
+
+nxt_int_t
+nxt_python_init_strings(nxt_python_string_t *pstr)
+{
+ PyObject *obj;
+
+ while (pstr->string.start != NULL) {
+ obj = PyString_FromStringAndSize((char *) pstr->string.start,
+ pstr->string.length);
+ if (nxt_slow_path(obj == NULL)) {
+ return NXT_ERROR;
+ }
+
+ PyUnicode_InternInPlace(&obj);
+
+ *pstr->object_p = obj;
+
+ pstr++;
+ }
+
+ return NXT_OK;
+}
+
+
+void
+nxt_python_done_strings(nxt_python_string_t *pstr)
+{
+ PyObject *obj;
+
+ while (pstr->string.start != NULL) {
+ obj = *pstr->object_p;
+
+ Py_XDECREF(obj);
+ *pstr->object_p = NULL;
+
+ pstr++;
+ }
+}
+
+
+static void
+nxt_python_atexit(void)
+{
+ nxt_python_wsgi_done();
+ nxt_python_asgi_done();
+
+ Py_XDECREF(nxt_py_stderr_flush);
+ Py_XDECREF(nxt_py_application);
+
+ Py_Finalize();
+
+ if (nxt_py_home != NULL) {
+ nxt_free(nxt_py_home);
+ }
+}
+
+
+void
+nxt_python_print_exception(void)
+{
+ PyErr_Print();
+
+#if PY_MAJOR_VERSION == 3
+ /* The backtrace may be buffered in sys.stderr file object. */
+ {
+ PyObject *result;
+
+ result = PyObject_CallFunction(nxt_py_stderr_flush, NULL);
+ if (nxt_slow_path(result == NULL)) {
+ PyErr_Clear();
+ return;
+ }
+
+ Py_DECREF(result);
+ }
+#endif
+}
diff --git a/src/python/nxt_python.h b/src/python/nxt_python.h
new file mode 100644
index 00000000..3211026b
--- /dev/null
+++ b/src/python/nxt_python.h
@@ -0,0 +1,60 @@
+
+/*
+ * Copyright (C) NGINX, Inc.
+ */
+
+#ifndef _NXT_PYTHON_H_INCLUDED_
+#define _NXT_PYTHON_H_INCLUDED_
+
+
+#include <Python.h>
+#include <nxt_main.h>
+#include <nxt_unit.h>
+
+
+#if PY_MAJOR_VERSION == 3
+#define NXT_PYTHON_BYTES_TYPE "bytestring"
+
+#define PyString_FromStringAndSize(str, size) \
+ PyUnicode_DecodeLatin1((str), (size), "strict")
+#define PyString_AS_STRING PyUnicode_DATA
+
+#else
+#define NXT_PYTHON_BYTES_TYPE "string"
+
+#define PyBytes_FromStringAndSize PyString_FromStringAndSize
+#define PyBytes_Check PyString_Check
+#define PyBytes_GET_SIZE PyString_GET_SIZE
+#define PyBytes_AS_STRING PyString_AS_STRING
+#define PyUnicode_InternInPlace PyString_InternInPlace
+#define PyUnicode_AsUTF8 PyString_AS_STRING
+#endif
+
+#if PY_MAJOR_VERSION == 3 && PY_MINOR_VERSION >= 5
+#define NXT_HAVE_ASGI 1
+#endif
+
+extern PyObject *nxt_py_application;
+
+typedef struct {
+ nxt_str_t string;
+ PyObject **object_p;
+} nxt_python_string_t;
+
+
+nxt_int_t nxt_python_init_strings(nxt_python_string_t *pstr);
+void nxt_python_done_strings(nxt_python_string_t *pstr);
+
+void nxt_python_print_exception(void);
+
+nxt_int_t nxt_python_wsgi_init(nxt_task_t *task, nxt_unit_init_t *init);
+int nxt_python_wsgi_run(nxt_unit_ctx_t *ctx);
+void nxt_python_wsgi_done(void);
+
+int nxt_python_asgi_check(PyObject *obj);
+nxt_int_t nxt_python_asgi_init(nxt_task_t *task, nxt_unit_init_t *init);
+nxt_int_t nxt_python_asgi_run(nxt_unit_ctx_t *ctx);
+void nxt_python_asgi_done(void);
+
+
+#endif /* _NXT_PYTHON_H_INCLUDED_ */
diff --git a/src/python/nxt_python_asgi.c b/src/python/nxt_python_asgi.c
new file mode 100644
index 00000000..72408ea1
--- /dev/null
+++ b/src/python/nxt_python_asgi.c
@@ -0,0 +1,1227 @@
+
+/*
+ * Copyright (C) NGINX, Inc.
+ */
+
+
+#include <python/nxt_python.h>
+
+#if (NXT_HAVE_ASGI)
+
+#include <nxt_main.h>
+#include <nxt_unit.h>
+#include <nxt_unit_request.h>
+#include <nxt_unit_response.h>
+#include <python/nxt_python_asgi.h>
+#include <python/nxt_python_asgi_str.h>
+
+
+static void nxt_py_asgi_request_handler(nxt_unit_request_info_t *req);
+
+static PyObject *nxt_py_asgi_create_http_scope(nxt_unit_request_info_t *req);
+static PyObject *nxt_py_asgi_create_address(nxt_unit_sptr_t *sptr, uint8_t len,
+ uint16_t port);
+static PyObject *nxt_py_asgi_create_header(nxt_unit_field_t *f);
+static PyObject *nxt_py_asgi_create_subprotocols(nxt_unit_field_t *f);
+
+static int nxt_py_asgi_add_port(nxt_unit_ctx_t *ctx, nxt_unit_port_t *port);
+static void nxt_py_asgi_remove_port(nxt_unit_t *lib, nxt_unit_port_t *port);
+static void nxt_py_asgi_quit(nxt_unit_ctx_t *ctx);
+static void nxt_py_asgi_shm_ack_handler(nxt_unit_ctx_t *ctx);
+
+static PyObject *nxt_py_asgi_port_read(PyObject *self, PyObject *args);
+
+
+PyObject *nxt_py_loop_run_until_complete;
+PyObject *nxt_py_loop_create_future;
+PyObject *nxt_py_loop_create_task;
+
+nxt_queue_t nxt_py_asgi_drain_queue;
+
+static PyObject *nxt_py_loop_call_soon;
+static PyObject *nxt_py_quit_future;
+static PyObject *nxt_py_quit_future_set_result;
+static PyObject *nxt_py_loop_add_reader;
+static PyObject *nxt_py_loop_remove_reader;
+static PyObject *nxt_py_port_read;
+
+static PyMethodDef nxt_py_port_read_method =
+ {"unit_port_read", nxt_py_asgi_port_read, METH_VARARGS, ""};
+
+#define NXT_UNIT_HASH_WS_PROTOCOL 0xED0A
+
+
+int
+nxt_python_asgi_check(PyObject *obj)
+{
+ int res;
+ PyObject *call;
+ PyCodeObject *code;
+
+ if (PyFunction_Check(obj)) {
+ code = (PyCodeObject *) PyFunction_GET_CODE(obj);
+
+ return (code->co_flags & CO_COROUTINE) != 0;
+ }
+
+ if (PyMethod_Check(obj)) {
+ obj = PyMethod_GET_FUNCTION(obj);
+
+ code = (PyCodeObject *) PyFunction_GET_CODE(obj);
+
+ return (code->co_flags & CO_COROUTINE) != 0;
+ }
+
+ call = PyObject_GetAttrString(obj, "__call__");
+
+ if (call == NULL) {
+ return 0;
+ }
+
+ if (PyFunction_Check(call)) {
+ code = (PyCodeObject *) PyFunction_GET_CODE(call);
+
+ res = (code->co_flags & CO_COROUTINE) != 0;
+
+ } else {
+ if (PyMethod_Check(call)) {
+ obj = PyMethod_GET_FUNCTION(call);
+
+ code = (PyCodeObject *) PyFunction_GET_CODE(obj);
+
+ res = (code->co_flags & CO_COROUTINE) != 0;
+
+ } else {
+ res = 0;
+ }
+ }
+
+ Py_DECREF(call);
+
+ return res;
+}
+
+
+nxt_int_t
+nxt_python_asgi_init(nxt_task_t *task, nxt_unit_init_t *init)
+{
+ PyObject *asyncio, *loop, *get_event_loop;
+ nxt_int_t rc;
+
+ nxt_debug(task, "asgi_init");
+
+ if (nxt_slow_path(nxt_py_asgi_str_init() != NXT_OK)) {
+ nxt_alert(task, "Python failed to init string objects");
+ return NXT_ERROR;
+ }
+
+ asyncio = PyImport_ImportModule("asyncio");
+ if (nxt_slow_path(asyncio == NULL)) {
+ nxt_alert(task, "Python failed to import module 'asyncio'");
+ nxt_python_print_exception();
+ return NXT_ERROR;
+ }
+
+ loop = NULL;
+ get_event_loop = PyDict_GetItemString(PyModule_GetDict(asyncio),
+ "get_event_loop");
+ if (nxt_slow_path(get_event_loop == NULL)) {
+ nxt_alert(task,
+ "Python failed to get 'get_event_loop' from module 'asyncio'");
+ goto fail;
+ }
+
+ if (nxt_slow_path(PyCallable_Check(get_event_loop) == 0)) {
+ nxt_alert(task, "'asyncio.get_event_loop' is not a callable object");
+ goto fail;
+ }
+
+ loop = PyObject_CallObject(get_event_loop, NULL);
+ if (nxt_slow_path(loop == NULL)) {
+ nxt_alert(task, "Python failed to call 'asyncio.get_event_loop'");
+ goto fail;
+ }
+
+ nxt_py_loop_create_task = PyObject_GetAttrString(loop, "create_task");
+ if (nxt_slow_path(nxt_py_loop_create_task == NULL)) {
+ nxt_alert(task, "Python failed to get 'loop.create_task'");
+ goto fail;
+ }
+
+ if (nxt_slow_path(PyCallable_Check(nxt_py_loop_create_task) == 0)) {
+ nxt_alert(task, "'loop.create_task' is not a callable object");
+ goto fail;
+ }
+
+ nxt_py_loop_add_reader = PyObject_GetAttrString(loop, "add_reader");
+ if (nxt_slow_path(nxt_py_loop_add_reader == NULL)) {
+ nxt_alert(task, "Python failed to get 'loop.add_reader'");
+ goto fail;
+ }
+
+ if (nxt_slow_path(PyCallable_Check(nxt_py_loop_add_reader) == 0)) {
+ nxt_alert(task, "'loop.add_reader' is not a callable object");
+ goto fail;
+ }
+
+ nxt_py_loop_remove_reader = PyObject_GetAttrString(loop, "remove_reader");
+ if (nxt_slow_path(nxt_py_loop_remove_reader == NULL)) {
+ nxt_alert(task, "Python failed to get 'loop.remove_reader'");
+ goto fail;
+ }
+
+ if (nxt_slow_path(PyCallable_Check(nxt_py_loop_remove_reader) == 0)) {
+ nxt_alert(task, "'loop.remove_reader' is not a callable object");
+ goto fail;
+ }
+
+ nxt_py_loop_call_soon = PyObject_GetAttrString(loop, "call_soon");
+ if (nxt_slow_path(nxt_py_loop_call_soon == NULL)) {
+ nxt_alert(task, "Python failed to get 'loop.call_soon'");
+ goto fail;
+ }
+
+ if (nxt_slow_path(PyCallable_Check(nxt_py_loop_call_soon) == 0)) {
+ nxt_alert(task, "'loop.call_soon' is not a callable object");
+ goto fail;
+ }
+
+ nxt_py_loop_run_until_complete = PyObject_GetAttrString(loop,
+ "run_until_complete");
+ if (nxt_slow_path(nxt_py_loop_run_until_complete == NULL)) {
+ nxt_alert(task, "Python failed to get 'loop.run_until_complete'");
+ goto fail;
+ }
+
+ if (nxt_slow_path(PyCallable_Check(nxt_py_loop_run_until_complete) == 0)) {
+ nxt_alert(task, "'loop.run_until_complete' is not a callable object");
+ goto fail;
+ }
+
+ nxt_py_loop_create_future = PyObject_GetAttrString(loop, "create_future");
+ if (nxt_slow_path(nxt_py_loop_create_future == NULL)) {
+ nxt_alert(task, "Python failed to get 'loop.create_future'");
+ goto fail;
+ }
+
+ if (nxt_slow_path(PyCallable_Check(nxt_py_loop_create_future) == 0)) {
+ nxt_alert(task, "'loop.create_future' is not a callable object");
+ goto fail;
+ }
+
+ nxt_py_quit_future = PyObject_CallObject(nxt_py_loop_create_future, NULL);
+ if (nxt_slow_path(nxt_py_quit_future == NULL)) {
+ nxt_alert(task, "Python failed to create Future ");
+ nxt_python_print_exception();
+ goto fail;
+ }
+
+ nxt_py_quit_future_set_result = PyObject_GetAttrString(nxt_py_quit_future,
+ "set_result");
+ if (nxt_slow_path(nxt_py_quit_future_set_result == NULL)) {
+ nxt_alert(task, "Python failed to get 'future.set_result'");
+ goto fail;
+ }
+
+ if (nxt_slow_path(PyCallable_Check(nxt_py_quit_future_set_result) == 0)) {
+ nxt_alert(task, "'future.set_result' is not a callable object");
+ goto fail;
+ }
+
+ nxt_py_port_read = PyCFunction_New(&nxt_py_port_read_method, NULL);
+ if (nxt_slow_path(nxt_py_port_read == NULL)) {
+ nxt_alert(task, "Python failed to initialize the 'port_read' function");
+ goto fail;
+ }
+
+ nxt_queue_init(&nxt_py_asgi_drain_queue);
+
+ if (nxt_slow_path(nxt_py_asgi_http_init(task) == NXT_ERROR)) {
+ goto fail;
+ }
+
+ if (nxt_slow_path(nxt_py_asgi_websocket_init(task) == NXT_ERROR)) {
+ goto fail;
+ }
+
+ rc = nxt_py_asgi_lifespan_startup(task);
+ if (nxt_slow_path(rc == NXT_ERROR)) {
+ goto fail;
+ }
+
+ init->callbacks.request_handler = nxt_py_asgi_request_handler;
+ init->callbacks.data_handler = nxt_py_asgi_http_data_handler;
+ init->callbacks.websocket_handler = nxt_py_asgi_websocket_handler;
+ init->callbacks.close_handler = nxt_py_asgi_websocket_close_handler;
+ init->callbacks.quit = nxt_py_asgi_quit;
+ init->callbacks.shm_ack_handler = nxt_py_asgi_shm_ack_handler;
+ init->callbacks.add_port = nxt_py_asgi_add_port;
+ init->callbacks.remove_port = nxt_py_asgi_remove_port;
+
+ Py_DECREF(loop);
+ Py_DECREF(asyncio);
+
+ return NXT_OK;
+
+fail:
+
+ Py_XDECREF(loop);
+ Py_DECREF(asyncio);
+
+ return NXT_ERROR;
+}
+
+
+nxt_int_t
+nxt_python_asgi_run(nxt_unit_ctx_t *ctx)
+{
+ PyObject *res;
+
+ res = PyObject_CallFunctionObjArgs(nxt_py_loop_run_until_complete,
+ nxt_py_quit_future, NULL);
+ if (nxt_slow_path(res == NULL)) {
+ nxt_unit_alert(ctx, "Python failed to call loop.run_until_complete");
+ nxt_python_print_exception();
+
+ return NXT_ERROR;
+ }
+
+ Py_DECREF(res);
+
+ nxt_py_asgi_lifespan_shutdown();
+
+ return NXT_OK;
+}
+
+
+static void
+nxt_py_asgi_request_handler(nxt_unit_request_info_t *req)
+{
+ PyObject *scope, *res, *task, *receive, *send, *done, *asgi;
+
+ if (req->request->websocket_handshake) {
+ asgi = nxt_py_asgi_websocket_create(req);
+
+ } else {
+ asgi = nxt_py_asgi_http_create(req);
+ }
+
+ if (nxt_slow_path(asgi == NULL)) {
+ nxt_unit_req_alert(req, "Python failed to create asgi object");
+ nxt_unit_request_done(req, NXT_UNIT_ERROR);
+
+ return;
+ }
+
+ receive = PyObject_GetAttrString(asgi, "receive");
+ if (nxt_slow_path(receive == NULL)) {
+ nxt_unit_req_alert(req, "Python failed to get 'receive' method");
+ nxt_unit_request_done(req, NXT_UNIT_ERROR);
+
+ goto release_asgi;
+ }
+
+ send = PyObject_GetAttrString(asgi, "send");
+ if (nxt_slow_path(receive == NULL)) {
+ nxt_unit_req_alert(req, "Python failed to get 'send' method");
+ nxt_unit_request_done(req, NXT_UNIT_ERROR);
+
+ goto release_receive;
+ }
+
+ done = PyObject_GetAttrString(asgi, "_done");
+ if (nxt_slow_path(receive == NULL)) {
+ nxt_unit_req_alert(req, "Python failed to get '_done' method");
+ nxt_unit_request_done(req, NXT_UNIT_ERROR);
+
+ goto release_send;
+ }
+
+ scope = nxt_py_asgi_create_http_scope(req);
+ if (nxt_slow_path(scope == NULL)) {
+ nxt_unit_request_done(req, NXT_UNIT_ERROR);
+
+ goto release_done;
+ }
+
+ req->data = asgi;
+
+ res = PyObject_CallFunctionObjArgs(nxt_py_application,
+ scope, receive, send, NULL);
+ if (nxt_slow_path(res == NULL)) {
+ nxt_unit_req_error(req, "Python failed to call the application");
+ nxt_python_print_exception();
+ nxt_unit_request_done(req, NXT_UNIT_ERROR);
+
+ goto release_scope;
+ }
+
+ if (nxt_slow_path(!PyCoro_CheckExact(res))) {
+ nxt_unit_req_error(req, "Application result type is not a coroutine");
+ nxt_unit_request_done(req, NXT_UNIT_ERROR);
+
+ Py_DECREF(res);
+
+ goto release_scope;
+ }
+
+ task = PyObject_CallFunctionObjArgs(nxt_py_loop_create_task, res, NULL);
+ if (nxt_slow_path(task == NULL)) {
+ nxt_unit_req_error(req, "Python failed to call the create_task");
+ nxt_python_print_exception();
+ nxt_unit_request_done(req, NXT_UNIT_ERROR);
+
+ Py_DECREF(res);
+
+ goto release_scope;
+ }
+
+ Py_DECREF(res);
+
+ res = PyObject_CallMethodObjArgs(task, nxt_py_add_done_callback_str, done,
+ NULL);
+ if (nxt_slow_path(res == NULL)) {
+ nxt_unit_req_error(req,
+ "Python failed to call 'task.add_done_callback'");
+ nxt_python_print_exception();
+ nxt_unit_request_done(req, NXT_UNIT_ERROR);
+
+ goto release_task;
+ }
+
+ Py_DECREF(res);
+release_task:
+ Py_DECREF(task);
+release_scope:
+ Py_DECREF(scope);
+release_done:
+ Py_DECREF(done);
+release_send:
+ Py_DECREF(send);
+release_receive:
+ Py_DECREF(receive);
+release_asgi:
+ Py_DECREF(asgi);
+}
+
+
+static PyObject *
+nxt_py_asgi_create_http_scope(nxt_unit_request_info_t *req)
+{
+ char *p, *target, *query;
+ uint32_t target_length, i;
+ PyObject *scope, *v, *type, *scheme;
+ PyObject *headers, *header;
+ nxt_unit_field_t *f;
+ nxt_unit_request_t *r;
+
+ static const nxt_str_t ws_protocol = nxt_string("sec-websocket-protocol");
+
+#define SET_ITEM(dict, key, value) \
+ if (nxt_slow_path(PyDict_SetItem(dict, nxt_py_ ## key ## _str, value) \
+ == -1)) \
+ { \
+ nxt_unit_req_alert(req, "Python failed to set '" \
+ #dict "." #key "' item"); \
+ goto fail; \
+ }
+
+ v = NULL;
+ headers = NULL;
+
+ r = req->request;
+
+ if (r->websocket_handshake) {
+ type = nxt_py_websocket_str;
+ scheme = r->tls ? nxt_py_wss_str : nxt_py_ws_str;
+
+ } else {
+ type = nxt_py_http_str;
+ scheme = r->tls ? nxt_py_https_str : nxt_py_http_str;
+ }
+
+ scope = nxt_py_asgi_new_scope(req, type, nxt_py_2_1_str);
+ if (nxt_slow_path(scope == NULL)) {
+ return NULL;
+ }
+
+ p = nxt_unit_sptr_get(&r->version);
+ SET_ITEM(scope, http_version, p[7] == '1' ? nxt_py_1_1_str
+ : nxt_py_1_0_str)
+ SET_ITEM(scope, scheme, scheme)
+
+ v = PyString_FromStringAndSize(nxt_unit_sptr_get(&r->method),
+ r->method_length);
+ if (nxt_slow_path(v == NULL)) {
+ nxt_unit_req_alert(req, "Python failed to create 'method' string");
+ goto fail;
+ }
+
+ SET_ITEM(scope, method, v)
+ Py_DECREF(v);
+
+ v = PyUnicode_DecodeUTF8(nxt_unit_sptr_get(&r->path), r->path_length,
+ "replace");
+ if (nxt_slow_path(v == NULL)) {
+ nxt_unit_req_alert(req, "Python failed to create 'path' string");
+ goto fail;
+ }
+
+ SET_ITEM(scope, path, v)
+ Py_DECREF(v);
+
+ target = nxt_unit_sptr_get(&r->target);
+ query = nxt_unit_sptr_get(&r->query);
+
+ if (r->query.offset != 0) {
+ target_length = query - target - 1;
+
+ } else {
+ target_length = r->target_length;
+ }
+
+ v = PyBytes_FromStringAndSize(target, target_length);
+ if (nxt_slow_path(v == NULL)) {
+ nxt_unit_req_alert(req, "Python failed to create 'raw_path' string");
+ goto fail;
+ }
+
+ SET_ITEM(scope, raw_path, v)
+ Py_DECREF(v);
+
+ v = PyBytes_FromStringAndSize(query, r->query_length);
+ if (nxt_slow_path(v == NULL)) {
+ nxt_unit_req_alert(req, "Python failed to create 'query' string");
+ goto fail;
+ }
+
+ SET_ITEM(scope, query_string, v)
+ Py_DECREF(v);
+
+ v = nxt_py_asgi_create_address(&r->remote, r->remote_length, 0);
+ if (nxt_slow_path(v == NULL)) {
+ nxt_unit_req_alert(req, "Python failed to create 'client' pair");
+ goto fail;
+ }
+
+ SET_ITEM(scope, client, v)
+ Py_DECREF(v);
+
+ v = nxt_py_asgi_create_address(&r->local, r->local_length, 80);
+ if (nxt_slow_path(v == NULL)) {
+ nxt_unit_req_alert(req, "Python failed to create 'server' pair");
+ goto fail;
+ }
+
+ SET_ITEM(scope, server, v)
+ Py_DECREF(v);
+
+ v = NULL;
+
+ headers = PyTuple_New(r->fields_count);
+ if (nxt_slow_path(headers == NULL)) {
+ nxt_unit_req_alert(req, "Python failed to create 'headers' object");
+ goto fail;
+ }
+
+ for (i = 0; i < r->fields_count; i++) {
+ f = r->fields + i;
+
+ header = nxt_py_asgi_create_header(f);
+ if (nxt_slow_path(header == NULL)) {
+ nxt_unit_req_alert(req, "Python failed to create 'header' pair");
+ goto fail;
+ }
+
+ PyTuple_SET_ITEM(headers, i, header);
+
+ if (f->hash == NXT_UNIT_HASH_WS_PROTOCOL
+ && f->name_length == ws_protocol.length
+ && f->value_length > 0
+ && r->websocket_handshake)
+ {
+ v = nxt_py_asgi_create_subprotocols(f);
+ if (nxt_slow_path(v == NULL)) {
+ nxt_unit_req_alert(req, "Failed to create subprotocols");
+ goto fail;
+ }
+
+ SET_ITEM(scope, subprotocols, v);
+ Py_DECREF(v);
+ }
+ }
+
+ SET_ITEM(scope, headers, headers)
+ Py_DECREF(headers);
+
+ return scope;
+
+fail:
+
+ Py_XDECREF(v);
+ Py_XDECREF(headers);
+ Py_DECREF(scope);
+
+ return NULL;
+
+#undef SET_ITEM
+}
+
+
+static PyObject *
+nxt_py_asgi_create_address(nxt_unit_sptr_t *sptr, uint8_t len, uint16_t port)
+{
+ char *p, *s;
+ PyObject *pair, *v;
+
+ pair = PyTuple_New(2);
+ if (nxt_slow_path(pair == NULL)) {
+ return NULL;
+ }
+
+ p = nxt_unit_sptr_get(sptr);
+ s = memchr(p, ':', len);
+
+ v = PyString_FromStringAndSize(p, s == NULL ? len : s - p);
+ if (nxt_slow_path(v == NULL)) {
+ Py_DECREF(pair);
+
+ return NULL;
+ }
+
+ PyTuple_SET_ITEM(pair, 0, v);
+
+ if (s != NULL) {
+ p += len;
+ v = PyLong_FromString(s + 1, &p, 10);
+
+ } else {
+ v = PyLong_FromLong(port);
+ }
+
+ if (nxt_slow_path(v == NULL)) {
+ Py_DECREF(pair);
+
+ return NULL;
+ }
+
+ PyTuple_SET_ITEM(pair, 1, v);
+
+ return pair;
+}
+
+
+static PyObject *
+nxt_py_asgi_create_header(nxt_unit_field_t *f)
+{
+ char c, *name;
+ uint8_t pos;
+ PyObject *header, *v;
+
+ header = PyTuple_New(2);
+ if (nxt_slow_path(header == NULL)) {
+ return NULL;
+ }
+
+ name = nxt_unit_sptr_get(&f->name);
+
+ for (pos = 0; pos < f->name_length; pos++) {
+ c = name[pos];
+ if (c >= 'A' && c <= 'Z') {
+ name[pos] = (c | 0x20);
+ }
+ }
+
+ v = PyBytes_FromStringAndSize(name, f->name_length);
+ if (nxt_slow_path(v == NULL)) {
+ Py_DECREF(header);
+
+ return NULL;
+ }
+
+ PyTuple_SET_ITEM(header, 0, v);
+
+ v = PyBytes_FromStringAndSize(nxt_unit_sptr_get(&f->value),
+ f->value_length);
+ if (nxt_slow_path(v == NULL)) {
+ Py_DECREF(header);
+
+ return NULL;
+ }
+
+ PyTuple_SET_ITEM(header, 1, v);
+
+ return header;
+}
+
+
+static PyObject *
+nxt_py_asgi_create_subprotocols(nxt_unit_field_t *f)
+{
+ char *v;
+ uint32_t i, n, start;
+ PyObject *res, *proto;
+
+ v = nxt_unit_sptr_get(&f->value);
+ n = 1;
+
+ for (i = 0; i < f->value_length; i++) {
+ if (v[i] == ',') {
+ n++;
+ }
+ }
+
+ res = PyTuple_New(n);
+ if (nxt_slow_path(res == NULL)) {
+ return NULL;
+ }
+
+ n = 0;
+ start = 0;
+
+ for (i = 0; i < f->value_length; ) {
+ if (v[i] != ',') {
+ i++;
+
+ continue;
+ }
+
+ if (i - start > 0) {
+ proto = PyString_FromStringAndSize(v + start, i - start);
+ if (nxt_slow_path(proto == NULL)) {
+ goto fail;
+ }
+
+ PyTuple_SET_ITEM(res, n, proto);
+
+ n++;
+ }
+
+ do {
+ i++;
+ } while (i < f->value_length && v[i] == ' ');
+
+ start = i;
+ }
+
+ if (i - start > 0) {
+ proto = PyString_FromStringAndSize(v + start, i - start);
+ if (nxt_slow_path(proto == NULL)) {
+ goto fail;
+ }
+
+ PyTuple_SET_ITEM(res, n, proto);
+ }
+
+ return res;
+
+fail:
+
+ Py_DECREF(res);
+
+ return NULL;
+}
+
+
+static int
+nxt_py_asgi_add_port(nxt_unit_ctx_t *ctx, nxt_unit_port_t *port)
+{
+ int nb;
+ PyObject *res;
+
+ if (port->in_fd == -1) {
+ return NXT_UNIT_OK;
+ }
+
+ nb = 1;
+
+ if (nxt_slow_path(ioctl(port->in_fd, FIONBIO, &nb) == -1)) {
+ nxt_unit_alert(ctx, "ioctl(%d, FIONBIO, 0) failed: %s (%d)",
+ port->in_fd, strerror(errno), errno);
+
+ return NXT_UNIT_ERROR;
+ }
+
+ nxt_unit_debug(ctx, "asgi_add_port %d %p %p", port->in_fd, ctx, port);
+
+ res = PyObject_CallFunctionObjArgs(nxt_py_loop_add_reader,
+ PyLong_FromLong(port->in_fd),
+ nxt_py_port_read,
+ PyLong_FromVoidPtr(ctx),
+ PyLong_FromVoidPtr(port), NULL);
+ if (nxt_slow_path(res == NULL)) {
+ nxt_unit_alert(ctx, "Python failed to add_reader");
+
+ return NXT_UNIT_ERROR;
+ }
+
+ Py_DECREF(res);
+
+ return NXT_UNIT_OK;
+}
+
+
+static void
+nxt_py_asgi_remove_port(nxt_unit_t *lib, nxt_unit_port_t *port)
+{
+ PyObject *res;
+
+ nxt_unit_debug(NULL, "asgi_remove_port %d %p", port->in_fd, port);
+
+ if (port->in_fd == -1) {
+ return;
+ }
+
+ res = PyObject_CallFunctionObjArgs(nxt_py_loop_remove_reader,
+ PyLong_FromLong(port->in_fd), NULL);
+ if (nxt_slow_path(res == NULL)) {
+ nxt_unit_alert(NULL, "Python failed to remove_reader");
+ }
+
+ Py_DECREF(res);
+}
+
+
+static void
+nxt_py_asgi_quit(nxt_unit_ctx_t *ctx)
+{
+ PyObject *res;
+
+ nxt_unit_debug(ctx, "asgi_quit %p", ctx);
+
+ res = PyObject_CallFunctionObjArgs(nxt_py_quit_future_set_result,
+ PyLong_FromLong(0), NULL);
+ if (nxt_slow_path(res == NULL)) {
+ nxt_unit_alert(ctx, "Python failed to set_result");
+ }
+
+ Py_DECREF(res);
+}
+
+
+static void
+nxt_py_asgi_shm_ack_handler(nxt_unit_ctx_t *ctx)
+{
+ int rc;
+ nxt_queue_link_t *lnk;
+
+ while (!nxt_queue_is_empty(&nxt_py_asgi_drain_queue)) {
+ lnk = nxt_queue_first(&nxt_py_asgi_drain_queue);
+
+ rc = nxt_py_asgi_http_drain(lnk);
+ if (rc == NXT_UNIT_AGAIN) {
+ break;
+ }
+
+ nxt_queue_remove(lnk);
+ }
+}
+
+
+static PyObject *
+nxt_py_asgi_port_read(PyObject *self, PyObject *args)
+{
+ int rc;
+ PyObject *arg;
+ Py_ssize_t n;
+ nxt_unit_ctx_t *ctx;
+ nxt_unit_port_t *port;
+
+ n = PyTuple_GET_SIZE(args);
+
+ if (n != 2) {
+ nxt_unit_alert(NULL,
+ "nxt_py_asgi_port_read: invalid number of arguments %d",
+ (int) n);
+
+ return PyErr_Format(PyExc_TypeError, "invalid number of arguments");
+ }
+
+ arg = PyTuple_GET_ITEM(args, 0);
+ if (nxt_slow_path(arg == NULL || PyLong_Check(arg) == 0)) {
+ return PyErr_Format(PyExc_TypeError,
+ "the first argument is not a long");
+ }
+
+ ctx = PyLong_AsVoidPtr(arg);
+
+ arg = PyTuple_GET_ITEM(args, 1);
+ if (nxt_slow_path(arg == NULL || PyLong_Check(arg) == 0)) {
+ return PyErr_Format(PyExc_TypeError,
+ "the second argument is not a long");
+ }
+
+ port = PyLong_AsVoidPtr(arg);
+
+ nxt_unit_debug(ctx, "asgi_port_read %p %p", ctx, port);
+
+ rc = nxt_unit_process_port_msg(ctx, port);
+
+ if (nxt_slow_path(rc == NXT_UNIT_ERROR)) {
+ return PyErr_Format(PyExc_RuntimeError,
+ "error processing port message");
+ }
+
+ Py_RETURN_NONE;
+}
+
+
+PyObject *
+nxt_py_asgi_enum_headers(PyObject *headers, nxt_py_asgi_enum_header_cb cb,
+ void *data)
+{
+ int i;
+ PyObject *iter, *header, *h_iter, *name, *val, *res;
+
+ iter = PyObject_GetIter(headers);
+ if (nxt_slow_path(iter == NULL)) {
+ return PyErr_Format(PyExc_TypeError, "'headers' is not an iterable");
+ }
+
+ for (i = 0; /* void */; i++) {
+ header = PyIter_Next(iter);
+ if (header == NULL) {
+ break;
+ }
+
+ h_iter = PyObject_GetIter(header);
+ if (nxt_slow_path(h_iter == NULL)) {
+ Py_DECREF(header);
+ Py_DECREF(iter);
+
+ return PyErr_Format(PyExc_TypeError,
+ "'headers' item #%d is not an iterable", i);
+ }
+
+ name = PyIter_Next(h_iter);
+ if (nxt_slow_path(name == NULL || !PyBytes_Check(name))) {
+ Py_XDECREF(name);
+ Py_DECREF(h_iter);
+ Py_DECREF(header);
+ Py_DECREF(iter);
+
+ return PyErr_Format(PyExc_TypeError,
+ "'headers' item #%d 'name' is not a byte string", i);
+ }
+
+ val = PyIter_Next(h_iter);
+ if (nxt_slow_path(val == NULL || !PyBytes_Check(val))) {
+ Py_XDECREF(val);
+ Py_DECREF(h_iter);
+ Py_DECREF(header);
+ Py_DECREF(iter);
+
+ return PyErr_Format(PyExc_TypeError,
+ "'headers' item #%d 'value' is not a byte string", i);
+ }
+
+ res = cb(data, i, name, val);
+
+ Py_DECREF(name);
+ Py_DECREF(val);
+ Py_DECREF(h_iter);
+ Py_DECREF(header);
+
+ if (nxt_slow_path(res == NULL)) {
+ Py_DECREF(iter);
+
+ return NULL;
+ }
+
+ Py_DECREF(res);
+ }
+
+ Py_DECREF(iter);
+
+ Py_RETURN_NONE;
+}
+
+
+PyObject *
+nxt_py_asgi_calc_size(void *data, int i, PyObject *name, PyObject *val)
+{
+ nxt_py_asgi_calc_size_ctx_t *ctx;
+
+ ctx = data;
+
+ ctx->fields_count++;
+ ctx->fields_size += PyBytes_GET_SIZE(name) + PyBytes_GET_SIZE(val);
+
+ Py_RETURN_NONE;
+}
+
+
+PyObject *
+nxt_py_asgi_add_field(void *data, int i, PyObject *name, PyObject *val)
+{
+ int rc;
+ char *name_str, *val_str;
+ uint32_t name_len, val_len;
+ nxt_off_t content_length;
+ nxt_unit_request_info_t *req;
+ nxt_py_asgi_add_field_ctx_t *ctx;
+
+ name_str = PyBytes_AS_STRING(name);
+ name_len = PyBytes_GET_SIZE(name);
+
+ val_str = PyBytes_AS_STRING(val);
+ val_len = PyBytes_GET_SIZE(val);
+
+ ctx = data;
+ req = ctx->req;
+
+ rc = nxt_unit_response_add_field(req, name_str, name_len,
+ val_str, val_len);
+ if (nxt_slow_path(rc != NXT_UNIT_OK)) {
+ return PyErr_Format(PyExc_RuntimeError,
+ "failed to add header #%d", i);
+ }
+
+ if (req->response->fields[i].hash == NXT_UNIT_HASH_CONTENT_LENGTH) {
+ content_length = nxt_off_t_parse((u_char *) val_str, val_len);
+ if (nxt_slow_path(content_length < 0)) {
+ nxt_unit_req_error(req, "failed to parse Content-Length "
+ "value %.*s", (int) val_len, val_str);
+
+ return PyErr_Format(PyExc_ValueError,
+ "Failed to parse Content-Length: '%.*s'",
+ (int) val_len, val_str);
+ }
+
+ ctx->content_length = content_length;
+ }
+
+ Py_RETURN_NONE;
+}
+
+
+PyObject *
+nxt_py_asgi_set_result_soon(nxt_unit_request_info_t *req, PyObject *future,
+ PyObject *result)
+{
+ PyObject *set_result, *res;
+
+ if (nxt_slow_path(result == NULL)) {
+ Py_DECREF(future);
+
+ return NULL;
+ }
+
+ set_result = PyObject_GetAttrString(future, "set_result");
+ if (nxt_slow_path(set_result == NULL)) {
+ nxt_unit_req_alert(req, "failed to get 'set_result' for future");
+
+ Py_CLEAR(future);
+
+ goto cleanup;
+ }
+
+ if (nxt_slow_path(PyCallable_Check(set_result) == 0)) {
+ nxt_unit_req_alert(req, "'future.set_result' is not a callable");
+
+ Py_CLEAR(future);
+
+ goto cleanup;
+ }
+
+ res = PyObject_CallFunctionObjArgs(nxt_py_loop_call_soon, set_result,
+ result, NULL);
+ if (nxt_slow_path(res == NULL)) {
+ nxt_unit_req_alert(req, "Python failed to call 'loop.call_soon'");
+ nxt_python_print_exception();
+
+ Py_CLEAR(future);
+ }
+
+ Py_XDECREF(res);
+
+cleanup:
+
+ Py_DECREF(set_result);
+ Py_DECREF(result);
+
+ return future;
+}
+
+
+PyObject *
+nxt_py_asgi_new_msg(nxt_unit_request_info_t *req, PyObject *type)
+{
+ PyObject *msg;
+
+ msg = PyDict_New();
+ if (nxt_slow_path(msg == NULL)) {
+ nxt_unit_req_alert(req, "Python failed to create message dict");
+ nxt_python_print_exception();
+
+ return PyErr_Format(PyExc_RuntimeError,
+ "failed to create message dict");
+ }
+
+ if (nxt_slow_path(PyDict_SetItem(msg, nxt_py_type_str, type) == -1)) {
+ nxt_unit_req_alert(req, "Python failed to set 'msg.type' item");
+
+ Py_DECREF(msg);
+
+ return PyErr_Format(PyExc_RuntimeError,
+ "failed to set 'msg.type' item");
+ }
+
+ return msg;
+}
+
+
+PyObject *
+nxt_py_asgi_new_scope(nxt_unit_request_info_t *req, PyObject *type,
+ PyObject *spec_version)
+{
+ PyObject *scope, *asgi;
+
+ scope = PyDict_New();
+ if (nxt_slow_path(scope == NULL)) {
+ nxt_unit_req_alert(req, "Python failed to create 'scope' dict");
+ nxt_python_print_exception();
+
+ return PyErr_Format(PyExc_RuntimeError,
+ "failed to create 'scope' dict");
+ }
+
+ if (nxt_slow_path(PyDict_SetItem(scope, nxt_py_type_str, type) == -1)) {
+ nxt_unit_req_alert(req, "Python failed to set 'scope.type' item");
+
+ Py_DECREF(scope);
+
+ return PyErr_Format(PyExc_RuntimeError,
+ "failed to set 'scope.type' item");
+ }
+
+ asgi = PyDict_New();
+ if (nxt_slow_path(asgi == NULL)) {
+ nxt_unit_req_alert(req, "Python failed to create 'asgi' dict");
+ nxt_python_print_exception();
+
+ Py_DECREF(scope);
+
+ return PyErr_Format(PyExc_RuntimeError,
+ "failed to create 'asgi' dict");
+ }
+
+ if (nxt_slow_path(PyDict_SetItem(scope, nxt_py_asgi_str, asgi) == -1)) {
+ nxt_unit_req_alert(req, "Python failed to set 'scope.asgi' item");
+
+ Py_DECREF(asgi);
+ Py_DECREF(scope);
+
+ return PyErr_Format(PyExc_RuntimeError,
+ "failed to set 'scope.asgi' item");
+ }
+
+ if (nxt_slow_path(PyDict_SetItem(asgi, nxt_py_version_str,
+ nxt_py_3_0_str) == -1))
+ {
+ nxt_unit_req_alert(req, "Python failed to set 'asgi.version' item");
+
+ Py_DECREF(asgi);
+ Py_DECREF(scope);
+
+ return PyErr_Format(PyExc_RuntimeError,
+ "failed to set 'asgi.version' item");
+ }
+
+ if (nxt_slow_path(PyDict_SetItem(asgi, nxt_py_spec_version_str,
+ spec_version) == -1))
+ {
+ nxt_unit_req_alert(req,
+ "Python failed to set 'asgi.spec_version' item");
+
+ Py_DECREF(asgi);
+ Py_DECREF(scope);
+
+ return PyErr_Format(PyExc_RuntimeError,
+ "failed to set 'asgi.spec_version' item");
+ }
+
+ Py_DECREF(asgi);
+
+ return scope;
+}
+
+
+void
+nxt_py_asgi_dealloc(PyObject *self)
+{
+ PyObject_Del(self);
+}
+
+
+PyObject *
+nxt_py_asgi_await(PyObject *self)
+{
+ Py_INCREF(self);
+ return self;
+}
+
+
+PyObject *
+nxt_py_asgi_iter(PyObject *self)
+{
+ Py_INCREF(self);
+ return self;
+}
+
+
+PyObject *
+nxt_py_asgi_next(PyObject *self)
+{
+ return NULL;
+}
+
+
+void
+nxt_python_asgi_done(void)
+{
+ nxt_py_asgi_str_done();
+
+ Py_XDECREF(nxt_py_quit_future);
+ Py_XDECREF(nxt_py_quit_future_set_result);
+ Py_XDECREF(nxt_py_loop_run_until_complete);
+ Py_XDECREF(nxt_py_loop_create_future);
+ Py_XDECREF(nxt_py_loop_create_task);
+ Py_XDECREF(nxt_py_loop_call_soon);
+ Py_XDECREF(nxt_py_loop_add_reader);
+ Py_XDECREF(nxt_py_loop_remove_reader);
+ Py_XDECREF(nxt_py_port_read);
+}
+
+#else /* !(NXT_HAVE_ASGI) */
+
+
+int
+nxt_python_asgi_check(PyObject *obj)
+{
+ return 0;
+}
+
+
+nxt_int_t
+nxt_python_asgi_init(nxt_task_t *task, nxt_unit_init_t *init)
+{
+ nxt_alert(task, "ASGI not implemented");
+ return NXT_ERROR;
+}
+
+
+nxt_int_t
+nxt_python_asgi_run(nxt_unit_ctx_t *ctx)
+{
+ nxt_unit_alert(ctx, "ASGI not implemented");
+ return NXT_ERROR;
+}
+
+
+void
+nxt_python_asgi_done(void)
+{
+}
+
+#endif /* NXT_HAVE_ASGI */
diff --git a/src/python/nxt_python_asgi.h b/src/python/nxt_python_asgi.h
new file mode 100644
index 00000000..24337c37
--- /dev/null
+++ b/src/python/nxt_python_asgi.h
@@ -0,0 +1,60 @@
+
+/*
+ * Copyright (C) NGINX, Inc.
+ */
+
+#ifndef _NXT_PYTHON_ASGI_H_INCLUDED_
+#define _NXT_PYTHON_ASGI_H_INCLUDED_
+
+
+typedef PyObject * (*nxt_py_asgi_enum_header_cb)(void *ctx, int i,
+ PyObject *name, PyObject *val);
+
+typedef struct {
+ uint32_t fields_count;
+ uint32_t fields_size;
+} nxt_py_asgi_calc_size_ctx_t;
+
+typedef struct {
+ nxt_unit_request_info_t *req;
+ uint64_t content_length;
+} nxt_py_asgi_add_field_ctx_t;
+
+PyObject *nxt_py_asgi_enum_headers(PyObject *headers,
+ nxt_py_asgi_enum_header_cb cb, void *data);
+
+PyObject *nxt_py_asgi_calc_size(void *data, int i, PyObject *n, PyObject *v);
+PyObject *nxt_py_asgi_add_field(void *data, int i, PyObject *n, PyObject *v);
+
+PyObject *nxt_py_asgi_set_result_soon(nxt_unit_request_info_t *req,
+ PyObject *future, PyObject *result);
+PyObject *nxt_py_asgi_new_msg(nxt_unit_request_info_t *req, PyObject *type);
+PyObject *nxt_py_asgi_new_scope(nxt_unit_request_info_t *req, PyObject *type,
+ PyObject *spec_version);
+
+void nxt_py_asgi_dealloc(PyObject *self);
+PyObject *nxt_py_asgi_await(PyObject *self);
+PyObject *nxt_py_asgi_iter(PyObject *self);
+PyObject *nxt_py_asgi_next(PyObject *self);
+
+nxt_int_t nxt_py_asgi_http_init(nxt_task_t *task);
+PyObject *nxt_py_asgi_http_create(nxt_unit_request_info_t *req);
+void nxt_py_asgi_http_data_handler(nxt_unit_request_info_t *req);
+int nxt_py_asgi_http_drain(nxt_queue_link_t *lnk);
+
+nxt_int_t nxt_py_asgi_websocket_init(nxt_task_t *task);
+PyObject *nxt_py_asgi_websocket_create(nxt_unit_request_info_t *req);
+void nxt_py_asgi_websocket_handler(nxt_unit_websocket_frame_t *ws);
+void nxt_py_asgi_websocket_close_handler(nxt_unit_request_info_t *req);
+
+nxt_int_t nxt_py_asgi_lifespan_startup(nxt_task_t *task);
+nxt_int_t nxt_py_asgi_lifespan_shutdown(void);
+
+extern PyObject *nxt_py_loop_run_until_complete;
+extern PyObject *nxt_py_loop_create_future;
+extern PyObject *nxt_py_loop_create_task;
+
+extern nxt_queue_t nxt_py_asgi_drain_queue;
+
+
+#endif /* _NXT_PYTHON_ASGI_H_INCLUDED_ */
diff --git a/src/python/nxt_python_asgi_http.c b/src/python/nxt_python_asgi_http.c
new file mode 100644
index 00000000..b07d61d6
--- /dev/null
+++ b/src/python/nxt_python_asgi_http.c
@@ -0,0 +1,591 @@
+
+/*
+ * Copyright (C) NGINX, Inc.
+ */
+
+
+#include <python/nxt_python.h>
+
+#if (NXT_HAVE_ASGI)
+
+#include <nxt_main.h>
+#include <nxt_unit.h>
+#include <nxt_unit_request.h>
+#include <python/nxt_python_asgi.h>
+#include <python/nxt_python_asgi_str.h>
+
+
+typedef struct {
+ PyObject_HEAD
+ nxt_unit_request_info_t *req;
+ nxt_queue_link_t link;
+ PyObject *receive_future;
+ PyObject *send_future;
+ uint64_t content_length;
+ uint64_t bytes_sent;
+ int complete;
+ PyObject *send_body;
+ Py_ssize_t send_body_off;
+} nxt_py_asgi_http_t;
+
+
+static PyObject *nxt_py_asgi_http_receive(PyObject *self, PyObject *none);
+static PyObject *nxt_py_asgi_http_read_msg(nxt_py_asgi_http_t *http);
+static PyObject *nxt_py_asgi_http_send(PyObject *self, PyObject *dict);
+static PyObject *nxt_py_asgi_http_response_start(nxt_py_asgi_http_t *http,
+ PyObject *dict);
+static PyObject *nxt_py_asgi_http_response_body(nxt_py_asgi_http_t *http,
+ PyObject *dict);
+static PyObject *nxt_py_asgi_http_done(PyObject *self, PyObject *future);
+
+
+static PyMethodDef nxt_py_asgi_http_methods[] = {
+ { "receive", nxt_py_asgi_http_receive, METH_NOARGS, 0 },
+ { "send", nxt_py_asgi_http_send, METH_O, 0 },
+ { "_done", nxt_py_asgi_http_done, METH_O, 0 },
+ { NULL, NULL, 0, 0 }
+};
+
+static PyAsyncMethods nxt_py_asgi_async_methods = {
+ .am_await = nxt_py_asgi_await,
+};
+
+static PyTypeObject nxt_py_asgi_http_type = {
+ PyVarObject_HEAD_INIT(NULL, 0)
+
+ .tp_name = "unit._asgi_http",
+ .tp_basicsize = sizeof(nxt_py_asgi_http_t),
+ .tp_dealloc = nxt_py_asgi_dealloc,
+ .tp_as_async = &nxt_py_asgi_async_methods,
+ .tp_flags = Py_TPFLAGS_DEFAULT,
+ .tp_doc = "unit ASGI HTTP request object",
+ .tp_iter = nxt_py_asgi_iter,
+ .tp_iternext = nxt_py_asgi_next,
+ .tp_methods = nxt_py_asgi_http_methods,
+};
+
+static Py_ssize_t nxt_py_asgi_http_body_buf_size = 32 * 1024 * 1024;
+
+
+nxt_int_t
+nxt_py_asgi_http_init(nxt_task_t *task)
+{
+ if (nxt_slow_path(PyType_Ready(&nxt_py_asgi_http_type) != 0)) {
+ nxt_alert(task, "Python failed to initialize the 'http' type object");
+ return NXT_ERROR;
+ }
+
+ return NXT_OK;
+}
+
+
+PyObject *
+nxt_py_asgi_http_create(nxt_unit_request_info_t *req)
+{
+ nxt_py_asgi_http_t *http;
+
+ http = PyObject_New(nxt_py_asgi_http_t, &nxt_py_asgi_http_type);
+
+ if (nxt_fast_path(http != NULL)) {
+ http->req = req;
+ http->receive_future = NULL;
+ http->send_future = NULL;
+ http->content_length = -1;
+ http->bytes_sent = 0;
+ http->complete = 0;
+ http->send_body = NULL;
+ http->send_body_off = 0;
+ }
+
+ return (PyObject *) http;
+}
+
+
+static PyObject *
+nxt_py_asgi_http_receive(PyObject *self, PyObject *none)
+{
+ PyObject *msg, *future;
+ nxt_py_asgi_http_t *http;
+ nxt_unit_request_info_t *req;
+
+ http = (nxt_py_asgi_http_t *) self;
+ req = http->req;
+
+ nxt_unit_req_debug(req, "asgi_http_receive");
+
+ msg = nxt_py_asgi_http_read_msg(http);
+ if (nxt_slow_path(msg == NULL)) {
+ return NULL;
+ }
+
+ future = PyObject_CallObject(nxt_py_loop_create_future, NULL);
+ if (nxt_slow_path(future == NULL)) {
+ nxt_unit_req_alert(req, "Python failed to create Future object");
+ nxt_python_print_exception();
+
+ Py_DECREF(msg);
+
+ return PyErr_Format(PyExc_RuntimeError,
+ "failed to create Future object");
+ }
+
+ if (msg != Py_None) {
+ return nxt_py_asgi_set_result_soon(req, future, msg);
+ }
+
+ http->receive_future = future;
+ Py_INCREF(http->receive_future);
+
+ Py_DECREF(msg);
+
+ return future;
+}
+
+
+static PyObject *
+nxt_py_asgi_http_read_msg(nxt_py_asgi_http_t *http)
+{
+ char *body_buf;
+ ssize_t read_res;
+ PyObject *msg, *body;
+ Py_ssize_t size;
+ nxt_unit_request_info_t *req;
+
+ req = http->req;
+
+ size = req->content_length;
+
+ if (size > nxt_py_asgi_http_body_buf_size) {
+ size = nxt_py_asgi_http_body_buf_size;
+ }
+
+ if (size > 0) {
+ body = PyBytes_FromStringAndSize(NULL, size);
+ if (nxt_slow_path(body == NULL)) {
+ nxt_unit_req_alert(req, "Python failed to create body byte string");
+ nxt_python_print_exception();
+
+ return PyErr_Format(PyExc_RuntimeError,
+ "failed to create Bytes object");
+ }
+
+ body_buf = PyBytes_AS_STRING(body);
+
+ read_res = nxt_unit_request_read(req, body_buf, size);
+
+ } else {
+ body = NULL;
+ read_res = 0;
+ }
+
+ if (read_res > 0 || read_res == size) {
+ msg = nxt_py_asgi_new_msg(req, nxt_py_http_request_str);
+ if (nxt_slow_path(msg == NULL)) {
+ Py_XDECREF(body);
+
+ return NULL;
+ }
+
+#define SET_ITEM(dict, key, value) \
+ if (nxt_slow_path(PyDict_SetItem(dict, nxt_py_ ## key ## _str, value) \
+ == -1)) \
+ { \
+ nxt_unit_req_alert(req, \
+ "Python failed to set '" #dict "." #key "' item"); \
+ PyErr_SetString(PyExc_RuntimeError, \
+ "Python failed to set '" #dict "." #key "' item"); \
+ goto fail; \
+ }
+
+ if (body != NULL) {
+ SET_ITEM(msg, body, body)
+ }
+
+ if (req->content_length > 0) {
+ SET_ITEM(msg, more_body, Py_True)
+ }
+
+#undef SET_ITEM
+
+ Py_XDECREF(body);
+
+ return msg;
+ }
+
+ Py_XDECREF(body);
+
+ Py_RETURN_NONE;
+
+fail:
+
+ Py_DECREF(msg);
+ Py_XDECREF(body);
+
+ return NULL;
+}
+
+
+static PyObject *
+nxt_py_asgi_http_send(PyObject *self, PyObject *dict)
+{
+ PyObject *type;
+ const char *type_str;
+ Py_ssize_t type_len;
+ nxt_py_asgi_http_t *http;
+
+ static const nxt_str_t response_start = nxt_string("http.response.start");
+ static const nxt_str_t response_body = nxt_string("http.response.body");
+
+ http = (nxt_py_asgi_http_t *) self;
+
+ type = PyDict_GetItem(dict, nxt_py_type_str);
+ if (nxt_slow_path(type == NULL || !PyUnicode_Check(type))) {
+ nxt_unit_req_error(http->req, "asgi_http_send: "
+ "'type' is not a unicode string");
+ return PyErr_Format(PyExc_TypeError, "'type' is not a unicode string");
+ }
+
+ type_str = PyUnicode_AsUTF8AndSize(type, &type_len);
+
+ nxt_unit_req_debug(http->req, "asgi_http_send type is '%.*s'",
+ (int) type_len, type_str);
+
+ if (type_len == (Py_ssize_t) response_start.length
+ && memcmp(type_str, response_start.start, type_len) == 0)
+ {
+ return nxt_py_asgi_http_response_start(http, dict);
+ }
+
+ if (type_len == (Py_ssize_t) response_body.length
+ && memcmp(type_str, response_body.start, type_len) == 0)
+ {
+ return nxt_py_asgi_http_response_body(http, dict);
+ }
+
+ nxt_unit_req_error(http->req, "asgi_http_send: unexpected 'type': '%.*s'",
+ (int) type_len, type_str);
+
+ return PyErr_Format(PyExc_AssertionError, "unexpected 'type': '%U'", type);
+}
+
+
+static PyObject *
+nxt_py_asgi_http_response_start(nxt_py_asgi_http_t *http, PyObject *dict)
+{
+ int rc;
+ PyObject *status, *headers, *res;
+ nxt_py_asgi_calc_size_ctx_t calc_size_ctx;
+ nxt_py_asgi_add_field_ctx_t add_field_ctx;
+
+ status = PyDict_GetItem(dict, nxt_py_status_str);
+ if (nxt_slow_path(status == NULL || !PyLong_Check(status))) {
+ nxt_unit_req_error(http->req, "asgi_http_response_start: "
+ "'status' is not an integer");
+ return PyErr_Format(PyExc_TypeError, "'status' is not an integer");
+ }
+
+ calc_size_ctx.fields_size = 0;
+ calc_size_ctx.fields_count = 0;
+
+ headers = PyDict_GetItem(dict, nxt_py_headers_str);
+ if (headers != NULL) {
+ res = nxt_py_asgi_enum_headers(headers, nxt_py_asgi_calc_size,
+ &calc_size_ctx);
+ if (nxt_slow_path(res == NULL)) {
+ return NULL;
+ }
+
+ Py_DECREF(res);
+ }
+
+ rc = nxt_unit_response_init(http->req, PyLong_AsLong(status),
+ calc_size_ctx.fields_count,
+ calc_size_ctx.fields_size);
+ if (nxt_slow_path(rc != NXT_UNIT_OK)) {
+ return PyErr_Format(PyExc_RuntimeError,
+ "failed to allocate response object");
+ }
+
+ add_field_ctx.req = http->req;
+ add_field_ctx.content_length = -1;
+
+ if (headers != NULL) {
+ res = nxt_py_asgi_enum_headers(headers, nxt_py_asgi_add_field,
+ &add_field_ctx);
+ if (nxt_slow_path(res == NULL)) {
+ return NULL;
+ }
+
+ Py_DECREF(res);
+ }
+
+ http->content_length = add_field_ctx.content_length;
+
+ Py_INCREF(http);
+ return (PyObject *) http;
+}
+
+
+static PyObject *
+nxt_py_asgi_http_response_body(nxt_py_asgi_http_t *http, PyObject *dict)
+{
+ int rc;
+ char *body_str;
+ ssize_t sent;
+ PyObject *body, *more_body, *future;
+ Py_ssize_t body_len, body_off;
+
+ body = PyDict_GetItem(dict, nxt_py_body_str);
+ if (nxt_slow_path(body != NULL && !PyBytes_Check(body))) {
+ return PyErr_Format(PyExc_TypeError, "'body' is not a byte string");
+ }
+
+ more_body = PyDict_GetItem(dict, nxt_py_more_body_str);
+ if (nxt_slow_path(more_body != NULL && !PyBool_Check(more_body))) {
+ return PyErr_Format(PyExc_TypeError, "'more_body' is not a bool");
+ }
+
+ if (nxt_slow_path(http->complete)) {
+ return PyErr_Format(PyExc_RuntimeError,
+ "Unexpected ASGI message 'http.response.body' "
+ "sent, after response already completed");
+ }
+
+ if (nxt_slow_path(http->send_future != NULL)) {
+ return PyErr_Format(PyExc_RuntimeError, "Concurrent send");
+ }
+
+ if (body != NULL) {
+ body_str = PyBytes_AS_STRING(body);
+ body_len = PyBytes_GET_SIZE(body);
+
+ nxt_unit_req_debug(http->req, "asgi_http_response_body: %d, %d",
+ (int) body_len, (more_body == Py_True) );
+
+ if (nxt_slow_path(http->bytes_sent + body_len
+ > http->content_length))
+ {
+ return PyErr_Format(PyExc_RuntimeError,
+ "Response content longer than Content-Length");
+ }
+
+ body_off = 0;
+
+ while (body_len > 0) {
+ sent = nxt_unit_response_write_nb(http->req, body_str, body_len, 0);
+ if (nxt_slow_path(sent < 0)) {
+ return PyErr_Format(PyExc_RuntimeError, "failed to send body");
+ }
+
+ if (nxt_slow_path(sent == 0)) {
+ nxt_unit_req_debug(http->req, "asgi_http_response_body: "
+ "out of shared memory, %d",
+ (int) body_len);
+
+ future = PyObject_CallObject(nxt_py_loop_create_future, NULL);
+ if (nxt_slow_path(future == NULL)) {
+ nxt_unit_req_alert(http->req,
+ "Python failed to create Future object");
+ nxt_python_print_exception();
+
+ return PyErr_Format(PyExc_RuntimeError,
+ "failed to create Future object");
+ }
+
+ http->send_body = body;
+ Py_INCREF(http->send_body);
+ http->send_body_off = body_off;
+
+ nxt_queue_insert_tail(&nxt_py_asgi_drain_queue, &http->link);
+
+ http->send_future = future;
+ Py_INCREF(http->send_future);
+
+ return future;
+ }
+
+ body_str += sent;
+ body_len -= sent;
+ body_off += sent;
+ http->bytes_sent += sent;
+ }
+
+ } else {
+ nxt_unit_req_debug(http->req, "asgi_http_response_body: 0, %d",
+ (more_body == Py_True) );
+
+ if (!nxt_unit_response_is_sent(http->req)) {
+ rc = nxt_unit_response_send(http->req);
+ if (nxt_slow_path(rc != NXT_UNIT_OK)) {
+ return PyErr_Format(PyExc_RuntimeError,
+ "failed to send response");
+ }
+ }
+ }
+
+ if (more_body == NULL || more_body == Py_False) {
+ http->complete = 1;
+ }
+
+ Py_INCREF(http);
+ return (PyObject *) http;
+}
+
+
+void
+nxt_py_asgi_http_data_handler(nxt_unit_request_info_t *req)
+{
+ PyObject *msg, *future, *res;
+ nxt_py_asgi_http_t *http;
+
+ http = req->data;
+
+ nxt_unit_req_debug(req, "asgi_http_data_handler");
+
+ if (http->receive_future == NULL) {
+ return;
+ }
+
+ msg = nxt_py_asgi_http_read_msg(http);
+ if (nxt_slow_path(msg == NULL)) {
+ return;
+ }
+
+ if (msg == Py_None) {
+ Py_DECREF(msg);
+ return;
+ }
+
+ future = http->receive_future;
+ http->receive_future = NULL;
+
+ res = PyObject_CallMethodObjArgs(future, nxt_py_set_result_str, msg, NULL);
+ if (nxt_slow_path(res == NULL)) {
+ nxt_unit_req_alert(req, "'set_result' call failed");
+ nxt_python_print_exception();
+ }
+
+ Py_XDECREF(res);
+ Py_DECREF(future);
+
+ Py_DECREF(msg);
+}
+
+
+int
+nxt_py_asgi_http_drain(nxt_queue_link_t *lnk)
+{
+ char *body_str;
+ ssize_t sent;
+ PyObject *future, *exc, *res;
+ Py_ssize_t body_len;
+ nxt_py_asgi_http_t *http;
+
+ http = nxt_container_of(lnk, nxt_py_asgi_http_t, link);
+
+ body_str = PyBytes_AS_STRING(http->send_body) + http->send_body_off;
+ body_len = PyBytes_GET_SIZE(http->send_body) - http->send_body_off;
+
+ nxt_unit_req_debug(http->req, "asgi_http_drain: %d", (int) body_len);
+
+ while (body_len > 0) {
+ sent = nxt_unit_response_write_nb(http->req, body_str, body_len, 0);
+ if (nxt_slow_path(sent < 0)) {
+ goto fail;
+ }
+
+ if (nxt_slow_path(sent == 0)) {
+ return NXT_UNIT_AGAIN;
+ }
+
+ body_str += sent;
+ body_len -= sent;
+
+ http->send_body_off += sent;
+ http->bytes_sent += sent;
+ }
+
+ Py_CLEAR(http->send_body);
+
+ future = http->send_future;
+ http->send_future = NULL;
+
+ res = PyObject_CallMethodObjArgs(future, nxt_py_set_result_str, Py_None,
+ NULL);
+ if (nxt_slow_path(res == NULL)) {
+ nxt_unit_req_alert(http->req, "'set_result' call failed");
+ nxt_python_print_exception();
+ }
+
+ Py_XDECREF(res);
+ Py_DECREF(future);
+
+ return NXT_UNIT_OK;
+
+fail:
+
+ exc = PyObject_CallFunctionObjArgs(PyExc_RuntimeError,
+ nxt_py_failed_to_send_body_str,
+ NULL);
+ if (nxt_slow_path(exc == NULL)) {
+ nxt_unit_req_alert(http->req, "RuntimeError create failed");
+ nxt_python_print_exception();
+
+ exc = Py_None;
+ Py_INCREF(exc);
+ }
+
+ future = http->send_future;
+ http->send_future = NULL;
+
+ res = PyObject_CallMethodObjArgs(future, nxt_py_set_exception_str, exc,
+ NULL);
+ if (nxt_slow_path(res == NULL)) {
+ nxt_unit_req_alert(http->req, "'set_exception' call failed");
+ nxt_python_print_exception();
+ }
+
+ Py_XDECREF(res);
+ Py_DECREF(future);
+ Py_DECREF(exc);
+
+ return NXT_UNIT_ERROR;
+}
+
+
+static PyObject *
+nxt_py_asgi_http_done(PyObject *self, PyObject *future)
+{
+ int rc;
+ PyObject *res;
+ nxt_py_asgi_http_t *http;
+
+ http = (nxt_py_asgi_http_t *) self;
+
+ nxt_unit_req_debug(http->req, "asgi_http_done");
+
+ /*
+ * Get Future.result() and it raises an exception, if coroutine exited
+ * with exception.
+ */
+ res = PyObject_CallMethodObjArgs(future, nxt_py_result_str, NULL);
+ if (nxt_slow_path(res == NULL)) {
+ nxt_unit_req_error(http->req,
+ "Python failed to call 'future.result()'");
+ nxt_python_print_exception();
+
+ rc = NXT_UNIT_ERROR;
+
+ } else {
+ Py_DECREF(res);
+
+ rc = NXT_UNIT_OK;
+ }
+
+ nxt_unit_request_done(http->req, rc);
+
+ Py_RETURN_NONE;
+}
+
+
+#endif /* NXT_HAVE_ASGI */
diff --git a/src/python/nxt_python_asgi_lifespan.c b/src/python/nxt_python_asgi_lifespan.c
new file mode 100644
index 00000000..14d0ee97
--- /dev/null
+++ b/src/python/nxt_python_asgi_lifespan.c
@@ -0,0 +1,505 @@
+
+/*
+ * Copyright (C) NGINX, Inc.
+ */
+
+
+#include <python/nxt_python.h>
+
+#if (NXT_HAVE_ASGI)
+
+#include <nxt_main.h>
+#include <python/nxt_python_asgi.h>
+#include <python/nxt_python_asgi_str.h>
+
+
+typedef struct {
+ PyObject_HEAD
+ int disabled;
+ int startup_received;
+ int startup_sent;
+ int shutdown_received;
+ int shutdown_sent;
+ int shutdown_called;
+ PyObject *startup_future;
+ PyObject *shutdown_future;
+ PyObject *receive_future;
+} nxt_py_asgi_lifespan_t;
+
+
+static PyObject *nxt_py_asgi_lifespan_receive(PyObject *self, PyObject *none);
+static PyObject *nxt_py_asgi_lifespan_send(PyObject *self, PyObject *dict);
+static PyObject *nxt_py_asgi_lifespan_send_startup(
+ nxt_py_asgi_lifespan_t *lifespan, int v, PyObject *dict);
+static PyObject *nxt_py_asgi_lifespan_send_(nxt_py_asgi_lifespan_t *lifespan,
+ int v, int *sent, PyObject **future);
+static PyObject *nxt_py_asgi_lifespan_send_shutdown(
+ nxt_py_asgi_lifespan_t *lifespan, int v, PyObject *dict);
+static PyObject *nxt_py_asgi_lifespan_disable(nxt_py_asgi_lifespan_t *lifespan);
+static PyObject *nxt_py_asgi_lifespan_done(PyObject *self, PyObject *future);
+
+
+static nxt_py_asgi_lifespan_t *nxt_py_lifespan;
+
+static PyMethodDef nxt_py_asgi_lifespan_methods[] = {
+ { "receive", nxt_py_asgi_lifespan_receive, METH_NOARGS, 0 },
+ { "send", nxt_py_asgi_lifespan_send, METH_O, 0 },
+ { "_done", nxt_py_asgi_lifespan_done, METH_O, 0 },
+ { NULL, NULL, 0, 0 }
+};
+
+static PyAsyncMethods nxt_py_asgi_async_methods = {
+ .am_await = nxt_py_asgi_await,
+};
+
+static PyTypeObject nxt_py_asgi_lifespan_type = {
+ PyVarObject_HEAD_INIT(NULL, 0)
+
+ .tp_name = "unit._asgi_lifespan",
+ .tp_basicsize = sizeof(nxt_py_asgi_lifespan_t),
+ .tp_dealloc = nxt_py_asgi_dealloc,
+ .tp_as_async = &nxt_py_asgi_async_methods,
+ .tp_flags = Py_TPFLAGS_DEFAULT,
+ .tp_doc = "unit ASGI Lifespan object",
+ .tp_iter = nxt_py_asgi_iter,
+ .tp_iternext = nxt_py_asgi_next,
+ .tp_methods = nxt_py_asgi_lifespan_methods,
+};
+
+
+nxt_int_t
+nxt_py_asgi_lifespan_startup(nxt_task_t *task)
+{
+ PyObject *scope, *res, *py_task, *receive, *send, *done;
+ nxt_int_t rc;
+ nxt_py_asgi_lifespan_t *lifespan;
+
+ if (nxt_slow_path(PyType_Ready(&nxt_py_asgi_lifespan_type) != 0)) {
+ nxt_alert(task,
+ "Python failed to initialize the 'asgi_lifespan' type object");
+ return NXT_ERROR;
+ }
+
+ lifespan = PyObject_New(nxt_py_asgi_lifespan_t, &nxt_py_asgi_lifespan_type);
+ if (nxt_slow_path(lifespan == NULL)) {
+ nxt_alert(task, "Python failed to create lifespan object");
+ return NXT_ERROR;
+ }
+
+ rc = NXT_ERROR;
+
+ receive = PyObject_GetAttrString((PyObject *) lifespan, "receive");
+ if (nxt_slow_path(receive == NULL)) {
+ nxt_alert(task, "Python failed to get 'receive' method");
+ goto release_lifespan;
+ }
+
+ send = PyObject_GetAttrString((PyObject *) lifespan, "send");
+ if (nxt_slow_path(receive == NULL)) {
+ nxt_alert(task, "Python failed to get 'send' method");
+ goto release_receive;
+ }
+
+ done = PyObject_GetAttrString((PyObject *) lifespan, "_done");
+ if (nxt_slow_path(receive == NULL)) {
+ nxt_alert(task, "Python failed to get '_done' method");
+ goto release_send;
+ }
+
+ lifespan->startup_future = PyObject_CallObject(nxt_py_loop_create_future,
+ NULL);
+ if (nxt_slow_path(lifespan->startup_future == NULL)) {
+ nxt_unit_alert(NULL, "Python failed to create Future object");
+ nxt_python_print_exception();
+
+ goto release_done;
+ }
+
+ lifespan->disabled = 0;
+ lifespan->startup_received = 0;
+ lifespan->startup_sent = 0;
+ lifespan->shutdown_received = 0;
+ lifespan->shutdown_sent = 0;
+ lifespan->shutdown_called = 0;
+ lifespan->shutdown_future = NULL;
+ lifespan->receive_future = NULL;
+
+ scope = nxt_py_asgi_new_scope(NULL, nxt_py_lifespan_str, nxt_py_2_0_str);
+ if (nxt_slow_path(scope == NULL)) {
+ goto release_future;
+ }
+
+ res = PyObject_CallFunctionObjArgs(nxt_py_application,
+ scope, receive, send, NULL);
+ if (nxt_slow_path(res == NULL)) {
+ nxt_log(task, NXT_LOG_ERR, "Python failed to call the application");
+ nxt_python_print_exception();
+ goto release_scope;
+ }
+
+ if (nxt_slow_path(!PyCoro_CheckExact(res))) {
+ nxt_log(task, NXT_LOG_ERR,
+ "Application result type is not a coroutine");
+ Py_DECREF(res);
+ goto release_scope;
+ }
+
+ py_task = PyObject_CallFunctionObjArgs(nxt_py_loop_create_task, res, NULL);
+ if (nxt_slow_path(py_task == NULL)) {
+ nxt_log(task, NXT_LOG_ERR, "Python failed to call the create_task");
+ nxt_python_print_exception();
+ Py_DECREF(res);
+ goto release_scope;
+ }
+
+ Py_DECREF(res);
+
+ res = PyObject_CallMethodObjArgs(py_task, nxt_py_add_done_callback_str,
+ done, NULL);
+ if (nxt_slow_path(res == NULL)) {
+ nxt_log(task, NXT_LOG_ERR,
+ "Python failed to call 'task.add_done_callback'");
+ nxt_python_print_exception();
+ goto release_task;
+ }
+
+ Py_DECREF(res);
+
+ res = PyObject_CallFunctionObjArgs(nxt_py_loop_run_until_complete,
+ lifespan->startup_future, NULL);
+ if (nxt_slow_path(res == NULL)) {
+ nxt_alert(task, "Python failed to call loop.run_until_complete");
+ nxt_python_print_exception();
+ goto release_task;
+ }
+
+ Py_DECREF(res);
+
+ if (lifespan->startup_sent == 1 || lifespan->disabled) {
+ nxt_py_lifespan = lifespan;
+ Py_INCREF(nxt_py_lifespan);
+
+ rc = NXT_OK;
+ }
+
+release_task:
+ Py_DECREF(py_task);
+release_scope:
+ Py_DECREF(scope);
+release_future:
+ Py_CLEAR(lifespan->startup_future);
+release_done:
+ Py_DECREF(done);
+release_send:
+ Py_DECREF(send);
+release_receive:
+ Py_DECREF(receive);
+release_lifespan:
+ Py_DECREF(lifespan);
+
+ return rc;
+}
+
+
+nxt_int_t
+nxt_py_asgi_lifespan_shutdown(void)
+{
+ PyObject *msg, *future, *res;
+ nxt_py_asgi_lifespan_t *lifespan;
+
+ if (nxt_slow_path(nxt_py_lifespan == NULL || nxt_py_lifespan->disabled)) {
+ return NXT_OK;
+ }
+
+ lifespan = nxt_py_lifespan;
+ lifespan->shutdown_called = 1;
+
+ if (lifespan->receive_future != NULL) {
+ future = lifespan->receive_future;
+ lifespan->receive_future = NULL;
+
+ msg = nxt_py_asgi_new_msg(NULL, nxt_py_lifespan_shutdown_str);
+
+ if (nxt_fast_path(msg != NULL)) {
+ res = PyObject_CallMethodObjArgs(future, nxt_py_set_result_str,
+ msg, NULL);
+ Py_XDECREF(res);
+ Py_DECREF(msg);
+ }
+
+ Py_DECREF(future);
+ }
+
+ if (lifespan->shutdown_sent) {
+ return NXT_OK;
+ }
+
+ lifespan->shutdown_future = PyObject_CallObject(nxt_py_loop_create_future,
+ NULL);
+ if (nxt_slow_path(lifespan->shutdown_future == NULL)) {
+ nxt_unit_alert(NULL, "Python failed to create Future object");
+ nxt_python_print_exception();
+ return NXT_ERROR;
+ }
+
+ res = PyObject_CallFunctionObjArgs(nxt_py_loop_run_until_complete,
+ lifespan->shutdown_future, NULL);
+ if (nxt_slow_path(res == NULL)) {
+ nxt_unit_alert(NULL, "Python failed to call loop.run_until_complete");
+ nxt_python_print_exception();
+ return NXT_ERROR;
+ }
+
+ Py_DECREF(res);
+ Py_CLEAR(lifespan->shutdown_future);
+
+ return NXT_OK;
+}
+
+
+static PyObject *
+nxt_py_asgi_lifespan_receive(PyObject *self, PyObject *none)
+{
+ PyObject *msg, *future;
+ nxt_py_asgi_lifespan_t *lifespan;
+
+ lifespan = (nxt_py_asgi_lifespan_t *) self;
+
+ nxt_unit_debug(NULL, "asgi_lifespan_receive");
+
+ future = PyObject_CallObject(nxt_py_loop_create_future, NULL);
+ if (nxt_slow_path(future == NULL)) {
+ nxt_unit_alert(NULL, "Python failed to create Future object");
+ nxt_python_print_exception();
+
+ return PyErr_Format(PyExc_RuntimeError,
+ "failed to create Future object");
+ }
+
+ if (!lifespan->startup_received) {
+ lifespan->startup_received = 1;
+
+ msg = nxt_py_asgi_new_msg(NULL, nxt_py_lifespan_startup_str);
+
+ return nxt_py_asgi_set_result_soon(NULL, future, msg);
+ }
+
+ if (lifespan->shutdown_called && !lifespan->shutdown_received) {
+ lifespan->shutdown_received = 1;
+
+ msg = nxt_py_asgi_new_msg(NULL, nxt_py_lifespan_shutdown_str);
+
+ return nxt_py_asgi_set_result_soon(NULL, future, msg);
+ }
+
+ Py_INCREF(future);
+ lifespan->receive_future = future;
+
+ return future;
+}
+
+
+static PyObject *
+nxt_py_asgi_lifespan_send(PyObject *self, PyObject *dict)
+{
+ PyObject *type, *msg;
+ const char *type_str;
+ Py_ssize_t type_len;
+ nxt_py_asgi_lifespan_t *lifespan;
+
+ static const nxt_str_t startup_complete
+ = nxt_string("lifespan.startup.complete");
+ static const nxt_str_t startup_failed
+ = nxt_string("lifespan.startup.failed");
+ static const nxt_str_t shutdown_complete
+ = nxt_string("lifespan.shutdown.complete");
+ static const nxt_str_t shutdown_failed
+ = nxt_string("lifespan.shutdown.failed");
+
+ lifespan = (nxt_py_asgi_lifespan_t *) self;
+
+ type = PyDict_GetItem(dict, nxt_py_type_str);
+ if (nxt_slow_path(type == NULL || !PyUnicode_Check(type))) {
+ nxt_unit_error(NULL,
+ "asgi_lifespan_send: 'type' is not a unicode string");
+ return PyErr_Format(PyExc_TypeError,
+ "'type' is not a unicode string");
+ }
+
+ type_str = PyUnicode_AsUTF8AndSize(type, &type_len);
+
+ nxt_unit_debug(NULL, "asgi_lifespan_send type is '%.*s'",
+ (int) type_len, type_str);
+
+ if (type_len == (Py_ssize_t) startup_complete.length
+ && memcmp(type_str, startup_complete.start, type_len) == 0)
+ {
+ return nxt_py_asgi_lifespan_send_startup(lifespan, 0, NULL);
+ }
+
+ if (type_len == (Py_ssize_t) startup_failed.length
+ && memcmp(type_str, startup_failed.start, type_len) == 0)
+ {
+ msg = PyDict_GetItem(dict, nxt_py_message_str);
+ return nxt_py_asgi_lifespan_send_startup(lifespan, 1, msg);
+ }
+
+ if (type_len == (Py_ssize_t) shutdown_complete.length
+ && memcmp(type_str, shutdown_complete.start, type_len) == 0)
+ {
+ return nxt_py_asgi_lifespan_send_shutdown(lifespan, 0, NULL);
+ }
+
+ if (type_len == (Py_ssize_t) shutdown_failed.length
+ && memcmp(type_str, shutdown_failed.start, type_len) == 0)
+ {
+ msg = PyDict_GetItem(dict, nxt_py_message_str);
+ return nxt_py_asgi_lifespan_send_shutdown(lifespan, 1, msg);
+ }
+
+ return nxt_py_asgi_lifespan_disable(lifespan);
+}
+
+
+static PyObject *
+nxt_py_asgi_lifespan_send_startup(nxt_py_asgi_lifespan_t *lifespan, int v,
+ PyObject *message)
+{
+ const char *message_str;
+ Py_ssize_t message_len;
+
+ if (nxt_slow_path(v != 0)) {
+ nxt_unit_error(NULL, "Application startup failed");
+
+ if (nxt_fast_path(message != NULL && PyUnicode_Check(message))) {
+ message_str = PyUnicode_AsUTF8AndSize(message, &message_len);
+
+ nxt_unit_error(NULL, "%.*s", (int) message_len, message_str);
+ }
+ }
+
+ return nxt_py_asgi_lifespan_send_(lifespan, v,
+ &lifespan->startup_sent,
+ &lifespan->startup_future);
+}
+
+
+static PyObject *
+nxt_py_asgi_lifespan_send_(nxt_py_asgi_lifespan_t *lifespan, int v, int *sent,
+ PyObject **pfuture)
+{
+ PyObject *future, *res;
+
+ if (*sent) {
+ return nxt_py_asgi_lifespan_disable(lifespan);
+ }
+
+ *sent = 1 + v;
+
+ if (*pfuture != NULL) {
+ future = *pfuture;
+ *pfuture = NULL;
+
+ res = PyObject_CallMethodObjArgs(future, nxt_py_set_result_str,
+ Py_None, NULL);
+ if (nxt_slow_path(res == NULL)) {
+ nxt_unit_alert(NULL, "Failed to call 'future.set_result'");
+ nxt_python_print_exception();
+
+ return nxt_py_asgi_lifespan_disable(lifespan);
+ }
+
+ Py_DECREF(res);
+ Py_DECREF(future);
+ }
+
+ Py_INCREF(lifespan);
+
+ return (PyObject *) lifespan;
+}
+
+
+static PyObject *
+nxt_py_asgi_lifespan_disable(nxt_py_asgi_lifespan_t *lifespan)
+{
+ nxt_unit_warn(NULL, "Got invalid state transition on lifespan protocol");
+
+ lifespan->disabled = 1;
+
+ return PyErr_Format(PyExc_AssertionError,
+ "Got invalid state transition on lifespan protocol");
+}
+
+
+static PyObject *
+nxt_py_asgi_lifespan_send_shutdown(nxt_py_asgi_lifespan_t *lifespan, int v,
+ PyObject *message)
+{
+ return nxt_py_asgi_lifespan_send_(lifespan, v,
+ &lifespan->shutdown_sent,
+ &lifespan->shutdown_future);
+}
+
+
+static PyObject *
+nxt_py_asgi_lifespan_done(PyObject *self, PyObject *future)
+{
+ PyObject *res;
+ nxt_py_asgi_lifespan_t *lifespan;
+
+ nxt_unit_debug(NULL, "asgi_lifespan_done");
+
+ lifespan = (nxt_py_asgi_lifespan_t *) self;
+
+ if (lifespan->startup_sent == 0) {
+ lifespan->disabled = 1;
+ }
+
+ /*
+ * Get Future.result() and it raises an exception, if coroutine exited
+ * with exception.
+ */
+ res = PyObject_CallMethodObjArgs(future, nxt_py_result_str, NULL);
+ if (nxt_slow_path(res == NULL)) {
+ nxt_unit_log(NULL, NXT_UNIT_LOG_INFO,
+ "ASGI Lifespan processing exception");
+ nxt_python_print_exception();
+ }
+
+ Py_XDECREF(res);
+
+ if (lifespan->startup_future != NULL) {
+ future = lifespan->startup_future;
+ lifespan->startup_future = NULL;
+
+ res = PyObject_CallMethodObjArgs(future, nxt_py_set_result_str,
+ Py_None, NULL);
+ if (nxt_slow_path(res == NULL)) {
+ nxt_unit_alert(NULL, "Failed to call 'future.set_result'");
+ nxt_python_print_exception();
+ }
+
+ Py_XDECREF(res);
+ Py_DECREF(future);
+ }
+
+ if (lifespan->shutdown_future != NULL) {
+ future = lifespan->shutdown_future;
+ lifespan->shutdown_future = NULL;
+
+ res = PyObject_CallMethodObjArgs(future, nxt_py_set_result_str,
+ Py_None, NULL);
+ if (nxt_slow_path(res == NULL)) {
+ nxt_unit_alert(NULL, "Failed to call 'future.set_result'");
+ nxt_python_print_exception();
+ }
+
+ Py_XDECREF(res);
+ Py_DECREF(future);
+ }
+
+ Py_RETURN_NONE;
+}
+
+
+#endif /* NXT_HAVE_ASGI */
diff --git a/src/python/nxt_python_asgi_str.c b/src/python/nxt_python_asgi_str.c
new file mode 100644
index 00000000..37fa7f04
--- /dev/null
+++ b/src/python/nxt_python_asgi_str.c
@@ -0,0 +1,141 @@
+
+/*
+ * Copyright (C) NGINX, Inc.
+ */
+
+
+#include <python/nxt_python.h>
+
+#if (NXT_HAVE_ASGI)
+
+#include <nxt_main.h>
+#include <python/nxt_python_asgi_str.h>
+
+
+PyObject *nxt_py_1_0_str;
+PyObject *nxt_py_1_1_str;
+PyObject *nxt_py_2_0_str;
+PyObject *nxt_py_2_1_str;
+PyObject *nxt_py_3_0_str;
+PyObject *nxt_py_add_done_callback_str;
+PyObject *nxt_py_asgi_str;
+PyObject *nxt_py_bad_state_str;
+PyObject *nxt_py_body_str;
+PyObject *nxt_py_bytes_str;
+PyObject *nxt_py_client_str;
+PyObject *nxt_py_code_str;
+PyObject *nxt_py_done_str;
+PyObject *nxt_py_exception_str;
+PyObject *nxt_py_failed_to_send_body_str;
+PyObject *nxt_py_headers_str;
+PyObject *nxt_py_http_str;
+PyObject *nxt_py_http_disconnect_str;
+PyObject *nxt_py_http_request_str;
+PyObject *nxt_py_http_version_str;
+PyObject *nxt_py_https_str;
+PyObject *nxt_py_lifespan_str;
+PyObject *nxt_py_lifespan_shutdown_str;
+PyObject *nxt_py_lifespan_startup_str;
+PyObject *nxt_py_method_str;
+PyObject *nxt_py_message_str;
+PyObject *nxt_py_message_too_big_str;
+PyObject *nxt_py_more_body_str;
+PyObject *nxt_py_path_str;
+PyObject *nxt_py_query_string_str;
+PyObject *nxt_py_raw_path_str;
+PyObject *nxt_py_result_str;
+PyObject *nxt_py_root_path_str;
+PyObject *nxt_py_scheme_str;
+PyObject *nxt_py_server_str;
+PyObject *nxt_py_set_exception_str;
+PyObject *nxt_py_set_result_str;
+PyObject *nxt_py_spec_version_str;
+PyObject *nxt_py_status_str;
+PyObject *nxt_py_subprotocol_str;
+PyObject *nxt_py_subprotocols_str;
+PyObject *nxt_py_text_str;
+PyObject *nxt_py_type_str;
+PyObject *nxt_py_version_str;
+PyObject *nxt_py_websocket_str;
+PyObject *nxt_py_websocket_accept_str;
+PyObject *nxt_py_websocket_close_str;
+PyObject *nxt_py_websocket_connect_str;
+PyObject *nxt_py_websocket_disconnect_str;
+PyObject *nxt_py_websocket_receive_str;
+PyObject *nxt_py_websocket_send_str;
+PyObject *nxt_py_ws_str;
+PyObject *nxt_py_wss_str;
+
+static nxt_python_string_t nxt_py_asgi_strings[] = {
+ { nxt_string("1.0"), &nxt_py_1_0_str },
+ { nxt_string("1.1"), &nxt_py_1_1_str },
+ { nxt_string("2.0"), &nxt_py_2_0_str },
+ { nxt_string("2.1"), &nxt_py_2_1_str },
+ { nxt_string("3.0"), &nxt_py_3_0_str },
+ { nxt_string("add_done_callback"), &nxt_py_add_done_callback_str },
+ { nxt_string("asgi"), &nxt_py_asgi_str },
+ { nxt_string("bad state"), &nxt_py_bad_state_str },
+ { nxt_string("body"), &nxt_py_body_str },
+ { nxt_string("bytes"), &nxt_py_bytes_str },
+ { nxt_string("client"), &nxt_py_client_str },
+ { nxt_string("code"), &nxt_py_code_str },
+ { nxt_string("done"), &nxt_py_done_str },
+ { nxt_string("exception"), &nxt_py_exception_str },
+ { nxt_string("failed to send body"), &nxt_py_failed_to_send_body_str },
+ { nxt_string("headers"), &nxt_py_headers_str },
+ { nxt_string("http"), &nxt_py_http_str },
+ { nxt_string("http.disconnect"), &nxt_py_http_disconnect_str },
+ { nxt_string("http.request"), &nxt_py_http_request_str },
+ { nxt_string("http_version"), &nxt_py_http_version_str },
+ { nxt_string("https"), &nxt_py_https_str },
+ { nxt_string("lifespan"), &nxt_py_lifespan_str },
+ { nxt_string("lifespan.shutdown"), &nxt_py_lifespan_shutdown_str },
+ { nxt_string("lifespan.startup"), &nxt_py_lifespan_startup_str },
+ { nxt_string("message"), &nxt_py_message_str },
+ { nxt_string("message too big"), &nxt_py_message_too_big_str },
+ { nxt_string("method"), &nxt_py_method_str },
+ { nxt_string("more_body"), &nxt_py_more_body_str },
+ { nxt_string("path"), &nxt_py_path_str },
+ { nxt_string("query_string"), &nxt_py_query_string_str },
+ { nxt_string("raw_path"), &nxt_py_raw_path_str },
+ { nxt_string("result"), &nxt_py_result_str },
+ { nxt_string("root_path"), &nxt_py_root_path_str }, // not used
+ { nxt_string("scheme"), &nxt_py_scheme_str },
+ { nxt_string("server"), &nxt_py_server_str },
+ { nxt_string("set_exception"), &nxt_py_set_exception_str },
+ { nxt_string("set_result"), &nxt_py_set_result_str },
+ { nxt_string("spec_version"), &nxt_py_spec_version_str },
+ { nxt_string("status"), &nxt_py_status_str },
+ { nxt_string("subprotocol"), &nxt_py_subprotocol_str },
+ { nxt_string("subprotocols"), &nxt_py_subprotocols_str },
+ { nxt_string("text"), &nxt_py_text_str },
+ { nxt_string("type"), &nxt_py_type_str },
+ { nxt_string("version"), &nxt_py_version_str },
+ { nxt_string("websocket"), &nxt_py_websocket_str },
+ { nxt_string("websocket.accept"), &nxt_py_websocket_accept_str },
+ { nxt_string("websocket.close"), &nxt_py_websocket_close_str },
+ { nxt_string("websocket.connect"), &nxt_py_websocket_connect_str },
+ { nxt_string("websocket.disconnect"), &nxt_py_websocket_disconnect_str },
+ { nxt_string("websocket.receive"), &nxt_py_websocket_receive_str },
+ { nxt_string("websocket.send"), &nxt_py_websocket_send_str },
+ { nxt_string("ws"), &nxt_py_ws_str },
+ { nxt_string("wss"), &nxt_py_wss_str },
+ { nxt_null_string, NULL },
+};
+
+
+nxt_int_t
+nxt_py_asgi_str_init(void)
+{
+ return nxt_python_init_strings(nxt_py_asgi_strings);
+}
+
+
+void
+nxt_py_asgi_str_done(void)
+{
+ nxt_python_done_strings(nxt_py_asgi_strings);
+}
+
+
+#endif /* NXT_HAVE_ASGI */
diff --git a/src/python/nxt_python_asgi_str.h b/src/python/nxt_python_asgi_str.h
new file mode 100644
index 00000000..3f389c62
--- /dev/null
+++ b/src/python/nxt_python_asgi_str.h
@@ -0,0 +1,69 @@
+
+/*
+ * Copyright (C) NGINX, Inc.
+ */
+
+#ifndef _NXT_PYTHON_ASGI_STR_H_INCLUDED_
+#define _NXT_PYTHON_ASGI_STR_H_INCLUDED_
+
+
+extern PyObject *nxt_py_1_0_str;
+extern PyObject *nxt_py_1_1_str;
+extern PyObject *nxt_py_2_0_str;
+extern PyObject *nxt_py_2_1_str;
+extern PyObject *nxt_py_3_0_str;
+extern PyObject *nxt_py_add_done_callback_str;
+extern PyObject *nxt_py_asgi_str;
+extern PyObject *nxt_py_bad_state_str;
+extern PyObject *nxt_py_body_str;
+extern PyObject *nxt_py_bytes_str;
+extern PyObject *nxt_py_client_str;
+extern PyObject *nxt_py_code_str;
+extern PyObject *nxt_py_done_str;
+extern PyObject *nxt_py_exception_str;
+extern PyObject *nxt_py_failed_to_send_body_str;
+extern PyObject *nxt_py_headers_str;
+extern PyObject *nxt_py_http_str;
+extern PyObject *nxt_py_http_disconnect_str;
+extern PyObject *nxt_py_http_request_str;
+extern PyObject *nxt_py_http_version_str;
+extern PyObject *nxt_py_https_str;
+extern PyObject *nxt_py_lifespan_str;
+extern PyObject *nxt_py_lifespan_shutdown_str;
+extern PyObject *nxt_py_lifespan_startup_str;
+extern PyObject *nxt_py_method_str;
+extern PyObject *nxt_py_message_str;
+extern PyObject *nxt_py_message_too_big_str;
+extern PyObject *nxt_py_more_body_str;
+extern PyObject *nxt_py_path_str;
+extern PyObject *nxt_py_query_string_str;
+extern PyObject *nxt_py_result_str;
+extern PyObject *nxt_py_raw_path_str;
+extern PyObject *nxt_py_root_path_str;
+extern PyObject *nxt_py_scheme_str;
+extern PyObject *nxt_py_server_str;
+extern PyObject *nxt_py_set_exception_str;
+extern PyObject *nxt_py_set_result_str;
+extern PyObject *nxt_py_spec_version_str;
+extern PyObject *nxt_py_status_str;
+extern PyObject *nxt_py_subprotocol_str;
+extern PyObject *nxt_py_subprotocols_str;
+extern PyObject *nxt_py_text_str;
+extern PyObject *nxt_py_type_str;
+extern PyObject *nxt_py_version_str;
+extern PyObject *nxt_py_websocket_str;
+extern PyObject *nxt_py_websocket_accept_str;
+extern PyObject *nxt_py_websocket_close_str;
+extern PyObject *nxt_py_websocket_connect_str;
+extern PyObject *nxt_py_websocket_disconnect_str;
+extern PyObject *nxt_py_websocket_receive_str;
+extern PyObject *nxt_py_websocket_send_str;
+extern PyObject *nxt_py_ws_str;
+extern PyObject *nxt_py_wss_str;
+
+
+nxt_int_t nxt_py_asgi_str_init(void);
+void nxt_py_asgi_str_done(void);
+
+
+#endif /* _NXT_PYTHON_ASGI_STR_H_INCLUDED_ */
diff --git a/src/python/nxt_python_asgi_websocket.c b/src/python/nxt_python_asgi_websocket.c
new file mode 100644
index 00000000..5a27b588
--- /dev/null
+++ b/src/python/nxt_python_asgi_websocket.c
@@ -0,0 +1,1084 @@
+
+/*
+ * Copyright (C) NGINX, Inc.
+ */
+
+
+#include <python/nxt_python.h>
+
+#if (NXT_HAVE_ASGI)
+
+#include <nxt_main.h>
+#include <nxt_unit.h>
+#include <nxt_unit_request.h>
+#include <nxt_unit_websocket.h>
+#include <nxt_websocket_header.h>
+#include <python/nxt_python_asgi.h>
+#include <python/nxt_python_asgi_str.h>
+
+
+enum {
+ NXT_WS_INIT,
+ NXT_WS_CONNECT,
+ NXT_WS_ACCEPTED,
+ NXT_WS_DISCONNECTED,
+ NXT_WS_CLOSED,
+};
+
+
+typedef struct {
+ nxt_queue_link_t link;
+ nxt_unit_websocket_frame_t *frame;
+} nxt_py_asgi_penging_frame_t;
+
+
+typedef struct {
+ PyObject_HEAD
+ nxt_unit_request_info_t *req;
+ PyObject *receive_future;
+ PyObject *receive_exc_str;
+ int state;
+ nxt_queue_t pending_frames;
+ uint64_t pending_payload_len;
+ uint64_t pending_frame_len;
+ int pending_fins;
+} nxt_py_asgi_websocket_t;
+
+
+static PyObject *nxt_py_asgi_websocket_receive(PyObject *self, PyObject *none);
+static PyObject *nxt_py_asgi_websocket_send(PyObject *self, PyObject *dict);
+static PyObject *nxt_py_asgi_websocket_accept(nxt_py_asgi_websocket_t *ws,
+ PyObject *dict);
+static PyObject *nxt_py_asgi_websocket_close(nxt_py_asgi_websocket_t *ws,
+ PyObject *dict);
+static PyObject *nxt_py_asgi_websocket_send_frame(nxt_py_asgi_websocket_t *ws,
+ PyObject *dict);
+static void nxt_py_asgi_websocket_receive_done(nxt_py_asgi_websocket_t *ws,
+ PyObject *msg);
+static void nxt_py_asgi_websocket_receive_fail(nxt_py_asgi_websocket_t *ws,
+ PyObject *exc);
+static void nxt_py_asgi_websocket_suspend_frame(nxt_unit_websocket_frame_t *f);
+static PyObject *nxt_py_asgi_websocket_pop_msg(nxt_py_asgi_websocket_t *ws,
+ nxt_unit_websocket_frame_t *frame);
+static uint64_t nxt_py_asgi_websocket_pending_len(
+ nxt_py_asgi_websocket_t *ws);
+static nxt_unit_websocket_frame_t *nxt_py_asgi_websocket_pop_frame(
+ nxt_py_asgi_websocket_t *ws);
+static PyObject *nxt_py_asgi_websocket_disconnect_msg(
+ nxt_py_asgi_websocket_t *ws);
+static PyObject *nxt_py_asgi_websocket_done(PyObject *self, PyObject *future);
+
+
+static PyMethodDef nxt_py_asgi_websocket_methods[] = {
+ { "receive", nxt_py_asgi_websocket_receive, METH_NOARGS, 0 },
+ { "send", nxt_py_asgi_websocket_send, METH_O, 0 },
+ { "_done", nxt_py_asgi_websocket_done, METH_O, 0 },
+ { NULL, NULL, 0, 0 }
+};
+
+static PyAsyncMethods nxt_py_asgi_async_methods = {
+ .am_await = nxt_py_asgi_await,
+};
+
+static PyTypeObject nxt_py_asgi_websocket_type = {
+ PyVarObject_HEAD_INIT(NULL, 0)
+
+ .tp_name = "unit._asgi_websocket",
+ .tp_basicsize = sizeof(nxt_py_asgi_websocket_t),
+ .tp_dealloc = nxt_py_asgi_dealloc,
+ .tp_as_async = &nxt_py_asgi_async_methods,
+ .tp_flags = Py_TPFLAGS_DEFAULT,
+ .tp_doc = "unit ASGI WebSocket connection object",
+ .tp_iter = nxt_py_asgi_iter,
+ .tp_iternext = nxt_py_asgi_next,
+ .tp_methods = nxt_py_asgi_websocket_methods,
+};
+
+static uint64_t nxt_py_asgi_ws_max_frame_size = 1024 * 1024;
+static uint64_t nxt_py_asgi_ws_max_buffer_size = 10 * 1024 * 1024;
+
+
+nxt_int_t
+nxt_py_asgi_websocket_init(nxt_task_t *task)
+{
+ if (nxt_slow_path(PyType_Ready(&nxt_py_asgi_websocket_type) != 0)) {
+ nxt_alert(task,
+ "Python failed to initialize the \"asgi_websocket\" type object");
+ return NXT_ERROR;
+ }
+
+ return NXT_OK;
+}
+
+
+PyObject *
+nxt_py_asgi_websocket_create(nxt_unit_request_info_t *req)
+{
+ nxt_py_asgi_websocket_t *ws;
+
+ ws = PyObject_New(nxt_py_asgi_websocket_t, &nxt_py_asgi_websocket_type);
+
+ if (nxt_fast_path(ws != NULL)) {
+ ws->req = req;
+ ws->receive_future = NULL;
+ ws->receive_exc_str = NULL;
+ ws->state = NXT_WS_INIT;
+ nxt_queue_init(&ws->pending_frames);
+ ws->pending_payload_len = 0;
+ ws->pending_frame_len = 0;
+ ws->pending_fins = 0;
+ }
+
+ return (PyObject *) ws;
+}
+
+
+static PyObject *
+nxt_py_asgi_websocket_receive(PyObject *self, PyObject *none)
+{
+ PyObject *future, *msg;
+ nxt_py_asgi_websocket_t *ws;
+
+ ws = (nxt_py_asgi_websocket_t *) self;
+
+ nxt_unit_req_debug(ws->req, "asgi_websocket_receive");
+
+ /* If exception happened out of receive() call, raise it now. */
+ if (nxt_slow_path(ws->receive_exc_str != NULL)) {
+ PyErr_SetObject(PyExc_RuntimeError, ws->receive_exc_str);
+
+ ws->receive_exc_str = NULL;
+
+ return NULL;
+ }
+
+ if (nxt_slow_path(ws->state == NXT_WS_CLOSED)) {
+ nxt_unit_req_error(ws->req,
+ "receive() called for closed WebSocket");
+
+ return PyErr_Format(PyExc_RuntimeError,
+ "WebSocket already closed");
+ }
+
+ future = PyObject_CallObject(nxt_py_loop_create_future, NULL);
+ if (nxt_slow_path(future == NULL)) {
+ nxt_unit_req_alert(ws->req, "Python failed to create Future object");
+ nxt_python_print_exception();
+
+ return PyErr_Format(PyExc_RuntimeError,
+ "failed to create Future object");
+ }
+
+ if (nxt_slow_path(ws->state == NXT_WS_INIT)) {
+ ws->state = NXT_WS_CONNECT;
+
+ msg = nxt_py_asgi_new_msg(ws->req, nxt_py_websocket_connect_str);
+
+ return nxt_py_asgi_set_result_soon(ws->req, future, msg);
+ }
+
+ if (ws->pending_fins > 0) {
+ msg = nxt_py_asgi_websocket_pop_msg(ws, NULL);
+
+ return nxt_py_asgi_set_result_soon(ws->req, future, msg);
+ }
+
+ if (nxt_slow_path(ws->state == NXT_WS_DISCONNECTED)) {
+ msg = nxt_py_asgi_websocket_disconnect_msg(ws);
+
+ return nxt_py_asgi_set_result_soon(ws->req, future, msg);
+ }
+
+ ws->receive_future = future;
+ Py_INCREF(ws->receive_future);
+
+ return future;
+}
+
+
+static PyObject *
+nxt_py_asgi_websocket_send(PyObject *self, PyObject *dict)
+{
+ PyObject *type;
+ const char *type_str;
+ Py_ssize_t type_len;
+ nxt_py_asgi_websocket_t *ws;
+
+ static const nxt_str_t websocket_accept = nxt_string("websocket.accept");
+ static const nxt_str_t websocket_close = nxt_string("websocket.close");
+ static const nxt_str_t websocket_send = nxt_string("websocket.send");
+
+ ws = (nxt_py_asgi_websocket_t *) self;
+
+ type = PyDict_GetItem(dict, nxt_py_type_str);
+ if (nxt_slow_path(type == NULL || !PyUnicode_Check(type))) {
+ nxt_unit_req_error(ws->req, "asgi_websocket_send: "
+ "'type' is not a unicode string");
+ return PyErr_Format(PyExc_TypeError,
+ "'type' is not a unicode string");
+ }
+
+ type_str = PyUnicode_AsUTF8AndSize(type, &type_len);
+
+ nxt_unit_req_debug(ws->req, "asgi_websocket_send type is '%.*s'",
+ (int) type_len, type_str);
+
+ if (type_len == (Py_ssize_t) websocket_accept.length
+ && memcmp(type_str, websocket_accept.start, type_len) == 0)
+ {
+ return nxt_py_asgi_websocket_accept(ws, dict);
+ }
+
+ if (type_len == (Py_ssize_t) websocket_close.length
+ && memcmp(type_str, websocket_close.start, type_len) == 0)
+ {
+ return nxt_py_asgi_websocket_close(ws, dict);
+ }
+
+ if (type_len == (Py_ssize_t) websocket_send.length
+ && memcmp(type_str, websocket_send.start, type_len) == 0)
+ {
+ return nxt_py_asgi_websocket_send_frame(ws, dict);
+ }
+
+ nxt_unit_req_error(ws->req, "asgi_websocket_send: "
+ "unexpected 'type': '%.*s'", (int) type_len, type_str);
+ return PyErr_Format(PyExc_AssertionError, "unexpected 'type': '%U'", type);
+}
+
+
+static PyObject *
+nxt_py_asgi_websocket_accept(nxt_py_asgi_websocket_t *ws, PyObject *dict)
+{
+ int rc;
+ char *subprotocol_str;
+ PyObject *res, *headers, *subprotocol;
+ Py_ssize_t subprotocol_len;
+ nxt_py_asgi_calc_size_ctx_t calc_size_ctx;
+ nxt_py_asgi_add_field_ctx_t add_field_ctx;
+
+ static const nxt_str_t ws_protocol = nxt_string("sec-websocket-protocol");
+
+ switch(ws->state) {
+ case NXT_WS_INIT:
+ return PyErr_Format(PyExc_RuntimeError,
+ "WebSocket connect not received");
+ case NXT_WS_CONNECT:
+ break;
+
+ case NXT_WS_ACCEPTED:
+ return PyErr_Format(PyExc_RuntimeError, "WebSocket already accepted");
+
+ case NXT_WS_DISCONNECTED:
+ return PyErr_Format(PyExc_RuntimeError, "WebSocket disconnected");
+
+ case NXT_WS_CLOSED:
+ return PyErr_Format(PyExc_RuntimeError, "WebSocket already closed");
+ }
+
+ if (nxt_slow_path(nxt_unit_response_is_websocket(ws->req))) {
+ return PyErr_Format(PyExc_RuntimeError, "WebSocket already accepted");
+ }
+
+ if (nxt_slow_path(nxt_unit_response_is_sent(ws->req))) {
+ return PyErr_Format(PyExc_RuntimeError, "response already sent");
+ }
+
+ calc_size_ctx.fields_size = 0;
+ calc_size_ctx.fields_count = 0;
+
+ headers = PyDict_GetItem(dict, nxt_py_headers_str);
+ if (headers != NULL) {
+ res = nxt_py_asgi_enum_headers(headers, nxt_py_asgi_calc_size,
+ &calc_size_ctx);
+ if (nxt_slow_path(res == NULL)) {
+ return NULL;
+ }
+ }
+
+ subprotocol = PyDict_GetItem(dict, nxt_py_subprotocol_str);
+ if (subprotocol != NULL && PyUnicode_Check(subprotocol)) {
+ subprotocol_str = PyUnicode_DATA(subprotocol);
+ subprotocol_len = PyUnicode_GET_LENGTH(subprotocol);
+
+ calc_size_ctx.fields_size += ws_protocol.length + subprotocol_len;
+ calc_size_ctx.fields_count++;
+
+ } else {
+ subprotocol_str = NULL;
+ subprotocol_len = 0;
+ }
+
+ rc = nxt_unit_response_init(ws->req, 101,
+ calc_size_ctx.fields_count,
+ calc_size_ctx.fields_size);
+ if (nxt_slow_path(rc != NXT_UNIT_OK)) {
+ return PyErr_Format(PyExc_RuntimeError,
+ "failed to allocate response object");
+ }
+
+ add_field_ctx.req = ws->req;
+ add_field_ctx.content_length = -1;
+
+ if (headers != NULL) {
+ res = nxt_py_asgi_enum_headers(headers, nxt_py_asgi_add_field,
+ &add_field_ctx);
+ if (nxt_slow_path(res == NULL)) {
+ return NULL;
+ }
+ }
+
+ if (subprotocol_len > 0) {
+ rc = nxt_unit_response_add_field(ws->req,
+ (const char *) ws_protocol.start,
+ ws_protocol.length,
+ subprotocol_str, subprotocol_len);
+ if (nxt_slow_path(rc != NXT_UNIT_OK)) {
+ return PyErr_Format(PyExc_RuntimeError,
+ "failed to add header");
+ }
+ }
+
+ rc = nxt_unit_response_send(ws->req);
+ if (nxt_slow_path(rc != NXT_UNIT_OK)) {
+ return PyErr_Format(PyExc_RuntimeError, "failed to send response");
+ }
+
+ ws->state = NXT_WS_ACCEPTED;
+
+ Py_INCREF(ws);
+
+ return (PyObject *) ws;
+}
+
+
+static PyObject *
+nxt_py_asgi_websocket_close(nxt_py_asgi_websocket_t *ws, PyObject *dict)
+{
+ int rc;
+ uint16_t status_code;
+ PyObject *code;
+
+ if (nxt_slow_path(ws->state == NXT_WS_INIT)) {
+ return PyErr_Format(PyExc_RuntimeError,
+ "WebSocket connect not received");
+ }
+
+ if (nxt_slow_path(ws->state == NXT_WS_DISCONNECTED)) {
+ return PyErr_Format(PyExc_RuntimeError, "WebSocket disconnected");
+ }
+
+ if (nxt_slow_path(ws->state == NXT_WS_CLOSED)) {
+ return PyErr_Format(PyExc_RuntimeError, "WebSocket already closed");
+ }
+
+ if (nxt_unit_response_is_websocket(ws->req)) {
+ code = PyDict_GetItem(dict, nxt_py_code_str);
+ if (nxt_slow_path(code != NULL && !PyLong_Check(code))) {
+ return PyErr_Format(PyExc_TypeError, "'code' is not integer");
+ }
+
+ status_code = (code != NULL) ? htons(PyLong_AsLong(code))
+ : htons(NXT_WEBSOCKET_CR_NORMAL);
+
+ rc = nxt_unit_websocket_send(ws->req, NXT_WEBSOCKET_OP_CLOSE,
+ 1, &status_code, 2);
+ if (nxt_slow_path(rc != NXT_UNIT_OK)) {
+ return PyErr_Format(PyExc_RuntimeError,
+ "failed to send close frame");
+ }
+
+ } else {
+ rc = nxt_unit_response_init(ws->req, 403, 0, 0);
+ if (nxt_slow_path(rc != NXT_UNIT_OK)) {
+ return PyErr_Format(PyExc_RuntimeError,
+ "failed to allocate response object");
+ }
+
+ rc = nxt_unit_response_send(ws->req);
+ if (nxt_slow_path(rc != NXT_UNIT_OK)) {
+ return PyErr_Format(PyExc_RuntimeError,
+ "failed to send response");
+ }
+ }
+
+ ws->state = NXT_WS_CLOSED;
+
+ Py_INCREF(ws);
+
+ return (PyObject *) ws;
+}
+
+
+static PyObject *
+nxt_py_asgi_websocket_send_frame(nxt_py_asgi_websocket_t *ws, PyObject *dict)
+{
+ int rc;
+ uint8_t opcode;
+ PyObject *bytes, *text;
+ const void *buf;
+ Py_ssize_t buf_size;
+
+ if (nxt_slow_path(ws->state == NXT_WS_INIT)) {
+ return PyErr_Format(PyExc_RuntimeError,
+ "WebSocket connect not received");
+ }
+
+ if (nxt_slow_path(ws->state == NXT_WS_CONNECT)) {
+ return PyErr_Format(PyExc_RuntimeError,
+ "WebSocket not accepted yet");
+ }
+
+ if (nxt_slow_path(ws->state == NXT_WS_DISCONNECTED)) {
+ return PyErr_Format(PyExc_RuntimeError, "WebSocket disconnected");
+ }
+
+ if (nxt_slow_path(ws->state == NXT_WS_CLOSED)) {
+ return PyErr_Format(PyExc_RuntimeError, "WebSocket already closed");
+ }
+
+ bytes = PyDict_GetItem(dict, nxt_py_bytes_str);
+ if (bytes == Py_None) {
+ bytes = NULL;
+ }
+
+ if (nxt_slow_path(bytes != NULL && !PyBytes_Check(bytes))) {
+ return PyErr_Format(PyExc_TypeError,
+ "'bytes' is not a byte string");
+ }
+
+ text = PyDict_GetItem(dict, nxt_py_text_str);
+ if (text == Py_None) {
+ text = NULL;
+ }
+
+ if (nxt_slow_path(text != NULL && !PyUnicode_Check(text))) {
+ return PyErr_Format(PyExc_TypeError,
+ "'text' is not a unicode string");
+ }
+
+ if (nxt_slow_path(((bytes != NULL) ^ (text != NULL)) == 0)) {
+ return PyErr_Format(PyExc_ValueError,
+ "Exactly one of 'bytes' or 'text' must be non-None");
+ }
+
+ if (bytes != NULL) {
+ buf = PyBytes_AS_STRING(bytes);
+ buf_size = PyBytes_GET_SIZE(bytes);
+ opcode = NXT_WEBSOCKET_OP_BINARY;
+
+ } else {
+ buf = PyUnicode_AsUTF8AndSize(text, &buf_size);
+ opcode = NXT_WEBSOCKET_OP_TEXT;
+ }
+
+ rc = nxt_unit_websocket_send(ws->req, opcode, 1, buf, buf_size);
+ if (nxt_slow_path(rc != NXT_UNIT_OK)) {
+ return PyErr_Format(PyExc_RuntimeError, "failed to send close frame");
+ }
+
+ Py_INCREF(ws);
+ return (PyObject *) ws;
+}
+
+
+void
+nxt_py_asgi_websocket_handler(nxt_unit_websocket_frame_t *frame)
+{
+ uint8_t opcode;
+ uint16_t status_code;
+ uint64_t rest;
+ PyObject *msg, *exc;
+ nxt_py_asgi_websocket_t *ws;
+
+ ws = frame->req->data;
+
+ nxt_unit_req_debug(ws->req, "asgi_websocket_handler");
+
+ opcode = frame->header->opcode;
+ if (nxt_slow_path(opcode != NXT_WEBSOCKET_OP_CONT
+ && opcode != NXT_WEBSOCKET_OP_TEXT
+ && opcode != NXT_WEBSOCKET_OP_BINARY
+ && opcode != NXT_WEBSOCKET_OP_CLOSE))
+ {
+ nxt_unit_websocket_done(frame);
+
+ nxt_unit_req_debug(ws->req,
+ "asgi_websocket_handler: ignore frame with opcode %d",
+ opcode);
+
+ return;
+ }
+
+ if (nxt_slow_path(ws->state != NXT_WS_ACCEPTED)) {
+ nxt_unit_websocket_done(frame);
+
+ goto bad_state;
+ }
+
+ rest = nxt_py_asgi_ws_max_frame_size - ws->pending_frame_len;
+
+ if (nxt_slow_path(frame->payload_len > rest)) {
+ nxt_unit_websocket_done(frame);
+
+ goto too_big;
+ }
+
+ rest = nxt_py_asgi_ws_max_buffer_size - ws->pending_payload_len;
+
+ if (nxt_slow_path(frame->payload_len > rest)) {
+ nxt_unit_websocket_done(frame);
+
+ goto too_big;
+ }
+
+ if (ws->receive_future == NULL || frame->header->fin == 0) {
+ nxt_py_asgi_websocket_suspend_frame(frame);
+
+ return;
+ }
+
+ if (!nxt_queue_is_empty(&ws->pending_frames)) {
+ if (nxt_slow_path(opcode == NXT_WEBSOCKET_OP_TEXT
+ || opcode == NXT_WEBSOCKET_OP_BINARY))
+ {
+ nxt_unit_req_alert(ws->req,
+ "Invalid state: pending frames with active receiver. "
+ "CONT frame expected. (%d)", opcode);
+
+ PyErr_SetString(PyExc_AssertionError,
+ "Invalid state: pending frames with active receiver. "
+ "CONT frame expected.");
+
+ nxt_unit_websocket_done(frame);
+
+ return;
+ }
+ }
+
+ msg = nxt_py_asgi_websocket_pop_msg(ws, frame);
+ if (nxt_slow_path(msg == NULL)) {
+ exc = PyErr_Occurred();
+ Py_INCREF(exc);
+
+ goto raise;
+ }
+
+ nxt_py_asgi_websocket_receive_done(ws, msg);
+
+ return;
+
+bad_state:
+
+ if (ws->receive_future == NULL) {
+ ws->receive_exc_str = nxt_py_bad_state_str;
+
+ return;
+ }
+
+ exc = PyObject_CallFunctionObjArgs(PyExc_RuntimeError,
+ nxt_py_bad_state_str,
+ NULL);
+ if (nxt_slow_path(exc == NULL)) {
+ nxt_unit_req_alert(ws->req, "RuntimeError create failed");
+ nxt_python_print_exception();
+
+ exc = Py_None;
+ Py_INCREF(exc);
+ }
+
+ goto raise;
+
+too_big:
+
+ status_code = htons(NXT_WEBSOCKET_CR_MESSAGE_TOO_BIG);
+
+ (void) nxt_unit_websocket_send(ws->req, NXT_WEBSOCKET_OP_CLOSE,
+ 1, &status_code, 2);
+
+ ws->state = NXT_WS_CLOSED;
+
+ if (ws->receive_future == NULL) {
+ ws->receive_exc_str = nxt_py_message_too_big_str;
+
+ return;
+ }
+
+ exc = PyObject_CallFunctionObjArgs(PyExc_RuntimeError,
+ nxt_py_message_too_big_str,
+ NULL);
+ if (nxt_slow_path(exc == NULL)) {
+ nxt_unit_req_alert(ws->req, "RuntimeError create failed");
+ nxt_python_print_exception();
+
+ exc = Py_None;
+ Py_INCREF(exc);
+ }
+
+raise:
+
+ nxt_py_asgi_websocket_receive_fail(ws, exc);
+}
+
+
+static void
+nxt_py_asgi_websocket_receive_done(nxt_py_asgi_websocket_t *ws, PyObject *msg)
+{
+ PyObject *future, *res;
+
+ future = ws->receive_future;
+ ws->receive_future = NULL;
+
+ res = PyObject_CallMethodObjArgs(future, nxt_py_set_result_str, msg, NULL);
+ if (nxt_slow_path(res == NULL)) {
+ nxt_unit_req_alert(ws->req, "'set_result' call failed");
+ nxt_python_print_exception();
+ }
+
+ Py_XDECREF(res);
+ Py_DECREF(future);
+
+ Py_DECREF(msg);
+}
+
+
+static void
+nxt_py_asgi_websocket_receive_fail(nxt_py_asgi_websocket_t *ws, PyObject *exc)
+{
+ PyObject *future, *res;
+
+ future = ws->receive_future;
+ ws->receive_future = NULL;
+
+ res = PyObject_CallMethodObjArgs(future, nxt_py_set_exception_str, exc,
+ NULL);
+ if (nxt_slow_path(res == NULL)) {
+ nxt_unit_req_alert(ws->req, "'set_exception' call failed");
+ nxt_python_print_exception();
+ }
+
+ Py_XDECREF(res);
+ Py_DECREF(future);
+
+ Py_DECREF(exc);
+}
+
+
+static void
+nxt_py_asgi_websocket_suspend_frame(nxt_unit_websocket_frame_t *frame)
+{
+ int rc;
+ nxt_py_asgi_websocket_t *ws;
+ nxt_py_asgi_penging_frame_t *p;
+
+ nxt_unit_req_debug(frame->req, "asgi_websocket_suspend_frame: "
+ "%d, %"PRIu64", %d",
+ frame->header->opcode, frame->payload_len,
+ frame->header->fin);
+
+ ws = frame->req->data;
+
+ rc = nxt_unit_websocket_retain(frame);
+ if (nxt_slow_path(rc == NXT_UNIT_ERROR)) {
+ nxt_unit_req_alert(ws->req, "Failed to retain frame for suspension.");
+
+ nxt_unit_websocket_done(frame);
+
+ PyErr_SetString(PyExc_RuntimeError,
+ "Failed to retain frame for suspension.");
+
+ return;
+ }
+
+ p = nxt_unit_malloc(frame->req->ctx, sizeof(nxt_py_asgi_penging_frame_t));
+ if (nxt_slow_path(p == NULL)) {
+ nxt_unit_req_alert(ws->req,
+ "Failed to allocate buffer to suspend frame.");
+
+ nxt_unit_websocket_done(frame);
+
+ PyErr_SetString(PyExc_RuntimeError,
+ "Failed to allocate buffer to suspend frame.");
+
+ return;
+ }
+
+ p->frame = frame;
+ nxt_queue_insert_tail(&ws->pending_frames, &p->link);
+
+ ws->pending_payload_len += frame->payload_len;
+ ws->pending_fins += frame->header->fin;
+
+ if (frame->header->fin) {
+ ws->pending_frame_len = 0;
+
+ } else {
+ if (frame->header->opcode == NXT_WEBSOCKET_OP_CONT) {
+ ws->pending_frame_len += frame->payload_len;
+
+ } else {
+ ws->pending_frame_len = frame->payload_len;
+ }
+ }
+}
+
+
+static PyObject *
+nxt_py_asgi_websocket_pop_msg(nxt_py_asgi_websocket_t *ws,
+ nxt_unit_websocket_frame_t *frame)
+{
+ int fin;
+ char *buf;
+ uint8_t code_buf[2], opcode;
+ uint16_t code;
+ PyObject *msg, *data, *type, *data_key;
+ uint64_t payload_len;
+ nxt_unit_websocket_frame_t *fin_frame;
+
+ nxt_unit_req_debug(ws->req, "asgi_websocket_pop_msg");
+
+ fin_frame = NULL;
+
+ if (nxt_queue_is_empty(&ws->pending_frames)
+ || (frame != NULL
+ && frame->header->opcode == NXT_WEBSOCKET_OP_CLOSE))
+ {
+ payload_len = frame->payload_len;
+
+ } else {
+ if (frame != NULL) {
+ payload_len = ws->pending_payload_len + frame->payload_len;
+ fin_frame = frame;
+
+ } else {
+ payload_len = nxt_py_asgi_websocket_pending_len(ws);
+ }
+
+ frame = nxt_py_asgi_websocket_pop_frame(ws);
+ }
+
+ opcode = frame->header->opcode;
+
+ if (nxt_slow_path(opcode == NXT_WEBSOCKET_OP_CONT)) {
+ nxt_unit_req_alert(ws->req,
+ "Invalid state: attempt to process CONT frame.");
+
+ nxt_unit_websocket_done(frame);
+
+ return PyErr_Format(PyExc_AssertionError,
+ "Invalid state: attempt to process CONT frame.");
+ }
+
+ type = nxt_py_websocket_receive_str;
+
+ switch (opcode) {
+ case NXT_WEBSOCKET_OP_TEXT:
+ buf = nxt_unit_malloc(frame->req->ctx, payload_len);
+ if (nxt_slow_path(buf == NULL)) {
+ nxt_unit_req_alert(ws->req,
+ "Failed to allocate buffer for payload (%d).",
+ (int) payload_len);
+
+ nxt_unit_websocket_done(frame);
+
+ return PyErr_Format(PyExc_RuntimeError,
+ "Failed to allocate buffer for payload (%d).",
+ (int) payload_len);
+ }
+
+ data = NULL;
+ data_key = nxt_py_text_str;
+
+ break;
+
+ case NXT_WEBSOCKET_OP_BINARY:
+ data = PyBytes_FromStringAndSize(NULL, payload_len);
+ if (nxt_slow_path(data == NULL)) {
+ nxt_unit_req_alert(ws->req,
+ "Failed to create Bytes for payload (%d).",
+ (int) payload_len);
+ nxt_python_print_exception();
+
+ nxt_unit_websocket_done(frame);
+
+ return PyErr_Format(PyExc_RuntimeError,
+ "Failed to create Bytes for payload.");
+ }
+
+ buf = (char *) PyBytes_AS_STRING(data);
+ data_key = nxt_py_bytes_str;
+
+ break;
+
+ case NXT_WEBSOCKET_OP_CLOSE:
+ if (frame->payload_len >= 2) {
+ nxt_unit_websocket_read(frame, code_buf, 2);
+ code = ((uint16_t) code_buf[0]) << 8 | code_buf[1];
+
+ } else {
+ code = NXT_WEBSOCKET_CR_NORMAL;
+ }
+
+ nxt_unit_websocket_done(frame);
+
+ data = PyLong_FromLong(code);
+ if (nxt_slow_path(data == NULL)) {
+ nxt_unit_req_alert(ws->req,
+ "Failed to create Long from code %d.",
+ (int) code);
+ nxt_python_print_exception();
+
+ return PyErr_Format(PyExc_RuntimeError,
+ "Failed to create Long from code %d.",
+ (int) code);
+ }
+
+ buf = NULL;
+ type = nxt_py_websocket_disconnect_str;
+ data_key = nxt_py_code_str;
+
+ break;
+
+ default:
+ nxt_unit_req_alert(ws->req, "Unexpected opcode %d", opcode);
+
+ nxt_unit_websocket_done(frame);
+
+ return PyErr_Format(PyExc_AssertionError, "Unexpected opcode %d",
+ opcode);
+ }
+
+ if (buf != NULL) {
+ fin = frame->header->fin;
+ buf += nxt_unit_websocket_read(frame, buf, frame->payload_len);
+
+ nxt_unit_websocket_done(frame);
+
+ if (!fin) {
+ while (!nxt_queue_is_empty(&ws->pending_frames)) {
+ frame = nxt_py_asgi_websocket_pop_frame(ws);
+ fin = frame->header->fin;
+
+ buf += nxt_unit_websocket_read(frame, buf, frame->payload_len);
+
+ nxt_unit_websocket_done(frame);
+
+ if (fin) {
+ break;
+ }
+ }
+
+ if (fin_frame != NULL) {
+ buf += nxt_unit_websocket_read(fin_frame, buf,
+ fin_frame->payload_len);
+ nxt_unit_websocket_done(fin_frame);
+ }
+ }
+
+ if (opcode == NXT_WEBSOCKET_OP_TEXT) {
+ buf -= payload_len;
+
+ data = PyUnicode_DecodeUTF8(buf, payload_len, NULL);
+
+ nxt_unit_free(ws->req->ctx, buf);
+
+ if (nxt_slow_path(data == NULL)) {
+ nxt_unit_req_alert(ws->req,
+ "Failed to create Unicode for payload (%d).",
+ (int) payload_len);
+ nxt_python_print_exception();
+
+ return PyErr_Format(PyExc_RuntimeError,
+ "Failed to create Unicode.");
+ }
+ }
+ }
+
+ msg = nxt_py_asgi_new_msg(ws->req, type);
+ if (nxt_slow_path(msg == NULL)) {
+ Py_DECREF(data);
+ return NULL;
+ }
+
+ if (nxt_slow_path(PyDict_SetItem(msg, data_key, data) == -1)) {
+ nxt_unit_req_alert(ws->req, "Python failed to set 'msg.data' item");
+
+ Py_DECREF(msg);
+ Py_DECREF(data);
+
+ return PyErr_Format(PyExc_RuntimeError,
+ "Python failed to set 'msg.data' item");
+ }
+
+ Py_DECREF(data);
+
+ return msg;
+}
+
+
+static uint64_t
+nxt_py_asgi_websocket_pending_len(nxt_py_asgi_websocket_t *ws)
+{
+ uint64_t res;
+ nxt_py_asgi_penging_frame_t *p;
+
+ res = 0;
+
+ nxt_queue_each(p, &ws->pending_frames, nxt_py_asgi_penging_frame_t, link) {
+ res += p->frame->payload_len;
+
+ if (p->frame->header->fin) {
+ nxt_unit_req_debug(ws->req, "asgi_websocket_pending_len: %d",
+ (int) res);
+ return res;
+ }
+ } nxt_queue_loop;
+
+ nxt_unit_req_debug(ws->req, "asgi_websocket_pending_len: %d (all)",
+ (int) res);
+ return res;
+}
+
+
+static nxt_unit_websocket_frame_t *
+nxt_py_asgi_websocket_pop_frame(nxt_py_asgi_websocket_t *ws)
+{
+ nxt_queue_link_t *lnk;
+ nxt_unit_websocket_frame_t *frame;
+ nxt_py_asgi_penging_frame_t *p;
+
+ lnk = nxt_queue_first(&ws->pending_frames);
+ nxt_queue_remove(lnk);
+
+ p = nxt_queue_link_data(lnk, nxt_py_asgi_penging_frame_t, link);
+
+ frame = p->frame;
+ ws->pending_payload_len -= frame->payload_len;
+ ws->pending_fins -= frame->header->fin;
+
+ nxt_unit_free(frame->req->ctx, p);
+
+ nxt_unit_req_debug(frame->req, "asgi_websocket_pop_frame: "
+ "%d, %"PRIu64", %d",
+ frame->header->opcode, frame->payload_len,
+ frame->header->fin);
+
+ return frame;
+}
+
+
+void
+nxt_py_asgi_websocket_close_handler(nxt_unit_request_info_t *req)
+{
+ PyObject *msg, *exc;
+ nxt_py_asgi_websocket_t *ws;
+
+ ws = req->data;
+
+ nxt_unit_req_debug(req, "asgi_websocket_close_handler");
+
+ if (ws->receive_future == NULL) {
+ ws->state = NXT_WS_DISCONNECTED;
+
+ return;
+ }
+
+ msg = nxt_py_asgi_websocket_disconnect_msg(ws);
+ if (nxt_slow_path(msg == NULL)) {
+ exc = PyErr_Occurred();
+ Py_INCREF(exc);
+
+ nxt_py_asgi_websocket_receive_fail(ws, exc);
+
+ } else {
+ nxt_py_asgi_websocket_receive_done(ws, msg);
+ }
+}
+
+
+static PyObject *
+nxt_py_asgi_websocket_disconnect_msg(nxt_py_asgi_websocket_t *ws)
+{
+ PyObject *msg, *code;
+
+ msg = nxt_py_asgi_new_msg(ws->req, nxt_py_websocket_disconnect_str);
+ if (nxt_slow_path(msg == NULL)) {
+ return NULL;
+ }
+
+ code = PyLong_FromLong(NXT_WEBSOCKET_CR_GOING_AWAY);
+ if (nxt_slow_path(code == NULL)) {
+ nxt_unit_req_alert(ws->req, "Python failed to create long");
+ nxt_python_print_exception();
+
+ Py_DECREF(msg);
+
+ return PyErr_Format(PyExc_RuntimeError, "failed to create long");
+ }
+
+ if (nxt_slow_path(PyDict_SetItem(msg, nxt_py_code_str, code) == -1)) {
+ nxt_unit_req_alert(ws->req, "Python failed to set 'msg.code' item");
+
+ Py_DECREF(msg);
+ Py_DECREF(code);
+
+ return PyErr_Format(PyExc_RuntimeError,
+ "Python failed to set 'msg.code' item");
+ }
+
+ Py_DECREF(code);
+
+ return msg;
+}
+
+
+static PyObject *
+nxt_py_asgi_websocket_done(PyObject *self, PyObject *future)
+{
+ int rc;
+ uint16_t status_code;
+ PyObject *res;
+ nxt_py_asgi_websocket_t *ws;
+
+ ws = (nxt_py_asgi_websocket_t *) self;
+
+ nxt_unit_req_debug(ws->req, "asgi_websocket_done: %p", self);
+
+ /*
+ * Get Future.result() and it raises an exception, if coroutine exited
+ * with exception.
+ */
+ res = PyObject_CallMethodObjArgs(future, nxt_py_result_str, NULL);
+ if (nxt_slow_path(res == NULL)) {
+ nxt_unit_req_error(ws->req,
+ "Python failed to call 'future.result()'");
+ nxt_python_print_exception();
+
+ rc = NXT_UNIT_ERROR;
+
+ } else {
+ Py_DECREF(res);
+
+ rc = NXT_UNIT_OK;
+ }
+
+ if (ws->state == NXT_WS_ACCEPTED) {
+ status_code = (rc == NXT_UNIT_OK)
+ ? htons(NXT_WEBSOCKET_CR_NORMAL)
+ : htons(NXT_WEBSOCKET_CR_INTERNAL_SERVER_ERROR);
+
+ rc = nxt_unit_websocket_send(ws->req, NXT_WEBSOCKET_OP_CLOSE,
+ 1, &status_code, 2);
+ }
+
+ while (!nxt_queue_is_empty(&ws->pending_frames)) {
+ nxt_unit_websocket_done(nxt_py_asgi_websocket_pop_frame(ws));
+ }
+
+ nxt_unit_request_done(ws->req, rc);
+
+ Py_RETURN_NONE;
+}
+
+
+#endif /* NXT_HAVE_ASGI */
diff --git a/src/nxt_python_wsgi.c b/src/python/nxt_python_wsgi.c
index c4b7702e..97030cd3 100644
--- a/src/nxt_python_wsgi.c
+++ b/src/python/nxt_python_wsgi.c
@@ -8,17 +8,15 @@
#include <Python.h>
-#include <compile.h>
-#include <node.h>
-
#include <nxt_main.h>
-#include <nxt_runtime.h>
#include <nxt_router.h>
#include <nxt_unit.h>
#include <nxt_unit_field.h>
#include <nxt_unit_request.h>
#include <nxt_unit_response.h>
+#include <python/nxt_python.h>
+
#include NXT_PYTHON_MOUNTS_H
/*
@@ -40,23 +38,6 @@
*/
-#if PY_MAJOR_VERSION == 3
-#define NXT_PYTHON_BYTES_TYPE "bytestring"
-
-#define PyString_FromStringAndSize(str, size) \
- PyUnicode_DecodeLatin1((str), (size), "strict")
-
-#else
-#define NXT_PYTHON_BYTES_TYPE "string"
-
-#define PyBytes_FromStringAndSize PyString_FromStringAndSize
-#define PyBytes_Check PyString_Check
-#define PyBytes_GET_SIZE PyString_GET_SIZE
-#define PyBytes_AS_STRING PyString_AS_STRING
-#define PyUnicode_InternInPlace PyString_InternInPlace
-#define PyUnicode_AsUTF8 PyString_AS_STRING
-#endif
-
typedef struct nxt_python_run_ctx_s nxt_python_run_ctx_t;
typedef struct {
@@ -68,18 +49,17 @@ typedef struct {
PyObject_HEAD
} nxt_py_error_t;
-static nxt_int_t nxt_python_start(nxt_task_t *task,
- nxt_process_data_t *data);
-static nxt_int_t nxt_python_init_strings(void);
static void nxt_python_request_handler(nxt_unit_request_info_t *req);
-static void nxt_python_atexit(void);
static PyObject *nxt_python_create_environ(nxt_task_t *task);
static PyObject *nxt_python_get_environ(nxt_python_run_ctx_t *ctx);
static int nxt_python_add_sptr(nxt_python_run_ctx_t *ctx, PyObject *name,
nxt_unit_sptr_t *sptr, uint32_t size);
static int nxt_python_add_field(nxt_python_run_ctx_t *ctx,
- nxt_unit_field_t *field);
+ nxt_unit_field_t *field, int n, uint32_t vl);
+static PyObject *nxt_python_field_name(const char *name, uint8_t len);
+static PyObject *nxt_python_field_value(nxt_unit_field_t *f, int n,
+ uint32_t vl);
static int nxt_python_add_obj(nxt_python_run_ctx_t *ctx, PyObject *name,
PyObject *value);
@@ -99,7 +79,6 @@ static PyObject *nxt_py_input_readlines(nxt_py_input_t *self, PyObject *args);
static PyObject *nxt_py_input_iter(PyObject *self);
static PyObject *nxt_py_input_next(PyObject *self);
-static void nxt_python_print_exception(void);
static int nxt_python_write(nxt_python_run_ctx_t *ctx, PyObject *bytes);
struct nxt_python_run_ctx_s {
@@ -109,22 +88,6 @@ struct nxt_python_run_ctx_s {
nxt_unit_request_info_t *req;
};
-static uint32_t compat[] = {
- NXT_VERNUM, NXT_DEBUG,
-};
-
-
-NXT_EXPORT nxt_app_module_t nxt_app_module = {
- sizeof(compat),
- compat,
- nxt_string("python"),
- PY_VERSION,
- nxt_python_mounts,
- nxt_nitems(nxt_python_mounts),
- NULL,
- nxt_python_start,
-};
-
static PyMethodDef nxt_py_start_resp_method[] = {
{"unit_start_response", nxt_py_start_resp, METH_VARARGS, ""}
@@ -158,22 +121,13 @@ static PyTypeObject nxt_py_input_type = {
};
-static PyObject *nxt_py_stderr_flush;
-static PyObject *nxt_py_application;
static PyObject *nxt_py_start_resp_obj;
static PyObject *nxt_py_write_obj;
static PyObject *nxt_py_environ_ptyp;
-#if PY_MAJOR_VERSION == 3
-static wchar_t *nxt_py_home;
-#else
-static char *nxt_py_home;
-#endif
-
static PyThreadState *nxt_python_thread_state;
static nxt_python_run_ctx_t *nxt_python_run_ctx;
-
static PyObject *nxt_py_80_str;
static PyObject *nxt_py_close_str;
static PyObject *nxt_py_content_length_str;
@@ -191,11 +145,6 @@ static PyObject *nxt_py_server_port_str;
static PyObject *nxt_py_server_protocol_str;
static PyObject *nxt_py_wsgi_uri_scheme_str;
-typedef struct {
- nxt_str_t string;
- PyObject **object_p;
-} nxt_python_string_t;
-
static nxt_python_string_t nxt_python_strings[] = {
{ nxt_string("80"), &nxt_py_80_str },
{ nxt_string("close"), &nxt_py_close_str },
@@ -213,136 +162,22 @@ static nxt_python_string_t nxt_python_strings[] = {
{ nxt_string("SERVER_PORT"), &nxt_py_server_port_str },
{ nxt_string("SERVER_PROTOCOL"), &nxt_py_server_protocol_str },
{ nxt_string("wsgi.url_scheme"), &nxt_py_wsgi_uri_scheme_str },
+ { nxt_null_string, NULL },
};
-static nxt_int_t
-nxt_python_start(nxt_task_t *task, nxt_process_data_t *data)
+nxt_int_t
+nxt_python_wsgi_init(nxt_task_t *task, nxt_unit_init_t *init)
{
- int rc;
- char *nxt_py_module;
- size_t len;
- PyObject *obj, *pypath, *module;
- nxt_unit_ctx_t *unit_ctx;
- nxt_unit_init_t python_init;
- nxt_common_app_conf_t *app_conf;
- nxt_python_app_conf_t *c;
-#if PY_MAJOR_VERSION == 3
- char *path;
- size_t size;
- nxt_int_t pep405;
-
- static const char pyvenv[] = "/pyvenv.cfg";
- static const char bin_python[] = "/bin/python";
-#endif
-
- app_conf = data->app;
- c = &app_conf->u.python;
-
- if (c->home != NULL) {
- len = nxt_strlen(c->home);
+ PyObject *obj;
-#if PY_MAJOR_VERSION == 3
-
- path = nxt_malloc(len + sizeof(pyvenv));
- if (nxt_slow_path(path == NULL)) {
- nxt_alert(task, "Failed to allocate memory");
- return NXT_ERROR;
- }
-
- nxt_memcpy(path, c->home, len);
- nxt_memcpy(path + len, pyvenv, sizeof(pyvenv));
-
- pep405 = (access(path, R_OK) == 0);
-
- nxt_free(path);
-
- if (pep405) {
- size = (len + sizeof(bin_python)) * sizeof(wchar_t);
-
- } else {
- size = (len + 1) * sizeof(wchar_t);
- }
-
- nxt_py_home = nxt_malloc(size);
- if (nxt_slow_path(nxt_py_home == NULL)) {
- nxt_alert(task, "Failed to allocate memory");
- return NXT_ERROR;
- }
-
- if (pep405) {
- mbstowcs(nxt_py_home, c->home, len);
- mbstowcs(nxt_py_home + len, bin_python, sizeof(bin_python));
- Py_SetProgramName(nxt_py_home);
-
- } else {
- mbstowcs(nxt_py_home, c->home, len + 1);
- Py_SetPythonHome(nxt_py_home);
- }
-
-#else
- nxt_py_home = nxt_malloc(len + 1);
- if (nxt_slow_path(nxt_py_home == NULL)) {
- nxt_alert(task, "Failed to allocate memory");
- return NXT_ERROR;
- }
-
- nxt_memcpy(nxt_py_home, c->home, len + 1);
- Py_SetPythonHome(nxt_py_home);
-#endif
- }
-
- Py_InitializeEx(0);
-
- module = NULL;
obj = NULL;
- if (nxt_slow_path(nxt_python_init_strings() != NXT_OK)) {
+ if (nxt_slow_path(nxt_python_init_strings(nxt_python_strings) != NXT_OK)) {
nxt_alert(task, "Python failed to init string objects");
goto fail;
}
- obj = PySys_GetObject((char *) "stderr");
- if (nxt_slow_path(obj == NULL)) {
- nxt_alert(task, "Python failed to get \"sys.stderr\" object");
- goto fail;
- }
-
- nxt_py_stderr_flush = PyObject_GetAttrString(obj, "flush");
- if (nxt_slow_path(nxt_py_stderr_flush == NULL)) {
- nxt_alert(task, "Python failed to get \"flush\" attribute of "
- "\"sys.stderr\" object");
- goto fail;
- }
-
- Py_DECREF(obj);
-
- if (c->path.length > 0) {
- obj = PyString_FromStringAndSize((char *) c->path.start,
- c->path.length);
-
- if (nxt_slow_path(obj == NULL)) {
- nxt_alert(task, "Python failed to create string object \"%V\"",
- &c->path);
- goto fail;
- }
-
- pypath = PySys_GetObject((char *) "path");
-
- if (nxt_slow_path(pypath == NULL)) {
- nxt_alert(task, "Python failed to get \"sys.path\" list");
- goto fail;
- }
-
- if (nxt_slow_path(PyList_Insert(pypath, 0, obj) != 0)) {
- nxt_alert(task, "Python failed to insert \"%V\" into \"sys.path\"",
- &c->path);
- goto fail;
- }
-
- Py_DECREF(obj);
- }
-
obj = PyCFunction_New(nxt_py_start_resp_method, NULL);
if (nxt_slow_path(obj == NULL)) {
nxt_alert(task,
@@ -366,107 +201,43 @@ nxt_python_start(nxt_task_t *task, nxt_process_data_t *data)
}
nxt_py_environ_ptyp = obj;
-
- obj = Py_BuildValue("[s]", "unit");
- if (nxt_slow_path(obj == NULL)) {
- nxt_alert(task, "Python failed to create the \"sys.argv\" list");
- goto fail;
- }
-
- if (nxt_slow_path(PySys_SetObject((char *) "argv", obj) != 0)) {
- nxt_alert(task, "Python failed to set the \"sys.argv\" list");
- goto fail;
- }
-
- Py_CLEAR(obj);
-
- nxt_py_module = nxt_alloca(c->module.length + 1);
- nxt_memcpy(nxt_py_module, c->module.start, c->module.length);
- nxt_py_module[c->module.length] = '\0';
-
- module = PyImport_ImportModule(nxt_py_module);
- if (nxt_slow_path(module == NULL)) {
- nxt_alert(task, "Python failed to import module \"%s\"", nxt_py_module);
- nxt_python_print_exception();
- goto fail;
- }
-
- obj = PyDict_GetItemString(PyModule_GetDict(module), "application");
- if (nxt_slow_path(obj == NULL)) {
- nxt_alert(task, "Python failed to get \"application\" "
- "from module \"%s\"", nxt_py_module);
- goto fail;
- }
-
- if (nxt_slow_path(PyCallable_Check(obj) == 0)) {
- nxt_alert(task, "\"application\" in module \"%s\" "
- "is not a callable object", nxt_py_module);
- goto fail;
- }
-
- Py_INCREF(obj);
- Py_CLEAR(module);
-
- nxt_py_application = obj;
obj = NULL;
- nxt_unit_default_init(task, &python_init);
-
- python_init.callbacks.request_handler = nxt_python_request_handler;
- python_init.shm_limit = data->app->shm_limit;
-
- unit_ctx = nxt_unit_init(&python_init);
- if (nxt_slow_path(unit_ctx == NULL)) {
- goto fail;
- }
-
- nxt_python_thread_state = PyEval_SaveThread();
-
- rc = nxt_unit_run(unit_ctx);
-
- nxt_unit_done(unit_ctx);
-
- PyEval_RestoreThread(nxt_python_thread_state);
-
- nxt_python_atexit();
-
- exit(rc);
+ init->callbacks.request_handler = nxt_python_request_handler;
return NXT_OK;
fail:
Py_XDECREF(obj);
- Py_XDECREF(module);
-
- nxt_python_atexit();
return NXT_ERROR;
}
-static nxt_int_t
-nxt_python_init_strings(void)
+int
+nxt_python_wsgi_run(nxt_unit_ctx_t *ctx)
{
- PyObject *obj;
- nxt_uint_t i;
- nxt_python_string_t *pstr;
+ int rc;
- for (i = 0; i < nxt_nitems(nxt_python_strings); i++) {
- pstr = &nxt_python_strings[i];
+ nxt_python_thread_state = PyEval_SaveThread();
- obj = PyString_FromStringAndSize((char *) pstr->string.start,
- pstr->string.length);
- if (nxt_slow_path(obj == NULL)) {
- return NXT_ERROR;
- }
+ rc = nxt_unit_run(ctx);
- PyUnicode_InternInPlace(&obj);
+ PyEval_RestoreThread(nxt_python_thread_state);
- *pstr->object_p = obj;
- }
+ return rc;
+}
- return NXT_OK;
+
+void
+nxt_python_wsgi_done(void)
+{
+ nxt_python_done_strings(nxt_python_strings);
+
+ Py_XDECREF(nxt_py_start_resp_obj);
+ Py_XDECREF(nxt_py_write_obj);
+ Py_XDECREF(nxt_py_environ_ptyp);
}
@@ -597,29 +368,6 @@ done:
}
-static void
-nxt_python_atexit(void)
-{
- nxt_uint_t i;
-
- for (i = 0; i < nxt_nitems(nxt_python_strings); i++) {
- Py_XDECREF(*nxt_python_strings[i].object_p);
- }
-
- Py_XDECREF(nxt_py_stderr_flush);
- Py_XDECREF(nxt_py_application);
- Py_XDECREF(nxt_py_start_resp_obj);
- Py_XDECREF(nxt_py_write_obj);
- Py_XDECREF(nxt_py_environ_ptyp);
-
- Py_Finalize();
-
- if (nxt_py_home != NULL) {
- nxt_free(nxt_py_home);
- }
-}
-
-
static PyObject *
nxt_python_create_environ(nxt_task_t *task)
{
@@ -749,9 +497,9 @@ static PyObject *
nxt_python_get_environ(nxt_python_run_ctx_t *ctx)
{
int rc;
- uint32_t i;
+ uint32_t i, j, vl;
PyObject *environ;
- nxt_unit_field_t *f;
+ nxt_unit_field_t *f, *f2;
nxt_unit_request_t *r;
environ = PyDict_Copy(nxt_py_environ_ptyp);
@@ -803,10 +551,27 @@ nxt_python_get_environ(nxt_python_run_ctx_t *ctx)
r->server_name_length));
RC(nxt_python_add_obj(ctx, nxt_py_server_port_str, nxt_py_80_str));
- for (i = 0; i < r->fields_count; i++) {
+ nxt_unit_request_group_dup_fields(ctx->req);
+
+ for (i = 0; i < r->fields_count;) {
f = r->fields + i;
+ vl = f->value_length;
+
+ for (j = i + 1; j < r->fields_count; j++) {
+ f2 = r->fields + j;
+
+ if (f2->hash != f->hash
+ || nxt_unit_sptr_get(&f2->name) != nxt_unit_sptr_get(&f->name))
+ {
+ break;
+ }
- RC(nxt_python_add_field(ctx, f));
+ vl += 2 + f2->value_length;
+ }
+
+ RC(nxt_python_add_field(ctx, f, j - i, vl));
+
+ i = j;
}
if (r->content_length_field != NXT_UNIT_NONE_FIELD) {
@@ -870,14 +635,15 @@ nxt_python_add_sptr(nxt_python_run_ctx_t *ctx, PyObject *name,
static int
-nxt_python_add_field(nxt_python_run_ctx_t *ctx, nxt_unit_field_t *field)
+nxt_python_add_field(nxt_python_run_ctx_t *ctx, nxt_unit_field_t *field, int n,
+ uint32_t vl)
{
char *src;
PyObject *name, *value;
src = nxt_unit_sptr_get(&field->name);
- name = PyString_FromStringAndSize(src, field->name_length);
+ name = nxt_python_field_name(src, field->name_length);
if (nxt_slow_path(name == NULL)) {
nxt_unit_req_error(ctx->req,
"Python failed to create name string \"%.*s\"",
@@ -887,13 +653,13 @@ nxt_python_add_field(nxt_python_run_ctx_t *ctx, nxt_unit_field_t *field)
return NXT_UNIT_ERROR;
}
- src = nxt_unit_sptr_get(&field->value);
+ value = nxt_python_field_value(field, n, vl);
- value = PyString_FromStringAndSize(src, field->value_length);
if (nxt_slow_path(value == NULL)) {
nxt_unit_req_error(ctx->req,
"Python failed to create value string \"%.*s\"",
- (int) field->value_length, src);
+ (int) field->value_length,
+ (char *) nxt_unit_sptr_get(&field->value));
nxt_python_print_exception();
goto fail;
@@ -920,6 +686,80 @@ fail:
}
+static PyObject *
+nxt_python_field_name(const char *name, uint8_t len)
+{
+ char *p, c;
+ uint8_t i;
+ PyObject *res;
+
+#if PY_MAJOR_VERSION == 3
+ res = PyUnicode_New(len + 5, 255);
+#else
+ res = PyString_FromStringAndSize(NULL, len + 5);
+#endif
+
+ if (nxt_slow_path(res == NULL)) {
+ return NULL;
+ }
+
+ p = PyString_AS_STRING(res);
+
+ p = nxt_cpymem(p, "HTTP_", 5);
+
+ for (i = 0; i < len; i++) {
+ c = name[i];
+
+ if (c >= 'a' && c <= 'z') {
+ *p++ = (c & ~0x20);
+ continue;
+ }
+
+ if (c == '-') {
+ *p++ = '_';
+ continue;
+ }
+
+ *p++ = c;
+ }
+
+ return res;
+}
+
+
+static PyObject *
+nxt_python_field_value(nxt_unit_field_t *f, int n, uint32_t vl)
+{
+ int i;
+ char *p, *src;
+ PyObject *res;
+
+#if PY_MAJOR_VERSION == 3
+ res = PyUnicode_New(vl, 255);
+#else
+ res = PyString_FromStringAndSize(NULL, vl);
+#endif
+
+ if (nxt_slow_path(res == NULL)) {
+ return NULL;
+ }
+
+ p = PyString_AS_STRING(res);
+
+ src = nxt_unit_sptr_get(&f->value);
+ p = nxt_cpymem(p, src, f->value_length);
+
+ for (i = 1; i < n; i++) {
+ p = nxt_cpymem(p, ", ", 2);
+
+ src = nxt_unit_sptr_get(&f[i].value);
+ p = nxt_cpymem(p, src, f[i].value_length);
+ }
+
+ return res;
+}
+
+
static int
nxt_python_add_obj(nxt_python_run_ctx_t *ctx, PyObject *name, PyObject *value)
{
@@ -1386,28 +1226,6 @@ nxt_py_input_next(PyObject *self)
}
-static void
-nxt_python_print_exception(void)
-{
- PyErr_Print();
-
-#if PY_MAJOR_VERSION == 3
- /* The backtrace may be buffered in sys.stderr file object. */
- {
- PyObject *result;
-
- result = PyObject_CallFunction(nxt_py_stderr_flush, NULL);
- if (nxt_slow_path(result == NULL)) {
- PyErr_Clear();
- return;
- }
-
- Py_DECREF(result);
- }
-#endif
-}
-
-
static int
nxt_python_write(nxt_python_run_ctx_t *ctx, PyObject *bytes)
{