diff options
author | Andrei Belov <defan@nginx.com> | 2020-10-08 19:19:31 +0300 |
---|---|---|
committer | Andrei Belov <defan@nginx.com> | 2020-10-08 19:19:31 +0300 |
commit | d586ac9fdc4a86c142b06a75dde4cdacad5b52f6 (patch) | |
tree | 9817282396f9d2cf5333050e4b5bf807d3617e40 /src | |
parent | 9be35d9b7418c041e5177f273c20f0fd2d3f00ad (diff) | |
parent | ad516735a65fe109773b60e26214a071411f1734 (diff) | |
download | unit-d586ac9fdc4a86c142b06a75dde4cdacad5b52f6.tar.gz unit-d586ac9fdc4a86c142b06a75dde4cdacad5b52f6.tar.bz2 |
Merged with the default branch.1.20.0-1
Diffstat (limited to 'src')
38 files changed, 5837 insertions, 1531 deletions
diff --git a/src/nxt_application.c b/src/nxt_application.c index 57e4615e..6935346c 100644 --- a/src/nxt_application.c +++ b/src/nxt_application.c @@ -14,6 +14,7 @@ #include <nxt_application.h> #include <nxt_unit.h> #include <nxt_port_memory_int.h> +#include <nxt_isolation.h> #include <glob.h> @@ -41,45 +42,10 @@ static void nxt_discovery_quit(nxt_task_t *task, nxt_port_recv_msg_t *msg, void *data); static nxt_app_module_t *nxt_app_module_load(nxt_task_t *task, const char *name); -static nxt_int_t nxt_app_main_prefork(nxt_task_t *task, nxt_process_t *process, - nxt_mp_t *mp); static nxt_int_t nxt_app_setup(nxt_task_t *task, nxt_process_t *process); static nxt_int_t nxt_app_set_environment(nxt_conf_value_t *environment); static u_char *nxt_cstr_dup(nxt_mp_t *mp, u_char *dst, u_char *src); -#if (NXT_HAVE_ISOLATION_ROOTFS) -static nxt_int_t nxt_app_set_isolation_mounts(nxt_task_t *task, - nxt_process_t *process, nxt_str_t *app_type); -static nxt_int_t nxt_app_set_lang_mounts(nxt_task_t *task, - nxt_process_t *process, nxt_array_t *syspaths); -static nxt_int_t nxt_app_set_isolation_rootfs(nxt_task_t *task, - nxt_conf_value_t *isolation, nxt_process_t *process); -static nxt_int_t nxt_app_prepare_rootfs(nxt_task_t *task, - nxt_process_t *process); -#endif - -static nxt_int_t nxt_app_set_isolation(nxt_task_t *task, - nxt_conf_value_t *isolation, nxt_process_t *process); - -#if (NXT_HAVE_CLONE) -static nxt_int_t nxt_app_set_isolation_namespaces(nxt_task_t *task, - nxt_conf_value_t *isolation, nxt_process_t *process); -static nxt_int_t nxt_app_clone_flags(nxt_task_t *task, - nxt_conf_value_t *namespaces, nxt_clone_t *clone); -#endif - -#if (NXT_HAVE_CLONE_NEWUSER) -static nxt_int_t nxt_app_set_isolation_creds(nxt_task_t *task, - nxt_conf_value_t *isolation, nxt_process_t *process); -static nxt_int_t nxt_app_isolation_credential_map(nxt_task_t *task, - nxt_mp_t *mem_pool, nxt_conf_value_t *map_array, - nxt_clone_credential_map_t *map); -#endif - -#if (NXT_HAVE_PR_SET_NO_NEW_PRIVS) -static nxt_int_t nxt_app_set_isolation_new_privs(nxt_task_t *task, - nxt_conf_value_t *isolation, nxt_process_t *process); -#endif nxt_str_t nxt_server = nxt_string(NXT_SERVER); @@ -126,7 +92,7 @@ const nxt_process_init_t nxt_discovery_process = { const nxt_process_init_t nxt_app_process = { .type = NXT_PROCESS_APP, .setup = nxt_app_setup, - .prefork = nxt_app_main_prefork, + .prefork = nxt_isolation_main_prefork, .restart = 0, .start = NULL, /* set to module->start */ .port_handlers = &nxt_app_process_port_handlers, @@ -474,81 +440,6 @@ nxt_discovery_quit(nxt_task_t *task, nxt_port_recv_msg_t *msg, void *data) static nxt_int_t -nxt_app_main_prefork(nxt_task_t *task, nxt_process_t *process, nxt_mp_t *mp) -{ - nxt_int_t cap_setid; - nxt_int_t ret; - nxt_runtime_t *rt; - nxt_common_app_conf_t *app_conf; - - rt = task->thread->runtime; - app_conf = process->data.app; - cap_setid = rt->capabilities.setid; - - if (app_conf->isolation != NULL) { - ret = nxt_app_set_isolation(task, app_conf->isolation, process); - if (nxt_slow_path(ret != NXT_OK)) { - return ret; - } - } - -#if (NXT_HAVE_CLONE_NEWUSER) - if (nxt_is_clone_flag_set(process->isolation.clone.flags, NEWUSER)) { - cap_setid = 1; - } -#endif - -#if (NXT_HAVE_ISOLATION_ROOTFS) - if (process->isolation.rootfs != NULL) { - ret = nxt_app_set_isolation_mounts(task, process, &app_conf->type); - if (nxt_slow_path(ret != NXT_OK)) { - return ret; - } - } -#endif - - if (cap_setid) { - ret = nxt_process_creds_set(task, process, &app_conf->user, - &app_conf->group); - - if (nxt_slow_path(ret != NXT_OK)) { - return ret; - } - - } else { - if (!nxt_str_eq(&app_conf->user, (u_char *) rt->user_cred.user, - nxt_strlen(rt->user_cred.user))) - { - nxt_alert(task, "cannot set user \"%V\" for app \"%V\": " - "missing capabilities", &app_conf->user, &app_conf->name); - - return NXT_ERROR; - } - - if (app_conf->group.length > 0 - && !nxt_str_eq(&app_conf->group, (u_char *) rt->group, - nxt_strlen(rt->group))) - { - nxt_alert(task, "cannot set group \"%V\" for app \"%V\": " - "missing capabilities", &app_conf->group, - &app_conf->name); - - return NXT_ERROR; - } - } - -#if (NXT_HAVE_CLONE_NEWUSER) - ret = nxt_process_vldt_isolation_creds(task, process); - if (nxt_slow_path(ret != NXT_OK)) { - return ret; - } -#endif - - return NXT_OK; -} - - -static nxt_int_t nxt_app_setup(nxt_task_t *task, nxt_process_t *process) { nxt_int_t ret; @@ -594,13 +485,13 @@ nxt_app_setup(nxt_task_t *task, nxt_process_t *process) #if (NXT_HAVE_ISOLATION_ROOTFS) if (process->isolation.rootfs != NULL) { if (process->isolation.mounts != NULL) { - ret = nxt_app_prepare_rootfs(task, process); + ret = nxt_isolation_prepare_rootfs(task, process); if (nxt_slow_path(ret != NXT_OK)) { return ret; } } - ret = nxt_process_change_root(task, process); + ret = nxt_isolation_change_root(task, process); if (nxt_slow_path(ret != NXT_OK)) { return NXT_ERROR; } @@ -686,474 +577,6 @@ nxt_app_set_environment(nxt_conf_value_t *environment) } -static nxt_int_t -nxt_app_set_isolation(nxt_task_t *task, nxt_conf_value_t *isolation, - nxt_process_t *process) -{ -#if (NXT_HAVE_CLONE) - if (nxt_slow_path(nxt_app_set_isolation_namespaces(task, isolation, process) - != NXT_OK)) - { - return NXT_ERROR; - } -#endif - -#if (NXT_HAVE_CLONE_NEWUSER) - if (nxt_slow_path(nxt_app_set_isolation_creds(task, isolation, process) - != NXT_OK)) - { - return NXT_ERROR; - } -#endif - -#if (NXT_HAVE_ISOLATION_ROOTFS) - if (nxt_slow_path(nxt_app_set_isolation_rootfs(task, isolation, process) - != NXT_OK)) - { - return NXT_ERROR; - } -#endif - -#if (NXT_HAVE_PR_SET_NO_NEW_PRIVS) - if (nxt_slow_path(nxt_app_set_isolation_new_privs(task, isolation, process) - != NXT_OK)) - { - return NXT_ERROR; - } -#endif - - return NXT_OK; -} - - -#if (NXT_HAVE_CLONE) - -static nxt_int_t -nxt_app_set_isolation_namespaces(nxt_task_t *task, nxt_conf_value_t *isolation, - nxt_process_t *process) -{ - nxt_int_t ret; - nxt_conf_value_t *obj; - - static nxt_str_t nsname = nxt_string("namespaces"); - - obj = nxt_conf_get_object_member(isolation, &nsname, NULL); - if (obj != NULL) { - ret = nxt_app_clone_flags(task, obj, &process->isolation.clone); - if (nxt_slow_path(ret != NXT_OK)) { - return NXT_ERROR; - } - } - - return NXT_OK; -} - -#endif - - -#if (NXT_HAVE_CLONE_NEWUSER) - -static nxt_int_t -nxt_app_set_isolation_creds(nxt_task_t *task, nxt_conf_value_t *isolation, - nxt_process_t *process) -{ - nxt_int_t ret; - nxt_clone_t *clone; - nxt_conf_value_t *array; - - static nxt_str_t uidname = nxt_string("uidmap"); - static nxt_str_t gidname = nxt_string("gidmap"); - - clone = &process->isolation.clone; - - array = nxt_conf_get_object_member(isolation, &uidname, NULL); - if (array != NULL) { - ret = nxt_app_isolation_credential_map(task, process->mem_pool, array, - &clone->uidmap); - - if (nxt_slow_path(ret != NXT_OK)) { - return NXT_ERROR; - } - } - - array = nxt_conf_get_object_member(isolation, &gidname, NULL); - if (array != NULL) { - ret = nxt_app_isolation_credential_map(task, process->mem_pool, array, - &clone->gidmap); - - if (nxt_slow_path(ret != NXT_OK)) { - return NXT_ERROR; - } - } - - return NXT_OK; -} - - -static nxt_int_t -nxt_app_isolation_credential_map(nxt_task_t *task, nxt_mp_t *mp, - nxt_conf_value_t *map_array, nxt_clone_credential_map_t *map) -{ - nxt_int_t ret; - nxt_uint_t i; - nxt_conf_value_t *obj; - - static nxt_conf_map_t nxt_clone_map_entry_conf[] = { - { - nxt_string("container"), - NXT_CONF_MAP_INT, - offsetof(nxt_clone_map_entry_t, container), - }, - - { - nxt_string("host"), - NXT_CONF_MAP_INT, - offsetof(nxt_clone_map_entry_t, host), - }, - - { - nxt_string("size"), - NXT_CONF_MAP_INT, - offsetof(nxt_clone_map_entry_t, size), - }, - }; - - map->size = nxt_conf_array_elements_count(map_array); - - if (map->size == 0) { - return NXT_OK; - } - - map->map = nxt_mp_alloc(mp, map->size * sizeof(nxt_clone_map_entry_t)); - if (nxt_slow_path(map->map == NULL)) { - return NXT_ERROR; - } - - for (i = 0; i < map->size; i++) { - obj = nxt_conf_get_array_element(map_array, i); - - ret = nxt_conf_map_object(mp, obj, nxt_clone_map_entry_conf, - nxt_nitems(nxt_clone_map_entry_conf), - map->map + i); - if (nxt_slow_path(ret != NXT_OK)) { - nxt_alert(task, "clone map entry map error"); - return NXT_ERROR; - } - } - - return NXT_OK; -} - -#endif - -#if (NXT_HAVE_CLONE) - -static nxt_int_t -nxt_app_clone_flags(nxt_task_t *task, nxt_conf_value_t *namespaces, - nxt_clone_t *clone) -{ - uint32_t index; - nxt_str_t name; - nxt_int_t flag; - nxt_conf_value_t *value; - - index = 0; - - for ( ;; ) { - value = nxt_conf_next_object_member(namespaces, &name, &index); - - if (value == NULL) { - break; - } - - flag = 0; - -#if (NXT_HAVE_CLONE_NEWUSER) - if (nxt_str_eq(&name, "credential", 10)) { - flag = CLONE_NEWUSER; - } -#endif - -#if (NXT_HAVE_CLONE_NEWPID) - if (nxt_str_eq(&name, "pid", 3)) { - flag = CLONE_NEWPID; - } -#endif - -#if (NXT_HAVE_CLONE_NEWNET) - if (nxt_str_eq(&name, "network", 7)) { - flag = CLONE_NEWNET; - } -#endif - -#if (NXT_HAVE_CLONE_NEWUTS) - if (nxt_str_eq(&name, "uname", 5)) { - flag = CLONE_NEWUTS; - } -#endif - -#if (NXT_HAVE_CLONE_NEWNS) - if (nxt_str_eq(&name, "mount", 5)) { - flag = CLONE_NEWNS; - } -#endif - -#if (NXT_HAVE_CLONE_NEWCGROUP) - if (nxt_str_eq(&name, "cgroup", 6)) { - flag = CLONE_NEWCGROUP; - } -#endif - - if (!flag) { - nxt_alert(task, "unknown namespace flag: \"%V\"", &name); - return NXT_ERROR; - } - - if (nxt_conf_get_boolean(value)) { - clone->flags |= flag; - } - } - - return NXT_OK; -} - -#endif - - -#if (NXT_HAVE_ISOLATION_ROOTFS) - -static nxt_int_t -nxt_app_set_isolation_rootfs(nxt_task_t *task, nxt_conf_value_t *isolation, - nxt_process_t *process) -{ - nxt_str_t str; - nxt_conf_value_t *obj; - - static nxt_str_t rootfs_name = nxt_string("rootfs"); - - obj = nxt_conf_get_object_member(isolation, &rootfs_name, NULL); - if (obj != NULL) { - nxt_conf_get_string(obj, &str); - - if (nxt_slow_path(str.length <= 1 || str.start[0] != '/')) { - nxt_log(task, NXT_LOG_ERR, "rootfs requires an absolute path other " - "than \"/\" but given \"%V\"", &str); - - return NXT_ERROR; - } - - if (str.start[str.length - 1] == '/') { - str.length--; - } - - process->isolation.rootfs = nxt_mp_alloc(process->mem_pool, - str.length + 1); - - if (nxt_slow_path(process->isolation.rootfs == NULL)) { - return NXT_ERROR; - } - - nxt_memcpy(process->isolation.rootfs, str.start, str.length); - - process->isolation.rootfs[str.length] = '\0'; - } - - return NXT_OK; -} - - -static nxt_int_t -nxt_app_set_isolation_mounts(nxt_task_t *task, nxt_process_t *process, - nxt_str_t *app_type) -{ - nxt_int_t ret, cap_chroot; - nxt_runtime_t *rt; - nxt_app_lang_module_t *lang; - - rt = task->thread->runtime; - cap_chroot = rt->capabilities.chroot; - lang = nxt_app_lang_module(rt, app_type); - - nxt_assert(lang != NULL); - -#if (NXT_HAVE_CLONE_NEWUSER) - if (nxt_is_clone_flag_set(process->isolation.clone.flags, NEWUSER)) { - cap_chroot = 1; - } -#endif - - if (!cap_chroot) { - nxt_log(task, NXT_LOG_ERR, "The \"rootfs\" field requires privileges"); - return NXT_ERROR; - } - - if (lang->mounts != NULL && lang->mounts->nelts > 0) { - ret = nxt_app_set_lang_mounts(task, process, lang->mounts); - if (nxt_slow_path(ret != NXT_OK)) { - return NXT_ERROR; - } - } - - return NXT_OK; -} - - -static nxt_int_t -nxt_app_set_lang_mounts(nxt_task_t *task, nxt_process_t *process, - nxt_array_t *lang_mounts) -{ - u_char *p; - size_t i, n, rootfs_len, len; - nxt_mp_t *mp; - nxt_array_t *mounts; - const u_char *rootfs; - nxt_fs_mount_t *mnt, *lang_mnt; - - rootfs = process->isolation.rootfs; - rootfs_len = nxt_strlen(rootfs); - mp = process->mem_pool; - - /* copy to init mem pool */ - mounts = nxt_array_copy(mp, NULL, lang_mounts); - if (mounts == NULL) { - return NXT_ERROR; - } - - n = mounts->nelts; - mnt = mounts->elts; - lang_mnt = lang_mounts->elts; - - for (i = 0; i < n; i++) { - len = nxt_strlen(lang_mnt[i].dst); - - mnt[i].dst = nxt_mp_alloc(mp, rootfs_len + len + 1); - if (mnt[i].dst == NULL) { - return NXT_ERROR; - } - - p = nxt_cpymem(mnt[i].dst, rootfs, rootfs_len); - p = nxt_cpymem(p, lang_mnt[i].dst, len); - *p = '\0'; - } - - process->isolation.mounts = mounts; - - return NXT_OK; -} - - -static nxt_int_t -nxt_app_prepare_rootfs(nxt_task_t *task, nxt_process_t *process) -{ - size_t i, n; - nxt_int_t ret, hasproc; - struct stat st; - nxt_array_t *mounts; - const u_char *dst; - nxt_fs_mount_t *mnt; - - hasproc = 0; - -#if (NXT_HAVE_CLONE_NEWPID) && (NXT_HAVE_CLONE_NEWNS) - nxt_fs_mount_t mount; - - if (nxt_is_clone_flag_set(process->isolation.clone.flags, NEWPID) - && nxt_is_clone_flag_set(process->isolation.clone.flags, NEWNS)) - { - /* - * This mount point will automatically be gone when the namespace is - * destroyed. - */ - - mount.fstype = (u_char *) "proc"; - mount.src = (u_char *) "proc"; - mount.dst = (u_char *) "/proc"; - mount.data = (u_char *) ""; - mount.flags = 0; - - ret = nxt_fs_mkdir_all(mount.dst, S_IRWXU | S_IRWXG | S_IRWXO); - if (nxt_fast_path(ret == NXT_OK)) { - ret = nxt_fs_mount(task, &mount); - if (nxt_fast_path(ret == NXT_OK)) { - hasproc = 1; - } - - } else { - nxt_log(task, NXT_LOG_WARN, "mkdir(%s) %E", mount.dst, nxt_errno); - } - } -#endif - - mounts = process->isolation.mounts; - - n = mounts->nelts; - mnt = mounts->elts; - - for (i = 0; i < n; i++) { - dst = mnt[i].dst; - - if (nxt_slow_path(nxt_memcmp(mnt[i].fstype, "bind", 4) == 0 - && stat((const char *) mnt[i].src, &st) != 0)) - { - nxt_log(task, NXT_LOG_WARN, "host path not found: %s", mnt[i].src); - continue; - } - - if (hasproc && nxt_memcmp(mnt[i].fstype, "proc", 4) == 0 - && nxt_memcmp(mnt[i].dst, "/proc", 5) == 0) - { - continue; - } - - ret = nxt_fs_mkdir_all(dst, S_IRWXU | S_IRWXG | S_IRWXO); - if (nxt_slow_path(ret != NXT_OK)) { - nxt_alert(task, "mkdir(%s) %E", dst, nxt_errno); - goto undo; - } - - ret = nxt_fs_mount(task, &mnt[i]); - if (nxt_slow_path(ret != NXT_OK)) { - goto undo; - } - } - - return NXT_OK; - -undo: - - n = i + 1; - - for (i = 0; i < n; i++) { - nxt_fs_unmount(mnt[i].dst); - } - - return NXT_ERROR; -} - -#endif - - -#if (NXT_HAVE_PR_SET_NO_NEW_PRIVS) - -static nxt_int_t -nxt_app_set_isolation_new_privs(nxt_task_t *task, nxt_conf_value_t *isolation, - nxt_process_t *process) -{ - nxt_conf_value_t *obj; - - static nxt_str_t new_privs_name = nxt_string("new_privs"); - - obj = nxt_conf_get_object_member(isolation, &new_privs_name, NULL); - if (obj != NULL) { - process->isolation.new_privs = nxt_conf_get_boolean(obj); - } - - return NXT_OK; -} - -#endif - - static u_char * nxt_cstr_dup(nxt_mp_t *mp, u_char *dst, u_char *src) { diff --git a/src/nxt_application.h b/src/nxt_application.h index 3144dc3f..cb49a033 100644 --- a/src/nxt_application.h +++ b/src/nxt_application.h @@ -50,6 +50,7 @@ typedef struct { char *home; nxt_str_t path; nxt_str_t module; + char *callable; } nxt_python_app_conf_t; diff --git a/src/nxt_clone.h b/src/nxt_clone.h index e89fd82d..c2066ce6 100644 --- a/src/nxt_clone.h +++ b/src/nxt_clone.h @@ -42,9 +42,6 @@ pid_t nxt_clone(nxt_int_t flags); #if (NXT_HAVE_CLONE_NEWUSER) -#define NXT_CLONE_MNT(flags) \ - ((flags & CLONE_NEWNS) == CLONE_NEWNS) - NXT_EXPORT nxt_int_t nxt_clone_credential_map(nxt_task_t *task, pid_t pid, nxt_credential_t *creds, nxt_clone_t *clone); NXT_EXPORT nxt_int_t nxt_clone_vldt_credential_uidmap(nxt_task_t *task, diff --git a/src/nxt_conf_validation.c b/src/nxt_conf_validation.c index b5530b85..4364057b 100644 --- a/src/nxt_conf_validation.c +++ b/src/nxt_conf_validation.c @@ -496,12 +496,6 @@ static nxt_conf_vldt_object_t nxt_conf_vldt_app_limits_members[] = { NULL, NULL }, - { nxt_string("reschedule_timeout"), - NXT_CONF_VLDT_INTEGER, - 0, - NULL, - NULL }, - { nxt_string("requests"), NXT_CONF_VLDT_INTEGER, 0, @@ -622,6 +616,21 @@ static nxt_conf_vldt_object_t nxt_conf_vldt_app_procmap_members[] = { #endif +#if (NXT_HAVE_ISOLATION_ROOTFS) + +static nxt_conf_vldt_object_t nxt_conf_vldt_app_automount_members[] = { + { nxt_string("language_deps"), + NXT_CONF_VLDT_BOOLEAN, + 0, + NULL, + NULL }, + + NXT_CONF_VLDT_END +}; + +#endif + + static nxt_conf_vldt_object_t nxt_conf_vldt_app_isolation_members[] = { { nxt_string("namespaces"), NXT_CONF_VLDT_OBJECT, @@ -653,6 +662,12 @@ static nxt_conf_vldt_object_t nxt_conf_vldt_app_isolation_members[] = { NULL, NULL }, + { nxt_string("automount"), + NXT_CONF_VLDT_OBJECT, + 0, + &nxt_conf_vldt_object, + (void *) &nxt_conf_vldt_app_automount_members }, + #endif #if (NXT_HAVE_PR_SET_NO_NEW_PRIVS) @@ -758,6 +773,12 @@ static nxt_conf_vldt_object_t nxt_conf_vldt_python_members[] = { NULL, NULL }, + { nxt_string("callable"), + NXT_CONF_VLDT_STRING, + 0, + NULL, + NULL }, + NXT_CONF_VLDT_NEXT(&nxt_conf_vldt_common_members) }; @@ -1188,10 +1209,17 @@ static nxt_int_t nxt_conf_vldt_listener(nxt_conf_validation_t *vldt, nxt_str_t *name, nxt_conf_value_t *value) { - nxt_int_t ret; + nxt_int_t ret; + nxt_sockaddr_t *sa; - ret = nxt_conf_vldt_type(vldt, name, value, NXT_CONF_VLDT_OBJECT); + sa = nxt_sockaddr_parse(vldt->pool, name); + if (nxt_slow_path(sa == NULL)) { + return nxt_conf_vldt_error(vldt, + "The listener address \"%V\" is invalid.", + name); + } + ret = nxt_conf_vldt_type(vldt, name, value, NXT_CONF_VLDT_OBJECT); if (ret != NXT_OK) { return ret; } diff --git a/src/nxt_conn_write.c b/src/nxt_conn_write.c index d7a6a8da..bcf9e8fa 100644 --- a/src/nxt_conn_write.c +++ b/src/nxt_conn_write.c @@ -246,24 +246,47 @@ nxt_sendfile(int fd, int s, off_t pos, size_t size) { ssize_t res; -#ifdef NXT_HAVE_MACOSX_SENDFILE +#if (NXT_HAVE_MACOSX_SENDFILE) + off_t sent = size; int rc = sendfile(fd, s, pos, &sent, NULL, 0); res = (rc == 0 || sent > 0) ? sent : -1; -#endif -#ifdef NXT_HAVE_FREEBSD_SENDFILE +#elif (NXT_HAVE_FREEBSD_SENDFILE) + off_t sent = 0; int rc = sendfile(fd, s, pos, size, NULL, &sent, 0); res = (rc == 0 || sent > 0) ? sent : -1; -#endif -#ifdef NXT_HAVE_LINUX_SENDFILE +#elif (NXT_HAVE_LINUX_SENDFILE) + res = sendfile(s, fd, &pos, size); + +#else + + int err; + void *map; + off_t page_off; + + page_off = pos % nxt_pagesize; + + map = nxt_mem_mmap(NULL, size + page_off, PROT_READ, MAP_SHARED, fd, + pos - page_off); + if (nxt_slow_path(map == MAP_FAILED)) { + return -1; + } + + res = write(s, nxt_pointer_to(map, page_off), size); + + /* Backup and restore errno to catch socket errors in the upper level. */ + err = errno; + nxt_mem_munmap(map, size + page_off); + errno = err; + #endif return res; diff --git a/src/nxt_fs.c b/src/nxt_fs.c index fe271802..0228c25a 100644 --- a/src/nxt_fs.c +++ b/src/nxt_fs.c @@ -40,30 +40,31 @@ nxt_fs_mount(nxt_task_t *task, nxt_fs_mount_t *mnt) nxt_int_t nxt_fs_mount(nxt_task_t *task, nxt_fs_mount_t *mnt) { + u_char *data, *p, *end; + size_t iovlen; + nxt_int_t ret; const char *fstype; - uint8_t is_bind, is_proc; - struct iovec iov[8]; + struct iovec iov[128]; char errmsg[256]; - is_bind = nxt_strncmp(mnt->fstype, "bind", 4) == 0; - is_proc = nxt_strncmp(mnt->fstype, "proc", 4) == 0; + if (nxt_strncmp(mnt->fstype, "bind", 4) == 0) { + fstype = "nullfs"; - if (nxt_slow_path(!is_bind && !is_proc)) { - nxt_alert(task, "mount type \"%s\" not implemented.", mnt->fstype); - return NXT_ERROR; - } + } else if (nxt_strncmp(mnt->fstype, "proc", 4) == 0) { + fstype = "procfs"; - if (is_bind) { - fstype = "nullfs"; + } else if (nxt_strncmp(mnt->fstype, "tmpfs", 5) == 0) { + fstype = "tmpfs"; } else { - fstype = "procfs"; + nxt_alert(task, "mount type \"%s\" not implemented.", mnt->fstype); + return NXT_ERROR; } iov[0].iov_base = (void *) "fstype"; iov[0].iov_len = 7; iov[1].iov_base = (void *) fstype; - iov[1].iov_len = strlen(fstype) + 1; + iov[1].iov_len = nxt_strlen(fstype) + 1; iov[2].iov_base = (void *) "fspath"; iov[2].iov_len = 7; iov[3].iov_base = (void *) mnt->dst; @@ -77,12 +78,55 @@ nxt_fs_mount(nxt_task_t *task, nxt_fs_mount_t *mnt) iov[7].iov_base = (void *) errmsg; iov[7].iov_len = sizeof(errmsg); - if (nxt_slow_path(nmount(iov, 8, 0) < 0)) { - nxt_alert(task, "nmount(%p, 8, 0) %s", errmsg); - return NXT_ERROR; + iovlen = 8; + + data = NULL; + + if (mnt->data != NULL) { + data = (u_char *) nxt_strdup(mnt->data); + if (nxt_slow_path(data == NULL)) { + return NXT_ERROR; + } + + end = data - 1; + + do { + p = end + 1; + end = nxt_strchr(p, '='); + if (end == NULL) { + break; + } + + *end = '\0'; + + iov[iovlen++].iov_base = (void *) p; + iov[iovlen++].iov_len = (end - p) + 1; + + p = end + 1; + + end = nxt_strchr(p, ','); + if (end != NULL) { + *end = '\0'; + } + + iov[iovlen++].iov_base = (void *) p; + iov[iovlen++].iov_len = nxt_strlen(p) + 1; + + } while (end != NULL && nxt_nitems(iov) > (iovlen + 2)); } - return NXT_OK; + ret = NXT_OK; + + if (nxt_slow_path(nmount(iov, iovlen, 0) < 0)) { + nxt_alert(task, "nmount(%p, %d, 0) %s", iov, iovlen, errmsg); + ret = NXT_ERROR; + } + + if (data != NULL) { + free(data); + } + + return ret; } #endif diff --git a/src/nxt_fs.h b/src/nxt_fs.h index 85c78b27..bbd7ab9f 100644 --- a/src/nxt_fs.h +++ b/src/nxt_fs.h @@ -18,13 +18,38 @@ #define NXT_MS_REC 0 #endif +#ifdef MS_NOSUID +#define NXT_MS_NOSUID MS_NOSUID +#else +#define NXT_MS_NOSUID 0 +#endif + +#ifdef MS_NOEXEC +#define NXT_MS_NOEXEC MS_NOEXEC +#else +#define NXT_MS_NOEXEC 0 +#endif + +#ifdef MS_RELATIME +#define NXT_MS_RELATIME MS_RELATIME +#else +#define NXT_MS_RELATIME 0 +#endif + +#ifdef MS_NODEV +#define NXT_MS_NODEV MS_NODEV +#else +#define NXT_MS_NODEV 0 +#endif + typedef struct { - u_char *src; - u_char *dst; - u_char *fstype; - nxt_int_t flags; - u_char *data; + u_char *src; + u_char *dst; + u_char *fstype; + nxt_int_t flags; + u_char *data; + nxt_uint_t builtin; /* 1-bit */ } nxt_fs_mount_t; diff --git a/src/nxt_h1proto.c b/src/nxt_h1proto.c index b34be019..dc23d7c4 100644 --- a/src/nxt_h1proto.c +++ b/src/nxt_h1proto.c @@ -503,7 +503,10 @@ nxt_h1p_conn_request_init(nxt_task_t *task, void *obj, void *data) joint->count++; r->conf = joint; - c->local = joint->socket_conf->sockaddr; + + if (c->local == NULL) { + c->local = joint->socket_conf->sockaddr; + } nxt_h1p_conn_request_header_parse(task, c, h1p); return; @@ -734,9 +737,16 @@ nxt_h1p_connection(void *ctx, nxt_http_field_t *field, uintptr_t data) r = ctx; field->hopbyhop = 1; - if (field->value_length == 5 && nxt_memcmp(field->value, "close", 5) == 0) { + if (field->value_length == 5 + && nxt_memcasecmp(field->value, "close", 5) == 0) + { r->proto.h1->keepalive = 0; + } else if (field->value_length == 10 + && nxt_memcasecmp(field->value, "keep-alive", 10) == 0) + { + r->proto.h1->keepalive = 1; + } else if (field->value_length == 7 && nxt_memcasecmp(field->value, "upgrade", 7) == 0) { @@ -1749,7 +1759,15 @@ nxt_h1p_conn_timer_value(nxt_conn_t *c, uintptr_t data) joint = c->listen->socket.data; - return nxt_value_at(nxt_msec_t, joint->socket_conf, data); + if (nxt_fast_path(joint != NULL)) { + return nxt_value_at(nxt_msec_t, joint->socket_conf, data); + } + + /* + * Listening socket had been closed while + * connection was in keep-alive state. + */ + return 1; } @@ -1829,6 +1847,8 @@ nxt_h1p_idle_close(nxt_task_t *task, void *obj, void *data) nxt_debug(task, "h1p idle close"); + nxt_queue_remove(&c->link); + nxt_h1p_idle_response(task, c); } @@ -1863,10 +1883,9 @@ nxt_h1p_idle_timeout(nxt_task_t *task, void *obj, void *data) static void nxt_h1p_idle_response(nxt_task_t *task, nxt_conn_t *c) { - u_char *p; - size_t size; - nxt_buf_t *out, *last; - nxt_h1proto_t *h1p; + u_char *p; + size_t size; + nxt_buf_t *out, *last; size = nxt_length(NXT_H1P_IDLE_TIMEOUT) + nxt_http_date_cache.size @@ -1896,9 +1915,6 @@ nxt_h1p_idle_response(nxt_task_t *task, nxt_conn_t *c) last->completion_handler = nxt_h1p_idle_response_sent; last->parent = c; - h1p = c->socket.data; - h1p->conn_write_tail = &last->next; - c->write = out; c->write_state = &nxt_h1p_timeout_response_state; diff --git a/src/nxt_http_chunk_parse.c b/src/nxt_http_chunk_parse.c index 2164524b..be3a2023 100644 --- a/src/nxt_http_chunk_parse.c +++ b/src/nxt_http_chunk_parse.c @@ -74,7 +74,7 @@ nxt_http_chunk_parse(nxt_task_t *task, nxt_http_chunk_parse_t *hcp, goto next; } - /* ret == NXT_HTTP_CHUNK_END_ON_BORDER */ + /* ret == NXT_HTTP_CHUNK_END */ } ch = *hcp->pos++; diff --git a/src/nxt_http_proxy.c b/src/nxt_http_proxy.c index 34d0f36e..338d9fce 100644 --- a/src/nxt_http_proxy.c +++ b/src/nxt_http_proxy.c @@ -27,7 +27,6 @@ static void nxt_http_proxy_header_send(nxt_task_t *task, void *obj, void *data); static void nxt_http_proxy_header_sent(nxt_task_t *task, void *obj, void *data); static void nxt_http_proxy_header_read(nxt_task_t *task, void *obj, void *data); static void nxt_http_proxy_send_body(nxt_task_t *task, void *obj, void *data); -static void nxt_http_proxy_read(nxt_task_t *task, void *obj, void *data); static void nxt_http_proxy_buf_mem_completion(nxt_task_t *task, void *obj, void *data); static void nxt_http_proxy_error(nxt_task_t *task, void *obj, void *data); @@ -275,39 +274,16 @@ nxt_http_proxy_header_read(nxt_task_t *task, void *obj, void *data) } -static void -nxt_http_proxy_send_body(nxt_task_t *task, void *obj, void *data) -{ - nxt_buf_t *out; - nxt_http_peer_t *peer; - nxt_http_request_t *r; - - r = obj; - peer = data; - out = peer->body; - - if (out != NULL) { - peer->body = NULL; - nxt_http_request_send(task, r, out); - - } - - if (!peer->closed) { - nxt_http_proto[peer->protocol].peer_read(task, peer); - } -} - - static const nxt_http_request_state_t nxt_http_proxy_read_state nxt_aligned(64) = { - .ready_handler = nxt_http_proxy_read, + .ready_handler = nxt_http_proxy_send_body, .error_handler = nxt_http_proxy_error, }; static void -nxt_http_proxy_read(nxt_task_t *task, void *obj, void *data) +nxt_http_proxy_send_body(nxt_task_t *task, void *obj, void *data) { nxt_buf_t *out; nxt_http_peer_t *peer; @@ -316,9 +292,9 @@ nxt_http_proxy_read(nxt_task_t *task, void *obj, void *data) r = obj; peer = data; out = peer->body; - peer->body = NULL; if (out != NULL) { + peer->body = NULL; nxt_http_request_send(task, r, out); } diff --git a/src/nxt_http_route.c b/src/nxt_http_route.c index 0b2103cd..ae91076a 100644 --- a/src/nxt_http_route.c +++ b/src/nxt_http_route.c @@ -1085,10 +1085,6 @@ nxt_http_route_pattern_create(nxt_task_t *task, nxt_mp_t *mp, pattern->negative = 1; pattern->any = 0; - - if (test.length == 0) { - return NXT_OK; - } } if (test.length == 0) { @@ -1594,6 +1590,7 @@ nxt_http_action_t * nxt_http_action_create(nxt_task_t *task, nxt_router_temp_conf_t *tmcf, nxt_str_t *name) { + nxt_int_t ret; nxt_http_action_t *action; action = nxt_mp_alloc(tmcf->router_conf->mem_pool, @@ -1605,7 +1602,10 @@ nxt_http_action_create(nxt_task_t *task, nxt_router_temp_conf_t *tmcf, action->name = *name; action->handler = NULL; - nxt_http_action_resolve(task, tmcf, action); + ret = nxt_http_action_resolve(task, tmcf, action); + if (nxt_slow_path(ret != NXT_OK)) { + return NULL; + } return action; } diff --git a/src/nxt_http_static.c b/src/nxt_http_static.c index ee18be1b..5687ef2c 100644 --- a/src/nxt_http_static.c +++ b/src/nxt_http_static.c @@ -470,14 +470,17 @@ nxt_http_static_mtypes_init(nxt_mp_t *mp, nxt_lvlhsh_t *hash) { nxt_string("text/css"), ".css" }, { nxt_string("image/svg+xml"), ".svg" }, - { nxt_string("image/svg+xml"), ".svg" }, { nxt_string("image/webp"), ".webp" }, { nxt_string("image/png"), ".png" }, + { nxt_string("image/apng"), ".apng" }, { nxt_string("image/jpeg"), ".jpeg" }, { nxt_string("image/jpeg"), ".jpg" }, { nxt_string("image/gif"), ".gif" }, { nxt_string("image/x-icon"), ".ico" }, + { nxt_string("image/avif"), ".avif" }, + { nxt_string("image/avif-sequence"), ".avifs" }, + { nxt_string("font/woff"), ".woff" }, { nxt_string("font/woff2"), ".woff2" }, { nxt_string("font/otf"), ".otf" }, diff --git a/src/nxt_http_variables.c b/src/nxt_http_variables.c index 222d717c..1c0b561d 100644 --- a/src/nxt_http_variables.c +++ b/src/nxt_http_variables.c @@ -11,6 +11,8 @@ static nxt_int_t nxt_http_var_method(nxt_task_t *task, nxt_var_query_t *query, nxt_str_t *str, void *ctx); static nxt_int_t nxt_http_var_uri(nxt_task_t *task, nxt_var_query_t *query, nxt_str_t *str, void *ctx); +static nxt_int_t nxt_http_var_host(nxt_task_t *task, nxt_var_query_t *query, + nxt_str_t *str, void *ctx); static nxt_var_decl_t nxt_http_vars[] = { @@ -21,6 +23,10 @@ static nxt_var_decl_t nxt_http_vars[] = { { nxt_string("uri"), &nxt_http_var_uri, 0 }, + + { nxt_string("host"), + &nxt_http_var_host, + 0 }, }; @@ -57,3 +63,17 @@ nxt_http_var_uri(nxt_task_t *task, nxt_var_query_t *query, nxt_str_t *str, return NXT_OK; } + + +static nxt_int_t +nxt_http_var_host(nxt_task_t *task, nxt_var_query_t *query, nxt_str_t *str, + void *ctx) +{ + nxt_http_request_t *r; + + r = ctx; + + *str = r->host; + + return NXT_OK; +} diff --git a/src/nxt_isolation.c b/src/nxt_isolation.c new file mode 100644 index 00000000..ac7a37e8 --- /dev/null +++ b/src/nxt_isolation.c @@ -0,0 +1,1012 @@ +/* + * Copyright (C) NGINX, Inc. + */ + +#include <nxt_main.h> +#include <nxt_application.h> +#include <nxt_process.h> +#include <nxt_isolation.h> + +#if (NXT_HAVE_PIVOT_ROOT) +#include <mntent.h> +#endif + + +static nxt_int_t nxt_isolation_set(nxt_task_t *task, + nxt_conf_value_t *isolation, nxt_process_t *process); + +#if (NXT_HAVE_CLONE) +static nxt_int_t nxt_isolation_set_namespaces(nxt_task_t *task, + nxt_conf_value_t *isolation, nxt_process_t *process); +static nxt_int_t nxt_isolation_clone_flags(nxt_task_t *task, + nxt_conf_value_t *namespaces, nxt_clone_t *clone); +#endif + +#if (NXT_HAVE_CLONE_NEWUSER) +static nxt_int_t nxt_isolation_set_creds(nxt_task_t *task, + nxt_conf_value_t *isolation, nxt_process_t *process); +static nxt_int_t nxt_isolation_credential_map(nxt_task_t *task, + nxt_mp_t *mem_pool, nxt_conf_value_t *map_array, + nxt_clone_credential_map_t *map); +static nxt_int_t nxt_isolation_vldt_creds(nxt_task_t *task, + nxt_process_t *process); +#endif + +#if (NXT_HAVE_ISOLATION_ROOTFS) +static nxt_int_t nxt_isolation_set_rootfs(nxt_task_t *task, + nxt_conf_value_t *isolation, nxt_process_t *process); +static nxt_int_t nxt_isolation_set_automount(nxt_task_t *task, + nxt_conf_value_t *isolation, nxt_process_t *process); +static nxt_int_t nxt_isolation_set_mounts(nxt_task_t *task, + nxt_process_t *process, nxt_str_t *app_type); +static nxt_int_t nxt_isolation_set_lang_mounts(nxt_task_t *task, + nxt_process_t *process, nxt_array_t *syspaths); +static void nxt_isolation_unmount_all(nxt_task_t *task, nxt_process_t *process); + +#if (NXT_HAVE_PIVOT_ROOT) && (NXT_HAVE_CLONE_NEWNS) +static nxt_int_t nxt_isolation_pivot_root(nxt_task_t *task, const char *rootfs); +static nxt_int_t nxt_isolation_make_private_mount(nxt_task_t *task, + const char *rootfs); +nxt_inline int nxt_pivot_root(const char *new_root, const char *old_root); +#endif + +static nxt_int_t nxt_isolation_chroot(nxt_task_t *task, const char *path); +#endif + +#if (NXT_HAVE_PR_SET_NO_NEW_PRIVS) +static nxt_int_t nxt_isolation_set_new_privs(nxt_task_t *task, + nxt_conf_value_t *isolation, nxt_process_t *process); +#endif + + +nxt_int_t +nxt_isolation_main_prefork(nxt_task_t *task, nxt_process_t *process, + nxt_mp_t *mp) +{ + nxt_int_t cap_setid; + nxt_int_t ret; + nxt_runtime_t *rt; + nxt_common_app_conf_t *app_conf; + + rt = task->thread->runtime; + app_conf = process->data.app; + cap_setid = rt->capabilities.setid; + + if (app_conf->isolation != NULL) { + ret = nxt_isolation_set(task, app_conf->isolation, process); + if (nxt_slow_path(ret != NXT_OK)) { + return ret; + } + } + +#if (NXT_HAVE_CLONE_NEWUSER) + if (nxt_is_clone_flag_set(process->isolation.clone.flags, NEWUSER)) { + cap_setid = 1; + } +#endif + +#if (NXT_HAVE_ISOLATION_ROOTFS) + if (process->isolation.rootfs != NULL) { + ret = nxt_isolation_set_mounts(task, process, &app_conf->type); + if (nxt_slow_path(ret != NXT_OK)) { + return ret; + } + } +#endif + + if (cap_setid) { + ret = nxt_process_creds_set(task, process, &app_conf->user, + &app_conf->group); + + if (nxt_slow_path(ret != NXT_OK)) { + return ret; + } + + } else { + if (!nxt_str_eq(&app_conf->user, (u_char *) rt->user_cred.user, + nxt_strlen(rt->user_cred.user))) + { + nxt_alert(task, "cannot set user \"%V\" for app \"%V\": " + "missing capabilities", &app_conf->user, &app_conf->name); + + return NXT_ERROR; + } + + if (app_conf->group.length > 0 + && !nxt_str_eq(&app_conf->group, (u_char *) rt->group, + nxt_strlen(rt->group))) + { + nxt_alert(task, "cannot set group \"%V\" for app \"%V\": " + "missing capabilities", &app_conf->group, + &app_conf->name); + + return NXT_ERROR; + } + } + +#if (NXT_HAVE_CLONE_NEWUSER) + ret = nxt_isolation_vldt_creds(task, process); + if (nxt_slow_path(ret != NXT_OK)) { + return ret; + } +#endif + + return NXT_OK; +} + + +static nxt_int_t +nxt_isolation_set(nxt_task_t *task, nxt_conf_value_t *isolation, + nxt_process_t *process) +{ +#if (NXT_HAVE_CLONE) + if (nxt_slow_path(nxt_isolation_set_namespaces(task, isolation, process) + != NXT_OK)) + { + return NXT_ERROR; + } +#endif + +#if (NXT_HAVE_CLONE_NEWUSER) + if (nxt_slow_path(nxt_isolation_set_creds(task, isolation, process) + != NXT_OK)) + { + return NXT_ERROR; + } +#endif + +#if (NXT_HAVE_ISOLATION_ROOTFS) + if (nxt_slow_path(nxt_isolation_set_rootfs(task, isolation, process) + != NXT_OK)) + { + return NXT_ERROR; + } + + if (nxt_slow_path(nxt_isolation_set_automount(task, isolation, process) + != NXT_OK)) + { + return NXT_ERROR; + } +#endif + +#if (NXT_HAVE_PR_SET_NO_NEW_PRIVS) + if (nxt_slow_path(nxt_isolation_set_new_privs(task, isolation, process) + != NXT_OK)) + { + return NXT_ERROR; + } +#endif + + return NXT_OK; +} + + +#if (NXT_HAVE_CLONE) + +static nxt_int_t +nxt_isolation_set_namespaces(nxt_task_t *task, nxt_conf_value_t *isolation, + nxt_process_t *process) +{ + nxt_int_t ret; + nxt_conf_value_t *obj; + + static nxt_str_t nsname = nxt_string("namespaces"); + + obj = nxt_conf_get_object_member(isolation, &nsname, NULL); + if (obj != NULL) { + ret = nxt_isolation_clone_flags(task, obj, &process->isolation.clone); + if (nxt_slow_path(ret != NXT_OK)) { + return NXT_ERROR; + } + } + + return NXT_OK; +} + +#endif + + +#if (NXT_HAVE_CLONE_NEWUSER) + +static nxt_int_t +nxt_isolation_set_creds(nxt_task_t *task, nxt_conf_value_t *isolation, + nxt_process_t *process) +{ + nxt_int_t ret; + nxt_clone_t *clone; + nxt_conf_value_t *array; + + static nxt_str_t uidname = nxt_string("uidmap"); + static nxt_str_t gidname = nxt_string("gidmap"); + + clone = &process->isolation.clone; + + array = nxt_conf_get_object_member(isolation, &uidname, NULL); + if (array != NULL) { + ret = nxt_isolation_credential_map(task, process->mem_pool, array, + &clone->uidmap); + + if (nxt_slow_path(ret != NXT_OK)) { + return NXT_ERROR; + } + } + + array = nxt_conf_get_object_member(isolation, &gidname, NULL); + if (array != NULL) { + ret = nxt_isolation_credential_map(task, process->mem_pool, array, + &clone->gidmap); + + if (nxt_slow_path(ret != NXT_OK)) { + return NXT_ERROR; + } + } + + return NXT_OK; +} + + +static nxt_int_t +nxt_isolation_credential_map(nxt_task_t *task, nxt_mp_t *mp, + nxt_conf_value_t *map_array, nxt_clone_credential_map_t *map) +{ + nxt_int_t ret; + nxt_uint_t i; + nxt_conf_value_t *obj; + + static nxt_conf_map_t nxt_clone_map_entry_conf[] = { + { + nxt_string("container"), + NXT_CONF_MAP_INT, + offsetof(nxt_clone_map_entry_t, container), + }, + + { + nxt_string("host"), + NXT_CONF_MAP_INT, + offsetof(nxt_clone_map_entry_t, host), + }, + + { + nxt_string("size"), + NXT_CONF_MAP_INT, + offsetof(nxt_clone_map_entry_t, size), + }, + }; + + map->size = nxt_conf_array_elements_count(map_array); + + if (map->size == 0) { + return NXT_OK; + } + + map->map = nxt_mp_alloc(mp, map->size * sizeof(nxt_clone_map_entry_t)); + if (nxt_slow_path(map->map == NULL)) { + return NXT_ERROR; + } + + for (i = 0; i < map->size; i++) { + obj = nxt_conf_get_array_element(map_array, i); + + ret = nxt_conf_map_object(mp, obj, nxt_clone_map_entry_conf, + nxt_nitems(nxt_clone_map_entry_conf), + map->map + i); + if (nxt_slow_path(ret != NXT_OK)) { + nxt_alert(task, "clone map entry map error"); + return NXT_ERROR; + } + } + + return NXT_OK; +} + + +static nxt_int_t +nxt_isolation_vldt_creds(nxt_task_t *task, nxt_process_t *process) +{ + nxt_int_t ret; + nxt_clone_t *clone; + nxt_credential_t *creds; + + clone = &process->isolation.clone; + creds = process->user_cred; + + if (clone->uidmap.size == 0 && clone->gidmap.size == 0) { + return NXT_OK; + } + + if (!nxt_is_clone_flag_set(clone->flags, NEWUSER)) { + if (nxt_slow_path(clone->uidmap.size > 0)) { + nxt_log(task, NXT_LOG_ERR, "\"uidmap\" is set but " + "\"isolation.namespaces.credential\" is false or unset"); + + return NXT_ERROR; + } + + if (nxt_slow_path(clone->gidmap.size > 0)) { + nxt_log(task, NXT_LOG_ERR, "\"gidmap\" is set but " + "\"isolation.namespaces.credential\" is false or unset"); + + return NXT_ERROR; + } + + return NXT_OK; + } + + ret = nxt_clone_vldt_credential_uidmap(task, &clone->uidmap, creds); + if (nxt_slow_path(ret != NXT_OK)) { + return NXT_ERROR; + } + + return nxt_clone_vldt_credential_gidmap(task, &clone->gidmap, creds); +} + +#endif + + +#if (NXT_HAVE_CLONE) + +static nxt_int_t +nxt_isolation_clone_flags(nxt_task_t *task, nxt_conf_value_t *namespaces, + nxt_clone_t *clone) +{ + uint32_t index; + nxt_str_t name; + nxt_int_t flag; + nxt_conf_value_t *value; + + index = 0; + + for ( ;; ) { + value = nxt_conf_next_object_member(namespaces, &name, &index); + + if (value == NULL) { + break; + } + + flag = 0; + +#if (NXT_HAVE_CLONE_NEWUSER) + if (nxt_str_eq(&name, "credential", 10)) { + flag = CLONE_NEWUSER; + } +#endif + +#if (NXT_HAVE_CLONE_NEWPID) + if (nxt_str_eq(&name, "pid", 3)) { + flag = CLONE_NEWPID; + } +#endif + +#if (NXT_HAVE_CLONE_NEWNET) + if (nxt_str_eq(&name, "network", 7)) { + flag = CLONE_NEWNET; + } +#endif + +#if (NXT_HAVE_CLONE_NEWUTS) + if (nxt_str_eq(&name, "uname", 5)) { + flag = CLONE_NEWUTS; + } +#endif + +#if (NXT_HAVE_CLONE_NEWNS) + if (nxt_str_eq(&name, "mount", 5)) { + flag = CLONE_NEWNS; + } +#endif + +#if (NXT_HAVE_CLONE_NEWCGROUP) + if (nxt_str_eq(&name, "cgroup", 6)) { + flag = CLONE_NEWCGROUP; + } +#endif + + if (!flag) { + nxt_alert(task, "unknown namespace flag: \"%V\"", &name); + return NXT_ERROR; + } + + if (nxt_conf_get_boolean(value)) { + clone->flags |= flag; + } + } + + return NXT_OK; +} + +#endif + + +#if (NXT_HAVE_ISOLATION_ROOTFS) + +static nxt_int_t +nxt_isolation_set_rootfs(nxt_task_t *task, nxt_conf_value_t *isolation, + nxt_process_t *process) +{ + nxt_str_t str; + nxt_conf_value_t *obj; + + static nxt_str_t rootfs_name = nxt_string("rootfs"); + + obj = nxt_conf_get_object_member(isolation, &rootfs_name, NULL); + if (obj != NULL) { + nxt_conf_get_string(obj, &str); + + if (nxt_slow_path(str.length <= 1 || str.start[0] != '/')) { + nxt_log(task, NXT_LOG_ERR, "rootfs requires an absolute path other " + "than \"/\" but given \"%V\"", &str); + + return NXT_ERROR; + } + + if (str.start[str.length - 1] == '/') { + str.length--; + } + + process->isolation.rootfs = nxt_mp_alloc(process->mem_pool, + str.length + 1); + + if (nxt_slow_path(process->isolation.rootfs == NULL)) { + return NXT_ERROR; + } + + nxt_memcpy(process->isolation.rootfs, str.start, str.length); + + process->isolation.rootfs[str.length] = '\0'; + } + + return NXT_OK; +} + + +static nxt_int_t +nxt_isolation_set_automount(nxt_task_t *task, nxt_conf_value_t *isolation, + nxt_process_t *process) +{ + nxt_conf_value_t *conf, *value; + nxt_process_automount_t *automount; + + static nxt_str_t automount_name = nxt_string("automount"); + static nxt_str_t langdeps_name = nxt_string("language_deps"); + + automount = &process->isolation.automount; + + automount->language_deps = 1; + + conf = nxt_conf_get_object_member(isolation, &automount_name, NULL); + if (conf != NULL) { + value = nxt_conf_get_object_member(conf, &langdeps_name, NULL); + if (value != NULL) { + automount->language_deps = nxt_conf_get_boolean(value); + } + } + + return NXT_OK; +} + + +static nxt_int_t +nxt_isolation_set_mounts(nxt_task_t *task, nxt_process_t *process, + nxt_str_t *app_type) +{ + nxt_int_t ret, cap_chroot; + nxt_runtime_t *rt; + nxt_app_lang_module_t *lang; + + rt = task->thread->runtime; + cap_chroot = rt->capabilities.chroot; + lang = nxt_app_lang_module(rt, app_type); + + nxt_assert(lang != NULL); + +#if (NXT_HAVE_CLONE_NEWUSER) + if (nxt_is_clone_flag_set(process->isolation.clone.flags, NEWUSER)) { + cap_chroot = 1; + } +#endif + + if (!cap_chroot) { + nxt_log(task, NXT_LOG_ERR, "The \"rootfs\" field requires privileges"); + return NXT_ERROR; + } + + ret = nxt_isolation_set_lang_mounts(task, process, lang->mounts); + if (nxt_slow_path(ret != NXT_OK)) { + return NXT_ERROR; + } + + process->isolation.cleanup = nxt_isolation_unmount_all; + + return NXT_OK; +} + + +static nxt_int_t +nxt_isolation_set_lang_mounts(nxt_task_t *task, nxt_process_t *process, + nxt_array_t *lang_mounts) +{ + u_char *p; + size_t i, n, rootfs_len, len; + nxt_mp_t *mp; + nxt_array_t *mounts; + const u_char *rootfs; + nxt_fs_mount_t *mnt, *lang_mnt; + + mp = process->mem_pool; + + /* copy to init mem pool */ + mounts = nxt_array_copy(mp, NULL, lang_mounts); + if (mounts == NULL) { + return NXT_ERROR; + } + + n = mounts->nelts; + mnt = mounts->elts; + lang_mnt = lang_mounts->elts; + + rootfs = process->isolation.rootfs; + rootfs_len = nxt_strlen(rootfs); + + for (i = 0; i < n; i++) { + len = nxt_strlen(lang_mnt[i].dst); + + mnt[i].dst = nxt_mp_alloc(mp, rootfs_len + len + 1); + if (nxt_slow_path(mnt[i].dst == NULL)) { + return NXT_ERROR; + } + + p = nxt_cpymem(mnt[i].dst, rootfs, rootfs_len); + p = nxt_cpymem(p, lang_mnt[i].dst, len); + *p = '\0'; + } + + mnt = nxt_array_add(mounts); + if (nxt_slow_path(mnt == NULL)) { + return NXT_ERROR; + } + + mnt->src = (u_char *) "tmpfs"; + mnt->fstype = (u_char *) "tmpfs"; + mnt->flags = NXT_MS_NOSUID | NXT_MS_NODEV | NXT_MS_NOEXEC | NXT_MS_RELATIME; + mnt->data = (u_char *) "size=1m,mode=777"; + mnt->builtin = 1; + + mnt->dst = nxt_mp_nget(mp, rootfs_len + nxt_length("/tmp") + 1); + if (nxt_slow_path(mnt->dst == NULL)) { + return NXT_ERROR; + } + + p = nxt_cpymem(mnt->dst, rootfs, rootfs_len); + p = nxt_cpymem(p, "/tmp", 4); + *p = '\0'; + +#if (NXT_HAVE_CLONE_NEWPID) && (NXT_HAVE_CLONE_NEWNS) + + if (nxt_is_clone_flag_set(process->isolation.clone.flags, NEWPID) + && nxt_is_clone_flag_set(process->isolation.clone.flags, NEWNS)) + { + mnt = nxt_array_add(mounts); + if (nxt_slow_path(mnt == NULL)) { + return NXT_ERROR; + } + + mnt->fstype = (u_char *) "proc"; + mnt->src = (u_char *) "proc"; + + mnt->dst = nxt_mp_nget(mp, rootfs_len + nxt_length("/proc") + 1); + if (nxt_slow_path(mnt->dst == NULL)) { + return NXT_ERROR; + } + + p = nxt_cpymem(mnt->dst, rootfs, rootfs_len); + p = nxt_cpymem(p, "/proc", 5); + *p = '\0'; + + mnt->data = (u_char *) ""; + mnt->flags = 0; + } +#endif + + process->isolation.mounts = mounts; + + return NXT_OK; +} + + +void +nxt_isolation_unmount_all(nxt_task_t *task, nxt_process_t *process) +{ + size_t i, n; + nxt_array_t *mounts; + nxt_fs_mount_t *mnt; + nxt_process_automount_t *automount; + + nxt_debug(task, "unmount all (%s)", process->name); + + automount = &process->isolation.automount; + mounts = process->isolation.mounts; + n = mounts->nelts; + mnt = mounts->elts; + + for (i = 0; i < n; i++) { + if (mnt[i].builtin && !automount->language_deps) { + continue; + } + + nxt_fs_unmount(mnt[i].dst); + } +} + + +nxt_int_t +nxt_isolation_prepare_rootfs(nxt_task_t *task, nxt_process_t *process) +{ + size_t i, n; + nxt_int_t ret; + struct stat st; + nxt_array_t *mounts; + const u_char *dst; + nxt_fs_mount_t *mnt; + nxt_process_automount_t *automount; + + automount = &process->isolation.automount; + mounts = process->isolation.mounts; + + n = mounts->nelts; + mnt = mounts->elts; + + for (i = 0; i < n; i++) { + dst = mnt[i].dst; + + if (mnt[i].builtin && !automount->language_deps) { + continue; + } + + if (nxt_slow_path(nxt_memcmp(mnt[i].fstype, "bind", 4) == 0 + && stat((const char *) mnt[i].src, &st) != 0)) + { + nxt_log(task, NXT_LOG_WARN, "host path not found: %s", mnt[i].src); + continue; + } + + ret = nxt_fs_mkdir_all(dst, S_IRWXU | S_IRWXG | S_IRWXO); + if (nxt_slow_path(ret != NXT_OK)) { + nxt_alert(task, "mkdir(%s) %E", dst, nxt_errno); + goto undo; + } + + ret = nxt_fs_mount(task, &mnt[i]); + if (nxt_slow_path(ret != NXT_OK)) { + goto undo; + } + } + + return NXT_OK; + +undo: + + n = i + 1; + + for (i = 0; i < n; i++) { + nxt_fs_unmount(mnt[i].dst); + } + + return NXT_ERROR; +} + + +#if (NXT_HAVE_PIVOT_ROOT) && (NXT_HAVE_CLONE_NEWNS) + +nxt_int_t +nxt_isolation_change_root(nxt_task_t *task, nxt_process_t *process) +{ + char *rootfs; + nxt_int_t ret; + + rootfs = (char *) process->isolation.rootfs; + + nxt_debug(task, "change root: %s", rootfs); + + if (nxt_is_clone_flag_set(process->isolation.clone.flags, NEWNS)) { + ret = nxt_isolation_pivot_root(task, rootfs); + + } else { + ret = nxt_isolation_chroot(task, rootfs); + } + + if (nxt_fast_path(ret == NXT_OK)) { + if (nxt_slow_path(chdir("/") < 0)) { + nxt_alert(task, "chdir(\"/\") %E", nxt_errno); + return NXT_ERROR; + } + } + + return ret; +} + + +/* + * pivot_root(2) can only be safely used with containers, otherwise it can + * umount(2) the global root filesystem and screw up the machine. + */ + +static nxt_int_t +nxt_isolation_pivot_root(nxt_task_t *task, const char *path) +{ + /* + * This implementation makes use of a kernel trick that works for ages + * and now documented in Linux kernel 5. + * https://lore.kernel.org/linux-man/87r24piwhm.fsf@x220.int.ebiederm.org/T/ + */ + + if (nxt_slow_path(mount("", "/", "", MS_SLAVE|MS_REC, "") != 0)) { + nxt_alert(task, "mount(\"/\", MS_SLAVE|MS_REC) failed: %E", nxt_errno); + return NXT_ERROR; + } + + if (nxt_slow_path(nxt_isolation_make_private_mount(task, path) != NXT_OK)) { + return NXT_ERROR; + } + + if (nxt_slow_path(mount(path, path, "bind", MS_BIND|MS_REC, "") != 0)) { + nxt_alert(task, "error bind mounting rootfs %E", nxt_errno); + return NXT_ERROR; + } + + if (nxt_slow_path(chdir(path) != 0)) { + nxt_alert(task, "failed to chdir(%s) %E", path, nxt_errno); + return NXT_ERROR; + } + + if (nxt_slow_path(nxt_pivot_root(".", ".") != 0)) { + nxt_alert(task, "failed to pivot_root %E", nxt_errno); + return NXT_ERROR; + } + + /* + * Demote the oldroot mount to avoid unmounts getting propagated to + * the host. + */ + if (nxt_slow_path(mount("", ".", "", MS_SLAVE | MS_REC, NULL) != 0)) { + nxt_alert(task, "failed to bind mount rootfs %E", nxt_errno); + return NXT_ERROR; + } + + if (nxt_slow_path(umount2(".", MNT_DETACH) != 0)) { + nxt_alert(task, "failed to umount old root directory %E", nxt_errno); + return NXT_ERROR; + } + + return NXT_OK; +} + + +static nxt_int_t +nxt_isolation_make_private_mount(nxt_task_t *task, const char *rootfs) +{ + char *parent_mnt; + FILE *procfile; + u_char **mounts; + size_t len; + uint8_t *shared; + nxt_int_t ret, index, nmounts; + struct mntent *ent; + + static const char *mount_path = "/proc/self/mounts"; + + ret = NXT_ERROR; + ent = NULL; + shared = NULL; + procfile = NULL; + parent_mnt = NULL; + + nmounts = 256; + + mounts = nxt_malloc(nmounts * sizeof(uintptr_t)); + if (nxt_slow_path(mounts == NULL)) { + goto fail; + } + + shared = nxt_malloc(nmounts); + if (nxt_slow_path(shared == NULL)) { + goto fail; + } + + procfile = setmntent(mount_path, "r"); + if (nxt_slow_path(procfile == NULL)) { + nxt_alert(task, "failed to open %s %E", mount_path, nxt_errno); + + goto fail; + } + + index = 0; + +again: + + for ( ; index < nmounts; index++) { + ent = getmntent(procfile); + if (ent == NULL) { + nmounts = index; + break; + } + + mounts[index] = (u_char *) strdup(ent->mnt_dir); + shared[index] = hasmntopt(ent, "shared") != NULL; + } + + if (ent != NULL) { + /* there are still entries to be read */ + + nmounts *= 2; + mounts = nxt_realloc(mounts, nmounts); + if (nxt_slow_path(mounts == NULL)) { + goto fail; + } + + shared = nxt_realloc(shared, nmounts); + if (nxt_slow_path(shared == NULL)) { + goto fail; + } + + goto again; + } + + for (index = 0; index < nmounts; index++) { + if (nxt_strcmp(mounts[index], rootfs) == 0) { + parent_mnt = (char *) rootfs; + break; + } + } + + if (parent_mnt == NULL) { + len = nxt_strlen(rootfs); + + parent_mnt = nxt_malloc(len + 1); + if (parent_mnt == NULL) { + goto fail; + } + + nxt_memcpy(parent_mnt, rootfs, len); + parent_mnt[len] = '\0'; + + if (parent_mnt[len - 1] == '/') { + parent_mnt[len - 1] = '\0'; + len--; + } + + for ( ;; ) { + for (index = 0; index < nmounts; index++) { + if (nxt_strcmp(mounts[index], parent_mnt) == 0) { + goto found; + } + } + + if (len == 1 && parent_mnt[0] == '/') { + nxt_alert(task, "parent mount not found"); + goto fail; + } + + /* parent dir */ + while (parent_mnt[len - 1] != '/' && len > 0) { + len--; + } + + if (nxt_slow_path(len == 0)) { + nxt_alert(task, "parent mount not found"); + goto fail; + } + + if (len == 1) { + parent_mnt[len] = '\0'; /* / */ + } else { + parent_mnt[len - 1] = '\0'; /* /<path> */ + } + } + } + +found: + + if (shared[index]) { + if (nxt_slow_path(mount("", parent_mnt, "", MS_PRIVATE, "") != 0)) { + nxt_alert(task, "mount(\"\", \"%s\", MS_PRIVATE) %E", parent_mnt, + nxt_errno); + + goto fail; + } + } + + ret = NXT_OK; + +fail: + + if (procfile != NULL) { + endmntent(procfile); + } + + if (mounts != NULL) { + for (index = 0; index < nmounts; index++) { + nxt_free(mounts[index]); + } + + nxt_free(mounts); + } + + if (shared != NULL) { + nxt_free(shared); + } + + if (parent_mnt != NULL && parent_mnt != rootfs) { + nxt_free(parent_mnt); + } + + return ret; +} + + +nxt_inline int +nxt_pivot_root(const char *new_root, const char *old_root) +{ + return syscall(__NR_pivot_root, new_root, old_root); +} + + +#else /* !(NXT_HAVE_PIVOT_ROOT) || !(NXT_HAVE_CLONE_NEWNS) */ + + +nxt_int_t +nxt_isolation_change_root(nxt_task_t *task, nxt_process_t *process) +{ + char *rootfs; + + rootfs = (char *) process->isolation.rootfs; + + nxt_debug(task, "change root: %s", rootfs); + + if (nxt_fast_path(nxt_isolation_chroot(task, rootfs) == NXT_OK)) { + if (nxt_slow_path(chdir("/") < 0)) { + nxt_alert(task, "chdir(\"/\") %E", nxt_errno); + return NXT_ERROR; + } + + return NXT_OK; + } + + return NXT_ERROR; +} + +#endif + + +static nxt_int_t +nxt_isolation_chroot(nxt_task_t *task, const char *path) +{ + if (nxt_slow_path(chroot(path) < 0)) { + nxt_alert(task, "chroot(%s) %E", path, nxt_errno); + return NXT_ERROR; + } + + return NXT_OK; +} + +#endif /* NXT_HAVE_ISOLATION_ROOTFS */ + + +#if (NXT_HAVE_PR_SET_NO_NEW_PRIVS) + +static nxt_int_t +nxt_isolation_set_new_privs(nxt_task_t *task, nxt_conf_value_t *isolation, + nxt_process_t *process) +{ + nxt_conf_value_t *obj; + + static nxt_str_t new_privs_name = nxt_string("new_privs"); + + obj = nxt_conf_get_object_member(isolation, &new_privs_name, NULL); + if (obj != NULL) { + process->isolation.new_privs = nxt_conf_get_boolean(obj); + } + + return NXT_OK; +} + +#endif diff --git a/src/nxt_isolation.h b/src/nxt_isolation.h new file mode 100644 index 00000000..88a5f9e1 --- /dev/null +++ b/src/nxt_isolation.h @@ -0,0 +1,18 @@ +/* + * Copyright (C) NGINX, Inc. + */ + +#ifndef _NXT_ISOLATION_H_ +#define _NXT_ISOLATION_H_ + + +nxt_int_t nxt_isolation_main_prefork(nxt_task_t *task, nxt_process_t *process, + nxt_mp_t *mp); + +#if (NXT_HAVE_ISOLATION_ROOTFS) +nxt_int_t nxt_isolation_prepare_rootfs(nxt_task_t *task, + nxt_process_t *process); +nxt_int_t nxt_isolation_change_root(nxt_task_t *task, nxt_process_t *process); +#endif + +#endif /* _NXT_ISOLATION_H_ */ diff --git a/src/nxt_lvlhsh.c b/src/nxt_lvlhsh.c index ec433341..d10dbc58 100644 --- a/src/nxt_lvlhsh.c +++ b/src/nxt_lvlhsh.c @@ -1015,17 +1015,3 @@ nxt_lvlhsh_retrieve(nxt_lvlhsh_t *lh, const nxt_lvlhsh_proto_t *proto, return NULL; } - - -void * -nxt_lvlhsh_alloc(void *data, size_t size) -{ - return nxt_memalign(size, size); -} - - -void -nxt_lvlhsh_free(void *data, void *p) -{ - nxt_free(p); -} diff --git a/src/nxt_main_process.c b/src/nxt_main_process.c index 48eb2abb..d2edab1d 100644 --- a/src/nxt_main_process.c +++ b/src/nxt_main_process.c @@ -191,6 +191,12 @@ static nxt_conf_map_t nxt_python_app_conf[] = { NXT_CONF_MAP_STR, offsetof(nxt_common_app_conf_t, u.python.module), }, + + { + nxt_string("callable"), + NXT_CONF_MAP_CSTRZ, + offsetof(nxt_common_app_conf_t, u.python.callable), + }, }; @@ -878,11 +884,9 @@ nxt_main_cleanup_process(nxt_task_t *task, nxt_pid_t pid) return; } -#if (NXT_HAVE_ISOLATION_ROOTFS) - if (process->isolation.rootfs != NULL && process->isolation.mounts) { - (void) nxt_process_unmount_all(task, process); + if (process->isolation.cleanup != NULL) { + process->isolation.cleanup(task, process); } -#endif name = process->name; stream = process->stream; @@ -1292,6 +1296,8 @@ nxt_main_port_modules_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg) goto fail; } + mnt->builtin = 1; + ret = nxt_conf_map_object(rt->mem_pool, value, nxt_app_lang_mounts_map, nxt_nitems(nxt_app_lang_mounts_map), mnt); diff --git a/src/nxt_malloc.c b/src/nxt_malloc.c index 910ef7cd..fed58e96 100644 --- a/src/nxt_malloc.c +++ b/src/nxt_malloc.c @@ -81,6 +81,22 @@ nxt_realloc(void *p, size_t size) } +/* nxt_lvlhsh_* functions moved here to avoid references from nxt_lvlhsh.c. */ + +void * +nxt_lvlhsh_alloc(void *data, size_t size) +{ + return nxt_memalign(size, size); +} + + +void +nxt_lvlhsh_free(void *data, void *p) +{ + nxt_free(p); +} + + #if (NXT_DEBUG) void diff --git a/src/nxt_php_sapi.c b/src/nxt_php_sapi.c index bc8341f4..234ceef8 100644 --- a/src/nxt_php_sapi.c +++ b/src/nxt_php_sapi.c @@ -77,13 +77,20 @@ typedef void (*zif_handler)(INTERNAL_FUNCTION_PARAMETERS); #endif +static nxt_int_t nxt_php_setup(nxt_task_t *task, nxt_process_t *process, + nxt_common_app_conf_t *conf); static nxt_int_t nxt_php_start(nxt_task_t *task, nxt_process_data_t *data); static nxt_int_t nxt_php_set_target(nxt_task_t *task, nxt_php_target_t *target, nxt_conf_value_t *conf); +static nxt_int_t nxt_php_set_ini_path(nxt_task_t *task, nxt_str_t *path, + char *workdir); static void nxt_php_set_options(nxt_task_t *task, nxt_conf_value_t *options, int type); static nxt_int_t nxt_php_alter_option(nxt_str_t *name, nxt_str_t *value, int type); +#ifdef NXT_PHP8 +static void nxt_php_disable_functions(nxt_str_t *str); +#endif static void nxt_php_disable(nxt_task_t *task, const char *type, nxt_str_t *value, char **ptr, nxt_php_disable_t disable); @@ -252,7 +259,7 @@ NXT_EXPORT nxt_app_module_t nxt_app_module = { PHP_VERSION, NULL, 0, - NULL, + nxt_php_setup, nxt_php_start, }; @@ -267,55 +274,20 @@ static void ***tsrm_ls; static nxt_int_t -nxt_php_start(nxt_task_t *task, nxt_process_data_t *data) +nxt_php_setup(nxt_task_t *task, nxt_process_t *process, + nxt_common_app_conf_t *conf) { - u_char *p; - uint32_t next; - nxt_str_t ini_path, name; - nxt_int_t ret; - nxt_uint_t n; - nxt_unit_ctx_t *unit_ctx; - nxt_unit_init_t php_init; - nxt_conf_value_t *value; - nxt_php_app_conf_t *c; - nxt_common_app_conf_t *conf; + nxt_str_t ini_path; + nxt_int_t ret; + nxt_conf_value_t *value; + nxt_php_app_conf_t *c; static nxt_str_t file_str = nxt_string("file"); static nxt_str_t user_str = nxt_string("user"); static nxt_str_t admin_str = nxt_string("admin"); - conf = data->app; c = &conf->u.php; - n = (c->targets != NULL) ? nxt_conf_object_members_count(c->targets) : 1; - - nxt_php_targets = nxt_zalloc(sizeof(nxt_php_target_t) * n); - if (nxt_slow_path(nxt_php_targets == NULL)) { - return NXT_ERROR; - } - - if (c->targets != NULL) { - next = 0; - - for (n = 0; /* void */; n++) { - value = nxt_conf_next_object_member(c->targets, &name, &next); - if (value == NULL) { - break; - } - - ret = nxt_php_set_target(task, &nxt_php_targets[n], value); - if (nxt_slow_path(ret != NXT_OK)) { - return NXT_ERROR; - } - } - - } else { - ret = nxt_php_set_target(task, &nxt_php_targets[0], conf->self); - if (nxt_slow_path(ret != NXT_OK)) { - return NXT_ERROR; - } - } - #ifdef ZTS #if PHP_VERSION_ID >= 70400 @@ -345,15 +317,12 @@ nxt_php_start(nxt_task_t *task, nxt_process_data_t *data) if (value != NULL) { nxt_conf_get_string(value, &ini_path); - p = nxt_malloc(ini_path.length + 1); - if (nxt_slow_path(p == NULL)) { + ret = nxt_php_set_ini_path(task, &ini_path, + conf->working_directory); + + if (nxt_slow_path(ret != NXT_OK)) { return NXT_ERROR; } - - nxt_php_sapi_module.php_ini_path_override = (char *) p; - - p = nxt_cpymem(p, ini_path.start, ini_path.length); - *p = '\0'; } } @@ -370,6 +339,55 @@ nxt_php_start(nxt_task_t *task, nxt_process_data_t *data) nxt_php_set_options(task, value, ZEND_INI_USER); } + return NXT_OK; +} + + +static nxt_int_t +nxt_php_start(nxt_task_t *task, nxt_process_data_t *data) +{ + uint32_t next; + nxt_int_t ret; + nxt_str_t name; + nxt_uint_t n; + nxt_unit_ctx_t *unit_ctx; + nxt_unit_init_t php_init; + nxt_conf_value_t *value; + nxt_php_app_conf_t *c; + nxt_common_app_conf_t *conf; + + conf = data->app; + c = &conf->u.php; + + n = (c->targets != NULL) ? nxt_conf_object_members_count(c->targets) : 1; + + nxt_php_targets = nxt_zalloc(sizeof(nxt_php_target_t) * n); + if (nxt_slow_path(nxt_php_targets == NULL)) { + return NXT_ERROR; + } + + if (c->targets != NULL) { + next = 0; + + for (n = 0; /* void */; n++) { + value = nxt_conf_next_object_member(c->targets, &name, &next); + if (value == NULL) { + break; + } + + ret = nxt_php_set_target(task, &nxt_php_targets[n], value); + if (nxt_slow_path(ret != NXT_OK)) { + return NXT_ERROR; + } + } + + } else { + ret = nxt_php_set_target(task, &nxt_php_targets[0], conf->self); + if (nxt_slow_path(ret != NXT_OK)) { + return NXT_ERROR; + } + } + ret = nxt_unit_default_init(task, &php_init); if (nxt_slow_path(ret != NXT_OK)) { nxt_alert(task, "nxt_unit_default_init() failed"); @@ -386,9 +404,8 @@ nxt_php_start(nxt_task_t *task, nxt_process_data_t *data) nxt_php_unit_ctx = unit_ctx; - nxt_unit_run(unit_ctx); - - nxt_unit_done(unit_ctx); + nxt_unit_run(nxt_php_unit_ctx); + nxt_unit_done(nxt_php_unit_ctx); exit(0); @@ -510,6 +527,46 @@ nxt_php_set_target(nxt_task_t *task, nxt_php_target_t *target, } +static nxt_int_t +nxt_php_set_ini_path(nxt_task_t *task, nxt_str_t *ini_path, char *workdir) +{ + size_t wdlen; + u_char *p, *start; + + if (ini_path->start[0] == '/' || workdir == NULL) { + p = nxt_malloc(ini_path->length + 1); + if (nxt_slow_path(p == NULL)) { + return NXT_ERROR; + } + + start = p; + + } else { + wdlen = nxt_strlen(workdir); + + p = nxt_malloc(wdlen + ini_path->length + 2); + if (nxt_slow_path(p == NULL)) { + return NXT_ERROR; + } + + start = p; + + p = nxt_cpymem(p, workdir, wdlen); + + if (workdir[wdlen - 1] != '/') { + *p++ = '/'; + } + } + + p = nxt_cpymem(p, ini_path->start, ini_path->length); + *p = '\0'; + + nxt_php_sapi_module.php_ini_path_override = (char *) start; + + return NXT_OK; +} + + static void nxt_php_set_options(nxt_task_t *task, nxt_conf_value_t *options, int type) { @@ -535,9 +592,13 @@ nxt_php_set_options(nxt_task_t *task, nxt_conf_value_t *options, int type) } if (nxt_str_eq(&name, "disable_functions", 17)) { +#ifdef NXT_PHP8 + nxt_php_disable_functions(&value); +#else nxt_php_disable(task, "function", &value, &PG(disable_functions), zend_disable_function); +#endif continue; } @@ -626,6 +687,29 @@ nxt_php_alter_option(nxt_str_t *name, nxt_str_t *value, int type) #endif +#ifdef NXT_PHP8 + +static void +nxt_php_disable_functions(nxt_str_t *str) +{ + char *p; + + p = nxt_malloc(str->length + 1); + if (nxt_slow_path(p == NULL)) { + return; + } + + nxt_memcpy(p, str->start, str->length); + p[str->length] = '\0'; + + zend_disable_functions(p); + + nxt_free(p); +} + +#endif + + static void nxt_php_disable(nxt_task_t *task, const char *type, nxt_str_t *value, char **ptr, nxt_php_disable_t disable) diff --git a/src/nxt_process.c b/src/nxt_process.c index 9bfae395..9be7974f 100644 --- a/src/nxt_process.c +++ b/src/nxt_process.c @@ -17,10 +17,6 @@ #include <sys/prctl.h> #endif -#if (NXT_HAVE_PIVOT_ROOT) -#include <mntent.h> -#endif - static nxt_int_t nxt_process_setup(nxt_task_t *task, nxt_process_t *process); static nxt_int_t nxt_process_child_fixup(nxt_task_t *task, nxt_process_t *process); @@ -33,16 +29,6 @@ static void nxt_process_created_ok(nxt_task_t *task, nxt_port_recv_msg_t *msg, static void nxt_process_created_error(nxt_task_t *task, nxt_port_recv_msg_t *msg, void *data); -#if (NXT_HAVE_ISOLATION_ROOTFS) -static nxt_int_t nxt_process_chroot(nxt_task_t *task, const char *path); - -#if (NXT_HAVE_PIVOT_ROOT) && (NXT_HAVE_CLONE_NEWNS) -static nxt_int_t nxt_process_pivot_root(nxt_task_t *task, const char *rootfs); -static nxt_int_t nxt_process_private_mount(nxt_task_t *task, - const char *rootfs); -static int nxt_pivot_root(const char *new_root, const char *old_root); -#endif -#endif /* A cached process pid. */ nxt_pid_t nxt_pid; @@ -398,51 +384,6 @@ nxt_process_core_setup(nxt_task_t *task, nxt_process_t *process) } -#if (NXT_HAVE_CLONE_NEWUSER) - -nxt_int_t -nxt_process_vldt_isolation_creds(nxt_task_t *task, nxt_process_t *process) -{ - nxt_int_t ret; - nxt_clone_t *clone; - nxt_credential_t *creds; - - clone = &process->isolation.clone; - creds = process->user_cred; - - if (clone->uidmap.size == 0 && clone->gidmap.size == 0) { - return NXT_OK; - } - - if (!nxt_is_clone_flag_set(clone->flags, NEWUSER)) { - if (nxt_slow_path(clone->uidmap.size > 0)) { - nxt_log(task, NXT_LOG_ERR, "\"uidmap\" is set but " - "\"isolation.namespaces.credential\" is false or unset"); - - return NXT_ERROR; - } - - if (nxt_slow_path(clone->gidmap.size > 0)) { - nxt_log(task, NXT_LOG_ERR, "\"gidmap\" is set but " - "\"isolation.namespaces.credential\" is false or unset"); - - return NXT_ERROR; - } - - return NXT_OK; - } - - ret = nxt_clone_vldt_credential_uidmap(task, &clone->uidmap, creds); - if (nxt_slow_path(ret != NXT_OK)) { - return NXT_ERROR; - } - - return nxt_clone_vldt_credential_gidmap(task, &clone->gidmap, creds); -} - -#endif - - nxt_int_t nxt_process_creds_set(nxt_task_t *task, nxt_process_t *process, nxt_str_t *user, nxt_str_t *group) @@ -525,329 +466,6 @@ nxt_process_apply_creds(nxt_task_t *task, nxt_process_t *process) } -#if (NXT_HAVE_ISOLATION_ROOTFS) - - -#if (NXT_HAVE_PIVOT_ROOT) && (NXT_HAVE_CLONE_NEWNS) - - -nxt_int_t -nxt_process_change_root(nxt_task_t *task, nxt_process_t *process) -{ - char *rootfs; - nxt_int_t ret; - - rootfs = (char *) process->isolation.rootfs; - - nxt_debug(task, "change root: %s", rootfs); - - if (NXT_CLONE_MNT(process->isolation.clone.flags)) { - ret = nxt_process_pivot_root(task, rootfs); - } else { - ret = nxt_process_chroot(task, rootfs); - } - - if (nxt_fast_path(ret == NXT_OK)) { - if (nxt_slow_path(chdir("/") < 0)) { - nxt_alert(task, "chdir(\"/\") %E", nxt_errno); - return NXT_ERROR; - } - } - - return ret; -} - - -#else - - -nxt_int_t -nxt_process_change_root(nxt_task_t *task, nxt_process_t *process) -{ - char *rootfs; - - rootfs = (char *) process->isolation.rootfs; - - nxt_debug(task, "change root: %s", rootfs); - - if (nxt_fast_path(nxt_process_chroot(task, rootfs) == NXT_OK)) { - if (nxt_slow_path(chdir("/") < 0)) { - nxt_alert(task, "chdir(\"/\") %E", nxt_errno); - return NXT_ERROR; - } - - return NXT_OK; - } - - return NXT_ERROR; -} - - -#endif - - -static nxt_int_t -nxt_process_chroot(nxt_task_t *task, const char *path) -{ - if (nxt_slow_path(chroot(path) < 0)) { - nxt_alert(task, "chroot(%s) %E", path, nxt_errno); - return NXT_ERROR; - } - - return NXT_OK; -} - - -void -nxt_process_unmount_all(nxt_task_t *task, nxt_process_t *process) -{ - size_t i, n; - nxt_array_t *mounts; - nxt_fs_mount_t *mnt; - - nxt_debug(task, "unmount all (%s)", process->name); - - mounts = process->isolation.mounts; - n = mounts->nelts; - mnt = mounts->elts; - - for (i = 0; i < n; i++) { - nxt_fs_unmount(mnt[i].dst); - } -} - - -#if (NXT_HAVE_PIVOT_ROOT) && (NXT_HAVE_CLONE_NEWNS) - -/* - * pivot_root(2) can only be safely used with containers, otherwise it can - * umount(2) the global root filesystem and screw up the machine. - */ - -static nxt_int_t -nxt_process_pivot_root(nxt_task_t *task, const char *path) -{ - /* - * This implementation makes use of a kernel trick that works for ages - * and now documented in Linux kernel 5. - * https://lore.kernel.org/linux-man/87r24piwhm.fsf@x220.int.ebiederm.org/T/ - */ - - if (nxt_slow_path(mount("", "/", "", MS_SLAVE|MS_REC, "") != 0)) { - nxt_alert(task, "failed to make / a slave mount %E", nxt_errno); - return NXT_ERROR; - } - - if (nxt_slow_path(nxt_process_private_mount(task, path) != NXT_OK)) { - return NXT_ERROR; - } - - if (nxt_slow_path(mount(path, path, "bind", MS_BIND|MS_REC, "") != 0)) { - nxt_alert(task, "error bind mounting rootfs %E", nxt_errno); - return NXT_ERROR; - } - - if (nxt_slow_path(chdir(path) != 0)) { - nxt_alert(task, "failed to chdir(%s) %E", path, nxt_errno); - return NXT_ERROR; - } - - if (nxt_slow_path(nxt_pivot_root(".", ".") != 0)) { - nxt_alert(task, "failed to pivot_root %E", nxt_errno); - return NXT_ERROR; - } - - /* - * Make oldroot a slave mount to avoid unmounts getting propagated to the - * host. - */ - if (nxt_slow_path(mount("", ".", "", MS_SLAVE | MS_REC, NULL) != 0)) { - nxt_alert(task, "failed to bind mount rootfs %E", nxt_errno); - return NXT_ERROR; - } - - if (nxt_slow_path(umount2(".", MNT_DETACH) != 0)) { - nxt_alert(task, "failed to umount old root directory %E", nxt_errno); - return NXT_ERROR; - } - - return NXT_OK; -} - - -static nxt_int_t -nxt_process_private_mount(nxt_task_t *task, const char *rootfs) -{ - char *parent_mnt; - FILE *procfile; - u_char **mounts; - size_t len; - uint8_t *shared; - nxt_int_t ret, index, nmounts; - struct mntent *ent; - - static const char *mount_path = "/proc/self/mounts"; - - ret = NXT_ERROR; - ent = NULL; - shared = NULL; - procfile = NULL; - parent_mnt = NULL; - - nmounts = 256; - - mounts = nxt_malloc(nmounts * sizeof(uintptr_t)); - if (nxt_slow_path(mounts == NULL)) { - goto fail; - } - - shared = nxt_malloc(nmounts); - if (nxt_slow_path(shared == NULL)) { - goto fail; - } - - procfile = setmntent(mount_path, "r"); - if (nxt_slow_path(procfile == NULL)) { - nxt_alert(task, "failed to open %s %E", mount_path, nxt_errno); - - goto fail; - } - - index = 0; - -again: - - for ( ; index < nmounts; index++) { - ent = getmntent(procfile); - if (ent == NULL) { - nmounts = index; - break; - } - - mounts[index] = (u_char *) strdup(ent->mnt_dir); - shared[index] = hasmntopt(ent, "shared") != NULL; - } - - if (ent != NULL) { - /* there are still entries to be read */ - - nmounts *= 2; - mounts = nxt_realloc(mounts, nmounts); - if (nxt_slow_path(mounts == NULL)) { - goto fail; - } - - shared = nxt_realloc(shared, nmounts); - if (nxt_slow_path(shared == NULL)) { - goto fail; - } - - goto again; - } - - for (index = 0; index < nmounts; index++) { - if (nxt_strcmp(mounts[index], rootfs) == 0) { - parent_mnt = (char *) rootfs; - break; - } - } - - if (parent_mnt == NULL) { - len = nxt_strlen(rootfs); - - parent_mnt = nxt_malloc(len + 1); - if (parent_mnt == NULL) { - goto fail; - } - - nxt_memcpy(parent_mnt, rootfs, len); - parent_mnt[len] = '\0'; - - if (parent_mnt[len - 1] == '/') { - parent_mnt[len - 1] = '\0'; - len--; - } - - for ( ;; ) { - for (index = 0; index < nmounts; index++) { - if (nxt_strcmp(mounts[index], parent_mnt) == 0) { - goto found; - } - } - - if (len == 1 && parent_mnt[0] == '/') { - nxt_alert(task, "parent mount not found"); - goto fail; - } - - /* parent dir */ - while (parent_mnt[len - 1] != '/' && len > 0) { - len--; - } - - if (nxt_slow_path(len == 0)) { - nxt_alert(task, "parent mount not found"); - goto fail; - } - - if (len == 1) { - parent_mnt[len] = '\0'; /* / */ - } else { - parent_mnt[len - 1] = '\0'; /* /<path> */ - } - } - } - -found: - - if (shared[index]) { - if (nxt_slow_path(mount("", parent_mnt, "", MS_PRIVATE, "") != 0)) { - nxt_alert(task, "mount(\"\", \"%s\", MS_PRIVATE) %E", parent_mnt, - nxt_errno); - - goto fail; - } - } - - ret = NXT_OK; - -fail: - - if (procfile != NULL) { - endmntent(procfile); - } - - if (mounts != NULL) { - for (index = 0; index < nmounts; index++) { - nxt_free(mounts[index]); - } - - nxt_free(mounts); - } - - if (shared != NULL) { - nxt_free(shared); - } - - if (parent_mnt != NULL && parent_mnt != rootfs) { - nxt_free(parent_mnt); - } - - return ret; -} - - -static int -nxt_pivot_root(const char *new_root, const char *old_root) -{ - return syscall(__NR_pivot_root, new_root, old_root); -} - -#endif - -#endif - - static nxt_int_t nxt_process_send_ready(nxt_task_t *task, nxt_process_t *process) { diff --git a/src/nxt_process.h b/src/nxt_process.h index ecd813e2..d9b4dff1 100644 --- a/src/nxt_process.h +++ b/src/nxt_process.h @@ -60,6 +60,9 @@ typedef enum { typedef struct nxt_port_mmap_s nxt_port_mmap_t; +typedef struct nxt_process_s nxt_process_t; +typedef void (*nxt_isolation_cleanup_t)(nxt_task_t *task, + nxt_process_t *process); typedef struct { @@ -69,21 +72,30 @@ typedef struct { nxt_port_mmap_t *elts; } nxt_port_mmaps_t; + typedef struct { - u_char *rootfs; - nxt_array_t *mounts; /* of nxt_mount_t */ + uint8_t language_deps; /* 1-byte */ +} nxt_process_automount_t; + + +typedef struct { + u_char *rootfs; + nxt_process_automount_t automount; + nxt_array_t *mounts; /* of nxt_mount_t */ + + nxt_isolation_cleanup_t cleanup; #if (NXT_HAVE_CLONE) - nxt_clone_t clone; + nxt_clone_t clone; #endif #if (NXT_HAVE_PR_SET_NO_NEW_PRIVS) - uint8_t new_privs; /* 1 bit */ + uint8_t new_privs; /* 1 bit */ #endif } nxt_process_isolation_t; -typedef struct { +struct nxt_process_s { nxt_pid_t pid; const char *name; nxt_queue_t ports; /* of nxt_port_t */ @@ -103,7 +115,7 @@ typedef struct { nxt_process_data_t data; nxt_process_isolation_t isolation; -} nxt_process_t; +}; typedef nxt_int_t (*nxt_process_prefork_t)(nxt_task_t *task, @@ -178,17 +190,6 @@ nxt_int_t nxt_process_creds_set(nxt_task_t *task, nxt_process_t *process, nxt_str_t *user, nxt_str_t *group); nxt_int_t nxt_process_apply_creds(nxt_task_t *task, nxt_process_t *process); -#if (NXT_HAVE_CLONE_NEWUSER) -nxt_int_t nxt_process_vldt_isolation_creds(nxt_task_t *task, - nxt_process_t *process); -#endif - -nxt_int_t nxt_process_change_root(nxt_task_t *task, nxt_process_t *process); - -#if (NXT_HAVE_ISOLATION_ROOTFS) -void nxt_process_unmount_all(nxt_task_t *task, nxt_process_t *process); -#endif - #if (NXT_HAVE_SETPROCTITLE) #define nxt_process_title(task, fmt, ...) \ diff --git a/src/nxt_router.c b/src/nxt_router.c index 0e1de6fa..a3218047 100644 --- a/src/nxt_router.c +++ b/src/nxt_router.c @@ -24,7 +24,6 @@ typedef struct { uint32_t max_processes; uint32_t spare_processes; nxt_msec_t timeout; - nxt_msec_t res_timeout; nxt_msec_t idle_timeout; uint32_t requests; nxt_conf_value_t *limits_value; @@ -260,7 +259,7 @@ static const nxt_str_t empty_prefix = nxt_string(""); static const nxt_str_t *nxt_app_msg_prefix[] = { &empty_prefix, - &http_prefix, + &empty_prefix, &http_prefix, &http_prefix, &http_prefix, @@ -1158,12 +1157,6 @@ static nxt_conf_map_t nxt_router_app_limits_conf[] = { }, { - nxt_string("reschedule_timeout"), - NXT_CONF_MAP_MSEC, - offsetof(nxt_router_app_conf_t, res_timeout), - }, - - { nxt_string("requests"), NXT_CONF_MAP_INT32, offsetof(nxt_router_app_conf_t, requests), @@ -1423,7 +1416,6 @@ nxt_router_conf_create(nxt_task_t *task, nxt_router_temp_conf_t *tmcf, apcf.max_processes = 1; apcf.spare_processes = 0; apcf.timeout = 0; - apcf.res_timeout = 1000; apcf.idle_timeout = 15000; apcf.requests = 0; apcf.limits_value = NULL; @@ -1505,8 +1497,6 @@ nxt_router_conf_create(nxt_task_t *task, nxt_router_temp_conf_t *tmcf, nxt_debug(task, "application type: %V", &apcf.type); nxt_debug(task, "application processes: %D", apcf.processes); nxt_debug(task, "application request timeout: %M", apcf.timeout); - nxt_debug(task, "application reschedule timeout: %M", - apcf.res_timeout); nxt_debug(task, "application requests: %D", apcf.requests); lang = nxt_app_lang_module(task->thread->runtime, &apcf.type); @@ -1537,7 +1527,6 @@ nxt_router_conf_create(nxt_task_t *task, nxt_router_temp_conf_t *tmcf, app->max_pending_processes = apcf.spare_processes ? apcf.spare_processes : 1; app->timeout = apcf.timeout; - app->res_timeout = apcf.res_timeout * 1000000; app->idle_timeout = apcf.idle_timeout; app->max_requests = apcf.requests; @@ -1736,6 +1725,10 @@ nxt_router_conf_create(nxt_task_t *task, nxt_router_temp_conf_t *tmcf, tmcf->router_conf, &lscf.application); } + + if (nxt_slow_path(skcf->action == NULL)) { + goto fail; + } } } @@ -4948,6 +4941,9 @@ nxt_router_app_prepare_request(nxt_task_t *task, nxt_debug(task, "queue is not empty"); } + buf->is_port_mmap_sent = 1; + buf->mem.pos = buf->mem.free; + } else { nxt_alert(task, "stream #%uD, app '%V': failed to send app message", req_rpc_data->stream, &app->name); diff --git a/src/nxt_router.h b/src/nxt_router.h index 81b3538c..512f1810 100644 --- a/src/nxt_router.h +++ b/src/nxt_router.h @@ -126,7 +126,6 @@ struct nxt_app_s { uint32_t max_requests; nxt_msec_t timeout; - nxt_nsec_t res_timeout; nxt_msec_t idle_timeout; nxt_str_t *targets; diff --git a/src/nxt_runtime.c b/src/nxt_runtime.c index 435276a0..44970b34 100644 --- a/src/nxt_runtime.c +++ b/src/nxt_runtime.c @@ -84,7 +84,11 @@ nxt_runtime_create(nxt_task_t *task) lang->version = (u_char *) ""; lang->file = NULL; lang->module = &nxt_external_module; - lang->mounts = NULL; + + lang->mounts = nxt_array_create(mp, 1, sizeof(nxt_fs_mount_t)); + if (nxt_slow_path(lang->mounts == NULL)) { + goto fail; + } listen_sockets = nxt_array_create(mp, 1, sizeof(nxt_listen_socket_t)); if (nxt_slow_path(listen_sockets == NULL)) { diff --git a/src/nxt_string.h b/src/nxt_string.h index 3f9192e2..7e02f59a 100644 --- a/src/nxt_string.h +++ b/src/nxt_string.h @@ -31,6 +31,16 @@ nxt_strlen(s) \ #define \ +nxt_strdup(s) \ + strdup((char *) s) + + +#define \ +nxt_strchr(buf, delim) \ + (u_char *) strchr((char *) buf, delim) + + +#define \ nxt_memzero(buf, length) \ (void) memset(buf, 0, length) diff --git a/src/nxt_unit.c b/src/nxt_unit.c index 6b7d631d..f75d61bc 100644 --- a/src/nxt_unit.c +++ b/src/nxt_unit.c @@ -74,7 +74,8 @@ static void nxt_unit_request_info_free(nxt_unit_request_info_impl_t *req); static nxt_unit_websocket_frame_impl_t *nxt_unit_websocket_frame_get( nxt_unit_ctx_t *ctx); static void nxt_unit_websocket_frame_release(nxt_unit_websocket_frame_t *ws); -static void nxt_unit_websocket_frame_free(nxt_unit_websocket_frame_impl_t *ws); +static void nxt_unit_websocket_frame_free(nxt_unit_ctx_t *ctx, + nxt_unit_websocket_frame_impl_t *ws); static nxt_unit_mmap_buf_t *nxt_unit_mmap_buf_get(nxt_unit_ctx_t *ctx); static void nxt_unit_mmap_buf_release(nxt_unit_mmap_buf_t *mmap_buf); static int nxt_unit_mmap_buf_send(nxt_unit_request_info_t *req, @@ -119,8 +120,7 @@ static void nxt_unit_mmap_release(nxt_unit_ctx_t *ctx, nxt_port_mmap_header_t *hdr, void *start, uint32_t size); static int nxt_unit_send_shm_ack(nxt_unit_ctx_t *ctx, pid_t pid); -static nxt_unit_process_t *nxt_unit_process_get(nxt_unit_impl_t *lib, - pid_t pid); +static nxt_unit_process_t *nxt_unit_process_get(nxt_unit_ctx_t *ctx, pid_t pid); static nxt_unit_process_t *nxt_unit_process_find(nxt_unit_impl_t *lib, pid_t pid, int remove); static nxt_unit_process_t *nxt_unit_process_pop_first(nxt_unit_impl_t *lib); @@ -184,6 +184,9 @@ static nxt_unit_request_info_t *nxt_unit_request_hash_find( nxt_unit_ctx_t *ctx, uint32_t stream, int remove); static char * nxt_unit_snprint_prefix(char *p, char *end, pid_t pid, int level); +static void *nxt_unit_lvlhsh_alloc(void *data, size_t size); +static void nxt_unit_lvlhsh_free(void *data, void *p); +static int nxt_unit_memcasecmp(const void *p1, const void *p2, size_t length); struct nxt_unit_mmap_buf_s { @@ -531,7 +534,8 @@ nxt_unit_create(nxt_unit_init_t *init) nxt_unit_impl_t *lib; nxt_unit_callbacks_t *cb; - lib = malloc(sizeof(nxt_unit_impl_t) + init->request_data_size); + lib = nxt_unit_malloc(NULL, + sizeof(nxt_unit_impl_t) + init->request_data_size); if (nxt_slow_path(lib == NULL)) { nxt_unit_alert(NULL, "failed to allocate unit struct"); @@ -586,7 +590,7 @@ nxt_unit_create(nxt_unit_init_t *init) fail: - free(lib); + nxt_unit_free(NULL, lib); return NULL; } @@ -710,7 +714,7 @@ nxt_unit_lib_release(nxt_unit_impl_t *lib) nxt_unit_mmaps_destroy(&lib->incoming); nxt_unit_mmaps_destroy(&lib->outgoing); - free(lib); + nxt_unit_free(NULL, lib); } } @@ -1388,7 +1392,7 @@ nxt_unit_request_check_response_port(nxt_unit_request_info_t *req, return NXT_UNIT_AGAIN; } - port_impl = malloc(sizeof(nxt_unit_port_impl_t)); + port_impl = nxt_unit_malloc(ctx, sizeof(nxt_unit_port_impl_t)); if (nxt_slow_path(port_impl == NULL)) { nxt_unit_alert(ctx, "check_response_port: malloc(%d) failed", (int) sizeof(nxt_unit_port_impl_t)); @@ -1412,7 +1416,7 @@ nxt_unit_request_check_response_port(nxt_unit_request_info_t *req, pthread_mutex_unlock(&lib->mutex); - free(port); + nxt_unit_free(ctx, port); return NXT_UNIT_ERROR; } @@ -1426,7 +1430,7 @@ nxt_unit_request_check_response_port(nxt_unit_request_info_t *req, pthread_mutex_unlock(&lib->mutex); - free(port); + nxt_unit_free(ctx, port); return NXT_UNIT_ERROR; } @@ -1634,8 +1638,8 @@ nxt_unit_request_info_get(nxt_unit_ctx_t *ctx) if (nxt_queue_is_empty(&ctx_impl->free_req)) { pthread_mutex_unlock(&ctx_impl->mutex); - req_impl = malloc(sizeof(nxt_unit_request_info_impl_t) - + lib->request_data_size); + req_impl = nxt_unit_malloc(ctx, sizeof(nxt_unit_request_info_impl_t) + + lib->request_data_size); if (nxt_slow_path(req_impl == NULL)) { return NULL; } @@ -1722,7 +1726,7 @@ nxt_unit_request_info_free(nxt_unit_request_info_impl_t *req_impl) nxt_queue_remove(&req_impl->link); if (req_impl != &ctx_impl->req) { - free(req_impl); + nxt_unit_free(&ctx_impl->ctx, req_impl); } } @@ -1741,7 +1745,7 @@ nxt_unit_websocket_frame_get(nxt_unit_ctx_t *ctx) if (nxt_queue_is_empty(&ctx_impl->free_ws)) { pthread_mutex_unlock(&ctx_impl->mutex); - ws_impl = malloc(sizeof(nxt_unit_websocket_frame_impl_t)); + ws_impl = nxt_unit_malloc(ctx, sizeof(nxt_unit_websocket_frame_impl_t)); if (nxt_slow_path(ws_impl == NULL)) { return NULL; } @@ -1783,11 +1787,12 @@ nxt_unit_websocket_frame_release(nxt_unit_websocket_frame_t *ws) static void -nxt_unit_websocket_frame_free(nxt_unit_websocket_frame_impl_t *ws_impl) +nxt_unit_websocket_frame_free(nxt_unit_ctx_t *ctx, + nxt_unit_websocket_frame_impl_t *ws_impl) { nxt_queue_remove(&ws_impl->link); - free(ws_impl); + nxt_unit_free(ctx, ws_impl); } @@ -1815,42 +1820,66 @@ nxt_unit_field_hash(const char *name, size_t name_length) void nxt_unit_request_group_dup_fields(nxt_unit_request_info_t *req) { + char *name; uint32_t i, j; nxt_unit_field_t *fields, f; nxt_unit_request_t *r; + static nxt_str_t content_length = nxt_string("content-length"); + static nxt_str_t content_type = nxt_string("content-type"); + static nxt_str_t cookie = nxt_string("cookie"); + nxt_unit_req_debug(req, "group_dup_fields"); r = req->request; fields = r->fields; for (i = 0; i < r->fields_count; i++) { + name = nxt_unit_sptr_get(&fields[i].name); switch (fields[i].hash) { case NXT_UNIT_HASH_CONTENT_LENGTH: - r->content_length_field = i; + if (fields[i].name_length == content_length.length + && nxt_unit_memcasecmp(name, content_length.start, + content_length.length) == 0) + { + r->content_length_field = i; + } + break; case NXT_UNIT_HASH_CONTENT_TYPE: - r->content_type_field = i; + if (fields[i].name_length == content_type.length + && nxt_unit_memcasecmp(name, content_type.start, + content_type.length) == 0) + { + r->content_type_field = i; + } + break; case NXT_UNIT_HASH_COOKIE: - r->cookie_field = i; + if (fields[i].name_length == cookie.length + && nxt_unit_memcasecmp(name, cookie.start, + cookie.length) == 0) + { + r->cookie_field = i; + } + break; - }; + } for (j = i + 1; j < r->fields_count; j++) { - if (fields[i].hash != fields[j].hash) { - continue; - } - - if (j == i + 1) { + if (fields[i].hash != fields[j].hash + || fields[i].name_length != fields[j].name_length + || nxt_unit_memcasecmp(name, + nxt_unit_sptr_get(&fields[j].name), + fields[j].name_length) != 0) + { continue; } f = fields[j]; - f.name.offset += (j - (i + 1)) * sizeof(f); f.value.offset += (j - (i + 1)) * sizeof(f); while (j > i + 1) { @@ -1862,6 +1891,9 @@ nxt_unit_request_group_dup_fields(nxt_unit_request_info_t *req) fields[j] = f; + /* Assign the same name pointer for further grouping simplicity. */ + nxt_unit_sptr_set(&fields[j].name, name); + i++; } } @@ -2297,7 +2329,7 @@ nxt_unit_mmap_buf_get(nxt_unit_ctx_t *ctx) if (ctx_impl->free_buf == NULL) { pthread_mutex_unlock(&ctx_impl->mutex); - mmap_buf = malloc(sizeof(nxt_unit_mmap_buf_t)); + mmap_buf = nxt_unit_malloc(ctx, sizeof(nxt_unit_mmap_buf_t)); if (nxt_slow_path(mmap_buf == NULL)) { return NULL; } @@ -2615,7 +2647,7 @@ nxt_unit_free_outgoing_buf(nxt_unit_mmap_buf_t *mmap_buf) } if (mmap_buf->free_ptr != NULL) { - free(mmap_buf->free_ptr); + nxt_unit_free(&mmap_buf->ctx_impl->ctx, mmap_buf->free_ptr); mmap_buf->free_ptr = NULL; } @@ -2657,7 +2689,7 @@ nxt_unit_read_buf_get_impl(nxt_unit_ctx_impl_t *ctx_impl) return rbuf; } - rbuf = malloc(sizeof(nxt_unit_read_buf_t)); + rbuf = nxt_unit_malloc(&ctx_impl->ctx, sizeof(nxt_unit_read_buf_t)); if (nxt_fast_path(rbuf != NULL)) { rbuf->ctx_impl = ctx_impl; @@ -3016,7 +3048,7 @@ nxt_unit_request_preread(nxt_unit_request_info_t *req, size_t size) return NULL; } - mmap_buf->free_ptr = malloc(size); + mmap_buf->free_ptr = nxt_unit_malloc(req->ctx, size); if (nxt_slow_path(mmap_buf->free_ptr == NULL)) { nxt_unit_req_alert(req, "preread: failed to allocate buf memory"); nxt_unit_mmap_buf_release(mmap_buf); @@ -3288,7 +3320,7 @@ int nxt_unit_websocket_retain(nxt_unit_websocket_frame_t *ws) { char *b; - size_t size; + size_t size, hsize; nxt_unit_websocket_frame_impl_t *ws_impl; ws_impl = nxt_container_of(ws, nxt_unit_websocket_frame_impl_t, ws); @@ -3299,19 +3331,30 @@ nxt_unit_websocket_retain(nxt_unit_websocket_frame_t *ws) size = ws_impl->buf->buf.end - ws_impl->buf->buf.start; - b = malloc(size); + b = nxt_unit_malloc(ws->req->ctx, size); if (nxt_slow_path(b == NULL)) { return NXT_UNIT_ERROR; } memcpy(b, ws_impl->buf->buf.start, size); + hsize = nxt_websocket_frame_header_size(b); + ws_impl->buf->buf.start = b; - ws_impl->buf->buf.free = b; + ws_impl->buf->buf.free = b + hsize; ws_impl->buf->buf.end = b + size; ws_impl->buf->free_ptr = b; + ws_impl->ws.header = (nxt_websocket_header_t *) b; + + if (ws_impl->ws.header->mask) { + ws_impl->ws.mask = (uint8_t *) b + hsize - 4; + + } else { + ws_impl->ws.mask = NULL; + } + return NXT_UNIT_OK; } @@ -3796,7 +3839,8 @@ nxt_unit_get_outgoing_buf(nxt_unit_ctx_t *ctx, nxt_unit_port_t *port, mmap_buf->plain_ptr = local_buf; } else { - mmap_buf->free_ptr = malloc(size + sizeof(nxt_port_msg_t)); + mmap_buf->free_ptr = nxt_unit_malloc(ctx, + size + sizeof(nxt_port_msg_t)); if (nxt_slow_path(mmap_buf->free_ptr == NULL)) { return NXT_UNIT_ERROR; } @@ -3966,7 +4010,7 @@ nxt_unit_process_release(nxt_unit_process_t *process) if (c == 1) { nxt_unit_debug(NULL, "destroy process #%d", (int) process->pid); - free(process); + nxt_unit_free(NULL, process); } } @@ -3983,7 +4027,7 @@ nxt_unit_mmaps_destroy(nxt_unit_mmaps_t *mmaps) munmap(mm->hdr, PORT_MMAP_SIZE); } - free(mmaps->elts); + nxt_unit_free(NULL, mmaps->elts); } pthread_mutex_destroy(&mmaps->mutex); @@ -4255,8 +4299,8 @@ nxt_unit_lvlhsh_pid_test(nxt_lvlhsh_query_t *lhq, void *data) static const nxt_lvlhsh_proto_t lvlhsh_processes_proto nxt_aligned(64) = { NXT_LVLHSH_DEFAULT, nxt_unit_lvlhsh_pid_test, - nxt_lvlhsh_alloc, - nxt_lvlhsh_free, + nxt_unit_lvlhsh_alloc, + nxt_unit_lvlhsh_free, }; @@ -4271,11 +4315,14 @@ nxt_unit_process_lhq_pid(nxt_lvlhsh_query_t *lhq, pid_t *pid) static nxt_unit_process_t * -nxt_unit_process_get(nxt_unit_impl_t *lib, pid_t pid) +nxt_unit_process_get(nxt_unit_ctx_t *ctx, pid_t pid) { + nxt_unit_impl_t *lib; nxt_unit_process_t *process; nxt_lvlhsh_query_t lhq; + lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit); + nxt_unit_process_lhq_pid(&lhq, &pid); if (nxt_lvlhsh_find(&lib->processes, &lhq) == NXT_OK) { @@ -4285,9 +4332,9 @@ nxt_unit_process_get(nxt_unit_impl_t *lib, pid_t pid) return process; } - process = malloc(sizeof(nxt_unit_process_t)); + process = nxt_unit_malloc(ctx, sizeof(nxt_unit_process_t)); if (nxt_slow_path(process == NULL)) { - nxt_unit_alert(NULL, "failed to allocate process for #%d", (int) pid); + nxt_unit_alert(ctx, "failed to allocate process for #%d", (int) pid); return NULL; } @@ -4308,9 +4355,9 @@ nxt_unit_process_get(nxt_unit_impl_t *lib, pid_t pid) break; default: - nxt_unit_alert(NULL, "process %d insert failed", (int) pid); + nxt_unit_alert(ctx, "process %d insert failed", (int) pid); - free(process); + nxt_unit_free(ctx, process); process = NULL; break; } @@ -4881,7 +4928,8 @@ nxt_unit_ctx_alloc(nxt_unit_ctx_t *ctx, void *data) lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit); - new_ctx = malloc(sizeof(nxt_unit_ctx_impl_t) + lib->request_data_size); + new_ctx = nxt_unit_malloc(ctx, sizeof(nxt_unit_ctx_impl_t) + + lib->request_data_size); if (nxt_slow_path(new_ctx == NULL)) { nxt_unit_alert(ctx, "failed to allocate context"); @@ -4890,7 +4938,7 @@ nxt_unit_ctx_alloc(nxt_unit_ctx_t *ctx, void *data) rc = nxt_unit_ctx_init(lib, new_ctx, data); if (nxt_slow_path(rc != NXT_UNIT_OK)) { - free(new_ctx); + nxt_unit_free(ctx, new_ctx); return NULL; } @@ -4969,7 +5017,7 @@ nxt_unit_ctx_free(nxt_unit_ctx_impl_t *ctx_impl) while (ctx_impl->free_buf != NULL) { mmap_buf = ctx_impl->free_buf; nxt_unit_mmap_buf_unlink(mmap_buf); - free(mmap_buf); + nxt_unit_free(&ctx_impl->ctx, mmap_buf); } nxt_queue_each(req_impl, &ctx_impl->free_req, @@ -4982,7 +5030,7 @@ nxt_unit_ctx_free(nxt_unit_ctx_impl_t *ctx_impl) nxt_queue_each(ws_impl, &ctx_impl->free_ws, nxt_unit_websocket_frame_impl_t, link) { - nxt_unit_websocket_frame_free(ws_impl); + nxt_unit_websocket_frame_free(&ctx_impl->ctx, ws_impl); } nxt_queue_loop; @@ -4996,7 +5044,7 @@ nxt_unit_ctx_free(nxt_unit_ctx_impl_t *ctx_impl) } if (ctx_impl != &lib->main_ctx) { - free(ctx_impl); + nxt_unit_free(&lib->main_ctx.ctx, ctx_impl); } nxt_unit_lib_release(lib); @@ -5048,7 +5096,7 @@ nxt_unit_create_port(nxt_unit_ctx_t *ctx) pthread_mutex_lock(&lib->mutex); - process = nxt_unit_process_get(lib, lib->pid); + process = nxt_unit_process_get(ctx, lib->pid); if (nxt_slow_path(process == NULL)) { pthread_mutex_unlock(&lib->mutex); @@ -5181,7 +5229,7 @@ nxt_inline void nxt_unit_port_release(nxt_unit_port_t *port) : sizeof(nxt_port_queue_t)); } - free(port_impl); + nxt_unit_free(NULL, port_impl); } } @@ -5288,7 +5336,7 @@ nxt_unit_add_port(nxt_unit_ctx_t *ctx, nxt_unit_port_t *port, void *queue) port->id.pid, port->id.id, port->in_fd, port->out_fd, queue); - process = nxt_unit_process_get(lib, port->id.pid); + process = nxt_unit_process_get(ctx, port->id.pid); if (nxt_slow_path(process == NULL)) { goto unlock; } @@ -5297,7 +5345,7 @@ nxt_unit_add_port(nxt_unit_ctx_t *ctx, nxt_unit_port_t *port, void *queue) process->next_port_id = port->id.id + 1; } - new_port = malloc(sizeof(nxt_unit_port_impl_t)); + new_port = nxt_unit_malloc(ctx, sizeof(nxt_unit_port_impl_t)); if (nxt_slow_path(new_port == NULL)) { nxt_unit_alert(ctx, "add_port: %d,%d malloc() failed", port->id.pid, port->id.id); @@ -5312,7 +5360,7 @@ nxt_unit_add_port(nxt_unit_ctx_t *ctx, nxt_unit_port_t *port, void *queue) nxt_unit_alert(ctx, "add_port: %d,%d hash_add failed", port->id.pid, port->id.id); - free(new_port); + nxt_unit_free(ctx, new_port); new_port = NULL; @@ -5716,10 +5764,6 @@ retry: nxt_unit_debug(ctx, "port{%d,%d} recv %d read_queue", (int) port->id.pid, (int) port->id.id, (int) rbuf->size); - if (port_impl->from_socket) { - nxt_unit_warn(ctx, "port protocol warning: READ_QUEUE after READ_SOCKET"); - } - goto retry; } @@ -5977,8 +6021,8 @@ nxt_unit_port_hash_test(nxt_lvlhsh_query_t *lhq, void *data) static const nxt_lvlhsh_proto_t lvlhsh_ports_proto nxt_aligned(64) = { NXT_LVLHSH_DEFAULT, nxt_unit_port_hash_test, - nxt_lvlhsh_alloc, - nxt_lvlhsh_free, + nxt_unit_lvlhsh_alloc, + nxt_unit_lvlhsh_free, }; @@ -6076,8 +6120,8 @@ nxt_unit_request_hash_test(nxt_lvlhsh_query_t *lhq, void *data) static const nxt_lvlhsh_proto_t lvlhsh_requests_proto nxt_aligned(64) = { NXT_LVLHSH_DEFAULT, nxt_unit_request_hash_test, - nxt_lvlhsh_alloc, - nxt_lvlhsh_free, + nxt_unit_lvlhsh_alloc, + nxt_unit_lvlhsh_free, }; @@ -6158,7 +6202,9 @@ nxt_unit_request_hash_find(nxt_unit_ctx_t *ctx, uint32_t stream, int remove) case NXT_OK: req_impl = nxt_container_of(lhq.value, nxt_unit_request_info_impl_t, req); - req_impl->in_hash = 0; + if (remove) { + req_impl->in_hash = 0; + } return lhq.value; @@ -6307,29 +6353,84 @@ nxt_unit_snprint_prefix(char *p, char *end, pid_t pid, int level) } -/* The function required by nxt_lvlhsh_alloc() and nxt_lvlvhsh_free(). */ - -void * -nxt_memalign(size_t alignment, size_t size) +static void * +nxt_unit_lvlhsh_alloc(void *data, size_t size) { - void *p; - nxt_err_t err; + int err; + void *p; - err = posix_memalign(&p, alignment, size); + err = posix_memalign(&p, size, size); if (nxt_fast_path(err == 0)) { + nxt_unit_debug(NULL, "posix_memalign(%d, %d): %p", + (int) size, (int) size, p); return p; } + nxt_unit_alert(NULL, "posix_memalign(%d, %d) failed: %s (%d)", + (int) size, (int) size, strerror(err), err); return NULL; } -#if (NXT_DEBUG) + +static void +nxt_unit_lvlhsh_free(void *data, void *p) +{ + nxt_unit_free(NULL, p); +} + + +void * +nxt_unit_malloc(nxt_unit_ctx_t *ctx, size_t size) +{ + void *p; + + p = malloc(size); + + if (nxt_fast_path(p != NULL)) { + nxt_unit_debug(ctx, "malloc(%d): %p", (int) size, p); + + } else { + nxt_unit_alert(ctx, "malloc(%d) failed: %s (%d)", + (int) size, strerror(errno), errno); + } + + return p; +} + void -nxt_free(void *p) +nxt_unit_free(nxt_unit_ctx_t *ctx, void *p) { + nxt_unit_debug(ctx, "free(%p)", p); + free(p); } -#endif + +static int +nxt_unit_memcasecmp(const void *p1, const void *p2, size_t length) +{ + u_char c1, c2; + nxt_int_t n; + const u_char *s1, *s2; + + s1 = p1; + s2 = p2; + + while (length-- != 0) { + c1 = *s1++; + c2 = *s2++; + + c1 = nxt_lowcase(c1); + c2 = nxt_lowcase(c2); + + n = c1 - c2; + + if (n != 0) { + return n; + } + } + + return 0; +} diff --git a/src/nxt_unit.h b/src/nxt_unit.h index 67244cf4..e90f0781 100644 --- a/src/nxt_unit.h +++ b/src/nxt_unit.h @@ -187,7 +187,7 @@ struct nxt_unit_read_info_s { /* * Initialize Unit application library with necessary callbacks and - * ready/reply port parameters, send 'READY' response to master. + * ready/reply port parameters, send 'READY' response to main. */ nxt_unit_ctx_t *nxt_unit_init(nxt_unit_init_t *); @@ -322,6 +322,10 @@ int nxt_unit_websocket_retain(nxt_unit_websocket_frame_t *ws); void nxt_unit_websocket_done(nxt_unit_websocket_frame_t *ws); +void *nxt_unit_malloc(nxt_unit_ctx_t *ctx, size_t size); + +void nxt_unit_free(nxt_unit_ctx_t *ctx, void *p); + #if defined __has_attribute #if __has_attribute(format) diff --git a/src/nxt_upstream.c b/src/nxt_upstream.c index c8ecbbe6..9f81b286 100644 --- a/src/nxt_upstream.c +++ b/src/nxt_upstream.c @@ -86,11 +86,11 @@ nxt_upstream_find(nxt_upstreams_t *upstreams, nxt_str_t *name, action->u.upstream_number = i; action->handler = nxt_upstream_handler; - return NXT_DECLINED; + return NXT_OK; } } - return NXT_OK; + return NXT_DECLINED; } diff --git a/src/python/nxt_python.c b/src/python/nxt_python.c new file mode 100644 index 00000000..01534a47 --- /dev/null +++ b/src/python/nxt_python.c @@ -0,0 +1,340 @@ + +/* + * Copyright (C) NGINX, Inc. + */ + + +#include <Python.h> + +#include <nxt_main.h> +#include <nxt_router.h> +#include <nxt_unit.h> + +#include <python/nxt_python.h> + +#include NXT_PYTHON_MOUNTS_H + + +static nxt_int_t nxt_python_start(nxt_task_t *task, + nxt_process_data_t *data); +static void nxt_python_atexit(void); + +static uint32_t compat[] = { + NXT_VERNUM, NXT_DEBUG, +}; + + +NXT_EXPORT nxt_app_module_t nxt_app_module = { + sizeof(compat), + compat, + nxt_string("python"), + PY_VERSION, + nxt_python_mounts, + nxt_nitems(nxt_python_mounts), + NULL, + nxt_python_start, +}; + +static PyObject *nxt_py_stderr_flush; +PyObject *nxt_py_application; + +#if PY_MAJOR_VERSION == 3 +static wchar_t *nxt_py_home; +#else +static char *nxt_py_home; +#endif + + +static nxt_int_t +nxt_python_start(nxt_task_t *task, nxt_process_data_t *data) +{ + int rc, asgi; + char *nxt_py_module; + size_t len; + PyObject *obj, *pypath, *module; + const char *callable; + nxt_unit_ctx_t *unit_ctx; + nxt_unit_init_t python_init; + nxt_common_app_conf_t *app_conf; + nxt_python_app_conf_t *c; +#if PY_MAJOR_VERSION == 3 + char *path; + size_t size; + nxt_int_t pep405; + + static const char pyvenv[] = "/pyvenv.cfg"; + static const char bin_python[] = "/bin/python"; +#endif + + app_conf = data->app; + c = &app_conf->u.python; + + if (c->home != NULL) { + len = nxt_strlen(c->home); + +#if PY_MAJOR_VERSION == 3 + + path = nxt_malloc(len + sizeof(pyvenv)); + if (nxt_slow_path(path == NULL)) { + nxt_alert(task, "Failed to allocate memory"); + return NXT_ERROR; + } + + nxt_memcpy(path, c->home, len); + nxt_memcpy(path + len, pyvenv, sizeof(pyvenv)); + + pep405 = (access(path, R_OK) == 0); + + nxt_free(path); + + if (pep405) { + size = (len + sizeof(bin_python)) * sizeof(wchar_t); + + } else { + size = (len + 1) * sizeof(wchar_t); + } + + nxt_py_home = nxt_malloc(size); + if (nxt_slow_path(nxt_py_home == NULL)) { + nxt_alert(task, "Failed to allocate memory"); + return NXT_ERROR; + } + + if (pep405) { + mbstowcs(nxt_py_home, c->home, len); + mbstowcs(nxt_py_home + len, bin_python, sizeof(bin_python)); + Py_SetProgramName(nxt_py_home); + + } else { + mbstowcs(nxt_py_home, c->home, len + 1); + Py_SetPythonHome(nxt_py_home); + } + +#else + nxt_py_home = nxt_malloc(len + 1); + if (nxt_slow_path(nxt_py_home == NULL)) { + nxt_alert(task, "Failed to allocate memory"); + return NXT_ERROR; + } + + nxt_memcpy(nxt_py_home, c->home, len + 1); + Py_SetPythonHome(nxt_py_home); +#endif + } + + Py_InitializeEx(0); + + module = NULL; + obj = NULL; + + obj = PySys_GetObject((char *) "stderr"); + if (nxt_slow_path(obj == NULL)) { + nxt_alert(task, "Python failed to get \"sys.stderr\" object"); + goto fail; + } + + nxt_py_stderr_flush = PyObject_GetAttrString(obj, "flush"); + if (nxt_slow_path(nxt_py_stderr_flush == NULL)) { + nxt_alert(task, "Python failed to get \"flush\" attribute of " + "\"sys.stderr\" object"); + goto fail; + } + + /* obj is a Borrowed reference. */ + + if (c->path.length > 0) { + obj = PyString_FromStringAndSize((char *) c->path.start, + c->path.length); + + if (nxt_slow_path(obj == NULL)) { + nxt_alert(task, "Python failed to create string object \"%V\"", + &c->path); + goto fail; + } + + pypath = PySys_GetObject((char *) "path"); + + if (nxt_slow_path(pypath == NULL)) { + nxt_alert(task, "Python failed to get \"sys.path\" list"); + goto fail; + } + + if (nxt_slow_path(PyList_Insert(pypath, 0, obj) != 0)) { + nxt_alert(task, "Python failed to insert \"%V\" into \"sys.path\"", + &c->path); + goto fail; + } + + Py_DECREF(obj); + } + + obj = Py_BuildValue("[s]", "unit"); + if (nxt_slow_path(obj == NULL)) { + nxt_alert(task, "Python failed to create the \"sys.argv\" list"); + goto fail; + } + + if (nxt_slow_path(PySys_SetObject((char *) "argv", obj) != 0)) { + nxt_alert(task, "Python failed to set the \"sys.argv\" list"); + goto fail; + } + + Py_CLEAR(obj); + + nxt_py_module = nxt_alloca(c->module.length + 1); + nxt_memcpy(nxt_py_module, c->module.start, c->module.length); + nxt_py_module[c->module.length] = '\0'; + + module = PyImport_ImportModule(nxt_py_module); + if (nxt_slow_path(module == NULL)) { + nxt_alert(task, "Python failed to import module \"%s\"", nxt_py_module); + nxt_python_print_exception(); + goto fail; + } + + callable = (c->callable != NULL) ? c->callable : "application"; + + obj = PyDict_GetItemString(PyModule_GetDict(module), callable); + if (nxt_slow_path(obj == NULL)) { + nxt_alert(task, "Python failed to get \"%s\" " + "from module \"%s\"", callable, nxt_py_module); + goto fail; + } + + if (nxt_slow_path(PyCallable_Check(obj) == 0)) { + nxt_alert(task, "\"%s\" in module \"%s\" " + "is not a callable object", callable, nxt_py_module); + goto fail; + } + + nxt_py_application = obj; + obj = NULL; + + Py_INCREF(nxt_py_application); + + Py_CLEAR(module); + + nxt_unit_default_init(task, &python_init); + + python_init.shm_limit = data->app->shm_limit; + + asgi = nxt_python_asgi_check(nxt_py_application); + + if (asgi) { + rc = nxt_python_asgi_init(task, &python_init); + + } else { + rc = nxt_python_wsgi_init(task, &python_init); + } + + if (nxt_slow_path(rc == NXT_ERROR)) { + goto fail; + } + + unit_ctx = nxt_unit_init(&python_init); + if (nxt_slow_path(unit_ctx == NULL)) { + goto fail; + } + + if (asgi) { + rc = nxt_python_asgi_run(unit_ctx); + + } else { + rc = nxt_python_wsgi_run(unit_ctx); + } + + nxt_unit_done(unit_ctx); + + nxt_python_atexit(); + + exit(rc); + + return NXT_OK; + +fail: + + Py_XDECREF(obj); + Py_XDECREF(module); + + nxt_python_atexit(); + + return NXT_ERROR; +} + + +nxt_int_t +nxt_python_init_strings(nxt_python_string_t *pstr) +{ + PyObject *obj; + + while (pstr->string.start != NULL) { + obj = PyString_FromStringAndSize((char *) pstr->string.start, + pstr->string.length); + if (nxt_slow_path(obj == NULL)) { + return NXT_ERROR; + } + + PyUnicode_InternInPlace(&obj); + + *pstr->object_p = obj; + + pstr++; + } + + return NXT_OK; +} + + +void +nxt_python_done_strings(nxt_python_string_t *pstr) +{ + PyObject *obj; + + while (pstr->string.start != NULL) { + obj = *pstr->object_p; + + Py_XDECREF(obj); + *pstr->object_p = NULL; + + pstr++; + } +} + + +static void +nxt_python_atexit(void) +{ + nxt_python_wsgi_done(); + nxt_python_asgi_done(); + + Py_XDECREF(nxt_py_stderr_flush); + Py_XDECREF(nxt_py_application); + + Py_Finalize(); + + if (nxt_py_home != NULL) { + nxt_free(nxt_py_home); + } +} + + +void +nxt_python_print_exception(void) +{ + PyErr_Print(); + +#if PY_MAJOR_VERSION == 3 + /* The backtrace may be buffered in sys.stderr file object. */ + { + PyObject *result; + + result = PyObject_CallFunction(nxt_py_stderr_flush, NULL); + if (nxt_slow_path(result == NULL)) { + PyErr_Clear(); + return; + } + + Py_DECREF(result); + } +#endif +} diff --git a/src/python/nxt_python.h b/src/python/nxt_python.h new file mode 100644 index 00000000..3211026b --- /dev/null +++ b/src/python/nxt_python.h @@ -0,0 +1,60 @@ + +/* + * Copyright (C) NGINX, Inc. + */ + +#ifndef _NXT_PYTHON_H_INCLUDED_ +#define _NXT_PYTHON_H_INCLUDED_ + + +#include <Python.h> +#include <nxt_main.h> +#include <nxt_unit.h> + + +#if PY_MAJOR_VERSION == 3 +#define NXT_PYTHON_BYTES_TYPE "bytestring" + +#define PyString_FromStringAndSize(str, size) \ + PyUnicode_DecodeLatin1((str), (size), "strict") +#define PyString_AS_STRING PyUnicode_DATA + +#else +#define NXT_PYTHON_BYTES_TYPE "string" + +#define PyBytes_FromStringAndSize PyString_FromStringAndSize +#define PyBytes_Check PyString_Check +#define PyBytes_GET_SIZE PyString_GET_SIZE +#define PyBytes_AS_STRING PyString_AS_STRING +#define PyUnicode_InternInPlace PyString_InternInPlace +#define PyUnicode_AsUTF8 PyString_AS_STRING +#endif + +#if PY_MAJOR_VERSION == 3 && PY_MINOR_VERSION >= 5 +#define NXT_HAVE_ASGI 1 +#endif + +extern PyObject *nxt_py_application; + +typedef struct { + nxt_str_t string; + PyObject **object_p; +} nxt_python_string_t; + + +nxt_int_t nxt_python_init_strings(nxt_python_string_t *pstr); +void nxt_python_done_strings(nxt_python_string_t *pstr); + +void nxt_python_print_exception(void); + +nxt_int_t nxt_python_wsgi_init(nxt_task_t *task, nxt_unit_init_t *init); +int nxt_python_wsgi_run(nxt_unit_ctx_t *ctx); +void nxt_python_wsgi_done(void); + +int nxt_python_asgi_check(PyObject *obj); +nxt_int_t nxt_python_asgi_init(nxt_task_t *task, nxt_unit_init_t *init); +nxt_int_t nxt_python_asgi_run(nxt_unit_ctx_t *ctx); +void nxt_python_asgi_done(void); + + +#endif /* _NXT_PYTHON_H_INCLUDED_ */ diff --git a/src/python/nxt_python_asgi.c b/src/python/nxt_python_asgi.c new file mode 100644 index 00000000..72408ea1 --- /dev/null +++ b/src/python/nxt_python_asgi.c @@ -0,0 +1,1227 @@ + +/* + * Copyright (C) NGINX, Inc. + */ + + +#include <python/nxt_python.h> + +#if (NXT_HAVE_ASGI) + +#include <nxt_main.h> +#include <nxt_unit.h> +#include <nxt_unit_request.h> +#include <nxt_unit_response.h> +#include <python/nxt_python_asgi.h> +#include <python/nxt_python_asgi_str.h> + + +static void nxt_py_asgi_request_handler(nxt_unit_request_info_t *req); + +static PyObject *nxt_py_asgi_create_http_scope(nxt_unit_request_info_t *req); +static PyObject *nxt_py_asgi_create_address(nxt_unit_sptr_t *sptr, uint8_t len, + uint16_t port); +static PyObject *nxt_py_asgi_create_header(nxt_unit_field_t *f); +static PyObject *nxt_py_asgi_create_subprotocols(nxt_unit_field_t *f); + +static int nxt_py_asgi_add_port(nxt_unit_ctx_t *ctx, nxt_unit_port_t *port); +static void nxt_py_asgi_remove_port(nxt_unit_t *lib, nxt_unit_port_t *port); +static void nxt_py_asgi_quit(nxt_unit_ctx_t *ctx); +static void nxt_py_asgi_shm_ack_handler(nxt_unit_ctx_t *ctx); + +static PyObject *nxt_py_asgi_port_read(PyObject *self, PyObject *args); + + +PyObject *nxt_py_loop_run_until_complete; +PyObject *nxt_py_loop_create_future; +PyObject *nxt_py_loop_create_task; + +nxt_queue_t nxt_py_asgi_drain_queue; + +static PyObject *nxt_py_loop_call_soon; +static PyObject *nxt_py_quit_future; +static PyObject *nxt_py_quit_future_set_result; +static PyObject *nxt_py_loop_add_reader; +static PyObject *nxt_py_loop_remove_reader; +static PyObject *nxt_py_port_read; + +static PyMethodDef nxt_py_port_read_method = + {"unit_port_read", nxt_py_asgi_port_read, METH_VARARGS, ""}; + +#define NXT_UNIT_HASH_WS_PROTOCOL 0xED0A + + +int +nxt_python_asgi_check(PyObject *obj) +{ + int res; + PyObject *call; + PyCodeObject *code; + + if (PyFunction_Check(obj)) { + code = (PyCodeObject *) PyFunction_GET_CODE(obj); + + return (code->co_flags & CO_COROUTINE) != 0; + } + + if (PyMethod_Check(obj)) { + obj = PyMethod_GET_FUNCTION(obj); + + code = (PyCodeObject *) PyFunction_GET_CODE(obj); + + return (code->co_flags & CO_COROUTINE) != 0; + } + + call = PyObject_GetAttrString(obj, "__call__"); + + if (call == NULL) { + return 0; + } + + if (PyFunction_Check(call)) { + code = (PyCodeObject *) PyFunction_GET_CODE(call); + + res = (code->co_flags & CO_COROUTINE) != 0; + + } else { + if (PyMethod_Check(call)) { + obj = PyMethod_GET_FUNCTION(call); + + code = (PyCodeObject *) PyFunction_GET_CODE(obj); + + res = (code->co_flags & CO_COROUTINE) != 0; + + } else { + res = 0; + } + } + + Py_DECREF(call); + + return res; +} + + +nxt_int_t +nxt_python_asgi_init(nxt_task_t *task, nxt_unit_init_t *init) +{ + PyObject *asyncio, *loop, *get_event_loop; + nxt_int_t rc; + + nxt_debug(task, "asgi_init"); + + if (nxt_slow_path(nxt_py_asgi_str_init() != NXT_OK)) { + nxt_alert(task, "Python failed to init string objects"); + return NXT_ERROR; + } + + asyncio = PyImport_ImportModule("asyncio"); + if (nxt_slow_path(asyncio == NULL)) { + nxt_alert(task, "Python failed to import module 'asyncio'"); + nxt_python_print_exception(); + return NXT_ERROR; + } + + loop = NULL; + get_event_loop = PyDict_GetItemString(PyModule_GetDict(asyncio), + "get_event_loop"); + if (nxt_slow_path(get_event_loop == NULL)) { + nxt_alert(task, + "Python failed to get 'get_event_loop' from module 'asyncio'"); + goto fail; + } + + if (nxt_slow_path(PyCallable_Check(get_event_loop) == 0)) { + nxt_alert(task, "'asyncio.get_event_loop' is not a callable object"); + goto fail; + } + + loop = PyObject_CallObject(get_event_loop, NULL); + if (nxt_slow_path(loop == NULL)) { + nxt_alert(task, "Python failed to call 'asyncio.get_event_loop'"); + goto fail; + } + + nxt_py_loop_create_task = PyObject_GetAttrString(loop, "create_task"); + if (nxt_slow_path(nxt_py_loop_create_task == NULL)) { + nxt_alert(task, "Python failed to get 'loop.create_task'"); + goto fail; + } + + if (nxt_slow_path(PyCallable_Check(nxt_py_loop_create_task) == 0)) { + nxt_alert(task, "'loop.create_task' is not a callable object"); + goto fail; + } + + nxt_py_loop_add_reader = PyObject_GetAttrString(loop, "add_reader"); + if (nxt_slow_path(nxt_py_loop_add_reader == NULL)) { + nxt_alert(task, "Python failed to get 'loop.add_reader'"); + goto fail; + } + + if (nxt_slow_path(PyCallable_Check(nxt_py_loop_add_reader) == 0)) { + nxt_alert(task, "'loop.add_reader' is not a callable object"); + goto fail; + } + + nxt_py_loop_remove_reader = PyObject_GetAttrString(loop, "remove_reader"); + if (nxt_slow_path(nxt_py_loop_remove_reader == NULL)) { + nxt_alert(task, "Python failed to get 'loop.remove_reader'"); + goto fail; + } + + if (nxt_slow_path(PyCallable_Check(nxt_py_loop_remove_reader) == 0)) { + nxt_alert(task, "'loop.remove_reader' is not a callable object"); + goto fail; + } + + nxt_py_loop_call_soon = PyObject_GetAttrString(loop, "call_soon"); + if (nxt_slow_path(nxt_py_loop_call_soon == NULL)) { + nxt_alert(task, "Python failed to get 'loop.call_soon'"); + goto fail; + } + + if (nxt_slow_path(PyCallable_Check(nxt_py_loop_call_soon) == 0)) { + nxt_alert(task, "'loop.call_soon' is not a callable object"); + goto fail; + } + + nxt_py_loop_run_until_complete = PyObject_GetAttrString(loop, + "run_until_complete"); + if (nxt_slow_path(nxt_py_loop_run_until_complete == NULL)) { + nxt_alert(task, "Python failed to get 'loop.run_until_complete'"); + goto fail; + } + + if (nxt_slow_path(PyCallable_Check(nxt_py_loop_run_until_complete) == 0)) { + nxt_alert(task, "'loop.run_until_complete' is not a callable object"); + goto fail; + } + + nxt_py_loop_create_future = PyObject_GetAttrString(loop, "create_future"); + if (nxt_slow_path(nxt_py_loop_create_future == NULL)) { + nxt_alert(task, "Python failed to get 'loop.create_future'"); + goto fail; + } + + if (nxt_slow_path(PyCallable_Check(nxt_py_loop_create_future) == 0)) { + nxt_alert(task, "'loop.create_future' is not a callable object"); + goto fail; + } + + nxt_py_quit_future = PyObject_CallObject(nxt_py_loop_create_future, NULL); + if (nxt_slow_path(nxt_py_quit_future == NULL)) { + nxt_alert(task, "Python failed to create Future "); + nxt_python_print_exception(); + goto fail; + } + + nxt_py_quit_future_set_result = PyObject_GetAttrString(nxt_py_quit_future, + "set_result"); + if (nxt_slow_path(nxt_py_quit_future_set_result == NULL)) { + nxt_alert(task, "Python failed to get 'future.set_result'"); + goto fail; + } + + if (nxt_slow_path(PyCallable_Check(nxt_py_quit_future_set_result) == 0)) { + nxt_alert(task, "'future.set_result' is not a callable object"); + goto fail; + } + + nxt_py_port_read = PyCFunction_New(&nxt_py_port_read_method, NULL); + if (nxt_slow_path(nxt_py_port_read == NULL)) { + nxt_alert(task, "Python failed to initialize the 'port_read' function"); + goto fail; + } + + nxt_queue_init(&nxt_py_asgi_drain_queue); + + if (nxt_slow_path(nxt_py_asgi_http_init(task) == NXT_ERROR)) { + goto fail; + } + + if (nxt_slow_path(nxt_py_asgi_websocket_init(task) == NXT_ERROR)) { + goto fail; + } + + rc = nxt_py_asgi_lifespan_startup(task); + if (nxt_slow_path(rc == NXT_ERROR)) { + goto fail; + } + + init->callbacks.request_handler = nxt_py_asgi_request_handler; + init->callbacks.data_handler = nxt_py_asgi_http_data_handler; + init->callbacks.websocket_handler = nxt_py_asgi_websocket_handler; + init->callbacks.close_handler = nxt_py_asgi_websocket_close_handler; + init->callbacks.quit = nxt_py_asgi_quit; + init->callbacks.shm_ack_handler = nxt_py_asgi_shm_ack_handler; + init->callbacks.add_port = nxt_py_asgi_add_port; + init->callbacks.remove_port = nxt_py_asgi_remove_port; + + Py_DECREF(loop); + Py_DECREF(asyncio); + + return NXT_OK; + +fail: + + Py_XDECREF(loop); + Py_DECREF(asyncio); + + return NXT_ERROR; +} + + +nxt_int_t +nxt_python_asgi_run(nxt_unit_ctx_t *ctx) +{ + PyObject *res; + + res = PyObject_CallFunctionObjArgs(nxt_py_loop_run_until_complete, + nxt_py_quit_future, NULL); + if (nxt_slow_path(res == NULL)) { + nxt_unit_alert(ctx, "Python failed to call loop.run_until_complete"); + nxt_python_print_exception(); + + return NXT_ERROR; + } + + Py_DECREF(res); + + nxt_py_asgi_lifespan_shutdown(); + + return NXT_OK; +} + + +static void +nxt_py_asgi_request_handler(nxt_unit_request_info_t *req) +{ + PyObject *scope, *res, *task, *receive, *send, *done, *asgi; + + if (req->request->websocket_handshake) { + asgi = nxt_py_asgi_websocket_create(req); + + } else { + asgi = nxt_py_asgi_http_create(req); + } + + if (nxt_slow_path(asgi == NULL)) { + nxt_unit_req_alert(req, "Python failed to create asgi object"); + nxt_unit_request_done(req, NXT_UNIT_ERROR); + + return; + } + + receive = PyObject_GetAttrString(asgi, "receive"); + if (nxt_slow_path(receive == NULL)) { + nxt_unit_req_alert(req, "Python failed to get 'receive' method"); + nxt_unit_request_done(req, NXT_UNIT_ERROR); + + goto release_asgi; + } + + send = PyObject_GetAttrString(asgi, "send"); + if (nxt_slow_path(receive == NULL)) { + nxt_unit_req_alert(req, "Python failed to get 'send' method"); + nxt_unit_request_done(req, NXT_UNIT_ERROR); + + goto release_receive; + } + + done = PyObject_GetAttrString(asgi, "_done"); + if (nxt_slow_path(receive == NULL)) { + nxt_unit_req_alert(req, "Python failed to get '_done' method"); + nxt_unit_request_done(req, NXT_UNIT_ERROR); + + goto release_send; + } + + scope = nxt_py_asgi_create_http_scope(req); + if (nxt_slow_path(scope == NULL)) { + nxt_unit_request_done(req, NXT_UNIT_ERROR); + + goto release_done; + } + + req->data = asgi; + + res = PyObject_CallFunctionObjArgs(nxt_py_application, + scope, receive, send, NULL); + if (nxt_slow_path(res == NULL)) { + nxt_unit_req_error(req, "Python failed to call the application"); + nxt_python_print_exception(); + nxt_unit_request_done(req, NXT_UNIT_ERROR); + + goto release_scope; + } + + if (nxt_slow_path(!PyCoro_CheckExact(res))) { + nxt_unit_req_error(req, "Application result type is not a coroutine"); + nxt_unit_request_done(req, NXT_UNIT_ERROR); + + Py_DECREF(res); + + goto release_scope; + } + + task = PyObject_CallFunctionObjArgs(nxt_py_loop_create_task, res, NULL); + if (nxt_slow_path(task == NULL)) { + nxt_unit_req_error(req, "Python failed to call the create_task"); + nxt_python_print_exception(); + nxt_unit_request_done(req, NXT_UNIT_ERROR); + + Py_DECREF(res); + + goto release_scope; + } + + Py_DECREF(res); + + res = PyObject_CallMethodObjArgs(task, nxt_py_add_done_callback_str, done, + NULL); + if (nxt_slow_path(res == NULL)) { + nxt_unit_req_error(req, + "Python failed to call 'task.add_done_callback'"); + nxt_python_print_exception(); + nxt_unit_request_done(req, NXT_UNIT_ERROR); + + goto release_task; + } + + Py_DECREF(res); +release_task: + Py_DECREF(task); +release_scope: + Py_DECREF(scope); +release_done: + Py_DECREF(done); +release_send: + Py_DECREF(send); +release_receive: + Py_DECREF(receive); +release_asgi: + Py_DECREF(asgi); +} + + +static PyObject * +nxt_py_asgi_create_http_scope(nxt_unit_request_info_t *req) +{ + char *p, *target, *query; + uint32_t target_length, i; + PyObject *scope, *v, *type, *scheme; + PyObject *headers, *header; + nxt_unit_field_t *f; + nxt_unit_request_t *r; + + static const nxt_str_t ws_protocol = nxt_string("sec-websocket-protocol"); + +#define SET_ITEM(dict, key, value) \ + if (nxt_slow_path(PyDict_SetItem(dict, nxt_py_ ## key ## _str, value) \ + == -1)) \ + { \ + nxt_unit_req_alert(req, "Python failed to set '" \ + #dict "." #key "' item"); \ + goto fail; \ + } + + v = NULL; + headers = NULL; + + r = req->request; + + if (r->websocket_handshake) { + type = nxt_py_websocket_str; + scheme = r->tls ? nxt_py_wss_str : nxt_py_ws_str; + + } else { + type = nxt_py_http_str; + scheme = r->tls ? nxt_py_https_str : nxt_py_http_str; + } + + scope = nxt_py_asgi_new_scope(req, type, nxt_py_2_1_str); + if (nxt_slow_path(scope == NULL)) { + return NULL; + } + + p = nxt_unit_sptr_get(&r->version); + SET_ITEM(scope, http_version, p[7] == '1' ? nxt_py_1_1_str + : nxt_py_1_0_str) + SET_ITEM(scope, scheme, scheme) + + v = PyString_FromStringAndSize(nxt_unit_sptr_get(&r->method), + r->method_length); + if (nxt_slow_path(v == NULL)) { + nxt_unit_req_alert(req, "Python failed to create 'method' string"); + goto fail; + } + + SET_ITEM(scope, method, v) + Py_DECREF(v); + + v = PyUnicode_DecodeUTF8(nxt_unit_sptr_get(&r->path), r->path_length, + "replace"); + if (nxt_slow_path(v == NULL)) { + nxt_unit_req_alert(req, "Python failed to create 'path' string"); + goto fail; + } + + SET_ITEM(scope, path, v) + Py_DECREF(v); + + target = nxt_unit_sptr_get(&r->target); + query = nxt_unit_sptr_get(&r->query); + + if (r->query.offset != 0) { + target_length = query - target - 1; + + } else { + target_length = r->target_length; + } + + v = PyBytes_FromStringAndSize(target, target_length); + if (nxt_slow_path(v == NULL)) { + nxt_unit_req_alert(req, "Python failed to create 'raw_path' string"); + goto fail; + } + + SET_ITEM(scope, raw_path, v) + Py_DECREF(v); + + v = PyBytes_FromStringAndSize(query, r->query_length); + if (nxt_slow_path(v == NULL)) { + nxt_unit_req_alert(req, "Python failed to create 'query' string"); + goto fail; + } + + SET_ITEM(scope, query_string, v) + Py_DECREF(v); + + v = nxt_py_asgi_create_address(&r->remote, r->remote_length, 0); + if (nxt_slow_path(v == NULL)) { + nxt_unit_req_alert(req, "Python failed to create 'client' pair"); + goto fail; + } + + SET_ITEM(scope, client, v) + Py_DECREF(v); + + v = nxt_py_asgi_create_address(&r->local, r->local_length, 80); + if (nxt_slow_path(v == NULL)) { + nxt_unit_req_alert(req, "Python failed to create 'server' pair"); + goto fail; + } + + SET_ITEM(scope, server, v) + Py_DECREF(v); + + v = NULL; + + headers = PyTuple_New(r->fields_count); + if (nxt_slow_path(headers == NULL)) { + nxt_unit_req_alert(req, "Python failed to create 'headers' object"); + goto fail; + } + + for (i = 0; i < r->fields_count; i++) { + f = r->fields + i; + + header = nxt_py_asgi_create_header(f); + if (nxt_slow_path(header == NULL)) { + nxt_unit_req_alert(req, "Python failed to create 'header' pair"); + goto fail; + } + + PyTuple_SET_ITEM(headers, i, header); + + if (f->hash == NXT_UNIT_HASH_WS_PROTOCOL + && f->name_length == ws_protocol.length + && f->value_length > 0 + && r->websocket_handshake) + { + v = nxt_py_asgi_create_subprotocols(f); + if (nxt_slow_path(v == NULL)) { + nxt_unit_req_alert(req, "Failed to create subprotocols"); + goto fail; + } + + SET_ITEM(scope, subprotocols, v); + Py_DECREF(v); + } + } + + SET_ITEM(scope, headers, headers) + Py_DECREF(headers); + + return scope; + +fail: + + Py_XDECREF(v); + Py_XDECREF(headers); + Py_DECREF(scope); + + return NULL; + +#undef SET_ITEM +} + + +static PyObject * +nxt_py_asgi_create_address(nxt_unit_sptr_t *sptr, uint8_t len, uint16_t port) +{ + char *p, *s; + PyObject *pair, *v; + + pair = PyTuple_New(2); + if (nxt_slow_path(pair == NULL)) { + return NULL; + } + + p = nxt_unit_sptr_get(sptr); + s = memchr(p, ':', len); + + v = PyString_FromStringAndSize(p, s == NULL ? len : s - p); + if (nxt_slow_path(v == NULL)) { + Py_DECREF(pair); + + return NULL; + } + + PyTuple_SET_ITEM(pair, 0, v); + + if (s != NULL) { + p += len; + v = PyLong_FromString(s + 1, &p, 10); + + } else { + v = PyLong_FromLong(port); + } + + if (nxt_slow_path(v == NULL)) { + Py_DECREF(pair); + + return NULL; + } + + PyTuple_SET_ITEM(pair, 1, v); + + return pair; +} + + +static PyObject * +nxt_py_asgi_create_header(nxt_unit_field_t *f) +{ + char c, *name; + uint8_t pos; + PyObject *header, *v; + + header = PyTuple_New(2); + if (nxt_slow_path(header == NULL)) { + return NULL; + } + + name = nxt_unit_sptr_get(&f->name); + + for (pos = 0; pos < f->name_length; pos++) { + c = name[pos]; + if (c >= 'A' && c <= 'Z') { + name[pos] = (c | 0x20); + } + } + + v = PyBytes_FromStringAndSize(name, f->name_length); + if (nxt_slow_path(v == NULL)) { + Py_DECREF(header); + + return NULL; + } + + PyTuple_SET_ITEM(header, 0, v); + + v = PyBytes_FromStringAndSize(nxt_unit_sptr_get(&f->value), + f->value_length); + if (nxt_slow_path(v == NULL)) { + Py_DECREF(header); + + return NULL; + } + + PyTuple_SET_ITEM(header, 1, v); + + return header; +} + + +static PyObject * +nxt_py_asgi_create_subprotocols(nxt_unit_field_t *f) +{ + char *v; + uint32_t i, n, start; + PyObject *res, *proto; + + v = nxt_unit_sptr_get(&f->value); + n = 1; + + for (i = 0; i < f->value_length; i++) { + if (v[i] == ',') { + n++; + } + } + + res = PyTuple_New(n); + if (nxt_slow_path(res == NULL)) { + return NULL; + } + + n = 0; + start = 0; + + for (i = 0; i < f->value_length; ) { + if (v[i] != ',') { + i++; + + continue; + } + + if (i - start > 0) { + proto = PyString_FromStringAndSize(v + start, i - start); + if (nxt_slow_path(proto == NULL)) { + goto fail; + } + + PyTuple_SET_ITEM(res, n, proto); + + n++; + } + + do { + i++; + } while (i < f->value_length && v[i] == ' '); + + start = i; + } + + if (i - start > 0) { + proto = PyString_FromStringAndSize(v + start, i - start); + if (nxt_slow_path(proto == NULL)) { + goto fail; + } + + PyTuple_SET_ITEM(res, n, proto); + } + + return res; + +fail: + + Py_DECREF(res); + + return NULL; +} + + +static int +nxt_py_asgi_add_port(nxt_unit_ctx_t *ctx, nxt_unit_port_t *port) +{ + int nb; + PyObject *res; + + if (port->in_fd == -1) { + return NXT_UNIT_OK; + } + + nb = 1; + + if (nxt_slow_path(ioctl(port->in_fd, FIONBIO, &nb) == -1)) { + nxt_unit_alert(ctx, "ioctl(%d, FIONBIO, 0) failed: %s (%d)", + port->in_fd, strerror(errno), errno); + + return NXT_UNIT_ERROR; + } + + nxt_unit_debug(ctx, "asgi_add_port %d %p %p", port->in_fd, ctx, port); + + res = PyObject_CallFunctionObjArgs(nxt_py_loop_add_reader, + PyLong_FromLong(port->in_fd), + nxt_py_port_read, + PyLong_FromVoidPtr(ctx), + PyLong_FromVoidPtr(port), NULL); + if (nxt_slow_path(res == NULL)) { + nxt_unit_alert(ctx, "Python failed to add_reader"); + + return NXT_UNIT_ERROR; + } + + Py_DECREF(res); + + return NXT_UNIT_OK; +} + + +static void +nxt_py_asgi_remove_port(nxt_unit_t *lib, nxt_unit_port_t *port) +{ + PyObject *res; + + nxt_unit_debug(NULL, "asgi_remove_port %d %p", port->in_fd, port); + + if (port->in_fd == -1) { + return; + } + + res = PyObject_CallFunctionObjArgs(nxt_py_loop_remove_reader, + PyLong_FromLong(port->in_fd), NULL); + if (nxt_slow_path(res == NULL)) { + nxt_unit_alert(NULL, "Python failed to remove_reader"); + } + + Py_DECREF(res); +} + + +static void +nxt_py_asgi_quit(nxt_unit_ctx_t *ctx) +{ + PyObject *res; + + nxt_unit_debug(ctx, "asgi_quit %p", ctx); + + res = PyObject_CallFunctionObjArgs(nxt_py_quit_future_set_result, + PyLong_FromLong(0), NULL); + if (nxt_slow_path(res == NULL)) { + nxt_unit_alert(ctx, "Python failed to set_result"); + } + + Py_DECREF(res); +} + + +static void +nxt_py_asgi_shm_ack_handler(nxt_unit_ctx_t *ctx) +{ + int rc; + nxt_queue_link_t *lnk; + + while (!nxt_queue_is_empty(&nxt_py_asgi_drain_queue)) { + lnk = nxt_queue_first(&nxt_py_asgi_drain_queue); + + rc = nxt_py_asgi_http_drain(lnk); + if (rc == NXT_UNIT_AGAIN) { + break; + } + + nxt_queue_remove(lnk); + } +} + + +static PyObject * +nxt_py_asgi_port_read(PyObject *self, PyObject *args) +{ + int rc; + PyObject *arg; + Py_ssize_t n; + nxt_unit_ctx_t *ctx; + nxt_unit_port_t *port; + + n = PyTuple_GET_SIZE(args); + + if (n != 2) { + nxt_unit_alert(NULL, + "nxt_py_asgi_port_read: invalid number of arguments %d", + (int) n); + + return PyErr_Format(PyExc_TypeError, "invalid number of arguments"); + } + + arg = PyTuple_GET_ITEM(args, 0); + if (nxt_slow_path(arg == NULL || PyLong_Check(arg) == 0)) { + return PyErr_Format(PyExc_TypeError, + "the first argument is not a long"); + } + + ctx = PyLong_AsVoidPtr(arg); + + arg = PyTuple_GET_ITEM(args, 1); + if (nxt_slow_path(arg == NULL || PyLong_Check(arg) == 0)) { + return PyErr_Format(PyExc_TypeError, + "the second argument is not a long"); + } + + port = PyLong_AsVoidPtr(arg); + + nxt_unit_debug(ctx, "asgi_port_read %p %p", ctx, port); + + rc = nxt_unit_process_port_msg(ctx, port); + + if (nxt_slow_path(rc == NXT_UNIT_ERROR)) { + return PyErr_Format(PyExc_RuntimeError, + "error processing port message"); + } + + Py_RETURN_NONE; +} + + +PyObject * +nxt_py_asgi_enum_headers(PyObject *headers, nxt_py_asgi_enum_header_cb cb, + void *data) +{ + int i; + PyObject *iter, *header, *h_iter, *name, *val, *res; + + iter = PyObject_GetIter(headers); + if (nxt_slow_path(iter == NULL)) { + return PyErr_Format(PyExc_TypeError, "'headers' is not an iterable"); + } + + for (i = 0; /* void */; i++) { + header = PyIter_Next(iter); + if (header == NULL) { + break; + } + + h_iter = PyObject_GetIter(header); + if (nxt_slow_path(h_iter == NULL)) { + Py_DECREF(header); + Py_DECREF(iter); + + return PyErr_Format(PyExc_TypeError, + "'headers' item #%d is not an iterable", i); + } + + name = PyIter_Next(h_iter); + if (nxt_slow_path(name == NULL || !PyBytes_Check(name))) { + Py_XDECREF(name); + Py_DECREF(h_iter); + Py_DECREF(header); + Py_DECREF(iter); + + return PyErr_Format(PyExc_TypeError, + "'headers' item #%d 'name' is not a byte string", i); + } + + val = PyIter_Next(h_iter); + if (nxt_slow_path(val == NULL || !PyBytes_Check(val))) { + Py_XDECREF(val); + Py_DECREF(h_iter); + Py_DECREF(header); + Py_DECREF(iter); + + return PyErr_Format(PyExc_TypeError, + "'headers' item #%d 'value' is not a byte string", i); + } + + res = cb(data, i, name, val); + + Py_DECREF(name); + Py_DECREF(val); + Py_DECREF(h_iter); + Py_DECREF(header); + + if (nxt_slow_path(res == NULL)) { + Py_DECREF(iter); + + return NULL; + } + + Py_DECREF(res); + } + + Py_DECREF(iter); + + Py_RETURN_NONE; +} + + +PyObject * +nxt_py_asgi_calc_size(void *data, int i, PyObject *name, PyObject *val) +{ + nxt_py_asgi_calc_size_ctx_t *ctx; + + ctx = data; + + ctx->fields_count++; + ctx->fields_size += PyBytes_GET_SIZE(name) + PyBytes_GET_SIZE(val); + + Py_RETURN_NONE; +} + + +PyObject * +nxt_py_asgi_add_field(void *data, int i, PyObject *name, PyObject *val) +{ + int rc; + char *name_str, *val_str; + uint32_t name_len, val_len; + nxt_off_t content_length; + nxt_unit_request_info_t *req; + nxt_py_asgi_add_field_ctx_t *ctx; + + name_str = PyBytes_AS_STRING(name); + name_len = PyBytes_GET_SIZE(name); + + val_str = PyBytes_AS_STRING(val); + val_len = PyBytes_GET_SIZE(val); + + ctx = data; + req = ctx->req; + + rc = nxt_unit_response_add_field(req, name_str, name_len, + val_str, val_len); + if (nxt_slow_path(rc != NXT_UNIT_OK)) { + return PyErr_Format(PyExc_RuntimeError, + "failed to add header #%d", i); + } + + if (req->response->fields[i].hash == NXT_UNIT_HASH_CONTENT_LENGTH) { + content_length = nxt_off_t_parse((u_char *) val_str, val_len); + if (nxt_slow_path(content_length < 0)) { + nxt_unit_req_error(req, "failed to parse Content-Length " + "value %.*s", (int) val_len, val_str); + + return PyErr_Format(PyExc_ValueError, + "Failed to parse Content-Length: '%.*s'", + (int) val_len, val_str); + } + + ctx->content_length = content_length; + } + + Py_RETURN_NONE; +} + + +PyObject * +nxt_py_asgi_set_result_soon(nxt_unit_request_info_t *req, PyObject *future, + PyObject *result) +{ + PyObject *set_result, *res; + + if (nxt_slow_path(result == NULL)) { + Py_DECREF(future); + + return NULL; + } + + set_result = PyObject_GetAttrString(future, "set_result"); + if (nxt_slow_path(set_result == NULL)) { + nxt_unit_req_alert(req, "failed to get 'set_result' for future"); + + Py_CLEAR(future); + + goto cleanup; + } + + if (nxt_slow_path(PyCallable_Check(set_result) == 0)) { + nxt_unit_req_alert(req, "'future.set_result' is not a callable"); + + Py_CLEAR(future); + + goto cleanup; + } + + res = PyObject_CallFunctionObjArgs(nxt_py_loop_call_soon, set_result, + result, NULL); + if (nxt_slow_path(res == NULL)) { + nxt_unit_req_alert(req, "Python failed to call 'loop.call_soon'"); + nxt_python_print_exception(); + + Py_CLEAR(future); + } + + Py_XDECREF(res); + +cleanup: + + Py_DECREF(set_result); + Py_DECREF(result); + + return future; +} + + +PyObject * +nxt_py_asgi_new_msg(nxt_unit_request_info_t *req, PyObject *type) +{ + PyObject *msg; + + msg = PyDict_New(); + if (nxt_slow_path(msg == NULL)) { + nxt_unit_req_alert(req, "Python failed to create message dict"); + nxt_python_print_exception(); + + return PyErr_Format(PyExc_RuntimeError, + "failed to create message dict"); + } + + if (nxt_slow_path(PyDict_SetItem(msg, nxt_py_type_str, type) == -1)) { + nxt_unit_req_alert(req, "Python failed to set 'msg.type' item"); + + Py_DECREF(msg); + + return PyErr_Format(PyExc_RuntimeError, + "failed to set 'msg.type' item"); + } + + return msg; +} + + +PyObject * +nxt_py_asgi_new_scope(nxt_unit_request_info_t *req, PyObject *type, + PyObject *spec_version) +{ + PyObject *scope, *asgi; + + scope = PyDict_New(); + if (nxt_slow_path(scope == NULL)) { + nxt_unit_req_alert(req, "Python failed to create 'scope' dict"); + nxt_python_print_exception(); + + return PyErr_Format(PyExc_RuntimeError, + "failed to create 'scope' dict"); + } + + if (nxt_slow_path(PyDict_SetItem(scope, nxt_py_type_str, type) == -1)) { + nxt_unit_req_alert(req, "Python failed to set 'scope.type' item"); + + Py_DECREF(scope); + + return PyErr_Format(PyExc_RuntimeError, + "failed to set 'scope.type' item"); + } + + asgi = PyDict_New(); + if (nxt_slow_path(asgi == NULL)) { + nxt_unit_req_alert(req, "Python failed to create 'asgi' dict"); + nxt_python_print_exception(); + + Py_DECREF(scope); + + return PyErr_Format(PyExc_RuntimeError, + "failed to create 'asgi' dict"); + } + + if (nxt_slow_path(PyDict_SetItem(scope, nxt_py_asgi_str, asgi) == -1)) { + nxt_unit_req_alert(req, "Python failed to set 'scope.asgi' item"); + + Py_DECREF(asgi); + Py_DECREF(scope); + + return PyErr_Format(PyExc_RuntimeError, + "failed to set 'scope.asgi' item"); + } + + if (nxt_slow_path(PyDict_SetItem(asgi, nxt_py_version_str, + nxt_py_3_0_str) == -1)) + { + nxt_unit_req_alert(req, "Python failed to set 'asgi.version' item"); + + Py_DECREF(asgi); + Py_DECREF(scope); + + return PyErr_Format(PyExc_RuntimeError, + "failed to set 'asgi.version' item"); + } + + if (nxt_slow_path(PyDict_SetItem(asgi, nxt_py_spec_version_str, + spec_version) == -1)) + { + nxt_unit_req_alert(req, + "Python failed to set 'asgi.spec_version' item"); + + Py_DECREF(asgi); + Py_DECREF(scope); + + return PyErr_Format(PyExc_RuntimeError, + "failed to set 'asgi.spec_version' item"); + } + + Py_DECREF(asgi); + + return scope; +} + + +void +nxt_py_asgi_dealloc(PyObject *self) +{ + PyObject_Del(self); +} + + +PyObject * +nxt_py_asgi_await(PyObject *self) +{ + Py_INCREF(self); + return self; +} + + +PyObject * +nxt_py_asgi_iter(PyObject *self) +{ + Py_INCREF(self); + return self; +} + + +PyObject * +nxt_py_asgi_next(PyObject *self) +{ + return NULL; +} + + +void +nxt_python_asgi_done(void) +{ + nxt_py_asgi_str_done(); + + Py_XDECREF(nxt_py_quit_future); + Py_XDECREF(nxt_py_quit_future_set_result); + Py_XDECREF(nxt_py_loop_run_until_complete); + Py_XDECREF(nxt_py_loop_create_future); + Py_XDECREF(nxt_py_loop_create_task); + Py_XDECREF(nxt_py_loop_call_soon); + Py_XDECREF(nxt_py_loop_add_reader); + Py_XDECREF(nxt_py_loop_remove_reader); + Py_XDECREF(nxt_py_port_read); +} + +#else /* !(NXT_HAVE_ASGI) */ + + +int +nxt_python_asgi_check(PyObject *obj) +{ + return 0; +} + + +nxt_int_t +nxt_python_asgi_init(nxt_task_t *task, nxt_unit_init_t *init) +{ + nxt_alert(task, "ASGI not implemented"); + return NXT_ERROR; +} + + +nxt_int_t +nxt_python_asgi_run(nxt_unit_ctx_t *ctx) +{ + nxt_unit_alert(ctx, "ASGI not implemented"); + return NXT_ERROR; +} + + +void +nxt_python_asgi_done(void) +{ +} + +#endif /* NXT_HAVE_ASGI */ diff --git a/src/python/nxt_python_asgi.h b/src/python/nxt_python_asgi.h new file mode 100644 index 00000000..24337c37 --- /dev/null +++ b/src/python/nxt_python_asgi.h @@ -0,0 +1,60 @@ + +/* + * Copyright (C) NGINX, Inc. + */ + +#ifndef _NXT_PYTHON_ASGI_H_INCLUDED_ +#define _NXT_PYTHON_ASGI_H_INCLUDED_ + + +typedef PyObject * (*nxt_py_asgi_enum_header_cb)(void *ctx, int i, + PyObject *name, PyObject *val); + +typedef struct { + uint32_t fields_count; + uint32_t fields_size; +} nxt_py_asgi_calc_size_ctx_t; + +typedef struct { + nxt_unit_request_info_t *req; + uint64_t content_length; +} nxt_py_asgi_add_field_ctx_t; + +PyObject *nxt_py_asgi_enum_headers(PyObject *headers, + nxt_py_asgi_enum_header_cb cb, void *data); + +PyObject *nxt_py_asgi_calc_size(void *data, int i, PyObject *n, PyObject *v); +PyObject *nxt_py_asgi_add_field(void *data, int i, PyObject *n, PyObject *v); + +PyObject *nxt_py_asgi_set_result_soon(nxt_unit_request_info_t *req, + PyObject *future, PyObject *result); +PyObject *nxt_py_asgi_new_msg(nxt_unit_request_info_t *req, PyObject *type); +PyObject *nxt_py_asgi_new_scope(nxt_unit_request_info_t *req, PyObject *type, + PyObject *spec_version); + +void nxt_py_asgi_dealloc(PyObject *self); +PyObject *nxt_py_asgi_await(PyObject *self); +PyObject *nxt_py_asgi_iter(PyObject *self); +PyObject *nxt_py_asgi_next(PyObject *self); + +nxt_int_t nxt_py_asgi_http_init(nxt_task_t *task); +PyObject *nxt_py_asgi_http_create(nxt_unit_request_info_t *req); +void nxt_py_asgi_http_data_handler(nxt_unit_request_info_t *req); +int nxt_py_asgi_http_drain(nxt_queue_link_t *lnk); + +nxt_int_t nxt_py_asgi_websocket_init(nxt_task_t *task); +PyObject *nxt_py_asgi_websocket_create(nxt_unit_request_info_t *req); +void nxt_py_asgi_websocket_handler(nxt_unit_websocket_frame_t *ws); +void nxt_py_asgi_websocket_close_handler(nxt_unit_request_info_t *req); + +nxt_int_t nxt_py_asgi_lifespan_startup(nxt_task_t *task); +nxt_int_t nxt_py_asgi_lifespan_shutdown(void); + +extern PyObject *nxt_py_loop_run_until_complete; +extern PyObject *nxt_py_loop_create_future; +extern PyObject *nxt_py_loop_create_task; + +extern nxt_queue_t nxt_py_asgi_drain_queue; + + +#endif /* _NXT_PYTHON_ASGI_H_INCLUDED_ */ diff --git a/src/python/nxt_python_asgi_http.c b/src/python/nxt_python_asgi_http.c new file mode 100644 index 00000000..b07d61d6 --- /dev/null +++ b/src/python/nxt_python_asgi_http.c @@ -0,0 +1,591 @@ + +/* + * Copyright (C) NGINX, Inc. + */ + + +#include <python/nxt_python.h> + +#if (NXT_HAVE_ASGI) + +#include <nxt_main.h> +#include <nxt_unit.h> +#include <nxt_unit_request.h> +#include <python/nxt_python_asgi.h> +#include <python/nxt_python_asgi_str.h> + + +typedef struct { + PyObject_HEAD + nxt_unit_request_info_t *req; + nxt_queue_link_t link; + PyObject *receive_future; + PyObject *send_future; + uint64_t content_length; + uint64_t bytes_sent; + int complete; + PyObject *send_body; + Py_ssize_t send_body_off; +} nxt_py_asgi_http_t; + + +static PyObject *nxt_py_asgi_http_receive(PyObject *self, PyObject *none); +static PyObject *nxt_py_asgi_http_read_msg(nxt_py_asgi_http_t *http); +static PyObject *nxt_py_asgi_http_send(PyObject *self, PyObject *dict); +static PyObject *nxt_py_asgi_http_response_start(nxt_py_asgi_http_t *http, + PyObject *dict); +static PyObject *nxt_py_asgi_http_response_body(nxt_py_asgi_http_t *http, + PyObject *dict); +static PyObject *nxt_py_asgi_http_done(PyObject *self, PyObject *future); + + +static PyMethodDef nxt_py_asgi_http_methods[] = { + { "receive", nxt_py_asgi_http_receive, METH_NOARGS, 0 }, + { "send", nxt_py_asgi_http_send, METH_O, 0 }, + { "_done", nxt_py_asgi_http_done, METH_O, 0 }, + { NULL, NULL, 0, 0 } +}; + +static PyAsyncMethods nxt_py_asgi_async_methods = { + .am_await = nxt_py_asgi_await, +}; + +static PyTypeObject nxt_py_asgi_http_type = { + PyVarObject_HEAD_INIT(NULL, 0) + + .tp_name = "unit._asgi_http", + .tp_basicsize = sizeof(nxt_py_asgi_http_t), + .tp_dealloc = nxt_py_asgi_dealloc, + .tp_as_async = &nxt_py_asgi_async_methods, + .tp_flags = Py_TPFLAGS_DEFAULT, + .tp_doc = "unit ASGI HTTP request object", + .tp_iter = nxt_py_asgi_iter, + .tp_iternext = nxt_py_asgi_next, + .tp_methods = nxt_py_asgi_http_methods, +}; + +static Py_ssize_t nxt_py_asgi_http_body_buf_size = 32 * 1024 * 1024; + + +nxt_int_t +nxt_py_asgi_http_init(nxt_task_t *task) +{ + if (nxt_slow_path(PyType_Ready(&nxt_py_asgi_http_type) != 0)) { + nxt_alert(task, "Python failed to initialize the 'http' type object"); + return NXT_ERROR; + } + + return NXT_OK; +} + + +PyObject * +nxt_py_asgi_http_create(nxt_unit_request_info_t *req) +{ + nxt_py_asgi_http_t *http; + + http = PyObject_New(nxt_py_asgi_http_t, &nxt_py_asgi_http_type); + + if (nxt_fast_path(http != NULL)) { + http->req = req; + http->receive_future = NULL; + http->send_future = NULL; + http->content_length = -1; + http->bytes_sent = 0; + http->complete = 0; + http->send_body = NULL; + http->send_body_off = 0; + } + + return (PyObject *) http; +} + + +static PyObject * +nxt_py_asgi_http_receive(PyObject *self, PyObject *none) +{ + PyObject *msg, *future; + nxt_py_asgi_http_t *http; + nxt_unit_request_info_t *req; + + http = (nxt_py_asgi_http_t *) self; + req = http->req; + + nxt_unit_req_debug(req, "asgi_http_receive"); + + msg = nxt_py_asgi_http_read_msg(http); + if (nxt_slow_path(msg == NULL)) { + return NULL; + } + + future = PyObject_CallObject(nxt_py_loop_create_future, NULL); + if (nxt_slow_path(future == NULL)) { + nxt_unit_req_alert(req, "Python failed to create Future object"); + nxt_python_print_exception(); + + Py_DECREF(msg); + + return PyErr_Format(PyExc_RuntimeError, + "failed to create Future object"); + } + + if (msg != Py_None) { + return nxt_py_asgi_set_result_soon(req, future, msg); + } + + http->receive_future = future; + Py_INCREF(http->receive_future); + + Py_DECREF(msg); + + return future; +} + + +static PyObject * +nxt_py_asgi_http_read_msg(nxt_py_asgi_http_t *http) +{ + char *body_buf; + ssize_t read_res; + PyObject *msg, *body; + Py_ssize_t size; + nxt_unit_request_info_t *req; + + req = http->req; + + size = req->content_length; + + if (size > nxt_py_asgi_http_body_buf_size) { + size = nxt_py_asgi_http_body_buf_size; + } + + if (size > 0) { + body = PyBytes_FromStringAndSize(NULL, size); + if (nxt_slow_path(body == NULL)) { + nxt_unit_req_alert(req, "Python failed to create body byte string"); + nxt_python_print_exception(); + + return PyErr_Format(PyExc_RuntimeError, + "failed to create Bytes object"); + } + + body_buf = PyBytes_AS_STRING(body); + + read_res = nxt_unit_request_read(req, body_buf, size); + + } else { + body = NULL; + read_res = 0; + } + + if (read_res > 0 || read_res == size) { + msg = nxt_py_asgi_new_msg(req, nxt_py_http_request_str); + if (nxt_slow_path(msg == NULL)) { + Py_XDECREF(body); + + return NULL; + } + +#define SET_ITEM(dict, key, value) \ + if (nxt_slow_path(PyDict_SetItem(dict, nxt_py_ ## key ## _str, value) \ + == -1)) \ + { \ + nxt_unit_req_alert(req, \ + "Python failed to set '" #dict "." #key "' item"); \ + PyErr_SetString(PyExc_RuntimeError, \ + "Python failed to set '" #dict "." #key "' item"); \ + goto fail; \ + } + + if (body != NULL) { + SET_ITEM(msg, body, body) + } + + if (req->content_length > 0) { + SET_ITEM(msg, more_body, Py_True) + } + +#undef SET_ITEM + + Py_XDECREF(body); + + return msg; + } + + Py_XDECREF(body); + + Py_RETURN_NONE; + +fail: + + Py_DECREF(msg); + Py_XDECREF(body); + + return NULL; +} + + +static PyObject * +nxt_py_asgi_http_send(PyObject *self, PyObject *dict) +{ + PyObject *type; + const char *type_str; + Py_ssize_t type_len; + nxt_py_asgi_http_t *http; + + static const nxt_str_t response_start = nxt_string("http.response.start"); + static const nxt_str_t response_body = nxt_string("http.response.body"); + + http = (nxt_py_asgi_http_t *) self; + + type = PyDict_GetItem(dict, nxt_py_type_str); + if (nxt_slow_path(type == NULL || !PyUnicode_Check(type))) { + nxt_unit_req_error(http->req, "asgi_http_send: " + "'type' is not a unicode string"); + return PyErr_Format(PyExc_TypeError, "'type' is not a unicode string"); + } + + type_str = PyUnicode_AsUTF8AndSize(type, &type_len); + + nxt_unit_req_debug(http->req, "asgi_http_send type is '%.*s'", + (int) type_len, type_str); + + if (type_len == (Py_ssize_t) response_start.length + && memcmp(type_str, response_start.start, type_len) == 0) + { + return nxt_py_asgi_http_response_start(http, dict); + } + + if (type_len == (Py_ssize_t) response_body.length + && memcmp(type_str, response_body.start, type_len) == 0) + { + return nxt_py_asgi_http_response_body(http, dict); + } + + nxt_unit_req_error(http->req, "asgi_http_send: unexpected 'type': '%.*s'", + (int) type_len, type_str); + + return PyErr_Format(PyExc_AssertionError, "unexpected 'type': '%U'", type); +} + + +static PyObject * +nxt_py_asgi_http_response_start(nxt_py_asgi_http_t *http, PyObject *dict) +{ + int rc; + PyObject *status, *headers, *res; + nxt_py_asgi_calc_size_ctx_t calc_size_ctx; + nxt_py_asgi_add_field_ctx_t add_field_ctx; + + status = PyDict_GetItem(dict, nxt_py_status_str); + if (nxt_slow_path(status == NULL || !PyLong_Check(status))) { + nxt_unit_req_error(http->req, "asgi_http_response_start: " + "'status' is not an integer"); + return PyErr_Format(PyExc_TypeError, "'status' is not an integer"); + } + + calc_size_ctx.fields_size = 0; + calc_size_ctx.fields_count = 0; + + headers = PyDict_GetItem(dict, nxt_py_headers_str); + if (headers != NULL) { + res = nxt_py_asgi_enum_headers(headers, nxt_py_asgi_calc_size, + &calc_size_ctx); + if (nxt_slow_path(res == NULL)) { + return NULL; + } + + Py_DECREF(res); + } + + rc = nxt_unit_response_init(http->req, PyLong_AsLong(status), + calc_size_ctx.fields_count, + calc_size_ctx.fields_size); + if (nxt_slow_path(rc != NXT_UNIT_OK)) { + return PyErr_Format(PyExc_RuntimeError, + "failed to allocate response object"); + } + + add_field_ctx.req = http->req; + add_field_ctx.content_length = -1; + + if (headers != NULL) { + res = nxt_py_asgi_enum_headers(headers, nxt_py_asgi_add_field, + &add_field_ctx); + if (nxt_slow_path(res == NULL)) { + return NULL; + } + + Py_DECREF(res); + } + + http->content_length = add_field_ctx.content_length; + + Py_INCREF(http); + return (PyObject *) http; +} + + +static PyObject * +nxt_py_asgi_http_response_body(nxt_py_asgi_http_t *http, PyObject *dict) +{ + int rc; + char *body_str; + ssize_t sent; + PyObject *body, *more_body, *future; + Py_ssize_t body_len, body_off; + + body = PyDict_GetItem(dict, nxt_py_body_str); + if (nxt_slow_path(body != NULL && !PyBytes_Check(body))) { + return PyErr_Format(PyExc_TypeError, "'body' is not a byte string"); + } + + more_body = PyDict_GetItem(dict, nxt_py_more_body_str); + if (nxt_slow_path(more_body != NULL && !PyBool_Check(more_body))) { + return PyErr_Format(PyExc_TypeError, "'more_body' is not a bool"); + } + + if (nxt_slow_path(http->complete)) { + return PyErr_Format(PyExc_RuntimeError, + "Unexpected ASGI message 'http.response.body' " + "sent, after response already completed"); + } + + if (nxt_slow_path(http->send_future != NULL)) { + return PyErr_Format(PyExc_RuntimeError, "Concurrent send"); + } + + if (body != NULL) { + body_str = PyBytes_AS_STRING(body); + body_len = PyBytes_GET_SIZE(body); + + nxt_unit_req_debug(http->req, "asgi_http_response_body: %d, %d", + (int) body_len, (more_body == Py_True) ); + + if (nxt_slow_path(http->bytes_sent + body_len + > http->content_length)) + { + return PyErr_Format(PyExc_RuntimeError, + "Response content longer than Content-Length"); + } + + body_off = 0; + + while (body_len > 0) { + sent = nxt_unit_response_write_nb(http->req, body_str, body_len, 0); + if (nxt_slow_path(sent < 0)) { + return PyErr_Format(PyExc_RuntimeError, "failed to send body"); + } + + if (nxt_slow_path(sent == 0)) { + nxt_unit_req_debug(http->req, "asgi_http_response_body: " + "out of shared memory, %d", + (int) body_len); + + future = PyObject_CallObject(nxt_py_loop_create_future, NULL); + if (nxt_slow_path(future == NULL)) { + nxt_unit_req_alert(http->req, + "Python failed to create Future object"); + nxt_python_print_exception(); + + return PyErr_Format(PyExc_RuntimeError, + "failed to create Future object"); + } + + http->send_body = body; + Py_INCREF(http->send_body); + http->send_body_off = body_off; + + nxt_queue_insert_tail(&nxt_py_asgi_drain_queue, &http->link); + + http->send_future = future; + Py_INCREF(http->send_future); + + return future; + } + + body_str += sent; + body_len -= sent; + body_off += sent; + http->bytes_sent += sent; + } + + } else { + nxt_unit_req_debug(http->req, "asgi_http_response_body: 0, %d", + (more_body == Py_True) ); + + if (!nxt_unit_response_is_sent(http->req)) { + rc = nxt_unit_response_send(http->req); + if (nxt_slow_path(rc != NXT_UNIT_OK)) { + return PyErr_Format(PyExc_RuntimeError, + "failed to send response"); + } + } + } + + if (more_body == NULL || more_body == Py_False) { + http->complete = 1; + } + + Py_INCREF(http); + return (PyObject *) http; +} + + +void +nxt_py_asgi_http_data_handler(nxt_unit_request_info_t *req) +{ + PyObject *msg, *future, *res; + nxt_py_asgi_http_t *http; + + http = req->data; + + nxt_unit_req_debug(req, "asgi_http_data_handler"); + + if (http->receive_future == NULL) { + return; + } + + msg = nxt_py_asgi_http_read_msg(http); + if (nxt_slow_path(msg == NULL)) { + return; + } + + if (msg == Py_None) { + Py_DECREF(msg); + return; + } + + future = http->receive_future; + http->receive_future = NULL; + + res = PyObject_CallMethodObjArgs(future, nxt_py_set_result_str, msg, NULL); + if (nxt_slow_path(res == NULL)) { + nxt_unit_req_alert(req, "'set_result' call failed"); + nxt_python_print_exception(); + } + + Py_XDECREF(res); + Py_DECREF(future); + + Py_DECREF(msg); +} + + +int +nxt_py_asgi_http_drain(nxt_queue_link_t *lnk) +{ + char *body_str; + ssize_t sent; + PyObject *future, *exc, *res; + Py_ssize_t body_len; + nxt_py_asgi_http_t *http; + + http = nxt_container_of(lnk, nxt_py_asgi_http_t, link); + + body_str = PyBytes_AS_STRING(http->send_body) + http->send_body_off; + body_len = PyBytes_GET_SIZE(http->send_body) - http->send_body_off; + + nxt_unit_req_debug(http->req, "asgi_http_drain: %d", (int) body_len); + + while (body_len > 0) { + sent = nxt_unit_response_write_nb(http->req, body_str, body_len, 0); + if (nxt_slow_path(sent < 0)) { + goto fail; + } + + if (nxt_slow_path(sent == 0)) { + return NXT_UNIT_AGAIN; + } + + body_str += sent; + body_len -= sent; + + http->send_body_off += sent; + http->bytes_sent += sent; + } + + Py_CLEAR(http->send_body); + + future = http->send_future; + http->send_future = NULL; + + res = PyObject_CallMethodObjArgs(future, nxt_py_set_result_str, Py_None, + NULL); + if (nxt_slow_path(res == NULL)) { + nxt_unit_req_alert(http->req, "'set_result' call failed"); + nxt_python_print_exception(); + } + + Py_XDECREF(res); + Py_DECREF(future); + + return NXT_UNIT_OK; + +fail: + + exc = PyObject_CallFunctionObjArgs(PyExc_RuntimeError, + nxt_py_failed_to_send_body_str, + NULL); + if (nxt_slow_path(exc == NULL)) { + nxt_unit_req_alert(http->req, "RuntimeError create failed"); + nxt_python_print_exception(); + + exc = Py_None; + Py_INCREF(exc); + } + + future = http->send_future; + http->send_future = NULL; + + res = PyObject_CallMethodObjArgs(future, nxt_py_set_exception_str, exc, + NULL); + if (nxt_slow_path(res == NULL)) { + nxt_unit_req_alert(http->req, "'set_exception' call failed"); + nxt_python_print_exception(); + } + + Py_XDECREF(res); + Py_DECREF(future); + Py_DECREF(exc); + + return NXT_UNIT_ERROR; +} + + +static PyObject * +nxt_py_asgi_http_done(PyObject *self, PyObject *future) +{ + int rc; + PyObject *res; + nxt_py_asgi_http_t *http; + + http = (nxt_py_asgi_http_t *) self; + + nxt_unit_req_debug(http->req, "asgi_http_done"); + + /* + * Get Future.result() and it raises an exception, if coroutine exited + * with exception. + */ + res = PyObject_CallMethodObjArgs(future, nxt_py_result_str, NULL); + if (nxt_slow_path(res == NULL)) { + nxt_unit_req_error(http->req, + "Python failed to call 'future.result()'"); + nxt_python_print_exception(); + + rc = NXT_UNIT_ERROR; + + } else { + Py_DECREF(res); + + rc = NXT_UNIT_OK; + } + + nxt_unit_request_done(http->req, rc); + + Py_RETURN_NONE; +} + + +#endif /* NXT_HAVE_ASGI */ diff --git a/src/python/nxt_python_asgi_lifespan.c b/src/python/nxt_python_asgi_lifespan.c new file mode 100644 index 00000000..14d0ee97 --- /dev/null +++ b/src/python/nxt_python_asgi_lifespan.c @@ -0,0 +1,505 @@ + +/* + * Copyright (C) NGINX, Inc. + */ + + +#include <python/nxt_python.h> + +#if (NXT_HAVE_ASGI) + +#include <nxt_main.h> +#include <python/nxt_python_asgi.h> +#include <python/nxt_python_asgi_str.h> + + +typedef struct { + PyObject_HEAD + int disabled; + int startup_received; + int startup_sent; + int shutdown_received; + int shutdown_sent; + int shutdown_called; + PyObject *startup_future; + PyObject *shutdown_future; + PyObject *receive_future; +} nxt_py_asgi_lifespan_t; + + +static PyObject *nxt_py_asgi_lifespan_receive(PyObject *self, PyObject *none); +static PyObject *nxt_py_asgi_lifespan_send(PyObject *self, PyObject *dict); +static PyObject *nxt_py_asgi_lifespan_send_startup( + nxt_py_asgi_lifespan_t *lifespan, int v, PyObject *dict); +static PyObject *nxt_py_asgi_lifespan_send_(nxt_py_asgi_lifespan_t *lifespan, + int v, int *sent, PyObject **future); +static PyObject *nxt_py_asgi_lifespan_send_shutdown( + nxt_py_asgi_lifespan_t *lifespan, int v, PyObject *dict); +static PyObject *nxt_py_asgi_lifespan_disable(nxt_py_asgi_lifespan_t *lifespan); +static PyObject *nxt_py_asgi_lifespan_done(PyObject *self, PyObject *future); + + +static nxt_py_asgi_lifespan_t *nxt_py_lifespan; + +static PyMethodDef nxt_py_asgi_lifespan_methods[] = { + { "receive", nxt_py_asgi_lifespan_receive, METH_NOARGS, 0 }, + { "send", nxt_py_asgi_lifespan_send, METH_O, 0 }, + { "_done", nxt_py_asgi_lifespan_done, METH_O, 0 }, + { NULL, NULL, 0, 0 } +}; + +static PyAsyncMethods nxt_py_asgi_async_methods = { + .am_await = nxt_py_asgi_await, +}; + +static PyTypeObject nxt_py_asgi_lifespan_type = { + PyVarObject_HEAD_INIT(NULL, 0) + + .tp_name = "unit._asgi_lifespan", + .tp_basicsize = sizeof(nxt_py_asgi_lifespan_t), + .tp_dealloc = nxt_py_asgi_dealloc, + .tp_as_async = &nxt_py_asgi_async_methods, + .tp_flags = Py_TPFLAGS_DEFAULT, + .tp_doc = "unit ASGI Lifespan object", + .tp_iter = nxt_py_asgi_iter, + .tp_iternext = nxt_py_asgi_next, + .tp_methods = nxt_py_asgi_lifespan_methods, +}; + + +nxt_int_t +nxt_py_asgi_lifespan_startup(nxt_task_t *task) +{ + PyObject *scope, *res, *py_task, *receive, *send, *done; + nxt_int_t rc; + nxt_py_asgi_lifespan_t *lifespan; + + if (nxt_slow_path(PyType_Ready(&nxt_py_asgi_lifespan_type) != 0)) { + nxt_alert(task, + "Python failed to initialize the 'asgi_lifespan' type object"); + return NXT_ERROR; + } + + lifespan = PyObject_New(nxt_py_asgi_lifespan_t, &nxt_py_asgi_lifespan_type); + if (nxt_slow_path(lifespan == NULL)) { + nxt_alert(task, "Python failed to create lifespan object"); + return NXT_ERROR; + } + + rc = NXT_ERROR; + + receive = PyObject_GetAttrString((PyObject *) lifespan, "receive"); + if (nxt_slow_path(receive == NULL)) { + nxt_alert(task, "Python failed to get 'receive' method"); + goto release_lifespan; + } + + send = PyObject_GetAttrString((PyObject *) lifespan, "send"); + if (nxt_slow_path(receive == NULL)) { + nxt_alert(task, "Python failed to get 'send' method"); + goto release_receive; + } + + done = PyObject_GetAttrString((PyObject *) lifespan, "_done"); + if (nxt_slow_path(receive == NULL)) { + nxt_alert(task, "Python failed to get '_done' method"); + goto release_send; + } + + lifespan->startup_future = PyObject_CallObject(nxt_py_loop_create_future, + NULL); + if (nxt_slow_path(lifespan->startup_future == NULL)) { + nxt_unit_alert(NULL, "Python failed to create Future object"); + nxt_python_print_exception(); + + goto release_done; + } + + lifespan->disabled = 0; + lifespan->startup_received = 0; + lifespan->startup_sent = 0; + lifespan->shutdown_received = 0; + lifespan->shutdown_sent = 0; + lifespan->shutdown_called = 0; + lifespan->shutdown_future = NULL; + lifespan->receive_future = NULL; + + scope = nxt_py_asgi_new_scope(NULL, nxt_py_lifespan_str, nxt_py_2_0_str); + if (nxt_slow_path(scope == NULL)) { + goto release_future; + } + + res = PyObject_CallFunctionObjArgs(nxt_py_application, + scope, receive, send, NULL); + if (nxt_slow_path(res == NULL)) { + nxt_log(task, NXT_LOG_ERR, "Python failed to call the application"); + nxt_python_print_exception(); + goto release_scope; + } + + if (nxt_slow_path(!PyCoro_CheckExact(res))) { + nxt_log(task, NXT_LOG_ERR, + "Application result type is not a coroutine"); + Py_DECREF(res); + goto release_scope; + } + + py_task = PyObject_CallFunctionObjArgs(nxt_py_loop_create_task, res, NULL); + if (nxt_slow_path(py_task == NULL)) { + nxt_log(task, NXT_LOG_ERR, "Python failed to call the create_task"); + nxt_python_print_exception(); + Py_DECREF(res); + goto release_scope; + } + + Py_DECREF(res); + + res = PyObject_CallMethodObjArgs(py_task, nxt_py_add_done_callback_str, + done, NULL); + if (nxt_slow_path(res == NULL)) { + nxt_log(task, NXT_LOG_ERR, + "Python failed to call 'task.add_done_callback'"); + nxt_python_print_exception(); + goto release_task; + } + + Py_DECREF(res); + + res = PyObject_CallFunctionObjArgs(nxt_py_loop_run_until_complete, + lifespan->startup_future, NULL); + if (nxt_slow_path(res == NULL)) { + nxt_alert(task, "Python failed to call loop.run_until_complete"); + nxt_python_print_exception(); + goto release_task; + } + + Py_DECREF(res); + + if (lifespan->startup_sent == 1 || lifespan->disabled) { + nxt_py_lifespan = lifespan; + Py_INCREF(nxt_py_lifespan); + + rc = NXT_OK; + } + +release_task: + Py_DECREF(py_task); +release_scope: + Py_DECREF(scope); +release_future: + Py_CLEAR(lifespan->startup_future); +release_done: + Py_DECREF(done); +release_send: + Py_DECREF(send); +release_receive: + Py_DECREF(receive); +release_lifespan: + Py_DECREF(lifespan); + + return rc; +} + + +nxt_int_t +nxt_py_asgi_lifespan_shutdown(void) +{ + PyObject *msg, *future, *res; + nxt_py_asgi_lifespan_t *lifespan; + + if (nxt_slow_path(nxt_py_lifespan == NULL || nxt_py_lifespan->disabled)) { + return NXT_OK; + } + + lifespan = nxt_py_lifespan; + lifespan->shutdown_called = 1; + + if (lifespan->receive_future != NULL) { + future = lifespan->receive_future; + lifespan->receive_future = NULL; + + msg = nxt_py_asgi_new_msg(NULL, nxt_py_lifespan_shutdown_str); + + if (nxt_fast_path(msg != NULL)) { + res = PyObject_CallMethodObjArgs(future, nxt_py_set_result_str, + msg, NULL); + Py_XDECREF(res); + Py_DECREF(msg); + } + + Py_DECREF(future); + } + + if (lifespan->shutdown_sent) { + return NXT_OK; + } + + lifespan->shutdown_future = PyObject_CallObject(nxt_py_loop_create_future, + NULL); + if (nxt_slow_path(lifespan->shutdown_future == NULL)) { + nxt_unit_alert(NULL, "Python failed to create Future object"); + nxt_python_print_exception(); + return NXT_ERROR; + } + + res = PyObject_CallFunctionObjArgs(nxt_py_loop_run_until_complete, + lifespan->shutdown_future, NULL); + if (nxt_slow_path(res == NULL)) { + nxt_unit_alert(NULL, "Python failed to call loop.run_until_complete"); + nxt_python_print_exception(); + return NXT_ERROR; + } + + Py_DECREF(res); + Py_CLEAR(lifespan->shutdown_future); + + return NXT_OK; +} + + +static PyObject * +nxt_py_asgi_lifespan_receive(PyObject *self, PyObject *none) +{ + PyObject *msg, *future; + nxt_py_asgi_lifespan_t *lifespan; + + lifespan = (nxt_py_asgi_lifespan_t *) self; + + nxt_unit_debug(NULL, "asgi_lifespan_receive"); + + future = PyObject_CallObject(nxt_py_loop_create_future, NULL); + if (nxt_slow_path(future == NULL)) { + nxt_unit_alert(NULL, "Python failed to create Future object"); + nxt_python_print_exception(); + + return PyErr_Format(PyExc_RuntimeError, + "failed to create Future object"); + } + + if (!lifespan->startup_received) { + lifespan->startup_received = 1; + + msg = nxt_py_asgi_new_msg(NULL, nxt_py_lifespan_startup_str); + + return nxt_py_asgi_set_result_soon(NULL, future, msg); + } + + if (lifespan->shutdown_called && !lifespan->shutdown_received) { + lifespan->shutdown_received = 1; + + msg = nxt_py_asgi_new_msg(NULL, nxt_py_lifespan_shutdown_str); + + return nxt_py_asgi_set_result_soon(NULL, future, msg); + } + + Py_INCREF(future); + lifespan->receive_future = future; + + return future; +} + + +static PyObject * +nxt_py_asgi_lifespan_send(PyObject *self, PyObject *dict) +{ + PyObject *type, *msg; + const char *type_str; + Py_ssize_t type_len; + nxt_py_asgi_lifespan_t *lifespan; + + static const nxt_str_t startup_complete + = nxt_string("lifespan.startup.complete"); + static const nxt_str_t startup_failed + = nxt_string("lifespan.startup.failed"); + static const nxt_str_t shutdown_complete + = nxt_string("lifespan.shutdown.complete"); + static const nxt_str_t shutdown_failed + = nxt_string("lifespan.shutdown.failed"); + + lifespan = (nxt_py_asgi_lifespan_t *) self; + + type = PyDict_GetItem(dict, nxt_py_type_str); + if (nxt_slow_path(type == NULL || !PyUnicode_Check(type))) { + nxt_unit_error(NULL, + "asgi_lifespan_send: 'type' is not a unicode string"); + return PyErr_Format(PyExc_TypeError, + "'type' is not a unicode string"); + } + + type_str = PyUnicode_AsUTF8AndSize(type, &type_len); + + nxt_unit_debug(NULL, "asgi_lifespan_send type is '%.*s'", + (int) type_len, type_str); + + if (type_len == (Py_ssize_t) startup_complete.length + && memcmp(type_str, startup_complete.start, type_len) == 0) + { + return nxt_py_asgi_lifespan_send_startup(lifespan, 0, NULL); + } + + if (type_len == (Py_ssize_t) startup_failed.length + && memcmp(type_str, startup_failed.start, type_len) == 0) + { + msg = PyDict_GetItem(dict, nxt_py_message_str); + return nxt_py_asgi_lifespan_send_startup(lifespan, 1, msg); + } + + if (type_len == (Py_ssize_t) shutdown_complete.length + && memcmp(type_str, shutdown_complete.start, type_len) == 0) + { + return nxt_py_asgi_lifespan_send_shutdown(lifespan, 0, NULL); + } + + if (type_len == (Py_ssize_t) shutdown_failed.length + && memcmp(type_str, shutdown_failed.start, type_len) == 0) + { + msg = PyDict_GetItem(dict, nxt_py_message_str); + return nxt_py_asgi_lifespan_send_shutdown(lifespan, 1, msg); + } + + return nxt_py_asgi_lifespan_disable(lifespan); +} + + +static PyObject * +nxt_py_asgi_lifespan_send_startup(nxt_py_asgi_lifespan_t *lifespan, int v, + PyObject *message) +{ + const char *message_str; + Py_ssize_t message_len; + + if (nxt_slow_path(v != 0)) { + nxt_unit_error(NULL, "Application startup failed"); + + if (nxt_fast_path(message != NULL && PyUnicode_Check(message))) { + message_str = PyUnicode_AsUTF8AndSize(message, &message_len); + + nxt_unit_error(NULL, "%.*s", (int) message_len, message_str); + } + } + + return nxt_py_asgi_lifespan_send_(lifespan, v, + &lifespan->startup_sent, + &lifespan->startup_future); +} + + +static PyObject * +nxt_py_asgi_lifespan_send_(nxt_py_asgi_lifespan_t *lifespan, int v, int *sent, + PyObject **pfuture) +{ + PyObject *future, *res; + + if (*sent) { + return nxt_py_asgi_lifespan_disable(lifespan); + } + + *sent = 1 + v; + + if (*pfuture != NULL) { + future = *pfuture; + *pfuture = NULL; + + res = PyObject_CallMethodObjArgs(future, nxt_py_set_result_str, + Py_None, NULL); + if (nxt_slow_path(res == NULL)) { + nxt_unit_alert(NULL, "Failed to call 'future.set_result'"); + nxt_python_print_exception(); + + return nxt_py_asgi_lifespan_disable(lifespan); + } + + Py_DECREF(res); + Py_DECREF(future); + } + + Py_INCREF(lifespan); + + return (PyObject *) lifespan; +} + + +static PyObject * +nxt_py_asgi_lifespan_disable(nxt_py_asgi_lifespan_t *lifespan) +{ + nxt_unit_warn(NULL, "Got invalid state transition on lifespan protocol"); + + lifespan->disabled = 1; + + return PyErr_Format(PyExc_AssertionError, + "Got invalid state transition on lifespan protocol"); +} + + +static PyObject * +nxt_py_asgi_lifespan_send_shutdown(nxt_py_asgi_lifespan_t *lifespan, int v, + PyObject *message) +{ + return nxt_py_asgi_lifespan_send_(lifespan, v, + &lifespan->shutdown_sent, + &lifespan->shutdown_future); +} + + +static PyObject * +nxt_py_asgi_lifespan_done(PyObject *self, PyObject *future) +{ + PyObject *res; + nxt_py_asgi_lifespan_t *lifespan; + + nxt_unit_debug(NULL, "asgi_lifespan_done"); + + lifespan = (nxt_py_asgi_lifespan_t *) self; + + if (lifespan->startup_sent == 0) { + lifespan->disabled = 1; + } + + /* + * Get Future.result() and it raises an exception, if coroutine exited + * with exception. + */ + res = PyObject_CallMethodObjArgs(future, nxt_py_result_str, NULL); + if (nxt_slow_path(res == NULL)) { + nxt_unit_log(NULL, NXT_UNIT_LOG_INFO, + "ASGI Lifespan processing exception"); + nxt_python_print_exception(); + } + + Py_XDECREF(res); + + if (lifespan->startup_future != NULL) { + future = lifespan->startup_future; + lifespan->startup_future = NULL; + + res = PyObject_CallMethodObjArgs(future, nxt_py_set_result_str, + Py_None, NULL); + if (nxt_slow_path(res == NULL)) { + nxt_unit_alert(NULL, "Failed to call 'future.set_result'"); + nxt_python_print_exception(); + } + + Py_XDECREF(res); + Py_DECREF(future); + } + + if (lifespan->shutdown_future != NULL) { + future = lifespan->shutdown_future; + lifespan->shutdown_future = NULL; + + res = PyObject_CallMethodObjArgs(future, nxt_py_set_result_str, + Py_None, NULL); + if (nxt_slow_path(res == NULL)) { + nxt_unit_alert(NULL, "Failed to call 'future.set_result'"); + nxt_python_print_exception(); + } + + Py_XDECREF(res); + Py_DECREF(future); + } + + Py_RETURN_NONE; +} + + +#endif /* NXT_HAVE_ASGI */ diff --git a/src/python/nxt_python_asgi_str.c b/src/python/nxt_python_asgi_str.c new file mode 100644 index 00000000..37fa7f04 --- /dev/null +++ b/src/python/nxt_python_asgi_str.c @@ -0,0 +1,141 @@ + +/* + * Copyright (C) NGINX, Inc. + */ + + +#include <python/nxt_python.h> + +#if (NXT_HAVE_ASGI) + +#include <nxt_main.h> +#include <python/nxt_python_asgi_str.h> + + +PyObject *nxt_py_1_0_str; +PyObject *nxt_py_1_1_str; +PyObject *nxt_py_2_0_str; +PyObject *nxt_py_2_1_str; +PyObject *nxt_py_3_0_str; +PyObject *nxt_py_add_done_callback_str; +PyObject *nxt_py_asgi_str; +PyObject *nxt_py_bad_state_str; +PyObject *nxt_py_body_str; +PyObject *nxt_py_bytes_str; +PyObject *nxt_py_client_str; +PyObject *nxt_py_code_str; +PyObject *nxt_py_done_str; +PyObject *nxt_py_exception_str; +PyObject *nxt_py_failed_to_send_body_str; +PyObject *nxt_py_headers_str; +PyObject *nxt_py_http_str; +PyObject *nxt_py_http_disconnect_str; +PyObject *nxt_py_http_request_str; +PyObject *nxt_py_http_version_str; +PyObject *nxt_py_https_str; +PyObject *nxt_py_lifespan_str; +PyObject *nxt_py_lifespan_shutdown_str; +PyObject *nxt_py_lifespan_startup_str; +PyObject *nxt_py_method_str; +PyObject *nxt_py_message_str; +PyObject *nxt_py_message_too_big_str; +PyObject *nxt_py_more_body_str; +PyObject *nxt_py_path_str; +PyObject *nxt_py_query_string_str; +PyObject *nxt_py_raw_path_str; +PyObject *nxt_py_result_str; +PyObject *nxt_py_root_path_str; +PyObject *nxt_py_scheme_str; +PyObject *nxt_py_server_str; +PyObject *nxt_py_set_exception_str; +PyObject *nxt_py_set_result_str; +PyObject *nxt_py_spec_version_str; +PyObject *nxt_py_status_str; +PyObject *nxt_py_subprotocol_str; +PyObject *nxt_py_subprotocols_str; +PyObject *nxt_py_text_str; +PyObject *nxt_py_type_str; +PyObject *nxt_py_version_str; +PyObject *nxt_py_websocket_str; +PyObject *nxt_py_websocket_accept_str; +PyObject *nxt_py_websocket_close_str; +PyObject *nxt_py_websocket_connect_str; +PyObject *nxt_py_websocket_disconnect_str; +PyObject *nxt_py_websocket_receive_str; +PyObject *nxt_py_websocket_send_str; +PyObject *nxt_py_ws_str; +PyObject *nxt_py_wss_str; + +static nxt_python_string_t nxt_py_asgi_strings[] = { + { nxt_string("1.0"), &nxt_py_1_0_str }, + { nxt_string("1.1"), &nxt_py_1_1_str }, + { nxt_string("2.0"), &nxt_py_2_0_str }, + { nxt_string("2.1"), &nxt_py_2_1_str }, + { nxt_string("3.0"), &nxt_py_3_0_str }, + { nxt_string("add_done_callback"), &nxt_py_add_done_callback_str }, + { nxt_string("asgi"), &nxt_py_asgi_str }, + { nxt_string("bad state"), &nxt_py_bad_state_str }, + { nxt_string("body"), &nxt_py_body_str }, + { nxt_string("bytes"), &nxt_py_bytes_str }, + { nxt_string("client"), &nxt_py_client_str }, + { nxt_string("code"), &nxt_py_code_str }, + { nxt_string("done"), &nxt_py_done_str }, + { nxt_string("exception"), &nxt_py_exception_str }, + { nxt_string("failed to send body"), &nxt_py_failed_to_send_body_str }, + { nxt_string("headers"), &nxt_py_headers_str }, + { nxt_string("http"), &nxt_py_http_str }, + { nxt_string("http.disconnect"), &nxt_py_http_disconnect_str }, + { nxt_string("http.request"), &nxt_py_http_request_str }, + { nxt_string("http_version"), &nxt_py_http_version_str }, + { nxt_string("https"), &nxt_py_https_str }, + { nxt_string("lifespan"), &nxt_py_lifespan_str }, + { nxt_string("lifespan.shutdown"), &nxt_py_lifespan_shutdown_str }, + { nxt_string("lifespan.startup"), &nxt_py_lifespan_startup_str }, + { nxt_string("message"), &nxt_py_message_str }, + { nxt_string("message too big"), &nxt_py_message_too_big_str }, + { nxt_string("method"), &nxt_py_method_str }, + { nxt_string("more_body"), &nxt_py_more_body_str }, + { nxt_string("path"), &nxt_py_path_str }, + { nxt_string("query_string"), &nxt_py_query_string_str }, + { nxt_string("raw_path"), &nxt_py_raw_path_str }, + { nxt_string("result"), &nxt_py_result_str }, + { nxt_string("root_path"), &nxt_py_root_path_str }, // not used + { nxt_string("scheme"), &nxt_py_scheme_str }, + { nxt_string("server"), &nxt_py_server_str }, + { nxt_string("set_exception"), &nxt_py_set_exception_str }, + { nxt_string("set_result"), &nxt_py_set_result_str }, + { nxt_string("spec_version"), &nxt_py_spec_version_str }, + { nxt_string("status"), &nxt_py_status_str }, + { nxt_string("subprotocol"), &nxt_py_subprotocol_str }, + { nxt_string("subprotocols"), &nxt_py_subprotocols_str }, + { nxt_string("text"), &nxt_py_text_str }, + { nxt_string("type"), &nxt_py_type_str }, + { nxt_string("version"), &nxt_py_version_str }, + { nxt_string("websocket"), &nxt_py_websocket_str }, + { nxt_string("websocket.accept"), &nxt_py_websocket_accept_str }, + { nxt_string("websocket.close"), &nxt_py_websocket_close_str }, + { nxt_string("websocket.connect"), &nxt_py_websocket_connect_str }, + { nxt_string("websocket.disconnect"), &nxt_py_websocket_disconnect_str }, + { nxt_string("websocket.receive"), &nxt_py_websocket_receive_str }, + { nxt_string("websocket.send"), &nxt_py_websocket_send_str }, + { nxt_string("ws"), &nxt_py_ws_str }, + { nxt_string("wss"), &nxt_py_wss_str }, + { nxt_null_string, NULL }, +}; + + +nxt_int_t +nxt_py_asgi_str_init(void) +{ + return nxt_python_init_strings(nxt_py_asgi_strings); +} + + +void +nxt_py_asgi_str_done(void) +{ + nxt_python_done_strings(nxt_py_asgi_strings); +} + + +#endif /* NXT_HAVE_ASGI */ diff --git a/src/python/nxt_python_asgi_str.h b/src/python/nxt_python_asgi_str.h new file mode 100644 index 00000000..3f389c62 --- /dev/null +++ b/src/python/nxt_python_asgi_str.h @@ -0,0 +1,69 @@ + +/* + * Copyright (C) NGINX, Inc. + */ + +#ifndef _NXT_PYTHON_ASGI_STR_H_INCLUDED_ +#define _NXT_PYTHON_ASGI_STR_H_INCLUDED_ + + +extern PyObject *nxt_py_1_0_str; +extern PyObject *nxt_py_1_1_str; +extern PyObject *nxt_py_2_0_str; +extern PyObject *nxt_py_2_1_str; +extern PyObject *nxt_py_3_0_str; +extern PyObject *nxt_py_add_done_callback_str; +extern PyObject *nxt_py_asgi_str; +extern PyObject *nxt_py_bad_state_str; +extern PyObject *nxt_py_body_str; +extern PyObject *nxt_py_bytes_str; +extern PyObject *nxt_py_client_str; +extern PyObject *nxt_py_code_str; +extern PyObject *nxt_py_done_str; +extern PyObject *nxt_py_exception_str; +extern PyObject *nxt_py_failed_to_send_body_str; +extern PyObject *nxt_py_headers_str; +extern PyObject *nxt_py_http_str; +extern PyObject *nxt_py_http_disconnect_str; +extern PyObject *nxt_py_http_request_str; +extern PyObject *nxt_py_http_version_str; +extern PyObject *nxt_py_https_str; +extern PyObject *nxt_py_lifespan_str; +extern PyObject *nxt_py_lifespan_shutdown_str; +extern PyObject *nxt_py_lifespan_startup_str; +extern PyObject *nxt_py_method_str; +extern PyObject *nxt_py_message_str; +extern PyObject *nxt_py_message_too_big_str; +extern PyObject *nxt_py_more_body_str; +extern PyObject *nxt_py_path_str; +extern PyObject *nxt_py_query_string_str; +extern PyObject *nxt_py_result_str; +extern PyObject *nxt_py_raw_path_str; +extern PyObject *nxt_py_root_path_str; +extern PyObject *nxt_py_scheme_str; +extern PyObject *nxt_py_server_str; +extern PyObject *nxt_py_set_exception_str; +extern PyObject *nxt_py_set_result_str; +extern PyObject *nxt_py_spec_version_str; +extern PyObject *nxt_py_status_str; +extern PyObject *nxt_py_subprotocol_str; +extern PyObject *nxt_py_subprotocols_str; +extern PyObject *nxt_py_text_str; +extern PyObject *nxt_py_type_str; +extern PyObject *nxt_py_version_str; +extern PyObject *nxt_py_websocket_str; +extern PyObject *nxt_py_websocket_accept_str; +extern PyObject *nxt_py_websocket_close_str; +extern PyObject *nxt_py_websocket_connect_str; +extern PyObject *nxt_py_websocket_disconnect_str; +extern PyObject *nxt_py_websocket_receive_str; +extern PyObject *nxt_py_websocket_send_str; +extern PyObject *nxt_py_ws_str; +extern PyObject *nxt_py_wss_str; + + +nxt_int_t nxt_py_asgi_str_init(void); +void nxt_py_asgi_str_done(void); + + +#endif /* _NXT_PYTHON_ASGI_STR_H_INCLUDED_ */ diff --git a/src/python/nxt_python_asgi_websocket.c b/src/python/nxt_python_asgi_websocket.c new file mode 100644 index 00000000..5a27b588 --- /dev/null +++ b/src/python/nxt_python_asgi_websocket.c @@ -0,0 +1,1084 @@ + +/* + * Copyright (C) NGINX, Inc. + */ + + +#include <python/nxt_python.h> + +#if (NXT_HAVE_ASGI) + +#include <nxt_main.h> +#include <nxt_unit.h> +#include <nxt_unit_request.h> +#include <nxt_unit_websocket.h> +#include <nxt_websocket_header.h> +#include <python/nxt_python_asgi.h> +#include <python/nxt_python_asgi_str.h> + + +enum { + NXT_WS_INIT, + NXT_WS_CONNECT, + NXT_WS_ACCEPTED, + NXT_WS_DISCONNECTED, + NXT_WS_CLOSED, +}; + + +typedef struct { + nxt_queue_link_t link; + nxt_unit_websocket_frame_t *frame; +} nxt_py_asgi_penging_frame_t; + + +typedef struct { + PyObject_HEAD + nxt_unit_request_info_t *req; + PyObject *receive_future; + PyObject *receive_exc_str; + int state; + nxt_queue_t pending_frames; + uint64_t pending_payload_len; + uint64_t pending_frame_len; + int pending_fins; +} nxt_py_asgi_websocket_t; + + +static PyObject *nxt_py_asgi_websocket_receive(PyObject *self, PyObject *none); +static PyObject *nxt_py_asgi_websocket_send(PyObject *self, PyObject *dict); +static PyObject *nxt_py_asgi_websocket_accept(nxt_py_asgi_websocket_t *ws, + PyObject *dict); +static PyObject *nxt_py_asgi_websocket_close(nxt_py_asgi_websocket_t *ws, + PyObject *dict); +static PyObject *nxt_py_asgi_websocket_send_frame(nxt_py_asgi_websocket_t *ws, + PyObject *dict); +static void nxt_py_asgi_websocket_receive_done(nxt_py_asgi_websocket_t *ws, + PyObject *msg); +static void nxt_py_asgi_websocket_receive_fail(nxt_py_asgi_websocket_t *ws, + PyObject *exc); +static void nxt_py_asgi_websocket_suspend_frame(nxt_unit_websocket_frame_t *f); +static PyObject *nxt_py_asgi_websocket_pop_msg(nxt_py_asgi_websocket_t *ws, + nxt_unit_websocket_frame_t *frame); +static uint64_t nxt_py_asgi_websocket_pending_len( + nxt_py_asgi_websocket_t *ws); +static nxt_unit_websocket_frame_t *nxt_py_asgi_websocket_pop_frame( + nxt_py_asgi_websocket_t *ws); +static PyObject *nxt_py_asgi_websocket_disconnect_msg( + nxt_py_asgi_websocket_t *ws); +static PyObject *nxt_py_asgi_websocket_done(PyObject *self, PyObject *future); + + +static PyMethodDef nxt_py_asgi_websocket_methods[] = { + { "receive", nxt_py_asgi_websocket_receive, METH_NOARGS, 0 }, + { "send", nxt_py_asgi_websocket_send, METH_O, 0 }, + { "_done", nxt_py_asgi_websocket_done, METH_O, 0 }, + { NULL, NULL, 0, 0 } +}; + +static PyAsyncMethods nxt_py_asgi_async_methods = { + .am_await = nxt_py_asgi_await, +}; + +static PyTypeObject nxt_py_asgi_websocket_type = { + PyVarObject_HEAD_INIT(NULL, 0) + + .tp_name = "unit._asgi_websocket", + .tp_basicsize = sizeof(nxt_py_asgi_websocket_t), + .tp_dealloc = nxt_py_asgi_dealloc, + .tp_as_async = &nxt_py_asgi_async_methods, + .tp_flags = Py_TPFLAGS_DEFAULT, + .tp_doc = "unit ASGI WebSocket connection object", + .tp_iter = nxt_py_asgi_iter, + .tp_iternext = nxt_py_asgi_next, + .tp_methods = nxt_py_asgi_websocket_methods, +}; + +static uint64_t nxt_py_asgi_ws_max_frame_size = 1024 * 1024; +static uint64_t nxt_py_asgi_ws_max_buffer_size = 10 * 1024 * 1024; + + +nxt_int_t +nxt_py_asgi_websocket_init(nxt_task_t *task) +{ + if (nxt_slow_path(PyType_Ready(&nxt_py_asgi_websocket_type) != 0)) { + nxt_alert(task, + "Python failed to initialize the \"asgi_websocket\" type object"); + return NXT_ERROR; + } + + return NXT_OK; +} + + +PyObject * +nxt_py_asgi_websocket_create(nxt_unit_request_info_t *req) +{ + nxt_py_asgi_websocket_t *ws; + + ws = PyObject_New(nxt_py_asgi_websocket_t, &nxt_py_asgi_websocket_type); + + if (nxt_fast_path(ws != NULL)) { + ws->req = req; + ws->receive_future = NULL; + ws->receive_exc_str = NULL; + ws->state = NXT_WS_INIT; + nxt_queue_init(&ws->pending_frames); + ws->pending_payload_len = 0; + ws->pending_frame_len = 0; + ws->pending_fins = 0; + } + + return (PyObject *) ws; +} + + +static PyObject * +nxt_py_asgi_websocket_receive(PyObject *self, PyObject *none) +{ + PyObject *future, *msg; + nxt_py_asgi_websocket_t *ws; + + ws = (nxt_py_asgi_websocket_t *) self; + + nxt_unit_req_debug(ws->req, "asgi_websocket_receive"); + + /* If exception happened out of receive() call, raise it now. */ + if (nxt_slow_path(ws->receive_exc_str != NULL)) { + PyErr_SetObject(PyExc_RuntimeError, ws->receive_exc_str); + + ws->receive_exc_str = NULL; + + return NULL; + } + + if (nxt_slow_path(ws->state == NXT_WS_CLOSED)) { + nxt_unit_req_error(ws->req, + "receive() called for closed WebSocket"); + + return PyErr_Format(PyExc_RuntimeError, + "WebSocket already closed"); + } + + future = PyObject_CallObject(nxt_py_loop_create_future, NULL); + if (nxt_slow_path(future == NULL)) { + nxt_unit_req_alert(ws->req, "Python failed to create Future object"); + nxt_python_print_exception(); + + return PyErr_Format(PyExc_RuntimeError, + "failed to create Future object"); + } + + if (nxt_slow_path(ws->state == NXT_WS_INIT)) { + ws->state = NXT_WS_CONNECT; + + msg = nxt_py_asgi_new_msg(ws->req, nxt_py_websocket_connect_str); + + return nxt_py_asgi_set_result_soon(ws->req, future, msg); + } + + if (ws->pending_fins > 0) { + msg = nxt_py_asgi_websocket_pop_msg(ws, NULL); + + return nxt_py_asgi_set_result_soon(ws->req, future, msg); + } + + if (nxt_slow_path(ws->state == NXT_WS_DISCONNECTED)) { + msg = nxt_py_asgi_websocket_disconnect_msg(ws); + + return nxt_py_asgi_set_result_soon(ws->req, future, msg); + } + + ws->receive_future = future; + Py_INCREF(ws->receive_future); + + return future; +} + + +static PyObject * +nxt_py_asgi_websocket_send(PyObject *self, PyObject *dict) +{ + PyObject *type; + const char *type_str; + Py_ssize_t type_len; + nxt_py_asgi_websocket_t *ws; + + static const nxt_str_t websocket_accept = nxt_string("websocket.accept"); + static const nxt_str_t websocket_close = nxt_string("websocket.close"); + static const nxt_str_t websocket_send = nxt_string("websocket.send"); + + ws = (nxt_py_asgi_websocket_t *) self; + + type = PyDict_GetItem(dict, nxt_py_type_str); + if (nxt_slow_path(type == NULL || !PyUnicode_Check(type))) { + nxt_unit_req_error(ws->req, "asgi_websocket_send: " + "'type' is not a unicode string"); + return PyErr_Format(PyExc_TypeError, + "'type' is not a unicode string"); + } + + type_str = PyUnicode_AsUTF8AndSize(type, &type_len); + + nxt_unit_req_debug(ws->req, "asgi_websocket_send type is '%.*s'", + (int) type_len, type_str); + + if (type_len == (Py_ssize_t) websocket_accept.length + && memcmp(type_str, websocket_accept.start, type_len) == 0) + { + return nxt_py_asgi_websocket_accept(ws, dict); + } + + if (type_len == (Py_ssize_t) websocket_close.length + && memcmp(type_str, websocket_close.start, type_len) == 0) + { + return nxt_py_asgi_websocket_close(ws, dict); + } + + if (type_len == (Py_ssize_t) websocket_send.length + && memcmp(type_str, websocket_send.start, type_len) == 0) + { + return nxt_py_asgi_websocket_send_frame(ws, dict); + } + + nxt_unit_req_error(ws->req, "asgi_websocket_send: " + "unexpected 'type': '%.*s'", (int) type_len, type_str); + return PyErr_Format(PyExc_AssertionError, "unexpected 'type': '%U'", type); +} + + +static PyObject * +nxt_py_asgi_websocket_accept(nxt_py_asgi_websocket_t *ws, PyObject *dict) +{ + int rc; + char *subprotocol_str; + PyObject *res, *headers, *subprotocol; + Py_ssize_t subprotocol_len; + nxt_py_asgi_calc_size_ctx_t calc_size_ctx; + nxt_py_asgi_add_field_ctx_t add_field_ctx; + + static const nxt_str_t ws_protocol = nxt_string("sec-websocket-protocol"); + + switch(ws->state) { + case NXT_WS_INIT: + return PyErr_Format(PyExc_RuntimeError, + "WebSocket connect not received"); + case NXT_WS_CONNECT: + break; + + case NXT_WS_ACCEPTED: + return PyErr_Format(PyExc_RuntimeError, "WebSocket already accepted"); + + case NXT_WS_DISCONNECTED: + return PyErr_Format(PyExc_RuntimeError, "WebSocket disconnected"); + + case NXT_WS_CLOSED: + return PyErr_Format(PyExc_RuntimeError, "WebSocket already closed"); + } + + if (nxt_slow_path(nxt_unit_response_is_websocket(ws->req))) { + return PyErr_Format(PyExc_RuntimeError, "WebSocket already accepted"); + } + + if (nxt_slow_path(nxt_unit_response_is_sent(ws->req))) { + return PyErr_Format(PyExc_RuntimeError, "response already sent"); + } + + calc_size_ctx.fields_size = 0; + calc_size_ctx.fields_count = 0; + + headers = PyDict_GetItem(dict, nxt_py_headers_str); + if (headers != NULL) { + res = nxt_py_asgi_enum_headers(headers, nxt_py_asgi_calc_size, + &calc_size_ctx); + if (nxt_slow_path(res == NULL)) { + return NULL; + } + } + + subprotocol = PyDict_GetItem(dict, nxt_py_subprotocol_str); + if (subprotocol != NULL && PyUnicode_Check(subprotocol)) { + subprotocol_str = PyUnicode_DATA(subprotocol); + subprotocol_len = PyUnicode_GET_LENGTH(subprotocol); + + calc_size_ctx.fields_size += ws_protocol.length + subprotocol_len; + calc_size_ctx.fields_count++; + + } else { + subprotocol_str = NULL; + subprotocol_len = 0; + } + + rc = nxt_unit_response_init(ws->req, 101, + calc_size_ctx.fields_count, + calc_size_ctx.fields_size); + if (nxt_slow_path(rc != NXT_UNIT_OK)) { + return PyErr_Format(PyExc_RuntimeError, + "failed to allocate response object"); + } + + add_field_ctx.req = ws->req; + add_field_ctx.content_length = -1; + + if (headers != NULL) { + res = nxt_py_asgi_enum_headers(headers, nxt_py_asgi_add_field, + &add_field_ctx); + if (nxt_slow_path(res == NULL)) { + return NULL; + } + } + + if (subprotocol_len > 0) { + rc = nxt_unit_response_add_field(ws->req, + (const char *) ws_protocol.start, + ws_protocol.length, + subprotocol_str, subprotocol_len); + if (nxt_slow_path(rc != NXT_UNIT_OK)) { + return PyErr_Format(PyExc_RuntimeError, + "failed to add header"); + } + } + + rc = nxt_unit_response_send(ws->req); + if (nxt_slow_path(rc != NXT_UNIT_OK)) { + return PyErr_Format(PyExc_RuntimeError, "failed to send response"); + } + + ws->state = NXT_WS_ACCEPTED; + + Py_INCREF(ws); + + return (PyObject *) ws; +} + + +static PyObject * +nxt_py_asgi_websocket_close(nxt_py_asgi_websocket_t *ws, PyObject *dict) +{ + int rc; + uint16_t status_code; + PyObject *code; + + if (nxt_slow_path(ws->state == NXT_WS_INIT)) { + return PyErr_Format(PyExc_RuntimeError, + "WebSocket connect not received"); + } + + if (nxt_slow_path(ws->state == NXT_WS_DISCONNECTED)) { + return PyErr_Format(PyExc_RuntimeError, "WebSocket disconnected"); + } + + if (nxt_slow_path(ws->state == NXT_WS_CLOSED)) { + return PyErr_Format(PyExc_RuntimeError, "WebSocket already closed"); + } + + if (nxt_unit_response_is_websocket(ws->req)) { + code = PyDict_GetItem(dict, nxt_py_code_str); + if (nxt_slow_path(code != NULL && !PyLong_Check(code))) { + return PyErr_Format(PyExc_TypeError, "'code' is not integer"); + } + + status_code = (code != NULL) ? htons(PyLong_AsLong(code)) + : htons(NXT_WEBSOCKET_CR_NORMAL); + + rc = nxt_unit_websocket_send(ws->req, NXT_WEBSOCKET_OP_CLOSE, + 1, &status_code, 2); + if (nxt_slow_path(rc != NXT_UNIT_OK)) { + return PyErr_Format(PyExc_RuntimeError, + "failed to send close frame"); + } + + } else { + rc = nxt_unit_response_init(ws->req, 403, 0, 0); + if (nxt_slow_path(rc != NXT_UNIT_OK)) { + return PyErr_Format(PyExc_RuntimeError, + "failed to allocate response object"); + } + + rc = nxt_unit_response_send(ws->req); + if (nxt_slow_path(rc != NXT_UNIT_OK)) { + return PyErr_Format(PyExc_RuntimeError, + "failed to send response"); + } + } + + ws->state = NXT_WS_CLOSED; + + Py_INCREF(ws); + + return (PyObject *) ws; +} + + +static PyObject * +nxt_py_asgi_websocket_send_frame(nxt_py_asgi_websocket_t *ws, PyObject *dict) +{ + int rc; + uint8_t opcode; + PyObject *bytes, *text; + const void *buf; + Py_ssize_t buf_size; + + if (nxt_slow_path(ws->state == NXT_WS_INIT)) { + return PyErr_Format(PyExc_RuntimeError, + "WebSocket connect not received"); + } + + if (nxt_slow_path(ws->state == NXT_WS_CONNECT)) { + return PyErr_Format(PyExc_RuntimeError, + "WebSocket not accepted yet"); + } + + if (nxt_slow_path(ws->state == NXT_WS_DISCONNECTED)) { + return PyErr_Format(PyExc_RuntimeError, "WebSocket disconnected"); + } + + if (nxt_slow_path(ws->state == NXT_WS_CLOSED)) { + return PyErr_Format(PyExc_RuntimeError, "WebSocket already closed"); + } + + bytes = PyDict_GetItem(dict, nxt_py_bytes_str); + if (bytes == Py_None) { + bytes = NULL; + } + + if (nxt_slow_path(bytes != NULL && !PyBytes_Check(bytes))) { + return PyErr_Format(PyExc_TypeError, + "'bytes' is not a byte string"); + } + + text = PyDict_GetItem(dict, nxt_py_text_str); + if (text == Py_None) { + text = NULL; + } + + if (nxt_slow_path(text != NULL && !PyUnicode_Check(text))) { + return PyErr_Format(PyExc_TypeError, + "'text' is not a unicode string"); + } + + if (nxt_slow_path(((bytes != NULL) ^ (text != NULL)) == 0)) { + return PyErr_Format(PyExc_ValueError, + "Exactly one of 'bytes' or 'text' must be non-None"); + } + + if (bytes != NULL) { + buf = PyBytes_AS_STRING(bytes); + buf_size = PyBytes_GET_SIZE(bytes); + opcode = NXT_WEBSOCKET_OP_BINARY; + + } else { + buf = PyUnicode_AsUTF8AndSize(text, &buf_size); + opcode = NXT_WEBSOCKET_OP_TEXT; + } + + rc = nxt_unit_websocket_send(ws->req, opcode, 1, buf, buf_size); + if (nxt_slow_path(rc != NXT_UNIT_OK)) { + return PyErr_Format(PyExc_RuntimeError, "failed to send close frame"); + } + + Py_INCREF(ws); + return (PyObject *) ws; +} + + +void +nxt_py_asgi_websocket_handler(nxt_unit_websocket_frame_t *frame) +{ + uint8_t opcode; + uint16_t status_code; + uint64_t rest; + PyObject *msg, *exc; + nxt_py_asgi_websocket_t *ws; + + ws = frame->req->data; + + nxt_unit_req_debug(ws->req, "asgi_websocket_handler"); + + opcode = frame->header->opcode; + if (nxt_slow_path(opcode != NXT_WEBSOCKET_OP_CONT + && opcode != NXT_WEBSOCKET_OP_TEXT + && opcode != NXT_WEBSOCKET_OP_BINARY + && opcode != NXT_WEBSOCKET_OP_CLOSE)) + { + nxt_unit_websocket_done(frame); + + nxt_unit_req_debug(ws->req, + "asgi_websocket_handler: ignore frame with opcode %d", + opcode); + + return; + } + + if (nxt_slow_path(ws->state != NXT_WS_ACCEPTED)) { + nxt_unit_websocket_done(frame); + + goto bad_state; + } + + rest = nxt_py_asgi_ws_max_frame_size - ws->pending_frame_len; + + if (nxt_slow_path(frame->payload_len > rest)) { + nxt_unit_websocket_done(frame); + + goto too_big; + } + + rest = nxt_py_asgi_ws_max_buffer_size - ws->pending_payload_len; + + if (nxt_slow_path(frame->payload_len > rest)) { + nxt_unit_websocket_done(frame); + + goto too_big; + } + + if (ws->receive_future == NULL || frame->header->fin == 0) { + nxt_py_asgi_websocket_suspend_frame(frame); + + return; + } + + if (!nxt_queue_is_empty(&ws->pending_frames)) { + if (nxt_slow_path(opcode == NXT_WEBSOCKET_OP_TEXT + || opcode == NXT_WEBSOCKET_OP_BINARY)) + { + nxt_unit_req_alert(ws->req, + "Invalid state: pending frames with active receiver. " + "CONT frame expected. (%d)", opcode); + + PyErr_SetString(PyExc_AssertionError, + "Invalid state: pending frames with active receiver. " + "CONT frame expected."); + + nxt_unit_websocket_done(frame); + + return; + } + } + + msg = nxt_py_asgi_websocket_pop_msg(ws, frame); + if (nxt_slow_path(msg == NULL)) { + exc = PyErr_Occurred(); + Py_INCREF(exc); + + goto raise; + } + + nxt_py_asgi_websocket_receive_done(ws, msg); + + return; + +bad_state: + + if (ws->receive_future == NULL) { + ws->receive_exc_str = nxt_py_bad_state_str; + + return; + } + + exc = PyObject_CallFunctionObjArgs(PyExc_RuntimeError, + nxt_py_bad_state_str, + NULL); + if (nxt_slow_path(exc == NULL)) { + nxt_unit_req_alert(ws->req, "RuntimeError create failed"); + nxt_python_print_exception(); + + exc = Py_None; + Py_INCREF(exc); + } + + goto raise; + +too_big: + + status_code = htons(NXT_WEBSOCKET_CR_MESSAGE_TOO_BIG); + + (void) nxt_unit_websocket_send(ws->req, NXT_WEBSOCKET_OP_CLOSE, + 1, &status_code, 2); + + ws->state = NXT_WS_CLOSED; + + if (ws->receive_future == NULL) { + ws->receive_exc_str = nxt_py_message_too_big_str; + + return; + } + + exc = PyObject_CallFunctionObjArgs(PyExc_RuntimeError, + nxt_py_message_too_big_str, + NULL); + if (nxt_slow_path(exc == NULL)) { + nxt_unit_req_alert(ws->req, "RuntimeError create failed"); + nxt_python_print_exception(); + + exc = Py_None; + Py_INCREF(exc); + } + +raise: + + nxt_py_asgi_websocket_receive_fail(ws, exc); +} + + +static void +nxt_py_asgi_websocket_receive_done(nxt_py_asgi_websocket_t *ws, PyObject *msg) +{ + PyObject *future, *res; + + future = ws->receive_future; + ws->receive_future = NULL; + + res = PyObject_CallMethodObjArgs(future, nxt_py_set_result_str, msg, NULL); + if (nxt_slow_path(res == NULL)) { + nxt_unit_req_alert(ws->req, "'set_result' call failed"); + nxt_python_print_exception(); + } + + Py_XDECREF(res); + Py_DECREF(future); + + Py_DECREF(msg); +} + + +static void +nxt_py_asgi_websocket_receive_fail(nxt_py_asgi_websocket_t *ws, PyObject *exc) +{ + PyObject *future, *res; + + future = ws->receive_future; + ws->receive_future = NULL; + + res = PyObject_CallMethodObjArgs(future, nxt_py_set_exception_str, exc, + NULL); + if (nxt_slow_path(res == NULL)) { + nxt_unit_req_alert(ws->req, "'set_exception' call failed"); + nxt_python_print_exception(); + } + + Py_XDECREF(res); + Py_DECREF(future); + + Py_DECREF(exc); +} + + +static void +nxt_py_asgi_websocket_suspend_frame(nxt_unit_websocket_frame_t *frame) +{ + int rc; + nxt_py_asgi_websocket_t *ws; + nxt_py_asgi_penging_frame_t *p; + + nxt_unit_req_debug(frame->req, "asgi_websocket_suspend_frame: " + "%d, %"PRIu64", %d", + frame->header->opcode, frame->payload_len, + frame->header->fin); + + ws = frame->req->data; + + rc = nxt_unit_websocket_retain(frame); + if (nxt_slow_path(rc == NXT_UNIT_ERROR)) { + nxt_unit_req_alert(ws->req, "Failed to retain frame for suspension."); + + nxt_unit_websocket_done(frame); + + PyErr_SetString(PyExc_RuntimeError, + "Failed to retain frame for suspension."); + + return; + } + + p = nxt_unit_malloc(frame->req->ctx, sizeof(nxt_py_asgi_penging_frame_t)); + if (nxt_slow_path(p == NULL)) { + nxt_unit_req_alert(ws->req, + "Failed to allocate buffer to suspend frame."); + + nxt_unit_websocket_done(frame); + + PyErr_SetString(PyExc_RuntimeError, + "Failed to allocate buffer to suspend frame."); + + return; + } + + p->frame = frame; + nxt_queue_insert_tail(&ws->pending_frames, &p->link); + + ws->pending_payload_len += frame->payload_len; + ws->pending_fins += frame->header->fin; + + if (frame->header->fin) { + ws->pending_frame_len = 0; + + } else { + if (frame->header->opcode == NXT_WEBSOCKET_OP_CONT) { + ws->pending_frame_len += frame->payload_len; + + } else { + ws->pending_frame_len = frame->payload_len; + } + } +} + + +static PyObject * +nxt_py_asgi_websocket_pop_msg(nxt_py_asgi_websocket_t *ws, + nxt_unit_websocket_frame_t *frame) +{ + int fin; + char *buf; + uint8_t code_buf[2], opcode; + uint16_t code; + PyObject *msg, *data, *type, *data_key; + uint64_t payload_len; + nxt_unit_websocket_frame_t *fin_frame; + + nxt_unit_req_debug(ws->req, "asgi_websocket_pop_msg"); + + fin_frame = NULL; + + if (nxt_queue_is_empty(&ws->pending_frames) + || (frame != NULL + && frame->header->opcode == NXT_WEBSOCKET_OP_CLOSE)) + { + payload_len = frame->payload_len; + + } else { + if (frame != NULL) { + payload_len = ws->pending_payload_len + frame->payload_len; + fin_frame = frame; + + } else { + payload_len = nxt_py_asgi_websocket_pending_len(ws); + } + + frame = nxt_py_asgi_websocket_pop_frame(ws); + } + + opcode = frame->header->opcode; + + if (nxt_slow_path(opcode == NXT_WEBSOCKET_OP_CONT)) { + nxt_unit_req_alert(ws->req, + "Invalid state: attempt to process CONT frame."); + + nxt_unit_websocket_done(frame); + + return PyErr_Format(PyExc_AssertionError, + "Invalid state: attempt to process CONT frame."); + } + + type = nxt_py_websocket_receive_str; + + switch (opcode) { + case NXT_WEBSOCKET_OP_TEXT: + buf = nxt_unit_malloc(frame->req->ctx, payload_len); + if (nxt_slow_path(buf == NULL)) { + nxt_unit_req_alert(ws->req, + "Failed to allocate buffer for payload (%d).", + (int) payload_len); + + nxt_unit_websocket_done(frame); + + return PyErr_Format(PyExc_RuntimeError, + "Failed to allocate buffer for payload (%d).", + (int) payload_len); + } + + data = NULL; + data_key = nxt_py_text_str; + + break; + + case NXT_WEBSOCKET_OP_BINARY: + data = PyBytes_FromStringAndSize(NULL, payload_len); + if (nxt_slow_path(data == NULL)) { + nxt_unit_req_alert(ws->req, + "Failed to create Bytes for payload (%d).", + (int) payload_len); + nxt_python_print_exception(); + + nxt_unit_websocket_done(frame); + + return PyErr_Format(PyExc_RuntimeError, + "Failed to create Bytes for payload."); + } + + buf = (char *) PyBytes_AS_STRING(data); + data_key = nxt_py_bytes_str; + + break; + + case NXT_WEBSOCKET_OP_CLOSE: + if (frame->payload_len >= 2) { + nxt_unit_websocket_read(frame, code_buf, 2); + code = ((uint16_t) code_buf[0]) << 8 | code_buf[1]; + + } else { + code = NXT_WEBSOCKET_CR_NORMAL; + } + + nxt_unit_websocket_done(frame); + + data = PyLong_FromLong(code); + if (nxt_slow_path(data == NULL)) { + nxt_unit_req_alert(ws->req, + "Failed to create Long from code %d.", + (int) code); + nxt_python_print_exception(); + + return PyErr_Format(PyExc_RuntimeError, + "Failed to create Long from code %d.", + (int) code); + } + + buf = NULL; + type = nxt_py_websocket_disconnect_str; + data_key = nxt_py_code_str; + + break; + + default: + nxt_unit_req_alert(ws->req, "Unexpected opcode %d", opcode); + + nxt_unit_websocket_done(frame); + + return PyErr_Format(PyExc_AssertionError, "Unexpected opcode %d", + opcode); + } + + if (buf != NULL) { + fin = frame->header->fin; + buf += nxt_unit_websocket_read(frame, buf, frame->payload_len); + + nxt_unit_websocket_done(frame); + + if (!fin) { + while (!nxt_queue_is_empty(&ws->pending_frames)) { + frame = nxt_py_asgi_websocket_pop_frame(ws); + fin = frame->header->fin; + + buf += nxt_unit_websocket_read(frame, buf, frame->payload_len); + + nxt_unit_websocket_done(frame); + + if (fin) { + break; + } + } + + if (fin_frame != NULL) { + buf += nxt_unit_websocket_read(fin_frame, buf, + fin_frame->payload_len); + nxt_unit_websocket_done(fin_frame); + } + } + + if (opcode == NXT_WEBSOCKET_OP_TEXT) { + buf -= payload_len; + + data = PyUnicode_DecodeUTF8(buf, payload_len, NULL); + + nxt_unit_free(ws->req->ctx, buf); + + if (nxt_slow_path(data == NULL)) { + nxt_unit_req_alert(ws->req, + "Failed to create Unicode for payload (%d).", + (int) payload_len); + nxt_python_print_exception(); + + return PyErr_Format(PyExc_RuntimeError, + "Failed to create Unicode."); + } + } + } + + msg = nxt_py_asgi_new_msg(ws->req, type); + if (nxt_slow_path(msg == NULL)) { + Py_DECREF(data); + return NULL; + } + + if (nxt_slow_path(PyDict_SetItem(msg, data_key, data) == -1)) { + nxt_unit_req_alert(ws->req, "Python failed to set 'msg.data' item"); + + Py_DECREF(msg); + Py_DECREF(data); + + return PyErr_Format(PyExc_RuntimeError, + "Python failed to set 'msg.data' item"); + } + + Py_DECREF(data); + + return msg; +} + + +static uint64_t +nxt_py_asgi_websocket_pending_len(nxt_py_asgi_websocket_t *ws) +{ + uint64_t res; + nxt_py_asgi_penging_frame_t *p; + + res = 0; + + nxt_queue_each(p, &ws->pending_frames, nxt_py_asgi_penging_frame_t, link) { + res += p->frame->payload_len; + + if (p->frame->header->fin) { + nxt_unit_req_debug(ws->req, "asgi_websocket_pending_len: %d", + (int) res); + return res; + } + } nxt_queue_loop; + + nxt_unit_req_debug(ws->req, "asgi_websocket_pending_len: %d (all)", + (int) res); + return res; +} + + +static nxt_unit_websocket_frame_t * +nxt_py_asgi_websocket_pop_frame(nxt_py_asgi_websocket_t *ws) +{ + nxt_queue_link_t *lnk; + nxt_unit_websocket_frame_t *frame; + nxt_py_asgi_penging_frame_t *p; + + lnk = nxt_queue_first(&ws->pending_frames); + nxt_queue_remove(lnk); + + p = nxt_queue_link_data(lnk, nxt_py_asgi_penging_frame_t, link); + + frame = p->frame; + ws->pending_payload_len -= frame->payload_len; + ws->pending_fins -= frame->header->fin; + + nxt_unit_free(frame->req->ctx, p); + + nxt_unit_req_debug(frame->req, "asgi_websocket_pop_frame: " + "%d, %"PRIu64", %d", + frame->header->opcode, frame->payload_len, + frame->header->fin); + + return frame; +} + + +void +nxt_py_asgi_websocket_close_handler(nxt_unit_request_info_t *req) +{ + PyObject *msg, *exc; + nxt_py_asgi_websocket_t *ws; + + ws = req->data; + + nxt_unit_req_debug(req, "asgi_websocket_close_handler"); + + if (ws->receive_future == NULL) { + ws->state = NXT_WS_DISCONNECTED; + + return; + } + + msg = nxt_py_asgi_websocket_disconnect_msg(ws); + if (nxt_slow_path(msg == NULL)) { + exc = PyErr_Occurred(); + Py_INCREF(exc); + + nxt_py_asgi_websocket_receive_fail(ws, exc); + + } else { + nxt_py_asgi_websocket_receive_done(ws, msg); + } +} + + +static PyObject * +nxt_py_asgi_websocket_disconnect_msg(nxt_py_asgi_websocket_t *ws) +{ + PyObject *msg, *code; + + msg = nxt_py_asgi_new_msg(ws->req, nxt_py_websocket_disconnect_str); + if (nxt_slow_path(msg == NULL)) { + return NULL; + } + + code = PyLong_FromLong(NXT_WEBSOCKET_CR_GOING_AWAY); + if (nxt_slow_path(code == NULL)) { + nxt_unit_req_alert(ws->req, "Python failed to create long"); + nxt_python_print_exception(); + + Py_DECREF(msg); + + return PyErr_Format(PyExc_RuntimeError, "failed to create long"); + } + + if (nxt_slow_path(PyDict_SetItem(msg, nxt_py_code_str, code) == -1)) { + nxt_unit_req_alert(ws->req, "Python failed to set 'msg.code' item"); + + Py_DECREF(msg); + Py_DECREF(code); + + return PyErr_Format(PyExc_RuntimeError, + "Python failed to set 'msg.code' item"); + } + + Py_DECREF(code); + + return msg; +} + + +static PyObject * +nxt_py_asgi_websocket_done(PyObject *self, PyObject *future) +{ + int rc; + uint16_t status_code; + PyObject *res; + nxt_py_asgi_websocket_t *ws; + + ws = (nxt_py_asgi_websocket_t *) self; + + nxt_unit_req_debug(ws->req, "asgi_websocket_done: %p", self); + + /* + * Get Future.result() and it raises an exception, if coroutine exited + * with exception. + */ + res = PyObject_CallMethodObjArgs(future, nxt_py_result_str, NULL); + if (nxt_slow_path(res == NULL)) { + nxt_unit_req_error(ws->req, + "Python failed to call 'future.result()'"); + nxt_python_print_exception(); + + rc = NXT_UNIT_ERROR; + + } else { + Py_DECREF(res); + + rc = NXT_UNIT_OK; + } + + if (ws->state == NXT_WS_ACCEPTED) { + status_code = (rc == NXT_UNIT_OK) + ? htons(NXT_WEBSOCKET_CR_NORMAL) + : htons(NXT_WEBSOCKET_CR_INTERNAL_SERVER_ERROR); + + rc = nxt_unit_websocket_send(ws->req, NXT_WEBSOCKET_OP_CLOSE, + 1, &status_code, 2); + } + + while (!nxt_queue_is_empty(&ws->pending_frames)) { + nxt_unit_websocket_done(nxt_py_asgi_websocket_pop_frame(ws)); + } + + nxt_unit_request_done(ws->req, rc); + + Py_RETURN_NONE; +} + + +#endif /* NXT_HAVE_ASGI */ diff --git a/src/nxt_python_wsgi.c b/src/python/nxt_python_wsgi.c index c4b7702e..97030cd3 100644 --- a/src/nxt_python_wsgi.c +++ b/src/python/nxt_python_wsgi.c @@ -8,17 +8,15 @@ #include <Python.h> -#include <compile.h> -#include <node.h> - #include <nxt_main.h> -#include <nxt_runtime.h> #include <nxt_router.h> #include <nxt_unit.h> #include <nxt_unit_field.h> #include <nxt_unit_request.h> #include <nxt_unit_response.h> +#include <python/nxt_python.h> + #include NXT_PYTHON_MOUNTS_H /* @@ -40,23 +38,6 @@ */ -#if PY_MAJOR_VERSION == 3 -#define NXT_PYTHON_BYTES_TYPE "bytestring" - -#define PyString_FromStringAndSize(str, size) \ - PyUnicode_DecodeLatin1((str), (size), "strict") - -#else -#define NXT_PYTHON_BYTES_TYPE "string" - -#define PyBytes_FromStringAndSize PyString_FromStringAndSize -#define PyBytes_Check PyString_Check -#define PyBytes_GET_SIZE PyString_GET_SIZE -#define PyBytes_AS_STRING PyString_AS_STRING -#define PyUnicode_InternInPlace PyString_InternInPlace -#define PyUnicode_AsUTF8 PyString_AS_STRING -#endif - typedef struct nxt_python_run_ctx_s nxt_python_run_ctx_t; typedef struct { @@ -68,18 +49,17 @@ typedef struct { PyObject_HEAD } nxt_py_error_t; -static nxt_int_t nxt_python_start(nxt_task_t *task, - nxt_process_data_t *data); -static nxt_int_t nxt_python_init_strings(void); static void nxt_python_request_handler(nxt_unit_request_info_t *req); -static void nxt_python_atexit(void); static PyObject *nxt_python_create_environ(nxt_task_t *task); static PyObject *nxt_python_get_environ(nxt_python_run_ctx_t *ctx); static int nxt_python_add_sptr(nxt_python_run_ctx_t *ctx, PyObject *name, nxt_unit_sptr_t *sptr, uint32_t size); static int nxt_python_add_field(nxt_python_run_ctx_t *ctx, - nxt_unit_field_t *field); + nxt_unit_field_t *field, int n, uint32_t vl); +static PyObject *nxt_python_field_name(const char *name, uint8_t len); +static PyObject *nxt_python_field_value(nxt_unit_field_t *f, int n, + uint32_t vl); static int nxt_python_add_obj(nxt_python_run_ctx_t *ctx, PyObject *name, PyObject *value); @@ -99,7 +79,6 @@ static PyObject *nxt_py_input_readlines(nxt_py_input_t *self, PyObject *args); static PyObject *nxt_py_input_iter(PyObject *self); static PyObject *nxt_py_input_next(PyObject *self); -static void nxt_python_print_exception(void); static int nxt_python_write(nxt_python_run_ctx_t *ctx, PyObject *bytes); struct nxt_python_run_ctx_s { @@ -109,22 +88,6 @@ struct nxt_python_run_ctx_s { nxt_unit_request_info_t *req; }; -static uint32_t compat[] = { - NXT_VERNUM, NXT_DEBUG, -}; - - -NXT_EXPORT nxt_app_module_t nxt_app_module = { - sizeof(compat), - compat, - nxt_string("python"), - PY_VERSION, - nxt_python_mounts, - nxt_nitems(nxt_python_mounts), - NULL, - nxt_python_start, -}; - static PyMethodDef nxt_py_start_resp_method[] = { {"unit_start_response", nxt_py_start_resp, METH_VARARGS, ""} @@ -158,22 +121,13 @@ static PyTypeObject nxt_py_input_type = { }; -static PyObject *nxt_py_stderr_flush; -static PyObject *nxt_py_application; static PyObject *nxt_py_start_resp_obj; static PyObject *nxt_py_write_obj; static PyObject *nxt_py_environ_ptyp; -#if PY_MAJOR_VERSION == 3 -static wchar_t *nxt_py_home; -#else -static char *nxt_py_home; -#endif - static PyThreadState *nxt_python_thread_state; static nxt_python_run_ctx_t *nxt_python_run_ctx; - static PyObject *nxt_py_80_str; static PyObject *nxt_py_close_str; static PyObject *nxt_py_content_length_str; @@ -191,11 +145,6 @@ static PyObject *nxt_py_server_port_str; static PyObject *nxt_py_server_protocol_str; static PyObject *nxt_py_wsgi_uri_scheme_str; -typedef struct { - nxt_str_t string; - PyObject **object_p; -} nxt_python_string_t; - static nxt_python_string_t nxt_python_strings[] = { { nxt_string("80"), &nxt_py_80_str }, { nxt_string("close"), &nxt_py_close_str }, @@ -213,136 +162,22 @@ static nxt_python_string_t nxt_python_strings[] = { { nxt_string("SERVER_PORT"), &nxt_py_server_port_str }, { nxt_string("SERVER_PROTOCOL"), &nxt_py_server_protocol_str }, { nxt_string("wsgi.url_scheme"), &nxt_py_wsgi_uri_scheme_str }, + { nxt_null_string, NULL }, }; -static nxt_int_t -nxt_python_start(nxt_task_t *task, nxt_process_data_t *data) +nxt_int_t +nxt_python_wsgi_init(nxt_task_t *task, nxt_unit_init_t *init) { - int rc; - char *nxt_py_module; - size_t len; - PyObject *obj, *pypath, *module; - nxt_unit_ctx_t *unit_ctx; - nxt_unit_init_t python_init; - nxt_common_app_conf_t *app_conf; - nxt_python_app_conf_t *c; -#if PY_MAJOR_VERSION == 3 - char *path; - size_t size; - nxt_int_t pep405; - - static const char pyvenv[] = "/pyvenv.cfg"; - static const char bin_python[] = "/bin/python"; -#endif - - app_conf = data->app; - c = &app_conf->u.python; - - if (c->home != NULL) { - len = nxt_strlen(c->home); + PyObject *obj; -#if PY_MAJOR_VERSION == 3 - - path = nxt_malloc(len + sizeof(pyvenv)); - if (nxt_slow_path(path == NULL)) { - nxt_alert(task, "Failed to allocate memory"); - return NXT_ERROR; - } - - nxt_memcpy(path, c->home, len); - nxt_memcpy(path + len, pyvenv, sizeof(pyvenv)); - - pep405 = (access(path, R_OK) == 0); - - nxt_free(path); - - if (pep405) { - size = (len + sizeof(bin_python)) * sizeof(wchar_t); - - } else { - size = (len + 1) * sizeof(wchar_t); - } - - nxt_py_home = nxt_malloc(size); - if (nxt_slow_path(nxt_py_home == NULL)) { - nxt_alert(task, "Failed to allocate memory"); - return NXT_ERROR; - } - - if (pep405) { - mbstowcs(nxt_py_home, c->home, len); - mbstowcs(nxt_py_home + len, bin_python, sizeof(bin_python)); - Py_SetProgramName(nxt_py_home); - - } else { - mbstowcs(nxt_py_home, c->home, len + 1); - Py_SetPythonHome(nxt_py_home); - } - -#else - nxt_py_home = nxt_malloc(len + 1); - if (nxt_slow_path(nxt_py_home == NULL)) { - nxt_alert(task, "Failed to allocate memory"); - return NXT_ERROR; - } - - nxt_memcpy(nxt_py_home, c->home, len + 1); - Py_SetPythonHome(nxt_py_home); -#endif - } - - Py_InitializeEx(0); - - module = NULL; obj = NULL; - if (nxt_slow_path(nxt_python_init_strings() != NXT_OK)) { + if (nxt_slow_path(nxt_python_init_strings(nxt_python_strings) != NXT_OK)) { nxt_alert(task, "Python failed to init string objects"); goto fail; } - obj = PySys_GetObject((char *) "stderr"); - if (nxt_slow_path(obj == NULL)) { - nxt_alert(task, "Python failed to get \"sys.stderr\" object"); - goto fail; - } - - nxt_py_stderr_flush = PyObject_GetAttrString(obj, "flush"); - if (nxt_slow_path(nxt_py_stderr_flush == NULL)) { - nxt_alert(task, "Python failed to get \"flush\" attribute of " - "\"sys.stderr\" object"); - goto fail; - } - - Py_DECREF(obj); - - if (c->path.length > 0) { - obj = PyString_FromStringAndSize((char *) c->path.start, - c->path.length); - - if (nxt_slow_path(obj == NULL)) { - nxt_alert(task, "Python failed to create string object \"%V\"", - &c->path); - goto fail; - } - - pypath = PySys_GetObject((char *) "path"); - - if (nxt_slow_path(pypath == NULL)) { - nxt_alert(task, "Python failed to get \"sys.path\" list"); - goto fail; - } - - if (nxt_slow_path(PyList_Insert(pypath, 0, obj) != 0)) { - nxt_alert(task, "Python failed to insert \"%V\" into \"sys.path\"", - &c->path); - goto fail; - } - - Py_DECREF(obj); - } - obj = PyCFunction_New(nxt_py_start_resp_method, NULL); if (nxt_slow_path(obj == NULL)) { nxt_alert(task, @@ -366,107 +201,43 @@ nxt_python_start(nxt_task_t *task, nxt_process_data_t *data) } nxt_py_environ_ptyp = obj; - - obj = Py_BuildValue("[s]", "unit"); - if (nxt_slow_path(obj == NULL)) { - nxt_alert(task, "Python failed to create the \"sys.argv\" list"); - goto fail; - } - - if (nxt_slow_path(PySys_SetObject((char *) "argv", obj) != 0)) { - nxt_alert(task, "Python failed to set the \"sys.argv\" list"); - goto fail; - } - - Py_CLEAR(obj); - - nxt_py_module = nxt_alloca(c->module.length + 1); - nxt_memcpy(nxt_py_module, c->module.start, c->module.length); - nxt_py_module[c->module.length] = '\0'; - - module = PyImport_ImportModule(nxt_py_module); - if (nxt_slow_path(module == NULL)) { - nxt_alert(task, "Python failed to import module \"%s\"", nxt_py_module); - nxt_python_print_exception(); - goto fail; - } - - obj = PyDict_GetItemString(PyModule_GetDict(module), "application"); - if (nxt_slow_path(obj == NULL)) { - nxt_alert(task, "Python failed to get \"application\" " - "from module \"%s\"", nxt_py_module); - goto fail; - } - - if (nxt_slow_path(PyCallable_Check(obj) == 0)) { - nxt_alert(task, "\"application\" in module \"%s\" " - "is not a callable object", nxt_py_module); - goto fail; - } - - Py_INCREF(obj); - Py_CLEAR(module); - - nxt_py_application = obj; obj = NULL; - nxt_unit_default_init(task, &python_init); - - python_init.callbacks.request_handler = nxt_python_request_handler; - python_init.shm_limit = data->app->shm_limit; - - unit_ctx = nxt_unit_init(&python_init); - if (nxt_slow_path(unit_ctx == NULL)) { - goto fail; - } - - nxt_python_thread_state = PyEval_SaveThread(); - - rc = nxt_unit_run(unit_ctx); - - nxt_unit_done(unit_ctx); - - PyEval_RestoreThread(nxt_python_thread_state); - - nxt_python_atexit(); - - exit(rc); + init->callbacks.request_handler = nxt_python_request_handler; return NXT_OK; fail: Py_XDECREF(obj); - Py_XDECREF(module); - - nxt_python_atexit(); return NXT_ERROR; } -static nxt_int_t -nxt_python_init_strings(void) +int +nxt_python_wsgi_run(nxt_unit_ctx_t *ctx) { - PyObject *obj; - nxt_uint_t i; - nxt_python_string_t *pstr; + int rc; - for (i = 0; i < nxt_nitems(nxt_python_strings); i++) { - pstr = &nxt_python_strings[i]; + nxt_python_thread_state = PyEval_SaveThread(); - obj = PyString_FromStringAndSize((char *) pstr->string.start, - pstr->string.length); - if (nxt_slow_path(obj == NULL)) { - return NXT_ERROR; - } + rc = nxt_unit_run(ctx); - PyUnicode_InternInPlace(&obj); + PyEval_RestoreThread(nxt_python_thread_state); - *pstr->object_p = obj; - } + return rc; +} - return NXT_OK; + +void +nxt_python_wsgi_done(void) +{ + nxt_python_done_strings(nxt_python_strings); + + Py_XDECREF(nxt_py_start_resp_obj); + Py_XDECREF(nxt_py_write_obj); + Py_XDECREF(nxt_py_environ_ptyp); } @@ -597,29 +368,6 @@ done: } -static void -nxt_python_atexit(void) -{ - nxt_uint_t i; - - for (i = 0; i < nxt_nitems(nxt_python_strings); i++) { - Py_XDECREF(*nxt_python_strings[i].object_p); - } - - Py_XDECREF(nxt_py_stderr_flush); - Py_XDECREF(nxt_py_application); - Py_XDECREF(nxt_py_start_resp_obj); - Py_XDECREF(nxt_py_write_obj); - Py_XDECREF(nxt_py_environ_ptyp); - - Py_Finalize(); - - if (nxt_py_home != NULL) { - nxt_free(nxt_py_home); - } -} - - static PyObject * nxt_python_create_environ(nxt_task_t *task) { @@ -749,9 +497,9 @@ static PyObject * nxt_python_get_environ(nxt_python_run_ctx_t *ctx) { int rc; - uint32_t i; + uint32_t i, j, vl; PyObject *environ; - nxt_unit_field_t *f; + nxt_unit_field_t *f, *f2; nxt_unit_request_t *r; environ = PyDict_Copy(nxt_py_environ_ptyp); @@ -803,10 +551,27 @@ nxt_python_get_environ(nxt_python_run_ctx_t *ctx) r->server_name_length)); RC(nxt_python_add_obj(ctx, nxt_py_server_port_str, nxt_py_80_str)); - for (i = 0; i < r->fields_count; i++) { + nxt_unit_request_group_dup_fields(ctx->req); + + for (i = 0; i < r->fields_count;) { f = r->fields + i; + vl = f->value_length; + + for (j = i + 1; j < r->fields_count; j++) { + f2 = r->fields + j; + + if (f2->hash != f->hash + || nxt_unit_sptr_get(&f2->name) != nxt_unit_sptr_get(&f->name)) + { + break; + } - RC(nxt_python_add_field(ctx, f)); + vl += 2 + f2->value_length; + } + + RC(nxt_python_add_field(ctx, f, j - i, vl)); + + i = j; } if (r->content_length_field != NXT_UNIT_NONE_FIELD) { @@ -870,14 +635,15 @@ nxt_python_add_sptr(nxt_python_run_ctx_t *ctx, PyObject *name, static int -nxt_python_add_field(nxt_python_run_ctx_t *ctx, nxt_unit_field_t *field) +nxt_python_add_field(nxt_python_run_ctx_t *ctx, nxt_unit_field_t *field, int n, + uint32_t vl) { char *src; PyObject *name, *value; src = nxt_unit_sptr_get(&field->name); - name = PyString_FromStringAndSize(src, field->name_length); + name = nxt_python_field_name(src, field->name_length); if (nxt_slow_path(name == NULL)) { nxt_unit_req_error(ctx->req, "Python failed to create name string \"%.*s\"", @@ -887,13 +653,13 @@ nxt_python_add_field(nxt_python_run_ctx_t *ctx, nxt_unit_field_t *field) return NXT_UNIT_ERROR; } - src = nxt_unit_sptr_get(&field->value); + value = nxt_python_field_value(field, n, vl); - value = PyString_FromStringAndSize(src, field->value_length); if (nxt_slow_path(value == NULL)) { nxt_unit_req_error(ctx->req, "Python failed to create value string \"%.*s\"", - (int) field->value_length, src); + (int) field->value_length, + (char *) nxt_unit_sptr_get(&field->value)); nxt_python_print_exception(); goto fail; @@ -920,6 +686,80 @@ fail: } +static PyObject * +nxt_python_field_name(const char *name, uint8_t len) +{ + char *p, c; + uint8_t i; + PyObject *res; + +#if PY_MAJOR_VERSION == 3 + res = PyUnicode_New(len + 5, 255); +#else + res = PyString_FromStringAndSize(NULL, len + 5); +#endif + + if (nxt_slow_path(res == NULL)) { + return NULL; + } + + p = PyString_AS_STRING(res); + + p = nxt_cpymem(p, "HTTP_", 5); + + for (i = 0; i < len; i++) { + c = name[i]; + + if (c >= 'a' && c <= 'z') { + *p++ = (c & ~0x20); + continue; + } + + if (c == '-') { + *p++ = '_'; + continue; + } + + *p++ = c; + } + + return res; +} + + +static PyObject * +nxt_python_field_value(nxt_unit_field_t *f, int n, uint32_t vl) +{ + int i; + char *p, *src; + PyObject *res; + +#if PY_MAJOR_VERSION == 3 + res = PyUnicode_New(vl, 255); +#else + res = PyString_FromStringAndSize(NULL, vl); +#endif + + if (nxt_slow_path(res == NULL)) { + return NULL; + } + + p = PyString_AS_STRING(res); + + src = nxt_unit_sptr_get(&f->value); + p = nxt_cpymem(p, src, f->value_length); + + for (i = 1; i < n; i++) { + p = nxt_cpymem(p, ", ", 2); + + src = nxt_unit_sptr_get(&f[i].value); + p = nxt_cpymem(p, src, f[i].value_length); + } + + return res; +} + + static int nxt_python_add_obj(nxt_python_run_ctx_t *ctx, PyObject *name, PyObject *value) { @@ -1386,28 +1226,6 @@ nxt_py_input_next(PyObject *self) } -static void -nxt_python_print_exception(void) -{ - PyErr_Print(); - -#if PY_MAJOR_VERSION == 3 - /* The backtrace may be buffered in sys.stderr file object. */ - { - PyObject *result; - - result = PyObject_CallFunction(nxt_py_stderr_flush, NULL); - if (nxt_slow_path(result == NULL)) { - PyErr_Clear(); - return; - } - - Py_DECREF(result); - } -#endif -} - - static int nxt_python_write(nxt_python_run_ctx_t *ctx, PyObject *bytes) { |