diff options
Diffstat (limited to 'src/python')
-rw-r--r-- | src/python/nxt_python.c | 340 | ||||
-rw-r--r-- | src/python/nxt_python.h | 60 | ||||
-rw-r--r-- | src/python/nxt_python_asgi.c | 1227 | ||||
-rw-r--r-- | src/python/nxt_python_asgi.h | 60 | ||||
-rw-r--r-- | src/python/nxt_python_asgi_http.c | 591 | ||||
-rw-r--r-- | src/python/nxt_python_asgi_lifespan.c | 505 | ||||
-rw-r--r-- | src/python/nxt_python_asgi_str.c | 141 | ||||
-rw-r--r-- | src/python/nxt_python_asgi_str.h | 69 | ||||
-rw-r--r-- | src/python/nxt_python_asgi_websocket.c | 1084 | ||||
-rw-r--r-- | src/python/nxt_python_wsgi.c | 1264 |
10 files changed, 5341 insertions, 0 deletions
diff --git a/src/python/nxt_python.c b/src/python/nxt_python.c new file mode 100644 index 00000000..01534a47 --- /dev/null +++ b/src/python/nxt_python.c @@ -0,0 +1,340 @@ + +/* + * Copyright (C) NGINX, Inc. + */ + + +#include <Python.h> + +#include <nxt_main.h> +#include <nxt_router.h> +#include <nxt_unit.h> + +#include <python/nxt_python.h> + +#include NXT_PYTHON_MOUNTS_H + + +static nxt_int_t nxt_python_start(nxt_task_t *task, + nxt_process_data_t *data); +static void nxt_python_atexit(void); + +static uint32_t compat[] = { + NXT_VERNUM, NXT_DEBUG, +}; + + +NXT_EXPORT nxt_app_module_t nxt_app_module = { + sizeof(compat), + compat, + nxt_string("python"), + PY_VERSION, + nxt_python_mounts, + nxt_nitems(nxt_python_mounts), + NULL, + nxt_python_start, +}; + +static PyObject *nxt_py_stderr_flush; +PyObject *nxt_py_application; + +#if PY_MAJOR_VERSION == 3 +static wchar_t *nxt_py_home; +#else +static char *nxt_py_home; +#endif + + +static nxt_int_t +nxt_python_start(nxt_task_t *task, nxt_process_data_t *data) +{ + int rc, asgi; + char *nxt_py_module; + size_t len; + PyObject *obj, *pypath, *module; + const char *callable; + nxt_unit_ctx_t *unit_ctx; + nxt_unit_init_t python_init; + nxt_common_app_conf_t *app_conf; + nxt_python_app_conf_t *c; +#if PY_MAJOR_VERSION == 3 + char *path; + size_t size; + nxt_int_t pep405; + + static const char pyvenv[] = "/pyvenv.cfg"; + static const char bin_python[] = "/bin/python"; +#endif + + app_conf = data->app; + c = &app_conf->u.python; + + if (c->home != NULL) { + len = nxt_strlen(c->home); + +#if PY_MAJOR_VERSION == 3 + + path = nxt_malloc(len + sizeof(pyvenv)); + if (nxt_slow_path(path == NULL)) { + nxt_alert(task, "Failed to allocate memory"); + return NXT_ERROR; + } + + nxt_memcpy(path, c->home, len); + nxt_memcpy(path + len, pyvenv, sizeof(pyvenv)); + + pep405 = (access(path, R_OK) == 0); + + nxt_free(path); + + if (pep405) { + size = (len + sizeof(bin_python)) * sizeof(wchar_t); + + } else { + size = (len + 1) * sizeof(wchar_t); + } + + nxt_py_home = nxt_malloc(size); + if (nxt_slow_path(nxt_py_home == NULL)) { + nxt_alert(task, "Failed to allocate memory"); + return NXT_ERROR; + } + + if (pep405) { + mbstowcs(nxt_py_home, c->home, len); + mbstowcs(nxt_py_home + len, bin_python, sizeof(bin_python)); + Py_SetProgramName(nxt_py_home); + + } else { + mbstowcs(nxt_py_home, c->home, len + 1); + Py_SetPythonHome(nxt_py_home); + } + +#else + nxt_py_home = nxt_malloc(len + 1); + if (nxt_slow_path(nxt_py_home == NULL)) { + nxt_alert(task, "Failed to allocate memory"); + return NXT_ERROR; + } + + nxt_memcpy(nxt_py_home, c->home, len + 1); + Py_SetPythonHome(nxt_py_home); +#endif + } + + Py_InitializeEx(0); + + module = NULL; + obj = NULL; + + obj = PySys_GetObject((char *) "stderr"); + if (nxt_slow_path(obj == NULL)) { + nxt_alert(task, "Python failed to get \"sys.stderr\" object"); + goto fail; + } + + nxt_py_stderr_flush = PyObject_GetAttrString(obj, "flush"); + if (nxt_slow_path(nxt_py_stderr_flush == NULL)) { + nxt_alert(task, "Python failed to get \"flush\" attribute of " + "\"sys.stderr\" object"); + goto fail; + } + + /* obj is a Borrowed reference. */ + + if (c->path.length > 0) { + obj = PyString_FromStringAndSize((char *) c->path.start, + c->path.length); + + if (nxt_slow_path(obj == NULL)) { + nxt_alert(task, "Python failed to create string object \"%V\"", + &c->path); + goto fail; + } + + pypath = PySys_GetObject((char *) "path"); + + if (nxt_slow_path(pypath == NULL)) { + nxt_alert(task, "Python failed to get \"sys.path\" list"); + goto fail; + } + + if (nxt_slow_path(PyList_Insert(pypath, 0, obj) != 0)) { + nxt_alert(task, "Python failed to insert \"%V\" into \"sys.path\"", + &c->path); + goto fail; + } + + Py_DECREF(obj); + } + + obj = Py_BuildValue("[s]", "unit"); + if (nxt_slow_path(obj == NULL)) { + nxt_alert(task, "Python failed to create the \"sys.argv\" list"); + goto fail; + } + + if (nxt_slow_path(PySys_SetObject((char *) "argv", obj) != 0)) { + nxt_alert(task, "Python failed to set the \"sys.argv\" list"); + goto fail; + } + + Py_CLEAR(obj); + + nxt_py_module = nxt_alloca(c->module.length + 1); + nxt_memcpy(nxt_py_module, c->module.start, c->module.length); + nxt_py_module[c->module.length] = '\0'; + + module = PyImport_ImportModule(nxt_py_module); + if (nxt_slow_path(module == NULL)) { + nxt_alert(task, "Python failed to import module \"%s\"", nxt_py_module); + nxt_python_print_exception(); + goto fail; + } + + callable = (c->callable != NULL) ? c->callable : "application"; + + obj = PyDict_GetItemString(PyModule_GetDict(module), callable); + if (nxt_slow_path(obj == NULL)) { + nxt_alert(task, "Python failed to get \"%s\" " + "from module \"%s\"", callable, nxt_py_module); + goto fail; + } + + if (nxt_slow_path(PyCallable_Check(obj) == 0)) { + nxt_alert(task, "\"%s\" in module \"%s\" " + "is not a callable object", callable, nxt_py_module); + goto fail; + } + + nxt_py_application = obj; + obj = NULL; + + Py_INCREF(nxt_py_application); + + Py_CLEAR(module); + + nxt_unit_default_init(task, &python_init); + + python_init.shm_limit = data->app->shm_limit; + + asgi = nxt_python_asgi_check(nxt_py_application); + + if (asgi) { + rc = nxt_python_asgi_init(task, &python_init); + + } else { + rc = nxt_python_wsgi_init(task, &python_init); + } + + if (nxt_slow_path(rc == NXT_ERROR)) { + goto fail; + } + + unit_ctx = nxt_unit_init(&python_init); + if (nxt_slow_path(unit_ctx == NULL)) { + goto fail; + } + + if (asgi) { + rc = nxt_python_asgi_run(unit_ctx); + + } else { + rc = nxt_python_wsgi_run(unit_ctx); + } + + nxt_unit_done(unit_ctx); + + nxt_python_atexit(); + + exit(rc); + + return NXT_OK; + +fail: + + Py_XDECREF(obj); + Py_XDECREF(module); + + nxt_python_atexit(); + + return NXT_ERROR; +} + + +nxt_int_t +nxt_python_init_strings(nxt_python_string_t *pstr) +{ + PyObject *obj; + + while (pstr->string.start != NULL) { + obj = PyString_FromStringAndSize((char *) pstr->string.start, + pstr->string.length); + if (nxt_slow_path(obj == NULL)) { + return NXT_ERROR; + } + + PyUnicode_InternInPlace(&obj); + + *pstr->object_p = obj; + + pstr++; + } + + return NXT_OK; +} + + +void +nxt_python_done_strings(nxt_python_string_t *pstr) +{ + PyObject *obj; + + while (pstr->string.start != NULL) { + obj = *pstr->object_p; + + Py_XDECREF(obj); + *pstr->object_p = NULL; + + pstr++; + } +} + + +static void +nxt_python_atexit(void) +{ + nxt_python_wsgi_done(); + nxt_python_asgi_done(); + + Py_XDECREF(nxt_py_stderr_flush); + Py_XDECREF(nxt_py_application); + + Py_Finalize(); + + if (nxt_py_home != NULL) { + nxt_free(nxt_py_home); + } +} + + +void +nxt_python_print_exception(void) +{ + PyErr_Print(); + +#if PY_MAJOR_VERSION == 3 + /* The backtrace may be buffered in sys.stderr file object. */ + { + PyObject *result; + + result = PyObject_CallFunction(nxt_py_stderr_flush, NULL); + if (nxt_slow_path(result == NULL)) { + PyErr_Clear(); + return; + } + + Py_DECREF(result); + } +#endif +} diff --git a/src/python/nxt_python.h b/src/python/nxt_python.h new file mode 100644 index 00000000..3211026b --- /dev/null +++ b/src/python/nxt_python.h @@ -0,0 +1,60 @@ + +/* + * Copyright (C) NGINX, Inc. + */ + +#ifndef _NXT_PYTHON_H_INCLUDED_ +#define _NXT_PYTHON_H_INCLUDED_ + + +#include <Python.h> +#include <nxt_main.h> +#include <nxt_unit.h> + + +#if PY_MAJOR_VERSION == 3 +#define NXT_PYTHON_BYTES_TYPE "bytestring" + +#define PyString_FromStringAndSize(str, size) \ + PyUnicode_DecodeLatin1((str), (size), "strict") +#define PyString_AS_STRING PyUnicode_DATA + +#else +#define NXT_PYTHON_BYTES_TYPE "string" + +#define PyBytes_FromStringAndSize PyString_FromStringAndSize +#define PyBytes_Check PyString_Check +#define PyBytes_GET_SIZE PyString_GET_SIZE +#define PyBytes_AS_STRING PyString_AS_STRING +#define PyUnicode_InternInPlace PyString_InternInPlace +#define PyUnicode_AsUTF8 PyString_AS_STRING +#endif + +#if PY_MAJOR_VERSION == 3 && PY_MINOR_VERSION >= 5 +#define NXT_HAVE_ASGI 1 +#endif + +extern PyObject *nxt_py_application; + +typedef struct { + nxt_str_t string; + PyObject **object_p; +} nxt_python_string_t; + + +nxt_int_t nxt_python_init_strings(nxt_python_string_t *pstr); +void nxt_python_done_strings(nxt_python_string_t *pstr); + +void nxt_python_print_exception(void); + +nxt_int_t nxt_python_wsgi_init(nxt_task_t *task, nxt_unit_init_t *init); +int nxt_python_wsgi_run(nxt_unit_ctx_t *ctx); +void nxt_python_wsgi_done(void); + +int nxt_python_asgi_check(PyObject *obj); +nxt_int_t nxt_python_asgi_init(nxt_task_t *task, nxt_unit_init_t *init); +nxt_int_t nxt_python_asgi_run(nxt_unit_ctx_t *ctx); +void nxt_python_asgi_done(void); + + +#endif /* _NXT_PYTHON_H_INCLUDED_ */ diff --git a/src/python/nxt_python_asgi.c b/src/python/nxt_python_asgi.c new file mode 100644 index 00000000..72408ea1 --- /dev/null +++ b/src/python/nxt_python_asgi.c @@ -0,0 +1,1227 @@ + +/* + * Copyright (C) NGINX, Inc. + */ + + +#include <python/nxt_python.h> + +#if (NXT_HAVE_ASGI) + +#include <nxt_main.h> +#include <nxt_unit.h> +#include <nxt_unit_request.h> +#include <nxt_unit_response.h> +#include <python/nxt_python_asgi.h> +#include <python/nxt_python_asgi_str.h> + + +static void nxt_py_asgi_request_handler(nxt_unit_request_info_t *req); + +static PyObject *nxt_py_asgi_create_http_scope(nxt_unit_request_info_t *req); +static PyObject *nxt_py_asgi_create_address(nxt_unit_sptr_t *sptr, uint8_t len, + uint16_t port); +static PyObject *nxt_py_asgi_create_header(nxt_unit_field_t *f); +static PyObject *nxt_py_asgi_create_subprotocols(nxt_unit_field_t *f); + +static int nxt_py_asgi_add_port(nxt_unit_ctx_t *ctx, nxt_unit_port_t *port); +static void nxt_py_asgi_remove_port(nxt_unit_t *lib, nxt_unit_port_t *port); +static void nxt_py_asgi_quit(nxt_unit_ctx_t *ctx); +static void nxt_py_asgi_shm_ack_handler(nxt_unit_ctx_t *ctx); + +static PyObject *nxt_py_asgi_port_read(PyObject *self, PyObject *args); + + +PyObject *nxt_py_loop_run_until_complete; +PyObject *nxt_py_loop_create_future; +PyObject *nxt_py_loop_create_task; + +nxt_queue_t nxt_py_asgi_drain_queue; + +static PyObject *nxt_py_loop_call_soon; +static PyObject *nxt_py_quit_future; +static PyObject *nxt_py_quit_future_set_result; +static PyObject *nxt_py_loop_add_reader; +static PyObject *nxt_py_loop_remove_reader; +static PyObject *nxt_py_port_read; + +static PyMethodDef nxt_py_port_read_method = + {"unit_port_read", nxt_py_asgi_port_read, METH_VARARGS, ""}; + +#define NXT_UNIT_HASH_WS_PROTOCOL 0xED0A + + +int +nxt_python_asgi_check(PyObject *obj) +{ + int res; + PyObject *call; + PyCodeObject *code; + + if (PyFunction_Check(obj)) { + code = (PyCodeObject *) PyFunction_GET_CODE(obj); + + return (code->co_flags & CO_COROUTINE) != 0; + } + + if (PyMethod_Check(obj)) { + obj = PyMethod_GET_FUNCTION(obj); + + code = (PyCodeObject *) PyFunction_GET_CODE(obj); + + return (code->co_flags & CO_COROUTINE) != 0; + } + + call = PyObject_GetAttrString(obj, "__call__"); + + if (call == NULL) { + return 0; + } + + if (PyFunction_Check(call)) { + code = (PyCodeObject *) PyFunction_GET_CODE(call); + + res = (code->co_flags & CO_COROUTINE) != 0; + + } else { + if (PyMethod_Check(call)) { + obj = PyMethod_GET_FUNCTION(call); + + code = (PyCodeObject *) PyFunction_GET_CODE(obj); + + res = (code->co_flags & CO_COROUTINE) != 0; + + } else { + res = 0; + } + } + + Py_DECREF(call); + + return res; +} + + +nxt_int_t +nxt_python_asgi_init(nxt_task_t *task, nxt_unit_init_t *init) +{ + PyObject *asyncio, *loop, *get_event_loop; + nxt_int_t rc; + + nxt_debug(task, "asgi_init"); + + if (nxt_slow_path(nxt_py_asgi_str_init() != NXT_OK)) { + nxt_alert(task, "Python failed to init string objects"); + return NXT_ERROR; + } + + asyncio = PyImport_ImportModule("asyncio"); + if (nxt_slow_path(asyncio == NULL)) { + nxt_alert(task, "Python failed to import module 'asyncio'"); + nxt_python_print_exception(); + return NXT_ERROR; + } + + loop = NULL; + get_event_loop = PyDict_GetItemString(PyModule_GetDict(asyncio), + "get_event_loop"); + if (nxt_slow_path(get_event_loop == NULL)) { + nxt_alert(task, + "Python failed to get 'get_event_loop' from module 'asyncio'"); + goto fail; + } + + if (nxt_slow_path(PyCallable_Check(get_event_loop) == 0)) { + nxt_alert(task, "'asyncio.get_event_loop' is not a callable object"); + goto fail; + } + + loop = PyObject_CallObject(get_event_loop, NULL); + if (nxt_slow_path(loop == NULL)) { + nxt_alert(task, "Python failed to call 'asyncio.get_event_loop'"); + goto fail; + } + + nxt_py_loop_create_task = PyObject_GetAttrString(loop, "create_task"); + if (nxt_slow_path(nxt_py_loop_create_task == NULL)) { + nxt_alert(task, "Python failed to get 'loop.create_task'"); + goto fail; + } + + if (nxt_slow_path(PyCallable_Check(nxt_py_loop_create_task) == 0)) { + nxt_alert(task, "'loop.create_task' is not a callable object"); + goto fail; + } + + nxt_py_loop_add_reader = PyObject_GetAttrString(loop, "add_reader"); + if (nxt_slow_path(nxt_py_loop_add_reader == NULL)) { + nxt_alert(task, "Python failed to get 'loop.add_reader'"); + goto fail; + } + + if (nxt_slow_path(PyCallable_Check(nxt_py_loop_add_reader) == 0)) { + nxt_alert(task, "'loop.add_reader' is not a callable object"); + goto fail; + } + + nxt_py_loop_remove_reader = PyObject_GetAttrString(loop, "remove_reader"); + if (nxt_slow_path(nxt_py_loop_remove_reader == NULL)) { + nxt_alert(task, "Python failed to get 'loop.remove_reader'"); + goto fail; + } + + if (nxt_slow_path(PyCallable_Check(nxt_py_loop_remove_reader) == 0)) { + nxt_alert(task, "'loop.remove_reader' is not a callable object"); + goto fail; + } + + nxt_py_loop_call_soon = PyObject_GetAttrString(loop, "call_soon"); + if (nxt_slow_path(nxt_py_loop_call_soon == NULL)) { + nxt_alert(task, "Python failed to get 'loop.call_soon'"); + goto fail; + } + + if (nxt_slow_path(PyCallable_Check(nxt_py_loop_call_soon) == 0)) { + nxt_alert(task, "'loop.call_soon' is not a callable object"); + goto fail; + } + + nxt_py_loop_run_until_complete = PyObject_GetAttrString(loop, + "run_until_complete"); + if (nxt_slow_path(nxt_py_loop_run_until_complete == NULL)) { + nxt_alert(task, "Python failed to get 'loop.run_until_complete'"); + goto fail; + } + + if (nxt_slow_path(PyCallable_Check(nxt_py_loop_run_until_complete) == 0)) { + nxt_alert(task, "'loop.run_until_complete' is not a callable object"); + goto fail; + } + + nxt_py_loop_create_future = PyObject_GetAttrString(loop, "create_future"); + if (nxt_slow_path(nxt_py_loop_create_future == NULL)) { + nxt_alert(task, "Python failed to get 'loop.create_future'"); + goto fail; + } + + if (nxt_slow_path(PyCallable_Check(nxt_py_loop_create_future) == 0)) { + nxt_alert(task, "'loop.create_future' is not a callable object"); + goto fail; + } + + nxt_py_quit_future = PyObject_CallObject(nxt_py_loop_create_future, NULL); + if (nxt_slow_path(nxt_py_quit_future == NULL)) { + nxt_alert(task, "Python failed to create Future "); + nxt_python_print_exception(); + goto fail; + } + + nxt_py_quit_future_set_result = PyObject_GetAttrString(nxt_py_quit_future, + "set_result"); + if (nxt_slow_path(nxt_py_quit_future_set_result == NULL)) { + nxt_alert(task, "Python failed to get 'future.set_result'"); + goto fail; + } + + if (nxt_slow_path(PyCallable_Check(nxt_py_quit_future_set_result) == 0)) { + nxt_alert(task, "'future.set_result' is not a callable object"); + goto fail; + } + + nxt_py_port_read = PyCFunction_New(&nxt_py_port_read_method, NULL); + if (nxt_slow_path(nxt_py_port_read == NULL)) { + nxt_alert(task, "Python failed to initialize the 'port_read' function"); + goto fail; + } + + nxt_queue_init(&nxt_py_asgi_drain_queue); + + if (nxt_slow_path(nxt_py_asgi_http_init(task) == NXT_ERROR)) { + goto fail; + } + + if (nxt_slow_path(nxt_py_asgi_websocket_init(task) == NXT_ERROR)) { + goto fail; + } + + rc = nxt_py_asgi_lifespan_startup(task); + if (nxt_slow_path(rc == NXT_ERROR)) { + goto fail; + } + + init->callbacks.request_handler = nxt_py_asgi_request_handler; + init->callbacks.data_handler = nxt_py_asgi_http_data_handler; + init->callbacks.websocket_handler = nxt_py_asgi_websocket_handler; + init->callbacks.close_handler = nxt_py_asgi_websocket_close_handler; + init->callbacks.quit = nxt_py_asgi_quit; + init->callbacks.shm_ack_handler = nxt_py_asgi_shm_ack_handler; + init->callbacks.add_port = nxt_py_asgi_add_port; + init->callbacks.remove_port = nxt_py_asgi_remove_port; + + Py_DECREF(loop); + Py_DECREF(asyncio); + + return NXT_OK; + +fail: + + Py_XDECREF(loop); + Py_DECREF(asyncio); + + return NXT_ERROR; +} + + +nxt_int_t +nxt_python_asgi_run(nxt_unit_ctx_t *ctx) +{ + PyObject *res; + + res = PyObject_CallFunctionObjArgs(nxt_py_loop_run_until_complete, + nxt_py_quit_future, NULL); + if (nxt_slow_path(res == NULL)) { + nxt_unit_alert(ctx, "Python failed to call loop.run_until_complete"); + nxt_python_print_exception(); + + return NXT_ERROR; + } + + Py_DECREF(res); + + nxt_py_asgi_lifespan_shutdown(); + + return NXT_OK; +} + + +static void +nxt_py_asgi_request_handler(nxt_unit_request_info_t *req) +{ + PyObject *scope, *res, *task, *receive, *send, *done, *asgi; + + if (req->request->websocket_handshake) { + asgi = nxt_py_asgi_websocket_create(req); + + } else { + asgi = nxt_py_asgi_http_create(req); + } + + if (nxt_slow_path(asgi == NULL)) { + nxt_unit_req_alert(req, "Python failed to create asgi object"); + nxt_unit_request_done(req, NXT_UNIT_ERROR); + + return; + } + + receive = PyObject_GetAttrString(asgi, "receive"); + if (nxt_slow_path(receive == NULL)) { + nxt_unit_req_alert(req, "Python failed to get 'receive' method"); + nxt_unit_request_done(req, NXT_UNIT_ERROR); + + goto release_asgi; + } + + send = PyObject_GetAttrString(asgi, "send"); + if (nxt_slow_path(receive == NULL)) { + nxt_unit_req_alert(req, "Python failed to get 'send' method"); + nxt_unit_request_done(req, NXT_UNIT_ERROR); + + goto release_receive; + } + + done = PyObject_GetAttrString(asgi, "_done"); + if (nxt_slow_path(receive == NULL)) { + nxt_unit_req_alert(req, "Python failed to get '_done' method"); + nxt_unit_request_done(req, NXT_UNIT_ERROR); + + goto release_send; + } + + scope = nxt_py_asgi_create_http_scope(req); + if (nxt_slow_path(scope == NULL)) { + nxt_unit_request_done(req, NXT_UNIT_ERROR); + + goto release_done; + } + + req->data = asgi; + + res = PyObject_CallFunctionObjArgs(nxt_py_application, + scope, receive, send, NULL); + if (nxt_slow_path(res == NULL)) { + nxt_unit_req_error(req, "Python failed to call the application"); + nxt_python_print_exception(); + nxt_unit_request_done(req, NXT_UNIT_ERROR); + + goto release_scope; + } + + if (nxt_slow_path(!PyCoro_CheckExact(res))) { + nxt_unit_req_error(req, "Application result type is not a coroutine"); + nxt_unit_request_done(req, NXT_UNIT_ERROR); + + Py_DECREF(res); + + goto release_scope; + } + + task = PyObject_CallFunctionObjArgs(nxt_py_loop_create_task, res, NULL); + if (nxt_slow_path(task == NULL)) { + nxt_unit_req_error(req, "Python failed to call the create_task"); + nxt_python_print_exception(); + nxt_unit_request_done(req, NXT_UNIT_ERROR); + + Py_DECREF(res); + + goto release_scope; + } + + Py_DECREF(res); + + res = PyObject_CallMethodObjArgs(task, nxt_py_add_done_callback_str, done, + NULL); + if (nxt_slow_path(res == NULL)) { + nxt_unit_req_error(req, + "Python failed to call 'task.add_done_callback'"); + nxt_python_print_exception(); + nxt_unit_request_done(req, NXT_UNIT_ERROR); + + goto release_task; + } + + Py_DECREF(res); +release_task: + Py_DECREF(task); +release_scope: + Py_DECREF(scope); +release_done: + Py_DECREF(done); +release_send: + Py_DECREF(send); +release_receive: + Py_DECREF(receive); +release_asgi: + Py_DECREF(asgi); +} + + +static PyObject * +nxt_py_asgi_create_http_scope(nxt_unit_request_info_t *req) +{ + char *p, *target, *query; + uint32_t target_length, i; + PyObject *scope, *v, *type, *scheme; + PyObject *headers, *header; + nxt_unit_field_t *f; + nxt_unit_request_t *r; + + static const nxt_str_t ws_protocol = nxt_string("sec-websocket-protocol"); + +#define SET_ITEM(dict, key, value) \ + if (nxt_slow_path(PyDict_SetItem(dict, nxt_py_ ## key ## _str, value) \ + == -1)) \ + { \ + nxt_unit_req_alert(req, "Python failed to set '" \ + #dict "." #key "' item"); \ + goto fail; \ + } + + v = NULL; + headers = NULL; + + r = req->request; + + if (r->websocket_handshake) { + type = nxt_py_websocket_str; + scheme = r->tls ? nxt_py_wss_str : nxt_py_ws_str; + + } else { + type = nxt_py_http_str; + scheme = r->tls ? nxt_py_https_str : nxt_py_http_str; + } + + scope = nxt_py_asgi_new_scope(req, type, nxt_py_2_1_str); + if (nxt_slow_path(scope == NULL)) { + return NULL; + } + + p = nxt_unit_sptr_get(&r->version); + SET_ITEM(scope, http_version, p[7] == '1' ? nxt_py_1_1_str + : nxt_py_1_0_str) + SET_ITEM(scope, scheme, scheme) + + v = PyString_FromStringAndSize(nxt_unit_sptr_get(&r->method), + r->method_length); + if (nxt_slow_path(v == NULL)) { + nxt_unit_req_alert(req, "Python failed to create 'method' string"); + goto fail; + } + + SET_ITEM(scope, method, v) + Py_DECREF(v); + + v = PyUnicode_DecodeUTF8(nxt_unit_sptr_get(&r->path), r->path_length, + "replace"); + if (nxt_slow_path(v == NULL)) { + nxt_unit_req_alert(req, "Python failed to create 'path' string"); + goto fail; + } + + SET_ITEM(scope, path, v) + Py_DECREF(v); + + target = nxt_unit_sptr_get(&r->target); + query = nxt_unit_sptr_get(&r->query); + + if (r->query.offset != 0) { + target_length = query - target - 1; + + } else { + target_length = r->target_length; + } + + v = PyBytes_FromStringAndSize(target, target_length); + if (nxt_slow_path(v == NULL)) { + nxt_unit_req_alert(req, "Python failed to create 'raw_path' string"); + goto fail; + } + + SET_ITEM(scope, raw_path, v) + Py_DECREF(v); + + v = PyBytes_FromStringAndSize(query, r->query_length); + if (nxt_slow_path(v == NULL)) { + nxt_unit_req_alert(req, "Python failed to create 'query' string"); + goto fail; + } + + SET_ITEM(scope, query_string, v) + Py_DECREF(v); + + v = nxt_py_asgi_create_address(&r->remote, r->remote_length, 0); + if (nxt_slow_path(v == NULL)) { + nxt_unit_req_alert(req, "Python failed to create 'client' pair"); + goto fail; + } + + SET_ITEM(scope, client, v) + Py_DECREF(v); + + v = nxt_py_asgi_create_address(&r->local, r->local_length, 80); + if (nxt_slow_path(v == NULL)) { + nxt_unit_req_alert(req, "Python failed to create 'server' pair"); + goto fail; + } + + SET_ITEM(scope, server, v) + Py_DECREF(v); + + v = NULL; + + headers = PyTuple_New(r->fields_count); + if (nxt_slow_path(headers == NULL)) { + nxt_unit_req_alert(req, "Python failed to create 'headers' object"); + goto fail; + } + + for (i = 0; i < r->fields_count; i++) { + f = r->fields + i; + + header = nxt_py_asgi_create_header(f); + if (nxt_slow_path(header == NULL)) { + nxt_unit_req_alert(req, "Python failed to create 'header' pair"); + goto fail; + } + + PyTuple_SET_ITEM(headers, i, header); + + if (f->hash == NXT_UNIT_HASH_WS_PROTOCOL + && f->name_length == ws_protocol.length + && f->value_length > 0 + && r->websocket_handshake) + { + v = nxt_py_asgi_create_subprotocols(f); + if (nxt_slow_path(v == NULL)) { + nxt_unit_req_alert(req, "Failed to create subprotocols"); + goto fail; + } + + SET_ITEM(scope, subprotocols, v); + Py_DECREF(v); + } + } + + SET_ITEM(scope, headers, headers) + Py_DECREF(headers); + + return scope; + +fail: + + Py_XDECREF(v); + Py_XDECREF(headers); + Py_DECREF(scope); + + return NULL; + +#undef SET_ITEM +} + + +static PyObject * +nxt_py_asgi_create_address(nxt_unit_sptr_t *sptr, uint8_t len, uint16_t port) +{ + char *p, *s; + PyObject *pair, *v; + + pair = PyTuple_New(2); + if (nxt_slow_path(pair == NULL)) { + return NULL; + } + + p = nxt_unit_sptr_get(sptr); + s = memchr(p, ':', len); + + v = PyString_FromStringAndSize(p, s == NULL ? len : s - p); + if (nxt_slow_path(v == NULL)) { + Py_DECREF(pair); + + return NULL; + } + + PyTuple_SET_ITEM(pair, 0, v); + + if (s != NULL) { + p += len; + v = PyLong_FromString(s + 1, &p, 10); + + } else { + v = PyLong_FromLong(port); + } + + if (nxt_slow_path(v == NULL)) { + Py_DECREF(pair); + + return NULL; + } + + PyTuple_SET_ITEM(pair, 1, v); + + return pair; +} + + +static PyObject * +nxt_py_asgi_create_header(nxt_unit_field_t *f) +{ + char c, *name; + uint8_t pos; + PyObject *header, *v; + + header = PyTuple_New(2); + if (nxt_slow_path(header == NULL)) { + return NULL; + } + + name = nxt_unit_sptr_get(&f->name); + + for (pos = 0; pos < f->name_length; pos++) { + c = name[pos]; + if (c >= 'A' && c <= 'Z') { + name[pos] = (c | 0x20); + } + } + + v = PyBytes_FromStringAndSize(name, f->name_length); + if (nxt_slow_path(v == NULL)) { + Py_DECREF(header); + + return NULL; + } + + PyTuple_SET_ITEM(header, 0, v); + + v = PyBytes_FromStringAndSize(nxt_unit_sptr_get(&f->value), + f->value_length); + if (nxt_slow_path(v == NULL)) { + Py_DECREF(header); + + return NULL; + } + + PyTuple_SET_ITEM(header, 1, v); + + return header; +} + + +static PyObject * +nxt_py_asgi_create_subprotocols(nxt_unit_field_t *f) +{ + char *v; + uint32_t i, n, start; + PyObject *res, *proto; + + v = nxt_unit_sptr_get(&f->value); + n = 1; + + for (i = 0; i < f->value_length; i++) { + if (v[i] == ',') { + n++; + } + } + + res = PyTuple_New(n); + if (nxt_slow_path(res == NULL)) { + return NULL; + } + + n = 0; + start = 0; + + for (i = 0; i < f->value_length; ) { + if (v[i] != ',') { + i++; + + continue; + } + + if (i - start > 0) { + proto = PyString_FromStringAndSize(v + start, i - start); + if (nxt_slow_path(proto == NULL)) { + goto fail; + } + + PyTuple_SET_ITEM(res, n, proto); + + n++; + } + + do { + i++; + } while (i < f->value_length && v[i] == ' '); + + start = i; + } + + if (i - start > 0) { + proto = PyString_FromStringAndSize(v + start, i - start); + if (nxt_slow_path(proto == NULL)) { + goto fail; + } + + PyTuple_SET_ITEM(res, n, proto); + } + + return res; + +fail: + + Py_DECREF(res); + + return NULL; +} + + +static int +nxt_py_asgi_add_port(nxt_unit_ctx_t *ctx, nxt_unit_port_t *port) +{ + int nb; + PyObject *res; + + if (port->in_fd == -1) { + return NXT_UNIT_OK; + } + + nb = 1; + + if (nxt_slow_path(ioctl(port->in_fd, FIONBIO, &nb) == -1)) { + nxt_unit_alert(ctx, "ioctl(%d, FIONBIO, 0) failed: %s (%d)", + port->in_fd, strerror(errno), errno); + + return NXT_UNIT_ERROR; + } + + nxt_unit_debug(ctx, "asgi_add_port %d %p %p", port->in_fd, ctx, port); + + res = PyObject_CallFunctionObjArgs(nxt_py_loop_add_reader, + PyLong_FromLong(port->in_fd), + nxt_py_port_read, + PyLong_FromVoidPtr(ctx), + PyLong_FromVoidPtr(port), NULL); + if (nxt_slow_path(res == NULL)) { + nxt_unit_alert(ctx, "Python failed to add_reader"); + + return NXT_UNIT_ERROR; + } + + Py_DECREF(res); + + return NXT_UNIT_OK; +} + + +static void +nxt_py_asgi_remove_port(nxt_unit_t *lib, nxt_unit_port_t *port) +{ + PyObject *res; + + nxt_unit_debug(NULL, "asgi_remove_port %d %p", port->in_fd, port); + + if (port->in_fd == -1) { + return; + } + + res = PyObject_CallFunctionObjArgs(nxt_py_loop_remove_reader, + PyLong_FromLong(port->in_fd), NULL); + if (nxt_slow_path(res == NULL)) { + nxt_unit_alert(NULL, "Python failed to remove_reader"); + } + + Py_DECREF(res); +} + + +static void +nxt_py_asgi_quit(nxt_unit_ctx_t *ctx) +{ + PyObject *res; + + nxt_unit_debug(ctx, "asgi_quit %p", ctx); + + res = PyObject_CallFunctionObjArgs(nxt_py_quit_future_set_result, + PyLong_FromLong(0), NULL); + if (nxt_slow_path(res == NULL)) { + nxt_unit_alert(ctx, "Python failed to set_result"); + } + + Py_DECREF(res); +} + + +static void +nxt_py_asgi_shm_ack_handler(nxt_unit_ctx_t *ctx) +{ + int rc; + nxt_queue_link_t *lnk; + + while (!nxt_queue_is_empty(&nxt_py_asgi_drain_queue)) { + lnk = nxt_queue_first(&nxt_py_asgi_drain_queue); + + rc = nxt_py_asgi_http_drain(lnk); + if (rc == NXT_UNIT_AGAIN) { + break; + } + + nxt_queue_remove(lnk); + } +} + + +static PyObject * +nxt_py_asgi_port_read(PyObject *self, PyObject *args) +{ + int rc; + PyObject *arg; + Py_ssize_t n; + nxt_unit_ctx_t *ctx; + nxt_unit_port_t *port; + + n = PyTuple_GET_SIZE(args); + + if (n != 2) { + nxt_unit_alert(NULL, + "nxt_py_asgi_port_read: invalid number of arguments %d", + (int) n); + + return PyErr_Format(PyExc_TypeError, "invalid number of arguments"); + } + + arg = PyTuple_GET_ITEM(args, 0); + if (nxt_slow_path(arg == NULL || PyLong_Check(arg) == 0)) { + return PyErr_Format(PyExc_TypeError, + "the first argument is not a long"); + } + + ctx = PyLong_AsVoidPtr(arg); + + arg = PyTuple_GET_ITEM(args, 1); + if (nxt_slow_path(arg == NULL || PyLong_Check(arg) == 0)) { + return PyErr_Format(PyExc_TypeError, + "the second argument is not a long"); + } + + port = PyLong_AsVoidPtr(arg); + + nxt_unit_debug(ctx, "asgi_port_read %p %p", ctx, port); + + rc = nxt_unit_process_port_msg(ctx, port); + + if (nxt_slow_path(rc == NXT_UNIT_ERROR)) { + return PyErr_Format(PyExc_RuntimeError, + "error processing port message"); + } + + Py_RETURN_NONE; +} + + +PyObject * +nxt_py_asgi_enum_headers(PyObject *headers, nxt_py_asgi_enum_header_cb cb, + void *data) +{ + int i; + PyObject *iter, *header, *h_iter, *name, *val, *res; + + iter = PyObject_GetIter(headers); + if (nxt_slow_path(iter == NULL)) { + return PyErr_Format(PyExc_TypeError, "'headers' is not an iterable"); + } + + for (i = 0; /* void */; i++) { + header = PyIter_Next(iter); + if (header == NULL) { + break; + } + + h_iter = PyObject_GetIter(header); + if (nxt_slow_path(h_iter == NULL)) { + Py_DECREF(header); + Py_DECREF(iter); + + return PyErr_Format(PyExc_TypeError, + "'headers' item #%d is not an iterable", i); + } + + name = PyIter_Next(h_iter); + if (nxt_slow_path(name == NULL || !PyBytes_Check(name))) { + Py_XDECREF(name); + Py_DECREF(h_iter); + Py_DECREF(header); + Py_DECREF(iter); + + return PyErr_Format(PyExc_TypeError, + "'headers' item #%d 'name' is not a byte string", i); + } + + val = PyIter_Next(h_iter); + if (nxt_slow_path(val == NULL || !PyBytes_Check(val))) { + Py_XDECREF(val); + Py_DECREF(h_iter); + Py_DECREF(header); + Py_DECREF(iter); + + return PyErr_Format(PyExc_TypeError, + "'headers' item #%d 'value' is not a byte string", i); + } + + res = cb(data, i, name, val); + + Py_DECREF(name); + Py_DECREF(val); + Py_DECREF(h_iter); + Py_DECREF(header); + + if (nxt_slow_path(res == NULL)) { + Py_DECREF(iter); + + return NULL; + } + + Py_DECREF(res); + } + + Py_DECREF(iter); + + Py_RETURN_NONE; +} + + +PyObject * +nxt_py_asgi_calc_size(void *data, int i, PyObject *name, PyObject *val) +{ + nxt_py_asgi_calc_size_ctx_t *ctx; + + ctx = data; + + ctx->fields_count++; + ctx->fields_size += PyBytes_GET_SIZE(name) + PyBytes_GET_SIZE(val); + + Py_RETURN_NONE; +} + + +PyObject * +nxt_py_asgi_add_field(void *data, int i, PyObject *name, PyObject *val) +{ + int rc; + char *name_str, *val_str; + uint32_t name_len, val_len; + nxt_off_t content_length; + nxt_unit_request_info_t *req; + nxt_py_asgi_add_field_ctx_t *ctx; + + name_str = PyBytes_AS_STRING(name); + name_len = PyBytes_GET_SIZE(name); + + val_str = PyBytes_AS_STRING(val); + val_len = PyBytes_GET_SIZE(val); + + ctx = data; + req = ctx->req; + + rc = nxt_unit_response_add_field(req, name_str, name_len, + val_str, val_len); + if (nxt_slow_path(rc != NXT_UNIT_OK)) { + return PyErr_Format(PyExc_RuntimeError, + "failed to add header #%d", i); + } + + if (req->response->fields[i].hash == NXT_UNIT_HASH_CONTENT_LENGTH) { + content_length = nxt_off_t_parse((u_char *) val_str, val_len); + if (nxt_slow_path(content_length < 0)) { + nxt_unit_req_error(req, "failed to parse Content-Length " + "value %.*s", (int) val_len, val_str); + + return PyErr_Format(PyExc_ValueError, + "Failed to parse Content-Length: '%.*s'", + (int) val_len, val_str); + } + + ctx->content_length = content_length; + } + + Py_RETURN_NONE; +} + + +PyObject * +nxt_py_asgi_set_result_soon(nxt_unit_request_info_t *req, PyObject *future, + PyObject *result) +{ + PyObject *set_result, *res; + + if (nxt_slow_path(result == NULL)) { + Py_DECREF(future); + + return NULL; + } + + set_result = PyObject_GetAttrString(future, "set_result"); + if (nxt_slow_path(set_result == NULL)) { + nxt_unit_req_alert(req, "failed to get 'set_result' for future"); + + Py_CLEAR(future); + + goto cleanup; + } + + if (nxt_slow_path(PyCallable_Check(set_result) == 0)) { + nxt_unit_req_alert(req, "'future.set_result' is not a callable"); + + Py_CLEAR(future); + + goto cleanup; + } + + res = PyObject_CallFunctionObjArgs(nxt_py_loop_call_soon, set_result, + result, NULL); + if (nxt_slow_path(res == NULL)) { + nxt_unit_req_alert(req, "Python failed to call 'loop.call_soon'"); + nxt_python_print_exception(); + + Py_CLEAR(future); + } + + Py_XDECREF(res); + +cleanup: + + Py_DECREF(set_result); + Py_DECREF(result); + + return future; +} + + +PyObject * +nxt_py_asgi_new_msg(nxt_unit_request_info_t *req, PyObject *type) +{ + PyObject *msg; + + msg = PyDict_New(); + if (nxt_slow_path(msg == NULL)) { + nxt_unit_req_alert(req, "Python failed to create message dict"); + nxt_python_print_exception(); + + return PyErr_Format(PyExc_RuntimeError, + "failed to create message dict"); + } + + if (nxt_slow_path(PyDict_SetItem(msg, nxt_py_type_str, type) == -1)) { + nxt_unit_req_alert(req, "Python failed to set 'msg.type' item"); + + Py_DECREF(msg); + + return PyErr_Format(PyExc_RuntimeError, + "failed to set 'msg.type' item"); + } + + return msg; +} + + +PyObject * +nxt_py_asgi_new_scope(nxt_unit_request_info_t *req, PyObject *type, + PyObject *spec_version) +{ + PyObject *scope, *asgi; + + scope = PyDict_New(); + if (nxt_slow_path(scope == NULL)) { + nxt_unit_req_alert(req, "Python failed to create 'scope' dict"); + nxt_python_print_exception(); + + return PyErr_Format(PyExc_RuntimeError, + "failed to create 'scope' dict"); + } + + if (nxt_slow_path(PyDict_SetItem(scope, nxt_py_type_str, type) == -1)) { + nxt_unit_req_alert(req, "Python failed to set 'scope.type' item"); + + Py_DECREF(scope); + + return PyErr_Format(PyExc_RuntimeError, + "failed to set 'scope.type' item"); + } + + asgi = PyDict_New(); + if (nxt_slow_path(asgi == NULL)) { + nxt_unit_req_alert(req, "Python failed to create 'asgi' dict"); + nxt_python_print_exception(); + + Py_DECREF(scope); + + return PyErr_Format(PyExc_RuntimeError, + "failed to create 'asgi' dict"); + } + + if (nxt_slow_path(PyDict_SetItem(scope, nxt_py_asgi_str, asgi) == -1)) { + nxt_unit_req_alert(req, "Python failed to set 'scope.asgi' item"); + + Py_DECREF(asgi); + Py_DECREF(scope); + + return PyErr_Format(PyExc_RuntimeError, + "failed to set 'scope.asgi' item"); + } + + if (nxt_slow_path(PyDict_SetItem(asgi, nxt_py_version_str, + nxt_py_3_0_str) == -1)) + { + nxt_unit_req_alert(req, "Python failed to set 'asgi.version' item"); + + Py_DECREF(asgi); + Py_DECREF(scope); + + return PyErr_Format(PyExc_RuntimeError, + "failed to set 'asgi.version' item"); + } + + if (nxt_slow_path(PyDict_SetItem(asgi, nxt_py_spec_version_str, + spec_version) == -1)) + { + nxt_unit_req_alert(req, + "Python failed to set 'asgi.spec_version' item"); + + Py_DECREF(asgi); + Py_DECREF(scope); + + return PyErr_Format(PyExc_RuntimeError, + "failed to set 'asgi.spec_version' item"); + } + + Py_DECREF(asgi); + + return scope; +} + + +void +nxt_py_asgi_dealloc(PyObject *self) +{ + PyObject_Del(self); +} + + +PyObject * +nxt_py_asgi_await(PyObject *self) +{ + Py_INCREF(self); + return self; +} + + +PyObject * +nxt_py_asgi_iter(PyObject *self) +{ + Py_INCREF(self); + return self; +} + + +PyObject * +nxt_py_asgi_next(PyObject *self) +{ + return NULL; +} + + +void +nxt_python_asgi_done(void) +{ + nxt_py_asgi_str_done(); + + Py_XDECREF(nxt_py_quit_future); + Py_XDECREF(nxt_py_quit_future_set_result); + Py_XDECREF(nxt_py_loop_run_until_complete); + Py_XDECREF(nxt_py_loop_create_future); + Py_XDECREF(nxt_py_loop_create_task); + Py_XDECREF(nxt_py_loop_call_soon); + Py_XDECREF(nxt_py_loop_add_reader); + Py_XDECREF(nxt_py_loop_remove_reader); + Py_XDECREF(nxt_py_port_read); +} + +#else /* !(NXT_HAVE_ASGI) */ + + +int +nxt_python_asgi_check(PyObject *obj) +{ + return 0; +} + + +nxt_int_t +nxt_python_asgi_init(nxt_task_t *task, nxt_unit_init_t *init) +{ + nxt_alert(task, "ASGI not implemented"); + return NXT_ERROR; +} + + +nxt_int_t +nxt_python_asgi_run(nxt_unit_ctx_t *ctx) +{ + nxt_unit_alert(ctx, "ASGI not implemented"); + return NXT_ERROR; +} + + +void +nxt_python_asgi_done(void) +{ +} + +#endif /* NXT_HAVE_ASGI */ diff --git a/src/python/nxt_python_asgi.h b/src/python/nxt_python_asgi.h new file mode 100644 index 00000000..24337c37 --- /dev/null +++ b/src/python/nxt_python_asgi.h @@ -0,0 +1,60 @@ + +/* + * Copyright (C) NGINX, Inc. + */ + +#ifndef _NXT_PYTHON_ASGI_H_INCLUDED_ +#define _NXT_PYTHON_ASGI_H_INCLUDED_ + + +typedef PyObject * (*nxt_py_asgi_enum_header_cb)(void *ctx, int i, + PyObject *name, PyObject *val); + +typedef struct { + uint32_t fields_count; + uint32_t fields_size; +} nxt_py_asgi_calc_size_ctx_t; + +typedef struct { + nxt_unit_request_info_t *req; + uint64_t content_length; +} nxt_py_asgi_add_field_ctx_t; + +PyObject *nxt_py_asgi_enum_headers(PyObject *headers, + nxt_py_asgi_enum_header_cb cb, void *data); + +PyObject *nxt_py_asgi_calc_size(void *data, int i, PyObject *n, PyObject *v); +PyObject *nxt_py_asgi_add_field(void *data, int i, PyObject *n, PyObject *v); + +PyObject *nxt_py_asgi_set_result_soon(nxt_unit_request_info_t *req, + PyObject *future, PyObject *result); +PyObject *nxt_py_asgi_new_msg(nxt_unit_request_info_t *req, PyObject *type); +PyObject *nxt_py_asgi_new_scope(nxt_unit_request_info_t *req, PyObject *type, + PyObject *spec_version); + +void nxt_py_asgi_dealloc(PyObject *self); +PyObject *nxt_py_asgi_await(PyObject *self); +PyObject *nxt_py_asgi_iter(PyObject *self); +PyObject *nxt_py_asgi_next(PyObject *self); + +nxt_int_t nxt_py_asgi_http_init(nxt_task_t *task); +PyObject *nxt_py_asgi_http_create(nxt_unit_request_info_t *req); +void nxt_py_asgi_http_data_handler(nxt_unit_request_info_t *req); +int nxt_py_asgi_http_drain(nxt_queue_link_t *lnk); + +nxt_int_t nxt_py_asgi_websocket_init(nxt_task_t *task); +PyObject *nxt_py_asgi_websocket_create(nxt_unit_request_info_t *req); +void nxt_py_asgi_websocket_handler(nxt_unit_websocket_frame_t *ws); +void nxt_py_asgi_websocket_close_handler(nxt_unit_request_info_t *req); + +nxt_int_t nxt_py_asgi_lifespan_startup(nxt_task_t *task); +nxt_int_t nxt_py_asgi_lifespan_shutdown(void); + +extern PyObject *nxt_py_loop_run_until_complete; +extern PyObject *nxt_py_loop_create_future; +extern PyObject *nxt_py_loop_create_task; + +extern nxt_queue_t nxt_py_asgi_drain_queue; + + +#endif /* _NXT_PYTHON_ASGI_H_INCLUDED_ */ diff --git a/src/python/nxt_python_asgi_http.c b/src/python/nxt_python_asgi_http.c new file mode 100644 index 00000000..b07d61d6 --- /dev/null +++ b/src/python/nxt_python_asgi_http.c @@ -0,0 +1,591 @@ + +/* + * Copyright (C) NGINX, Inc. + */ + + +#include <python/nxt_python.h> + +#if (NXT_HAVE_ASGI) + +#include <nxt_main.h> +#include <nxt_unit.h> +#include <nxt_unit_request.h> +#include <python/nxt_python_asgi.h> +#include <python/nxt_python_asgi_str.h> + + +typedef struct { + PyObject_HEAD + nxt_unit_request_info_t *req; + nxt_queue_link_t link; + PyObject *receive_future; + PyObject *send_future; + uint64_t content_length; + uint64_t bytes_sent; + int complete; + PyObject *send_body; + Py_ssize_t send_body_off; +} nxt_py_asgi_http_t; + + +static PyObject *nxt_py_asgi_http_receive(PyObject *self, PyObject *none); +static PyObject *nxt_py_asgi_http_read_msg(nxt_py_asgi_http_t *http); +static PyObject *nxt_py_asgi_http_send(PyObject *self, PyObject *dict); +static PyObject *nxt_py_asgi_http_response_start(nxt_py_asgi_http_t *http, + PyObject *dict); +static PyObject *nxt_py_asgi_http_response_body(nxt_py_asgi_http_t *http, + PyObject *dict); +static PyObject *nxt_py_asgi_http_done(PyObject *self, PyObject *future); + + +static PyMethodDef nxt_py_asgi_http_methods[] = { + { "receive", nxt_py_asgi_http_receive, METH_NOARGS, 0 }, + { "send", nxt_py_asgi_http_send, METH_O, 0 }, + { "_done", nxt_py_asgi_http_done, METH_O, 0 }, + { NULL, NULL, 0, 0 } +}; + +static PyAsyncMethods nxt_py_asgi_async_methods = { + .am_await = nxt_py_asgi_await, +}; + +static PyTypeObject nxt_py_asgi_http_type = { + PyVarObject_HEAD_INIT(NULL, 0) + + .tp_name = "unit._asgi_http", + .tp_basicsize = sizeof(nxt_py_asgi_http_t), + .tp_dealloc = nxt_py_asgi_dealloc, + .tp_as_async = &nxt_py_asgi_async_methods, + .tp_flags = Py_TPFLAGS_DEFAULT, + .tp_doc = "unit ASGI HTTP request object", + .tp_iter = nxt_py_asgi_iter, + .tp_iternext = nxt_py_asgi_next, + .tp_methods = nxt_py_asgi_http_methods, +}; + +static Py_ssize_t nxt_py_asgi_http_body_buf_size = 32 * 1024 * 1024; + + +nxt_int_t +nxt_py_asgi_http_init(nxt_task_t *task) +{ + if (nxt_slow_path(PyType_Ready(&nxt_py_asgi_http_type) != 0)) { + nxt_alert(task, "Python failed to initialize the 'http' type object"); + return NXT_ERROR; + } + + return NXT_OK; +} + + +PyObject * +nxt_py_asgi_http_create(nxt_unit_request_info_t *req) +{ + nxt_py_asgi_http_t *http; + + http = PyObject_New(nxt_py_asgi_http_t, &nxt_py_asgi_http_type); + + if (nxt_fast_path(http != NULL)) { + http->req = req; + http->receive_future = NULL; + http->send_future = NULL; + http->content_length = -1; + http->bytes_sent = 0; + http->complete = 0; + http->send_body = NULL; + http->send_body_off = 0; + } + + return (PyObject *) http; +} + + +static PyObject * +nxt_py_asgi_http_receive(PyObject *self, PyObject *none) +{ + PyObject *msg, *future; + nxt_py_asgi_http_t *http; + nxt_unit_request_info_t *req; + + http = (nxt_py_asgi_http_t *) self; + req = http->req; + + nxt_unit_req_debug(req, "asgi_http_receive"); + + msg = nxt_py_asgi_http_read_msg(http); + if (nxt_slow_path(msg == NULL)) { + return NULL; + } + + future = PyObject_CallObject(nxt_py_loop_create_future, NULL); + if (nxt_slow_path(future == NULL)) { + nxt_unit_req_alert(req, "Python failed to create Future object"); + nxt_python_print_exception(); + + Py_DECREF(msg); + + return PyErr_Format(PyExc_RuntimeError, + "failed to create Future object"); + } + + if (msg != Py_None) { + return nxt_py_asgi_set_result_soon(req, future, msg); + } + + http->receive_future = future; + Py_INCREF(http->receive_future); + + Py_DECREF(msg); + + return future; +} + + +static PyObject * +nxt_py_asgi_http_read_msg(nxt_py_asgi_http_t *http) +{ + char *body_buf; + ssize_t read_res; + PyObject *msg, *body; + Py_ssize_t size; + nxt_unit_request_info_t *req; + + req = http->req; + + size = req->content_length; + + if (size > nxt_py_asgi_http_body_buf_size) { + size = nxt_py_asgi_http_body_buf_size; + } + + if (size > 0) { + body = PyBytes_FromStringAndSize(NULL, size); + if (nxt_slow_path(body == NULL)) { + nxt_unit_req_alert(req, "Python failed to create body byte string"); + nxt_python_print_exception(); + + return PyErr_Format(PyExc_RuntimeError, + "failed to create Bytes object"); + } + + body_buf = PyBytes_AS_STRING(body); + + read_res = nxt_unit_request_read(req, body_buf, size); + + } else { + body = NULL; + read_res = 0; + } + + if (read_res > 0 || read_res == size) { + msg = nxt_py_asgi_new_msg(req, nxt_py_http_request_str); + if (nxt_slow_path(msg == NULL)) { + Py_XDECREF(body); + + return NULL; + } + +#define SET_ITEM(dict, key, value) \ + if (nxt_slow_path(PyDict_SetItem(dict, nxt_py_ ## key ## _str, value) \ + == -1)) \ + { \ + nxt_unit_req_alert(req, \ + "Python failed to set '" #dict "." #key "' item"); \ + PyErr_SetString(PyExc_RuntimeError, \ + "Python failed to set '" #dict "." #key "' item"); \ + goto fail; \ + } + + if (body != NULL) { + SET_ITEM(msg, body, body) + } + + if (req->content_length > 0) { + SET_ITEM(msg, more_body, Py_True) + } + +#undef SET_ITEM + + Py_XDECREF(body); + + return msg; + } + + Py_XDECREF(body); + + Py_RETURN_NONE; + +fail: + + Py_DECREF(msg); + Py_XDECREF(body); + + return NULL; +} + + +static PyObject * +nxt_py_asgi_http_send(PyObject *self, PyObject *dict) +{ + PyObject *type; + const char *type_str; + Py_ssize_t type_len; + nxt_py_asgi_http_t *http; + + static const nxt_str_t response_start = nxt_string("http.response.start"); + static const nxt_str_t response_body = nxt_string("http.response.body"); + + http = (nxt_py_asgi_http_t *) self; + + type = PyDict_GetItem(dict, nxt_py_type_str); + if (nxt_slow_path(type == NULL || !PyUnicode_Check(type))) { + nxt_unit_req_error(http->req, "asgi_http_send: " + "'type' is not a unicode string"); + return PyErr_Format(PyExc_TypeError, "'type' is not a unicode string"); + } + + type_str = PyUnicode_AsUTF8AndSize(type, &type_len); + + nxt_unit_req_debug(http->req, "asgi_http_send type is '%.*s'", + (int) type_len, type_str); + + if (type_len == (Py_ssize_t) response_start.length + && memcmp(type_str, response_start.start, type_len) == 0) + { + return nxt_py_asgi_http_response_start(http, dict); + } + + if (type_len == (Py_ssize_t) response_body.length + && memcmp(type_str, response_body.start, type_len) == 0) + { + return nxt_py_asgi_http_response_body(http, dict); + } + + nxt_unit_req_error(http->req, "asgi_http_send: unexpected 'type': '%.*s'", + (int) type_len, type_str); + + return PyErr_Format(PyExc_AssertionError, "unexpected 'type': '%U'", type); +} + + +static PyObject * +nxt_py_asgi_http_response_start(nxt_py_asgi_http_t *http, PyObject *dict) +{ + int rc; + PyObject *status, *headers, *res; + nxt_py_asgi_calc_size_ctx_t calc_size_ctx; + nxt_py_asgi_add_field_ctx_t add_field_ctx; + + status = PyDict_GetItem(dict, nxt_py_status_str); + if (nxt_slow_path(status == NULL || !PyLong_Check(status))) { + nxt_unit_req_error(http->req, "asgi_http_response_start: " + "'status' is not an integer"); + return PyErr_Format(PyExc_TypeError, "'status' is not an integer"); + } + + calc_size_ctx.fields_size = 0; + calc_size_ctx.fields_count = 0; + + headers = PyDict_GetItem(dict, nxt_py_headers_str); + if (headers != NULL) { + res = nxt_py_asgi_enum_headers(headers, nxt_py_asgi_calc_size, + &calc_size_ctx); + if (nxt_slow_path(res == NULL)) { + return NULL; + } + + Py_DECREF(res); + } + + rc = nxt_unit_response_init(http->req, PyLong_AsLong(status), + calc_size_ctx.fields_count, + calc_size_ctx.fields_size); + if (nxt_slow_path(rc != NXT_UNIT_OK)) { + return PyErr_Format(PyExc_RuntimeError, + "failed to allocate response object"); + } + + add_field_ctx.req = http->req; + add_field_ctx.content_length = -1; + + if (headers != NULL) { + res = nxt_py_asgi_enum_headers(headers, nxt_py_asgi_add_field, + &add_field_ctx); + if (nxt_slow_path(res == NULL)) { + return NULL; + } + + Py_DECREF(res); + } + + http->content_length = add_field_ctx.content_length; + + Py_INCREF(http); + return (PyObject *) http; +} + + +static PyObject * +nxt_py_asgi_http_response_body(nxt_py_asgi_http_t *http, PyObject *dict) +{ + int rc; + char *body_str; + ssize_t sent; + PyObject *body, *more_body, *future; + Py_ssize_t body_len, body_off; + + body = PyDict_GetItem(dict, nxt_py_body_str); + if (nxt_slow_path(body != NULL && !PyBytes_Check(body))) { + return PyErr_Format(PyExc_TypeError, "'body' is not a byte string"); + } + + more_body = PyDict_GetItem(dict, nxt_py_more_body_str); + if (nxt_slow_path(more_body != NULL && !PyBool_Check(more_body))) { + return PyErr_Format(PyExc_TypeError, "'more_body' is not a bool"); + } + + if (nxt_slow_path(http->complete)) { + return PyErr_Format(PyExc_RuntimeError, + "Unexpected ASGI message 'http.response.body' " + "sent, after response already completed"); + } + + if (nxt_slow_path(http->send_future != NULL)) { + return PyErr_Format(PyExc_RuntimeError, "Concurrent send"); + } + + if (body != NULL) { + body_str = PyBytes_AS_STRING(body); + body_len = PyBytes_GET_SIZE(body); + + nxt_unit_req_debug(http->req, "asgi_http_response_body: %d, %d", + (int) body_len, (more_body == Py_True) ); + + if (nxt_slow_path(http->bytes_sent + body_len + > http->content_length)) + { + return PyErr_Format(PyExc_RuntimeError, + "Response content longer than Content-Length"); + } + + body_off = 0; + + while (body_len > 0) { + sent = nxt_unit_response_write_nb(http->req, body_str, body_len, 0); + if (nxt_slow_path(sent < 0)) { + return PyErr_Format(PyExc_RuntimeError, "failed to send body"); + } + + if (nxt_slow_path(sent == 0)) { + nxt_unit_req_debug(http->req, "asgi_http_response_body: " + "out of shared memory, %d", + (int) body_len); + + future = PyObject_CallObject(nxt_py_loop_create_future, NULL); + if (nxt_slow_path(future == NULL)) { + nxt_unit_req_alert(http->req, + "Python failed to create Future object"); + nxt_python_print_exception(); + + return PyErr_Format(PyExc_RuntimeError, + "failed to create Future object"); + } + + http->send_body = body; + Py_INCREF(http->send_body); + http->send_body_off = body_off; + + nxt_queue_insert_tail(&nxt_py_asgi_drain_queue, &http->link); + + http->send_future = future; + Py_INCREF(http->send_future); + + return future; + } + + body_str += sent; + body_len -= sent; + body_off += sent; + http->bytes_sent += sent; + } + + } else { + nxt_unit_req_debug(http->req, "asgi_http_response_body: 0, %d", + (more_body == Py_True) ); + + if (!nxt_unit_response_is_sent(http->req)) { + rc = nxt_unit_response_send(http->req); + if (nxt_slow_path(rc != NXT_UNIT_OK)) { + return PyErr_Format(PyExc_RuntimeError, + "failed to send response"); + } + } + } + + if (more_body == NULL || more_body == Py_False) { + http->complete = 1; + } + + Py_INCREF(http); + return (PyObject *) http; +} + + +void +nxt_py_asgi_http_data_handler(nxt_unit_request_info_t *req) +{ + PyObject *msg, *future, *res; + nxt_py_asgi_http_t *http; + + http = req->data; + + nxt_unit_req_debug(req, "asgi_http_data_handler"); + + if (http->receive_future == NULL) { + return; + } + + msg = nxt_py_asgi_http_read_msg(http); + if (nxt_slow_path(msg == NULL)) { + return; + } + + if (msg == Py_None) { + Py_DECREF(msg); + return; + } + + future = http->receive_future; + http->receive_future = NULL; + + res = PyObject_CallMethodObjArgs(future, nxt_py_set_result_str, msg, NULL); + if (nxt_slow_path(res == NULL)) { + nxt_unit_req_alert(req, "'set_result' call failed"); + nxt_python_print_exception(); + } + + Py_XDECREF(res); + Py_DECREF(future); + + Py_DECREF(msg); +} + + +int +nxt_py_asgi_http_drain(nxt_queue_link_t *lnk) +{ + char *body_str; + ssize_t sent; + PyObject *future, *exc, *res; + Py_ssize_t body_len; + nxt_py_asgi_http_t *http; + + http = nxt_container_of(lnk, nxt_py_asgi_http_t, link); + + body_str = PyBytes_AS_STRING(http->send_body) + http->send_body_off; + body_len = PyBytes_GET_SIZE(http->send_body) - http->send_body_off; + + nxt_unit_req_debug(http->req, "asgi_http_drain: %d", (int) body_len); + + while (body_len > 0) { + sent = nxt_unit_response_write_nb(http->req, body_str, body_len, 0); + if (nxt_slow_path(sent < 0)) { + goto fail; + } + + if (nxt_slow_path(sent == 0)) { + return NXT_UNIT_AGAIN; + } + + body_str += sent; + body_len -= sent; + + http->send_body_off += sent; + http->bytes_sent += sent; + } + + Py_CLEAR(http->send_body); + + future = http->send_future; + http->send_future = NULL; + + res = PyObject_CallMethodObjArgs(future, nxt_py_set_result_str, Py_None, + NULL); + if (nxt_slow_path(res == NULL)) { + nxt_unit_req_alert(http->req, "'set_result' call failed"); + nxt_python_print_exception(); + } + + Py_XDECREF(res); + Py_DECREF(future); + + return NXT_UNIT_OK; + +fail: + + exc = PyObject_CallFunctionObjArgs(PyExc_RuntimeError, + nxt_py_failed_to_send_body_str, + NULL); + if (nxt_slow_path(exc == NULL)) { + nxt_unit_req_alert(http->req, "RuntimeError create failed"); + nxt_python_print_exception(); + + exc = Py_None; + Py_INCREF(exc); + } + + future = http->send_future; + http->send_future = NULL; + + res = PyObject_CallMethodObjArgs(future, nxt_py_set_exception_str, exc, + NULL); + if (nxt_slow_path(res == NULL)) { + nxt_unit_req_alert(http->req, "'set_exception' call failed"); + nxt_python_print_exception(); + } + + Py_XDECREF(res); + Py_DECREF(future); + Py_DECREF(exc); + + return NXT_UNIT_ERROR; +} + + +static PyObject * +nxt_py_asgi_http_done(PyObject *self, PyObject *future) +{ + int rc; + PyObject *res; + nxt_py_asgi_http_t *http; + + http = (nxt_py_asgi_http_t *) self; + + nxt_unit_req_debug(http->req, "asgi_http_done"); + + /* + * Get Future.result() and it raises an exception, if coroutine exited + * with exception. + */ + res = PyObject_CallMethodObjArgs(future, nxt_py_result_str, NULL); + if (nxt_slow_path(res == NULL)) { + nxt_unit_req_error(http->req, + "Python failed to call 'future.result()'"); + nxt_python_print_exception(); + + rc = NXT_UNIT_ERROR; + + } else { + Py_DECREF(res); + + rc = NXT_UNIT_OK; + } + + nxt_unit_request_done(http->req, rc); + + Py_RETURN_NONE; +} + + +#endif /* NXT_HAVE_ASGI */ diff --git a/src/python/nxt_python_asgi_lifespan.c b/src/python/nxt_python_asgi_lifespan.c new file mode 100644 index 00000000..14d0ee97 --- /dev/null +++ b/src/python/nxt_python_asgi_lifespan.c @@ -0,0 +1,505 @@ + +/* + * Copyright (C) NGINX, Inc. + */ + + +#include <python/nxt_python.h> + +#if (NXT_HAVE_ASGI) + +#include <nxt_main.h> +#include <python/nxt_python_asgi.h> +#include <python/nxt_python_asgi_str.h> + + +typedef struct { + PyObject_HEAD + int disabled; + int startup_received; + int startup_sent; + int shutdown_received; + int shutdown_sent; + int shutdown_called; + PyObject *startup_future; + PyObject *shutdown_future; + PyObject *receive_future; +} nxt_py_asgi_lifespan_t; + + +static PyObject *nxt_py_asgi_lifespan_receive(PyObject *self, PyObject *none); +static PyObject *nxt_py_asgi_lifespan_send(PyObject *self, PyObject *dict); +static PyObject *nxt_py_asgi_lifespan_send_startup( + nxt_py_asgi_lifespan_t *lifespan, int v, PyObject *dict); +static PyObject *nxt_py_asgi_lifespan_send_(nxt_py_asgi_lifespan_t *lifespan, + int v, int *sent, PyObject **future); +static PyObject *nxt_py_asgi_lifespan_send_shutdown( + nxt_py_asgi_lifespan_t *lifespan, int v, PyObject *dict); +static PyObject *nxt_py_asgi_lifespan_disable(nxt_py_asgi_lifespan_t *lifespan); +static PyObject *nxt_py_asgi_lifespan_done(PyObject *self, PyObject *future); + + +static nxt_py_asgi_lifespan_t *nxt_py_lifespan; + +static PyMethodDef nxt_py_asgi_lifespan_methods[] = { + { "receive", nxt_py_asgi_lifespan_receive, METH_NOARGS, 0 }, + { "send", nxt_py_asgi_lifespan_send, METH_O, 0 }, + { "_done", nxt_py_asgi_lifespan_done, METH_O, 0 }, + { NULL, NULL, 0, 0 } +}; + +static PyAsyncMethods nxt_py_asgi_async_methods = { + .am_await = nxt_py_asgi_await, +}; + +static PyTypeObject nxt_py_asgi_lifespan_type = { + PyVarObject_HEAD_INIT(NULL, 0) + + .tp_name = "unit._asgi_lifespan", + .tp_basicsize = sizeof(nxt_py_asgi_lifespan_t), + .tp_dealloc = nxt_py_asgi_dealloc, + .tp_as_async = &nxt_py_asgi_async_methods, + .tp_flags = Py_TPFLAGS_DEFAULT, + .tp_doc = "unit ASGI Lifespan object", + .tp_iter = nxt_py_asgi_iter, + .tp_iternext = nxt_py_asgi_next, + .tp_methods = nxt_py_asgi_lifespan_methods, +}; + + +nxt_int_t +nxt_py_asgi_lifespan_startup(nxt_task_t *task) +{ + PyObject *scope, *res, *py_task, *receive, *send, *done; + nxt_int_t rc; + nxt_py_asgi_lifespan_t *lifespan; + + if (nxt_slow_path(PyType_Ready(&nxt_py_asgi_lifespan_type) != 0)) { + nxt_alert(task, + "Python failed to initialize the 'asgi_lifespan' type object"); + return NXT_ERROR; + } + + lifespan = PyObject_New(nxt_py_asgi_lifespan_t, &nxt_py_asgi_lifespan_type); + if (nxt_slow_path(lifespan == NULL)) { + nxt_alert(task, "Python failed to create lifespan object"); + return NXT_ERROR; + } + + rc = NXT_ERROR; + + receive = PyObject_GetAttrString((PyObject *) lifespan, "receive"); + if (nxt_slow_path(receive == NULL)) { + nxt_alert(task, "Python failed to get 'receive' method"); + goto release_lifespan; + } + + send = PyObject_GetAttrString((PyObject *) lifespan, "send"); + if (nxt_slow_path(receive == NULL)) { + nxt_alert(task, "Python failed to get 'send' method"); + goto release_receive; + } + + done = PyObject_GetAttrString((PyObject *) lifespan, "_done"); + if (nxt_slow_path(receive == NULL)) { + nxt_alert(task, "Python failed to get '_done' method"); + goto release_send; + } + + lifespan->startup_future = PyObject_CallObject(nxt_py_loop_create_future, + NULL); + if (nxt_slow_path(lifespan->startup_future == NULL)) { + nxt_unit_alert(NULL, "Python failed to create Future object"); + nxt_python_print_exception(); + + goto release_done; + } + + lifespan->disabled = 0; + lifespan->startup_received = 0; + lifespan->startup_sent = 0; + lifespan->shutdown_received = 0; + lifespan->shutdown_sent = 0; + lifespan->shutdown_called = 0; + lifespan->shutdown_future = NULL; + lifespan->receive_future = NULL; + + scope = nxt_py_asgi_new_scope(NULL, nxt_py_lifespan_str, nxt_py_2_0_str); + if (nxt_slow_path(scope == NULL)) { + goto release_future; + } + + res = PyObject_CallFunctionObjArgs(nxt_py_application, + scope, receive, send, NULL); + if (nxt_slow_path(res == NULL)) { + nxt_log(task, NXT_LOG_ERR, "Python failed to call the application"); + nxt_python_print_exception(); + goto release_scope; + } + + if (nxt_slow_path(!PyCoro_CheckExact(res))) { + nxt_log(task, NXT_LOG_ERR, + "Application result type is not a coroutine"); + Py_DECREF(res); + goto release_scope; + } + + py_task = PyObject_CallFunctionObjArgs(nxt_py_loop_create_task, res, NULL); + if (nxt_slow_path(py_task == NULL)) { + nxt_log(task, NXT_LOG_ERR, "Python failed to call the create_task"); + nxt_python_print_exception(); + Py_DECREF(res); + goto release_scope; + } + + Py_DECREF(res); + + res = PyObject_CallMethodObjArgs(py_task, nxt_py_add_done_callback_str, + done, NULL); + if (nxt_slow_path(res == NULL)) { + nxt_log(task, NXT_LOG_ERR, + "Python failed to call 'task.add_done_callback'"); + nxt_python_print_exception(); + goto release_task; + } + + Py_DECREF(res); + + res = PyObject_CallFunctionObjArgs(nxt_py_loop_run_until_complete, + lifespan->startup_future, NULL); + if (nxt_slow_path(res == NULL)) { + nxt_alert(task, "Python failed to call loop.run_until_complete"); + nxt_python_print_exception(); + goto release_task; + } + + Py_DECREF(res); + + if (lifespan->startup_sent == 1 || lifespan->disabled) { + nxt_py_lifespan = lifespan; + Py_INCREF(nxt_py_lifespan); + + rc = NXT_OK; + } + +release_task: + Py_DECREF(py_task); +release_scope: + Py_DECREF(scope); +release_future: + Py_CLEAR(lifespan->startup_future); +release_done: + Py_DECREF(done); +release_send: + Py_DECREF(send); +release_receive: + Py_DECREF(receive); +release_lifespan: + Py_DECREF(lifespan); + + return rc; +} + + +nxt_int_t +nxt_py_asgi_lifespan_shutdown(void) +{ + PyObject *msg, *future, *res; + nxt_py_asgi_lifespan_t *lifespan; + + if (nxt_slow_path(nxt_py_lifespan == NULL || nxt_py_lifespan->disabled)) { + return NXT_OK; + } + + lifespan = nxt_py_lifespan; + lifespan->shutdown_called = 1; + + if (lifespan->receive_future != NULL) { + future = lifespan->receive_future; + lifespan->receive_future = NULL; + + msg = nxt_py_asgi_new_msg(NULL, nxt_py_lifespan_shutdown_str); + + if (nxt_fast_path(msg != NULL)) { + res = PyObject_CallMethodObjArgs(future, nxt_py_set_result_str, + msg, NULL); + Py_XDECREF(res); + Py_DECREF(msg); + } + + Py_DECREF(future); + } + + if (lifespan->shutdown_sent) { + return NXT_OK; + } + + lifespan->shutdown_future = PyObject_CallObject(nxt_py_loop_create_future, + NULL); + if (nxt_slow_path(lifespan->shutdown_future == NULL)) { + nxt_unit_alert(NULL, "Python failed to create Future object"); + nxt_python_print_exception(); + return NXT_ERROR; + } + + res = PyObject_CallFunctionObjArgs(nxt_py_loop_run_until_complete, + lifespan->shutdown_future, NULL); + if (nxt_slow_path(res == NULL)) { + nxt_unit_alert(NULL, "Python failed to call loop.run_until_complete"); + nxt_python_print_exception(); + return NXT_ERROR; + } + + Py_DECREF(res); + Py_CLEAR(lifespan->shutdown_future); + + return NXT_OK; +} + + +static PyObject * +nxt_py_asgi_lifespan_receive(PyObject *self, PyObject *none) +{ + PyObject *msg, *future; + nxt_py_asgi_lifespan_t *lifespan; + + lifespan = (nxt_py_asgi_lifespan_t *) self; + + nxt_unit_debug(NULL, "asgi_lifespan_receive"); + + future = PyObject_CallObject(nxt_py_loop_create_future, NULL); + if (nxt_slow_path(future == NULL)) { + nxt_unit_alert(NULL, "Python failed to create Future object"); + nxt_python_print_exception(); + + return PyErr_Format(PyExc_RuntimeError, + "failed to create Future object"); + } + + if (!lifespan->startup_received) { + lifespan->startup_received = 1; + + msg = nxt_py_asgi_new_msg(NULL, nxt_py_lifespan_startup_str); + + return nxt_py_asgi_set_result_soon(NULL, future, msg); + } + + if (lifespan->shutdown_called && !lifespan->shutdown_received) { + lifespan->shutdown_received = 1; + + msg = nxt_py_asgi_new_msg(NULL, nxt_py_lifespan_shutdown_str); + + return nxt_py_asgi_set_result_soon(NULL, future, msg); + } + + Py_INCREF(future); + lifespan->receive_future = future; + + return future; +} + + +static PyObject * +nxt_py_asgi_lifespan_send(PyObject *self, PyObject *dict) +{ + PyObject *type, *msg; + const char *type_str; + Py_ssize_t type_len; + nxt_py_asgi_lifespan_t *lifespan; + + static const nxt_str_t startup_complete + = nxt_string("lifespan.startup.complete"); + static const nxt_str_t startup_failed + = nxt_string("lifespan.startup.failed"); + static const nxt_str_t shutdown_complete + = nxt_string("lifespan.shutdown.complete"); + static const nxt_str_t shutdown_failed + = nxt_string("lifespan.shutdown.failed"); + + lifespan = (nxt_py_asgi_lifespan_t *) self; + + type = PyDict_GetItem(dict, nxt_py_type_str); + if (nxt_slow_path(type == NULL || !PyUnicode_Check(type))) { + nxt_unit_error(NULL, + "asgi_lifespan_send: 'type' is not a unicode string"); + return PyErr_Format(PyExc_TypeError, + "'type' is not a unicode string"); + } + + type_str = PyUnicode_AsUTF8AndSize(type, &type_len); + + nxt_unit_debug(NULL, "asgi_lifespan_send type is '%.*s'", + (int) type_len, type_str); + + if (type_len == (Py_ssize_t) startup_complete.length + && memcmp(type_str, startup_complete.start, type_len) == 0) + { + return nxt_py_asgi_lifespan_send_startup(lifespan, 0, NULL); + } + + if (type_len == (Py_ssize_t) startup_failed.length + && memcmp(type_str, startup_failed.start, type_len) == 0) + { + msg = PyDict_GetItem(dict, nxt_py_message_str); + return nxt_py_asgi_lifespan_send_startup(lifespan, 1, msg); + } + + if (type_len == (Py_ssize_t) shutdown_complete.length + && memcmp(type_str, shutdown_complete.start, type_len) == 0) + { + return nxt_py_asgi_lifespan_send_shutdown(lifespan, 0, NULL); + } + + if (type_len == (Py_ssize_t) shutdown_failed.length + && memcmp(type_str, shutdown_failed.start, type_len) == 0) + { + msg = PyDict_GetItem(dict, nxt_py_message_str); + return nxt_py_asgi_lifespan_send_shutdown(lifespan, 1, msg); + } + + return nxt_py_asgi_lifespan_disable(lifespan); +} + + +static PyObject * +nxt_py_asgi_lifespan_send_startup(nxt_py_asgi_lifespan_t *lifespan, int v, + PyObject *message) +{ + const char *message_str; + Py_ssize_t message_len; + + if (nxt_slow_path(v != 0)) { + nxt_unit_error(NULL, "Application startup failed"); + + if (nxt_fast_path(message != NULL && PyUnicode_Check(message))) { + message_str = PyUnicode_AsUTF8AndSize(message, &message_len); + + nxt_unit_error(NULL, "%.*s", (int) message_len, message_str); + } + } + + return nxt_py_asgi_lifespan_send_(lifespan, v, + &lifespan->startup_sent, + &lifespan->startup_future); +} + + +static PyObject * +nxt_py_asgi_lifespan_send_(nxt_py_asgi_lifespan_t *lifespan, int v, int *sent, + PyObject **pfuture) +{ + PyObject *future, *res; + + if (*sent) { + return nxt_py_asgi_lifespan_disable(lifespan); + } + + *sent = 1 + v; + + if (*pfuture != NULL) { + future = *pfuture; + *pfuture = NULL; + + res = PyObject_CallMethodObjArgs(future, nxt_py_set_result_str, + Py_None, NULL); + if (nxt_slow_path(res == NULL)) { + nxt_unit_alert(NULL, "Failed to call 'future.set_result'"); + nxt_python_print_exception(); + + return nxt_py_asgi_lifespan_disable(lifespan); + } + + Py_DECREF(res); + Py_DECREF(future); + } + + Py_INCREF(lifespan); + + return (PyObject *) lifespan; +} + + +static PyObject * +nxt_py_asgi_lifespan_disable(nxt_py_asgi_lifespan_t *lifespan) +{ + nxt_unit_warn(NULL, "Got invalid state transition on lifespan protocol"); + + lifespan->disabled = 1; + + return PyErr_Format(PyExc_AssertionError, + "Got invalid state transition on lifespan protocol"); +} + + +static PyObject * +nxt_py_asgi_lifespan_send_shutdown(nxt_py_asgi_lifespan_t *lifespan, int v, + PyObject *message) +{ + return nxt_py_asgi_lifespan_send_(lifespan, v, + &lifespan->shutdown_sent, + &lifespan->shutdown_future); +} + + +static PyObject * +nxt_py_asgi_lifespan_done(PyObject *self, PyObject *future) +{ + PyObject *res; + nxt_py_asgi_lifespan_t *lifespan; + + nxt_unit_debug(NULL, "asgi_lifespan_done"); + + lifespan = (nxt_py_asgi_lifespan_t *) self; + + if (lifespan->startup_sent == 0) { + lifespan->disabled = 1; + } + + /* + * Get Future.result() and it raises an exception, if coroutine exited + * with exception. + */ + res = PyObject_CallMethodObjArgs(future, nxt_py_result_str, NULL); + if (nxt_slow_path(res == NULL)) { + nxt_unit_log(NULL, NXT_UNIT_LOG_INFO, + "ASGI Lifespan processing exception"); + nxt_python_print_exception(); + } + + Py_XDECREF(res); + + if (lifespan->startup_future != NULL) { + future = lifespan->startup_future; + lifespan->startup_future = NULL; + + res = PyObject_CallMethodObjArgs(future, nxt_py_set_result_str, + Py_None, NULL); + if (nxt_slow_path(res == NULL)) { + nxt_unit_alert(NULL, "Failed to call 'future.set_result'"); + nxt_python_print_exception(); + } + + Py_XDECREF(res); + Py_DECREF(future); + } + + if (lifespan->shutdown_future != NULL) { + future = lifespan->shutdown_future; + lifespan->shutdown_future = NULL; + + res = PyObject_CallMethodObjArgs(future, nxt_py_set_result_str, + Py_None, NULL); + if (nxt_slow_path(res == NULL)) { + nxt_unit_alert(NULL, "Failed to call 'future.set_result'"); + nxt_python_print_exception(); + } + + Py_XDECREF(res); + Py_DECREF(future); + } + + Py_RETURN_NONE; +} + + +#endif /* NXT_HAVE_ASGI */ diff --git a/src/python/nxt_python_asgi_str.c b/src/python/nxt_python_asgi_str.c new file mode 100644 index 00000000..37fa7f04 --- /dev/null +++ b/src/python/nxt_python_asgi_str.c @@ -0,0 +1,141 @@ + +/* + * Copyright (C) NGINX, Inc. + */ + + +#include <python/nxt_python.h> + +#if (NXT_HAVE_ASGI) + +#include <nxt_main.h> +#include <python/nxt_python_asgi_str.h> + + +PyObject *nxt_py_1_0_str; +PyObject *nxt_py_1_1_str; +PyObject *nxt_py_2_0_str; +PyObject *nxt_py_2_1_str; +PyObject *nxt_py_3_0_str; +PyObject *nxt_py_add_done_callback_str; +PyObject *nxt_py_asgi_str; +PyObject *nxt_py_bad_state_str; +PyObject *nxt_py_body_str; +PyObject *nxt_py_bytes_str; +PyObject *nxt_py_client_str; +PyObject *nxt_py_code_str; +PyObject *nxt_py_done_str; +PyObject *nxt_py_exception_str; +PyObject *nxt_py_failed_to_send_body_str; +PyObject *nxt_py_headers_str; +PyObject *nxt_py_http_str; +PyObject *nxt_py_http_disconnect_str; +PyObject *nxt_py_http_request_str; +PyObject *nxt_py_http_version_str; +PyObject *nxt_py_https_str; +PyObject *nxt_py_lifespan_str; +PyObject *nxt_py_lifespan_shutdown_str; +PyObject *nxt_py_lifespan_startup_str; +PyObject *nxt_py_method_str; +PyObject *nxt_py_message_str; +PyObject *nxt_py_message_too_big_str; +PyObject *nxt_py_more_body_str; +PyObject *nxt_py_path_str; +PyObject *nxt_py_query_string_str; +PyObject *nxt_py_raw_path_str; +PyObject *nxt_py_result_str; +PyObject *nxt_py_root_path_str; +PyObject *nxt_py_scheme_str; +PyObject *nxt_py_server_str; +PyObject *nxt_py_set_exception_str; +PyObject *nxt_py_set_result_str; +PyObject *nxt_py_spec_version_str; +PyObject *nxt_py_status_str; +PyObject *nxt_py_subprotocol_str; +PyObject *nxt_py_subprotocols_str; +PyObject *nxt_py_text_str; +PyObject *nxt_py_type_str; +PyObject *nxt_py_version_str; +PyObject *nxt_py_websocket_str; +PyObject *nxt_py_websocket_accept_str; +PyObject *nxt_py_websocket_close_str; +PyObject *nxt_py_websocket_connect_str; +PyObject *nxt_py_websocket_disconnect_str; +PyObject *nxt_py_websocket_receive_str; +PyObject *nxt_py_websocket_send_str; +PyObject *nxt_py_ws_str; +PyObject *nxt_py_wss_str; + +static nxt_python_string_t nxt_py_asgi_strings[] = { + { nxt_string("1.0"), &nxt_py_1_0_str }, + { nxt_string("1.1"), &nxt_py_1_1_str }, + { nxt_string("2.0"), &nxt_py_2_0_str }, + { nxt_string("2.1"), &nxt_py_2_1_str }, + { nxt_string("3.0"), &nxt_py_3_0_str }, + { nxt_string("add_done_callback"), &nxt_py_add_done_callback_str }, + { nxt_string("asgi"), &nxt_py_asgi_str }, + { nxt_string("bad state"), &nxt_py_bad_state_str }, + { nxt_string("body"), &nxt_py_body_str }, + { nxt_string("bytes"), &nxt_py_bytes_str }, + { nxt_string("client"), &nxt_py_client_str }, + { nxt_string("code"), &nxt_py_code_str }, + { nxt_string("done"), &nxt_py_done_str }, + { nxt_string("exception"), &nxt_py_exception_str }, + { nxt_string("failed to send body"), &nxt_py_failed_to_send_body_str }, + { nxt_string("headers"), &nxt_py_headers_str }, + { nxt_string("http"), &nxt_py_http_str }, + { nxt_string("http.disconnect"), &nxt_py_http_disconnect_str }, + { nxt_string("http.request"), &nxt_py_http_request_str }, + { nxt_string("http_version"), &nxt_py_http_version_str }, + { nxt_string("https"), &nxt_py_https_str }, + { nxt_string("lifespan"), &nxt_py_lifespan_str }, + { nxt_string("lifespan.shutdown"), &nxt_py_lifespan_shutdown_str }, + { nxt_string("lifespan.startup"), &nxt_py_lifespan_startup_str }, + { nxt_string("message"), &nxt_py_message_str }, + { nxt_string("message too big"), &nxt_py_message_too_big_str }, + { nxt_string("method"), &nxt_py_method_str }, + { nxt_string("more_body"), &nxt_py_more_body_str }, + { nxt_string("path"), &nxt_py_path_str }, + { nxt_string("query_string"), &nxt_py_query_string_str }, + { nxt_string("raw_path"), &nxt_py_raw_path_str }, + { nxt_string("result"), &nxt_py_result_str }, + { nxt_string("root_path"), &nxt_py_root_path_str }, // not used + { nxt_string("scheme"), &nxt_py_scheme_str }, + { nxt_string("server"), &nxt_py_server_str }, + { nxt_string("set_exception"), &nxt_py_set_exception_str }, + { nxt_string("set_result"), &nxt_py_set_result_str }, + { nxt_string("spec_version"), &nxt_py_spec_version_str }, + { nxt_string("status"), &nxt_py_status_str }, + { nxt_string("subprotocol"), &nxt_py_subprotocol_str }, + { nxt_string("subprotocols"), &nxt_py_subprotocols_str }, + { nxt_string("text"), &nxt_py_text_str }, + { nxt_string("type"), &nxt_py_type_str }, + { nxt_string("version"), &nxt_py_version_str }, + { nxt_string("websocket"), &nxt_py_websocket_str }, + { nxt_string("websocket.accept"), &nxt_py_websocket_accept_str }, + { nxt_string("websocket.close"), &nxt_py_websocket_close_str }, + { nxt_string("websocket.connect"), &nxt_py_websocket_connect_str }, + { nxt_string("websocket.disconnect"), &nxt_py_websocket_disconnect_str }, + { nxt_string("websocket.receive"), &nxt_py_websocket_receive_str }, + { nxt_string("websocket.send"), &nxt_py_websocket_send_str }, + { nxt_string("ws"), &nxt_py_ws_str }, + { nxt_string("wss"), &nxt_py_wss_str }, + { nxt_null_string, NULL }, +}; + + +nxt_int_t +nxt_py_asgi_str_init(void) +{ + return nxt_python_init_strings(nxt_py_asgi_strings); +} + + +void +nxt_py_asgi_str_done(void) +{ + nxt_python_done_strings(nxt_py_asgi_strings); +} + + +#endif /* NXT_HAVE_ASGI */ diff --git a/src/python/nxt_python_asgi_str.h b/src/python/nxt_python_asgi_str.h new file mode 100644 index 00000000..3f389c62 --- /dev/null +++ b/src/python/nxt_python_asgi_str.h @@ -0,0 +1,69 @@ + +/* + * Copyright (C) NGINX, Inc. + */ + +#ifndef _NXT_PYTHON_ASGI_STR_H_INCLUDED_ +#define _NXT_PYTHON_ASGI_STR_H_INCLUDED_ + + +extern PyObject *nxt_py_1_0_str; +extern PyObject *nxt_py_1_1_str; +extern PyObject *nxt_py_2_0_str; +extern PyObject *nxt_py_2_1_str; +extern PyObject *nxt_py_3_0_str; +extern PyObject *nxt_py_add_done_callback_str; +extern PyObject *nxt_py_asgi_str; +extern PyObject *nxt_py_bad_state_str; +extern PyObject *nxt_py_body_str; +extern PyObject *nxt_py_bytes_str; +extern PyObject *nxt_py_client_str; +extern PyObject *nxt_py_code_str; +extern PyObject *nxt_py_done_str; +extern PyObject *nxt_py_exception_str; +extern PyObject *nxt_py_failed_to_send_body_str; +extern PyObject *nxt_py_headers_str; +extern PyObject *nxt_py_http_str; +extern PyObject *nxt_py_http_disconnect_str; +extern PyObject *nxt_py_http_request_str; +extern PyObject *nxt_py_http_version_str; +extern PyObject *nxt_py_https_str; +extern PyObject *nxt_py_lifespan_str; +extern PyObject *nxt_py_lifespan_shutdown_str; +extern PyObject *nxt_py_lifespan_startup_str; +extern PyObject *nxt_py_method_str; +extern PyObject *nxt_py_message_str; +extern PyObject *nxt_py_message_too_big_str; +extern PyObject *nxt_py_more_body_str; +extern PyObject *nxt_py_path_str; +extern PyObject *nxt_py_query_string_str; +extern PyObject *nxt_py_result_str; +extern PyObject *nxt_py_raw_path_str; +extern PyObject *nxt_py_root_path_str; +extern PyObject *nxt_py_scheme_str; +extern PyObject *nxt_py_server_str; +extern PyObject *nxt_py_set_exception_str; +extern PyObject *nxt_py_set_result_str; +extern PyObject *nxt_py_spec_version_str; +extern PyObject *nxt_py_status_str; +extern PyObject *nxt_py_subprotocol_str; +extern PyObject *nxt_py_subprotocols_str; +extern PyObject *nxt_py_text_str; +extern PyObject *nxt_py_type_str; +extern PyObject *nxt_py_version_str; +extern PyObject *nxt_py_websocket_str; +extern PyObject *nxt_py_websocket_accept_str; +extern PyObject *nxt_py_websocket_close_str; +extern PyObject *nxt_py_websocket_connect_str; +extern PyObject *nxt_py_websocket_disconnect_str; +extern PyObject *nxt_py_websocket_receive_str; +extern PyObject *nxt_py_websocket_send_str; +extern PyObject *nxt_py_ws_str; +extern PyObject *nxt_py_wss_str; + + +nxt_int_t nxt_py_asgi_str_init(void); +void nxt_py_asgi_str_done(void); + + +#endif /* _NXT_PYTHON_ASGI_STR_H_INCLUDED_ */ diff --git a/src/python/nxt_python_asgi_websocket.c b/src/python/nxt_python_asgi_websocket.c new file mode 100644 index 00000000..5a27b588 --- /dev/null +++ b/src/python/nxt_python_asgi_websocket.c @@ -0,0 +1,1084 @@ + +/* + * Copyright (C) NGINX, Inc. + */ + + +#include <python/nxt_python.h> + +#if (NXT_HAVE_ASGI) + +#include <nxt_main.h> +#include <nxt_unit.h> +#include <nxt_unit_request.h> +#include <nxt_unit_websocket.h> +#include <nxt_websocket_header.h> +#include <python/nxt_python_asgi.h> +#include <python/nxt_python_asgi_str.h> + + +enum { + NXT_WS_INIT, + NXT_WS_CONNECT, + NXT_WS_ACCEPTED, + NXT_WS_DISCONNECTED, + NXT_WS_CLOSED, +}; + + +typedef struct { + nxt_queue_link_t link; + nxt_unit_websocket_frame_t *frame; +} nxt_py_asgi_penging_frame_t; + + +typedef struct { + PyObject_HEAD + nxt_unit_request_info_t *req; + PyObject *receive_future; + PyObject *receive_exc_str; + int state; + nxt_queue_t pending_frames; + uint64_t pending_payload_len; + uint64_t pending_frame_len; + int pending_fins; +} nxt_py_asgi_websocket_t; + + +static PyObject *nxt_py_asgi_websocket_receive(PyObject *self, PyObject *none); +static PyObject *nxt_py_asgi_websocket_send(PyObject *self, PyObject *dict); +static PyObject *nxt_py_asgi_websocket_accept(nxt_py_asgi_websocket_t *ws, + PyObject *dict); +static PyObject *nxt_py_asgi_websocket_close(nxt_py_asgi_websocket_t *ws, + PyObject *dict); +static PyObject *nxt_py_asgi_websocket_send_frame(nxt_py_asgi_websocket_t *ws, + PyObject *dict); +static void nxt_py_asgi_websocket_receive_done(nxt_py_asgi_websocket_t *ws, + PyObject *msg); +static void nxt_py_asgi_websocket_receive_fail(nxt_py_asgi_websocket_t *ws, + PyObject *exc); +static void nxt_py_asgi_websocket_suspend_frame(nxt_unit_websocket_frame_t *f); +static PyObject *nxt_py_asgi_websocket_pop_msg(nxt_py_asgi_websocket_t *ws, + nxt_unit_websocket_frame_t *frame); +static uint64_t nxt_py_asgi_websocket_pending_len( + nxt_py_asgi_websocket_t *ws); +static nxt_unit_websocket_frame_t *nxt_py_asgi_websocket_pop_frame( + nxt_py_asgi_websocket_t *ws); +static PyObject *nxt_py_asgi_websocket_disconnect_msg( + nxt_py_asgi_websocket_t *ws); +static PyObject *nxt_py_asgi_websocket_done(PyObject *self, PyObject *future); + + +static PyMethodDef nxt_py_asgi_websocket_methods[] = { + { "receive", nxt_py_asgi_websocket_receive, METH_NOARGS, 0 }, + { "send", nxt_py_asgi_websocket_send, METH_O, 0 }, + { "_done", nxt_py_asgi_websocket_done, METH_O, 0 }, + { NULL, NULL, 0, 0 } +}; + +static PyAsyncMethods nxt_py_asgi_async_methods = { + .am_await = nxt_py_asgi_await, +}; + +static PyTypeObject nxt_py_asgi_websocket_type = { + PyVarObject_HEAD_INIT(NULL, 0) + + .tp_name = "unit._asgi_websocket", + .tp_basicsize = sizeof(nxt_py_asgi_websocket_t), + .tp_dealloc = nxt_py_asgi_dealloc, + .tp_as_async = &nxt_py_asgi_async_methods, + .tp_flags = Py_TPFLAGS_DEFAULT, + .tp_doc = "unit ASGI WebSocket connection object", + .tp_iter = nxt_py_asgi_iter, + .tp_iternext = nxt_py_asgi_next, + .tp_methods = nxt_py_asgi_websocket_methods, +}; + +static uint64_t nxt_py_asgi_ws_max_frame_size = 1024 * 1024; +static uint64_t nxt_py_asgi_ws_max_buffer_size = 10 * 1024 * 1024; + + +nxt_int_t +nxt_py_asgi_websocket_init(nxt_task_t *task) +{ + if (nxt_slow_path(PyType_Ready(&nxt_py_asgi_websocket_type) != 0)) { + nxt_alert(task, + "Python failed to initialize the \"asgi_websocket\" type object"); + return NXT_ERROR; + } + + return NXT_OK; +} + + +PyObject * +nxt_py_asgi_websocket_create(nxt_unit_request_info_t *req) +{ + nxt_py_asgi_websocket_t *ws; + + ws = PyObject_New(nxt_py_asgi_websocket_t, &nxt_py_asgi_websocket_type); + + if (nxt_fast_path(ws != NULL)) { + ws->req = req; + ws->receive_future = NULL; + ws->receive_exc_str = NULL; + ws->state = NXT_WS_INIT; + nxt_queue_init(&ws->pending_frames); + ws->pending_payload_len = 0; + ws->pending_frame_len = 0; + ws->pending_fins = 0; + } + + return (PyObject *) ws; +} + + +static PyObject * +nxt_py_asgi_websocket_receive(PyObject *self, PyObject *none) +{ + PyObject *future, *msg; + nxt_py_asgi_websocket_t *ws; + + ws = (nxt_py_asgi_websocket_t *) self; + + nxt_unit_req_debug(ws->req, "asgi_websocket_receive"); + + /* If exception happened out of receive() call, raise it now. */ + if (nxt_slow_path(ws->receive_exc_str != NULL)) { + PyErr_SetObject(PyExc_RuntimeError, ws->receive_exc_str); + + ws->receive_exc_str = NULL; + + return NULL; + } + + if (nxt_slow_path(ws->state == NXT_WS_CLOSED)) { + nxt_unit_req_error(ws->req, + "receive() called for closed WebSocket"); + + return PyErr_Format(PyExc_RuntimeError, + "WebSocket already closed"); + } + + future = PyObject_CallObject(nxt_py_loop_create_future, NULL); + if (nxt_slow_path(future == NULL)) { + nxt_unit_req_alert(ws->req, "Python failed to create Future object"); + nxt_python_print_exception(); + + return PyErr_Format(PyExc_RuntimeError, + "failed to create Future object"); + } + + if (nxt_slow_path(ws->state == NXT_WS_INIT)) { + ws->state = NXT_WS_CONNECT; + + msg = nxt_py_asgi_new_msg(ws->req, nxt_py_websocket_connect_str); + + return nxt_py_asgi_set_result_soon(ws->req, future, msg); + } + + if (ws->pending_fins > 0) { + msg = nxt_py_asgi_websocket_pop_msg(ws, NULL); + + return nxt_py_asgi_set_result_soon(ws->req, future, msg); + } + + if (nxt_slow_path(ws->state == NXT_WS_DISCONNECTED)) { + msg = nxt_py_asgi_websocket_disconnect_msg(ws); + + return nxt_py_asgi_set_result_soon(ws->req, future, msg); + } + + ws->receive_future = future; + Py_INCREF(ws->receive_future); + + return future; +} + + +static PyObject * +nxt_py_asgi_websocket_send(PyObject *self, PyObject *dict) +{ + PyObject *type; + const char *type_str; + Py_ssize_t type_len; + nxt_py_asgi_websocket_t *ws; + + static const nxt_str_t websocket_accept = nxt_string("websocket.accept"); + static const nxt_str_t websocket_close = nxt_string("websocket.close"); + static const nxt_str_t websocket_send = nxt_string("websocket.send"); + + ws = (nxt_py_asgi_websocket_t *) self; + + type = PyDict_GetItem(dict, nxt_py_type_str); + if (nxt_slow_path(type == NULL || !PyUnicode_Check(type))) { + nxt_unit_req_error(ws->req, "asgi_websocket_send: " + "'type' is not a unicode string"); + return PyErr_Format(PyExc_TypeError, + "'type' is not a unicode string"); + } + + type_str = PyUnicode_AsUTF8AndSize(type, &type_len); + + nxt_unit_req_debug(ws->req, "asgi_websocket_send type is '%.*s'", + (int) type_len, type_str); + + if (type_len == (Py_ssize_t) websocket_accept.length + && memcmp(type_str, websocket_accept.start, type_len) == 0) + { + return nxt_py_asgi_websocket_accept(ws, dict); + } + + if (type_len == (Py_ssize_t) websocket_close.length + && memcmp(type_str, websocket_close.start, type_len) == 0) + { + return nxt_py_asgi_websocket_close(ws, dict); + } + + if (type_len == (Py_ssize_t) websocket_send.length + && memcmp(type_str, websocket_send.start, type_len) == 0) + { + return nxt_py_asgi_websocket_send_frame(ws, dict); + } + + nxt_unit_req_error(ws->req, "asgi_websocket_send: " + "unexpected 'type': '%.*s'", (int) type_len, type_str); + return PyErr_Format(PyExc_AssertionError, "unexpected 'type': '%U'", type); +} + + +static PyObject * +nxt_py_asgi_websocket_accept(nxt_py_asgi_websocket_t *ws, PyObject *dict) +{ + int rc; + char *subprotocol_str; + PyObject *res, *headers, *subprotocol; + Py_ssize_t subprotocol_len; + nxt_py_asgi_calc_size_ctx_t calc_size_ctx; + nxt_py_asgi_add_field_ctx_t add_field_ctx; + + static const nxt_str_t ws_protocol = nxt_string("sec-websocket-protocol"); + + switch(ws->state) { + case NXT_WS_INIT: + return PyErr_Format(PyExc_RuntimeError, + "WebSocket connect not received"); + case NXT_WS_CONNECT: + break; + + case NXT_WS_ACCEPTED: + return PyErr_Format(PyExc_RuntimeError, "WebSocket already accepted"); + + case NXT_WS_DISCONNECTED: + return PyErr_Format(PyExc_RuntimeError, "WebSocket disconnected"); + + case NXT_WS_CLOSED: + return PyErr_Format(PyExc_RuntimeError, "WebSocket already closed"); + } + + if (nxt_slow_path(nxt_unit_response_is_websocket(ws->req))) { + return PyErr_Format(PyExc_RuntimeError, "WebSocket already accepted"); + } + + if (nxt_slow_path(nxt_unit_response_is_sent(ws->req))) { + return PyErr_Format(PyExc_RuntimeError, "response already sent"); + } + + calc_size_ctx.fields_size = 0; + calc_size_ctx.fields_count = 0; + + headers = PyDict_GetItem(dict, nxt_py_headers_str); + if (headers != NULL) { + res = nxt_py_asgi_enum_headers(headers, nxt_py_asgi_calc_size, + &calc_size_ctx); + if (nxt_slow_path(res == NULL)) { + return NULL; + } + } + + subprotocol = PyDict_GetItem(dict, nxt_py_subprotocol_str); + if (subprotocol != NULL && PyUnicode_Check(subprotocol)) { + subprotocol_str = PyUnicode_DATA(subprotocol); + subprotocol_len = PyUnicode_GET_LENGTH(subprotocol); + + calc_size_ctx.fields_size += ws_protocol.length + subprotocol_len; + calc_size_ctx.fields_count++; + + } else { + subprotocol_str = NULL; + subprotocol_len = 0; + } + + rc = nxt_unit_response_init(ws->req, 101, + calc_size_ctx.fields_count, + calc_size_ctx.fields_size); + if (nxt_slow_path(rc != NXT_UNIT_OK)) { + return PyErr_Format(PyExc_RuntimeError, + "failed to allocate response object"); + } + + add_field_ctx.req = ws->req; + add_field_ctx.content_length = -1; + + if (headers != NULL) { + res = nxt_py_asgi_enum_headers(headers, nxt_py_asgi_add_field, + &add_field_ctx); + if (nxt_slow_path(res == NULL)) { + return NULL; + } + } + + if (subprotocol_len > 0) { + rc = nxt_unit_response_add_field(ws->req, + (const char *) ws_protocol.start, + ws_protocol.length, + subprotocol_str, subprotocol_len); + if (nxt_slow_path(rc != NXT_UNIT_OK)) { + return PyErr_Format(PyExc_RuntimeError, + "failed to add header"); + } + } + + rc = nxt_unit_response_send(ws->req); + if (nxt_slow_path(rc != NXT_UNIT_OK)) { + return PyErr_Format(PyExc_RuntimeError, "failed to send response"); + } + + ws->state = NXT_WS_ACCEPTED; + + Py_INCREF(ws); + + return (PyObject *) ws; +} + + +static PyObject * +nxt_py_asgi_websocket_close(nxt_py_asgi_websocket_t *ws, PyObject *dict) +{ + int rc; + uint16_t status_code; + PyObject *code; + + if (nxt_slow_path(ws->state == NXT_WS_INIT)) { + return PyErr_Format(PyExc_RuntimeError, + "WebSocket connect not received"); + } + + if (nxt_slow_path(ws->state == NXT_WS_DISCONNECTED)) { + return PyErr_Format(PyExc_RuntimeError, "WebSocket disconnected"); + } + + if (nxt_slow_path(ws->state == NXT_WS_CLOSED)) { + return PyErr_Format(PyExc_RuntimeError, "WebSocket already closed"); + } + + if (nxt_unit_response_is_websocket(ws->req)) { + code = PyDict_GetItem(dict, nxt_py_code_str); + if (nxt_slow_path(code != NULL && !PyLong_Check(code))) { + return PyErr_Format(PyExc_TypeError, "'code' is not integer"); + } + + status_code = (code != NULL) ? htons(PyLong_AsLong(code)) + : htons(NXT_WEBSOCKET_CR_NORMAL); + + rc = nxt_unit_websocket_send(ws->req, NXT_WEBSOCKET_OP_CLOSE, + 1, &status_code, 2); + if (nxt_slow_path(rc != NXT_UNIT_OK)) { + return PyErr_Format(PyExc_RuntimeError, + "failed to send close frame"); + } + + } else { + rc = nxt_unit_response_init(ws->req, 403, 0, 0); + if (nxt_slow_path(rc != NXT_UNIT_OK)) { + return PyErr_Format(PyExc_RuntimeError, + "failed to allocate response object"); + } + + rc = nxt_unit_response_send(ws->req); + if (nxt_slow_path(rc != NXT_UNIT_OK)) { + return PyErr_Format(PyExc_RuntimeError, + "failed to send response"); + } + } + + ws->state = NXT_WS_CLOSED; + + Py_INCREF(ws); + + return (PyObject *) ws; +} + + +static PyObject * +nxt_py_asgi_websocket_send_frame(nxt_py_asgi_websocket_t *ws, PyObject *dict) +{ + int rc; + uint8_t opcode; + PyObject *bytes, *text; + const void *buf; + Py_ssize_t buf_size; + + if (nxt_slow_path(ws->state == NXT_WS_INIT)) { + return PyErr_Format(PyExc_RuntimeError, + "WebSocket connect not received"); + } + + if (nxt_slow_path(ws->state == NXT_WS_CONNECT)) { + return PyErr_Format(PyExc_RuntimeError, + "WebSocket not accepted yet"); + } + + if (nxt_slow_path(ws->state == NXT_WS_DISCONNECTED)) { + return PyErr_Format(PyExc_RuntimeError, "WebSocket disconnected"); + } + + if (nxt_slow_path(ws->state == NXT_WS_CLOSED)) { + return PyErr_Format(PyExc_RuntimeError, "WebSocket already closed"); + } + + bytes = PyDict_GetItem(dict, nxt_py_bytes_str); + if (bytes == Py_None) { + bytes = NULL; + } + + if (nxt_slow_path(bytes != NULL && !PyBytes_Check(bytes))) { + return PyErr_Format(PyExc_TypeError, + "'bytes' is not a byte string"); + } + + text = PyDict_GetItem(dict, nxt_py_text_str); + if (text == Py_None) { + text = NULL; + } + + if (nxt_slow_path(text != NULL && !PyUnicode_Check(text))) { + return PyErr_Format(PyExc_TypeError, + "'text' is not a unicode string"); + } + + if (nxt_slow_path(((bytes != NULL) ^ (text != NULL)) == 0)) { + return PyErr_Format(PyExc_ValueError, + "Exactly one of 'bytes' or 'text' must be non-None"); + } + + if (bytes != NULL) { + buf = PyBytes_AS_STRING(bytes); + buf_size = PyBytes_GET_SIZE(bytes); + opcode = NXT_WEBSOCKET_OP_BINARY; + + } else { + buf = PyUnicode_AsUTF8AndSize(text, &buf_size); + opcode = NXT_WEBSOCKET_OP_TEXT; + } + + rc = nxt_unit_websocket_send(ws->req, opcode, 1, buf, buf_size); + if (nxt_slow_path(rc != NXT_UNIT_OK)) { + return PyErr_Format(PyExc_RuntimeError, "failed to send close frame"); + } + + Py_INCREF(ws); + return (PyObject *) ws; +} + + +void +nxt_py_asgi_websocket_handler(nxt_unit_websocket_frame_t *frame) +{ + uint8_t opcode; + uint16_t status_code; + uint64_t rest; + PyObject *msg, *exc; + nxt_py_asgi_websocket_t *ws; + + ws = frame->req->data; + + nxt_unit_req_debug(ws->req, "asgi_websocket_handler"); + + opcode = frame->header->opcode; + if (nxt_slow_path(opcode != NXT_WEBSOCKET_OP_CONT + && opcode != NXT_WEBSOCKET_OP_TEXT + && opcode != NXT_WEBSOCKET_OP_BINARY + && opcode != NXT_WEBSOCKET_OP_CLOSE)) + { + nxt_unit_websocket_done(frame); + + nxt_unit_req_debug(ws->req, + "asgi_websocket_handler: ignore frame with opcode %d", + opcode); + + return; + } + + if (nxt_slow_path(ws->state != NXT_WS_ACCEPTED)) { + nxt_unit_websocket_done(frame); + + goto bad_state; + } + + rest = nxt_py_asgi_ws_max_frame_size - ws->pending_frame_len; + + if (nxt_slow_path(frame->payload_len > rest)) { + nxt_unit_websocket_done(frame); + + goto too_big; + } + + rest = nxt_py_asgi_ws_max_buffer_size - ws->pending_payload_len; + + if (nxt_slow_path(frame->payload_len > rest)) { + nxt_unit_websocket_done(frame); + + goto too_big; + } + + if (ws->receive_future == NULL || frame->header->fin == 0) { + nxt_py_asgi_websocket_suspend_frame(frame); + + return; + } + + if (!nxt_queue_is_empty(&ws->pending_frames)) { + if (nxt_slow_path(opcode == NXT_WEBSOCKET_OP_TEXT + || opcode == NXT_WEBSOCKET_OP_BINARY)) + { + nxt_unit_req_alert(ws->req, + "Invalid state: pending frames with active receiver. " + "CONT frame expected. (%d)", opcode); + + PyErr_SetString(PyExc_AssertionError, + "Invalid state: pending frames with active receiver. " + "CONT frame expected."); + + nxt_unit_websocket_done(frame); + + return; + } + } + + msg = nxt_py_asgi_websocket_pop_msg(ws, frame); + if (nxt_slow_path(msg == NULL)) { + exc = PyErr_Occurred(); + Py_INCREF(exc); + + goto raise; + } + + nxt_py_asgi_websocket_receive_done(ws, msg); + + return; + +bad_state: + + if (ws->receive_future == NULL) { + ws->receive_exc_str = nxt_py_bad_state_str; + + return; + } + + exc = PyObject_CallFunctionObjArgs(PyExc_RuntimeError, + nxt_py_bad_state_str, + NULL); + if (nxt_slow_path(exc == NULL)) { + nxt_unit_req_alert(ws->req, "RuntimeError create failed"); + nxt_python_print_exception(); + + exc = Py_None; + Py_INCREF(exc); + } + + goto raise; + +too_big: + + status_code = htons(NXT_WEBSOCKET_CR_MESSAGE_TOO_BIG); + + (void) nxt_unit_websocket_send(ws->req, NXT_WEBSOCKET_OP_CLOSE, + 1, &status_code, 2); + + ws->state = NXT_WS_CLOSED; + + if (ws->receive_future == NULL) { + ws->receive_exc_str = nxt_py_message_too_big_str; + + return; + } + + exc = PyObject_CallFunctionObjArgs(PyExc_RuntimeError, + nxt_py_message_too_big_str, + NULL); + if (nxt_slow_path(exc == NULL)) { + nxt_unit_req_alert(ws->req, "RuntimeError create failed"); + nxt_python_print_exception(); + + exc = Py_None; + Py_INCREF(exc); + } + +raise: + + nxt_py_asgi_websocket_receive_fail(ws, exc); +} + + +static void +nxt_py_asgi_websocket_receive_done(nxt_py_asgi_websocket_t *ws, PyObject *msg) +{ + PyObject *future, *res; + + future = ws->receive_future; + ws->receive_future = NULL; + + res = PyObject_CallMethodObjArgs(future, nxt_py_set_result_str, msg, NULL); + if (nxt_slow_path(res == NULL)) { + nxt_unit_req_alert(ws->req, "'set_result' call failed"); + nxt_python_print_exception(); + } + + Py_XDECREF(res); + Py_DECREF(future); + + Py_DECREF(msg); +} + + +static void +nxt_py_asgi_websocket_receive_fail(nxt_py_asgi_websocket_t *ws, PyObject *exc) +{ + PyObject *future, *res; + + future = ws->receive_future; + ws->receive_future = NULL; + + res = PyObject_CallMethodObjArgs(future, nxt_py_set_exception_str, exc, + NULL); + if (nxt_slow_path(res == NULL)) { + nxt_unit_req_alert(ws->req, "'set_exception' call failed"); + nxt_python_print_exception(); + } + + Py_XDECREF(res); + Py_DECREF(future); + + Py_DECREF(exc); +} + + +static void +nxt_py_asgi_websocket_suspend_frame(nxt_unit_websocket_frame_t *frame) +{ + int rc; + nxt_py_asgi_websocket_t *ws; + nxt_py_asgi_penging_frame_t *p; + + nxt_unit_req_debug(frame->req, "asgi_websocket_suspend_frame: " + "%d, %"PRIu64", %d", + frame->header->opcode, frame->payload_len, + frame->header->fin); + + ws = frame->req->data; + + rc = nxt_unit_websocket_retain(frame); + if (nxt_slow_path(rc == NXT_UNIT_ERROR)) { + nxt_unit_req_alert(ws->req, "Failed to retain frame for suspension."); + + nxt_unit_websocket_done(frame); + + PyErr_SetString(PyExc_RuntimeError, + "Failed to retain frame for suspension."); + + return; + } + + p = nxt_unit_malloc(frame->req->ctx, sizeof(nxt_py_asgi_penging_frame_t)); + if (nxt_slow_path(p == NULL)) { + nxt_unit_req_alert(ws->req, + "Failed to allocate buffer to suspend frame."); + + nxt_unit_websocket_done(frame); + + PyErr_SetString(PyExc_RuntimeError, + "Failed to allocate buffer to suspend frame."); + + return; + } + + p->frame = frame; + nxt_queue_insert_tail(&ws->pending_frames, &p->link); + + ws->pending_payload_len += frame->payload_len; + ws->pending_fins += frame->header->fin; + + if (frame->header->fin) { + ws->pending_frame_len = 0; + + } else { + if (frame->header->opcode == NXT_WEBSOCKET_OP_CONT) { + ws->pending_frame_len += frame->payload_len; + + } else { + ws->pending_frame_len = frame->payload_len; + } + } +} + + +static PyObject * +nxt_py_asgi_websocket_pop_msg(nxt_py_asgi_websocket_t *ws, + nxt_unit_websocket_frame_t *frame) +{ + int fin; + char *buf; + uint8_t code_buf[2], opcode; + uint16_t code; + PyObject *msg, *data, *type, *data_key; + uint64_t payload_len; + nxt_unit_websocket_frame_t *fin_frame; + + nxt_unit_req_debug(ws->req, "asgi_websocket_pop_msg"); + + fin_frame = NULL; + + if (nxt_queue_is_empty(&ws->pending_frames) + || (frame != NULL + && frame->header->opcode == NXT_WEBSOCKET_OP_CLOSE)) + { + payload_len = frame->payload_len; + + } else { + if (frame != NULL) { + payload_len = ws->pending_payload_len + frame->payload_len; + fin_frame = frame; + + } else { + payload_len = nxt_py_asgi_websocket_pending_len(ws); + } + + frame = nxt_py_asgi_websocket_pop_frame(ws); + } + + opcode = frame->header->opcode; + + if (nxt_slow_path(opcode == NXT_WEBSOCKET_OP_CONT)) { + nxt_unit_req_alert(ws->req, + "Invalid state: attempt to process CONT frame."); + + nxt_unit_websocket_done(frame); + + return PyErr_Format(PyExc_AssertionError, + "Invalid state: attempt to process CONT frame."); + } + + type = nxt_py_websocket_receive_str; + + switch (opcode) { + case NXT_WEBSOCKET_OP_TEXT: + buf = nxt_unit_malloc(frame->req->ctx, payload_len); + if (nxt_slow_path(buf == NULL)) { + nxt_unit_req_alert(ws->req, + "Failed to allocate buffer for payload (%d).", + (int) payload_len); + + nxt_unit_websocket_done(frame); + + return PyErr_Format(PyExc_RuntimeError, + "Failed to allocate buffer for payload (%d).", + (int) payload_len); + } + + data = NULL; + data_key = nxt_py_text_str; + + break; + + case NXT_WEBSOCKET_OP_BINARY: + data = PyBytes_FromStringAndSize(NULL, payload_len); + if (nxt_slow_path(data == NULL)) { + nxt_unit_req_alert(ws->req, + "Failed to create Bytes for payload (%d).", + (int) payload_len); + nxt_python_print_exception(); + + nxt_unit_websocket_done(frame); + + return PyErr_Format(PyExc_RuntimeError, + "Failed to create Bytes for payload."); + } + + buf = (char *) PyBytes_AS_STRING(data); + data_key = nxt_py_bytes_str; + + break; + + case NXT_WEBSOCKET_OP_CLOSE: + if (frame->payload_len >= 2) { + nxt_unit_websocket_read(frame, code_buf, 2); + code = ((uint16_t) code_buf[0]) << 8 | code_buf[1]; + + } else { + code = NXT_WEBSOCKET_CR_NORMAL; + } + + nxt_unit_websocket_done(frame); + + data = PyLong_FromLong(code); + if (nxt_slow_path(data == NULL)) { + nxt_unit_req_alert(ws->req, + "Failed to create Long from code %d.", + (int) code); + nxt_python_print_exception(); + + return PyErr_Format(PyExc_RuntimeError, + "Failed to create Long from code %d.", + (int) code); + } + + buf = NULL; + type = nxt_py_websocket_disconnect_str; + data_key = nxt_py_code_str; + + break; + + default: + nxt_unit_req_alert(ws->req, "Unexpected opcode %d", opcode); + + nxt_unit_websocket_done(frame); + + return PyErr_Format(PyExc_AssertionError, "Unexpected opcode %d", + opcode); + } + + if (buf != NULL) { + fin = frame->header->fin; + buf += nxt_unit_websocket_read(frame, buf, frame->payload_len); + + nxt_unit_websocket_done(frame); + + if (!fin) { + while (!nxt_queue_is_empty(&ws->pending_frames)) { + frame = nxt_py_asgi_websocket_pop_frame(ws); + fin = frame->header->fin; + + buf += nxt_unit_websocket_read(frame, buf, frame->payload_len); + + nxt_unit_websocket_done(frame); + + if (fin) { + break; + } + } + + if (fin_frame != NULL) { + buf += nxt_unit_websocket_read(fin_frame, buf, + fin_frame->payload_len); + nxt_unit_websocket_done(fin_frame); + } + } + + if (opcode == NXT_WEBSOCKET_OP_TEXT) { + buf -= payload_len; + + data = PyUnicode_DecodeUTF8(buf, payload_len, NULL); + + nxt_unit_free(ws->req->ctx, buf); + + if (nxt_slow_path(data == NULL)) { + nxt_unit_req_alert(ws->req, + "Failed to create Unicode for payload (%d).", + (int) payload_len); + nxt_python_print_exception(); + + return PyErr_Format(PyExc_RuntimeError, + "Failed to create Unicode."); + } + } + } + + msg = nxt_py_asgi_new_msg(ws->req, type); + if (nxt_slow_path(msg == NULL)) { + Py_DECREF(data); + return NULL; + } + + if (nxt_slow_path(PyDict_SetItem(msg, data_key, data) == -1)) { + nxt_unit_req_alert(ws->req, "Python failed to set 'msg.data' item"); + + Py_DECREF(msg); + Py_DECREF(data); + + return PyErr_Format(PyExc_RuntimeError, + "Python failed to set 'msg.data' item"); + } + + Py_DECREF(data); + + return msg; +} + + +static uint64_t +nxt_py_asgi_websocket_pending_len(nxt_py_asgi_websocket_t *ws) +{ + uint64_t res; + nxt_py_asgi_penging_frame_t *p; + + res = 0; + + nxt_queue_each(p, &ws->pending_frames, nxt_py_asgi_penging_frame_t, link) { + res += p->frame->payload_len; + + if (p->frame->header->fin) { + nxt_unit_req_debug(ws->req, "asgi_websocket_pending_len: %d", + (int) res); + return res; + } + } nxt_queue_loop; + + nxt_unit_req_debug(ws->req, "asgi_websocket_pending_len: %d (all)", + (int) res); + return res; +} + + +static nxt_unit_websocket_frame_t * +nxt_py_asgi_websocket_pop_frame(nxt_py_asgi_websocket_t *ws) +{ + nxt_queue_link_t *lnk; + nxt_unit_websocket_frame_t *frame; + nxt_py_asgi_penging_frame_t *p; + + lnk = nxt_queue_first(&ws->pending_frames); + nxt_queue_remove(lnk); + + p = nxt_queue_link_data(lnk, nxt_py_asgi_penging_frame_t, link); + + frame = p->frame; + ws->pending_payload_len -= frame->payload_len; + ws->pending_fins -= frame->header->fin; + + nxt_unit_free(frame->req->ctx, p); + + nxt_unit_req_debug(frame->req, "asgi_websocket_pop_frame: " + "%d, %"PRIu64", %d", + frame->header->opcode, frame->payload_len, + frame->header->fin); + + return frame; +} + + +void +nxt_py_asgi_websocket_close_handler(nxt_unit_request_info_t *req) +{ + PyObject *msg, *exc; + nxt_py_asgi_websocket_t *ws; + + ws = req->data; + + nxt_unit_req_debug(req, "asgi_websocket_close_handler"); + + if (ws->receive_future == NULL) { + ws->state = NXT_WS_DISCONNECTED; + + return; + } + + msg = nxt_py_asgi_websocket_disconnect_msg(ws); + if (nxt_slow_path(msg == NULL)) { + exc = PyErr_Occurred(); + Py_INCREF(exc); + + nxt_py_asgi_websocket_receive_fail(ws, exc); + + } else { + nxt_py_asgi_websocket_receive_done(ws, msg); + } +} + + +static PyObject * +nxt_py_asgi_websocket_disconnect_msg(nxt_py_asgi_websocket_t *ws) +{ + PyObject *msg, *code; + + msg = nxt_py_asgi_new_msg(ws->req, nxt_py_websocket_disconnect_str); + if (nxt_slow_path(msg == NULL)) { + return NULL; + } + + code = PyLong_FromLong(NXT_WEBSOCKET_CR_GOING_AWAY); + if (nxt_slow_path(code == NULL)) { + nxt_unit_req_alert(ws->req, "Python failed to create long"); + nxt_python_print_exception(); + + Py_DECREF(msg); + + return PyErr_Format(PyExc_RuntimeError, "failed to create long"); + } + + if (nxt_slow_path(PyDict_SetItem(msg, nxt_py_code_str, code) == -1)) { + nxt_unit_req_alert(ws->req, "Python failed to set 'msg.code' item"); + + Py_DECREF(msg); + Py_DECREF(code); + + return PyErr_Format(PyExc_RuntimeError, + "Python failed to set 'msg.code' item"); + } + + Py_DECREF(code); + + return msg; +} + + +static PyObject * +nxt_py_asgi_websocket_done(PyObject *self, PyObject *future) +{ + int rc; + uint16_t status_code; + PyObject *res; + nxt_py_asgi_websocket_t *ws; + + ws = (nxt_py_asgi_websocket_t *) self; + + nxt_unit_req_debug(ws->req, "asgi_websocket_done: %p", self); + + /* + * Get Future.result() and it raises an exception, if coroutine exited + * with exception. + */ + res = PyObject_CallMethodObjArgs(future, nxt_py_result_str, NULL); + if (nxt_slow_path(res == NULL)) { + nxt_unit_req_error(ws->req, + "Python failed to call 'future.result()'"); + nxt_python_print_exception(); + + rc = NXT_UNIT_ERROR; + + } else { + Py_DECREF(res); + + rc = NXT_UNIT_OK; + } + + if (ws->state == NXT_WS_ACCEPTED) { + status_code = (rc == NXT_UNIT_OK) + ? htons(NXT_WEBSOCKET_CR_NORMAL) + : htons(NXT_WEBSOCKET_CR_INTERNAL_SERVER_ERROR); + + rc = nxt_unit_websocket_send(ws->req, NXT_WEBSOCKET_OP_CLOSE, + 1, &status_code, 2); + } + + while (!nxt_queue_is_empty(&ws->pending_frames)) { + nxt_unit_websocket_done(nxt_py_asgi_websocket_pop_frame(ws)); + } + + nxt_unit_request_done(ws->req, rc); + + Py_RETURN_NONE; +} + + +#endif /* NXT_HAVE_ASGI */ diff --git a/src/python/nxt_python_wsgi.c b/src/python/nxt_python_wsgi.c new file mode 100644 index 00000000..97030cd3 --- /dev/null +++ b/src/python/nxt_python_wsgi.c @@ -0,0 +1,1264 @@ + +/* + * Copyright (C) Max Romanov + * Copyright (C) Valentin V. Bartenev + * Copyright (C) NGINX, Inc. + */ + + +#include <Python.h> + +#include <nxt_main.h> +#include <nxt_router.h> +#include <nxt_unit.h> +#include <nxt_unit_field.h> +#include <nxt_unit_request.h> +#include <nxt_unit_response.h> + +#include <python/nxt_python.h> + +#include NXT_PYTHON_MOUNTS_H + +/* + * According to "PEP 3333 / A Note On String Types" + * [https://www.python.org/dev/peps/pep-3333/#a-note-on-string-types] + * + * WSGI therefore defines two kinds of "string": + * + * - "Native" strings (which are always implemented using the type named str ) + * that are used for request/response headers and metadata + * + * will use PyString_* or corresponding PyUnicode_* functions + * + * - "Bytestrings" (which are implemented using the bytes type in Python 3, and + * str elsewhere), that are used for the bodies of requests and responses + * (e.g. POST/PUT input data and HTML page outputs). + * + * will use PyString_* or corresponding PyBytes_* functions + */ + + +typedef struct nxt_python_run_ctx_s nxt_python_run_ctx_t; + +typedef struct { + PyObject_HEAD +} nxt_py_input_t; + + +typedef struct { + PyObject_HEAD +} nxt_py_error_t; + +static void nxt_python_request_handler(nxt_unit_request_info_t *req); + +static PyObject *nxt_python_create_environ(nxt_task_t *task); +static PyObject *nxt_python_get_environ(nxt_python_run_ctx_t *ctx); +static int nxt_python_add_sptr(nxt_python_run_ctx_t *ctx, PyObject *name, + nxt_unit_sptr_t *sptr, uint32_t size); +static int nxt_python_add_field(nxt_python_run_ctx_t *ctx, + nxt_unit_field_t *field, int n, uint32_t vl); +static PyObject *nxt_python_field_name(const char *name, uint8_t len); +static PyObject *nxt_python_field_value(nxt_unit_field_t *f, int n, + uint32_t vl); +static int nxt_python_add_obj(nxt_python_run_ctx_t *ctx, PyObject *name, + PyObject *value); + +static PyObject *nxt_py_start_resp(PyObject *self, PyObject *args); +static int nxt_python_response_add_field(nxt_python_run_ctx_t *ctx, + PyObject *name, PyObject *value, int i); +static int nxt_python_str_buf(PyObject *str, char **buf, uint32_t *len, + PyObject **bytes); +static PyObject *nxt_py_write(PyObject *self, PyObject *args); + +static void nxt_py_input_dealloc(nxt_py_input_t *self); +static PyObject *nxt_py_input_read(nxt_py_input_t *self, PyObject *args); +static PyObject *nxt_py_input_readline(nxt_py_input_t *self, PyObject *args); +static PyObject *nxt_py_input_getline(nxt_python_run_ctx_t *ctx, size_t size); +static PyObject *nxt_py_input_readlines(nxt_py_input_t *self, PyObject *args); + +static PyObject *nxt_py_input_iter(PyObject *self); +static PyObject *nxt_py_input_next(PyObject *self); + +static int nxt_python_write(nxt_python_run_ctx_t *ctx, PyObject *bytes); + +struct nxt_python_run_ctx_s { + uint64_t content_length; + uint64_t bytes_sent; + PyObject *environ; + nxt_unit_request_info_t *req; +}; + + +static PyMethodDef nxt_py_start_resp_method[] = { + {"unit_start_response", nxt_py_start_resp, METH_VARARGS, ""} +}; + + +static PyMethodDef nxt_py_write_method[] = { + {"unit_write", nxt_py_write, METH_O, ""} +}; + + +static PyMethodDef nxt_py_input_methods[] = { + { "read", (PyCFunction) nxt_py_input_read, METH_VARARGS, 0 }, + { "readline", (PyCFunction) nxt_py_input_readline, METH_VARARGS, 0 }, + { "readlines", (PyCFunction) nxt_py_input_readlines, METH_VARARGS, 0 }, + { NULL, NULL, 0, 0 } +}; + + +static PyTypeObject nxt_py_input_type = { + PyVarObject_HEAD_INIT(NULL, 0) + + .tp_name = "unit._input", + .tp_basicsize = sizeof(nxt_py_input_t), + .tp_dealloc = (destructor) nxt_py_input_dealloc, + .tp_flags = Py_TPFLAGS_DEFAULT, + .tp_doc = "unit input object.", + .tp_iter = nxt_py_input_iter, + .tp_iternext = nxt_py_input_next, + .tp_methods = nxt_py_input_methods, +}; + + +static PyObject *nxt_py_start_resp_obj; +static PyObject *nxt_py_write_obj; +static PyObject *nxt_py_environ_ptyp; + +static PyThreadState *nxt_python_thread_state; +static nxt_python_run_ctx_t *nxt_python_run_ctx; + +static PyObject *nxt_py_80_str; +static PyObject *nxt_py_close_str; +static PyObject *nxt_py_content_length_str; +static PyObject *nxt_py_content_type_str; +static PyObject *nxt_py_http_str; +static PyObject *nxt_py_https_str; +static PyObject *nxt_py_path_info_str; +static PyObject *nxt_py_query_string_str; +static PyObject *nxt_py_remote_addr_str; +static PyObject *nxt_py_request_method_str; +static PyObject *nxt_py_request_uri_str; +static PyObject *nxt_py_server_addr_str; +static PyObject *nxt_py_server_name_str; +static PyObject *nxt_py_server_port_str; +static PyObject *nxt_py_server_protocol_str; +static PyObject *nxt_py_wsgi_uri_scheme_str; + +static nxt_python_string_t nxt_python_strings[] = { + { nxt_string("80"), &nxt_py_80_str }, + { nxt_string("close"), &nxt_py_close_str }, + { nxt_string("CONTENT_LENGTH"), &nxt_py_content_length_str }, + { nxt_string("CONTENT_TYPE"), &nxt_py_content_type_str }, + { nxt_string("http"), &nxt_py_http_str }, + { nxt_string("https"), &nxt_py_https_str }, + { nxt_string("PATH_INFO"), &nxt_py_path_info_str }, + { nxt_string("QUERY_STRING"), &nxt_py_query_string_str }, + { nxt_string("REMOTE_ADDR"), &nxt_py_remote_addr_str }, + { nxt_string("REQUEST_METHOD"), &nxt_py_request_method_str }, + { nxt_string("REQUEST_URI"), &nxt_py_request_uri_str }, + { nxt_string("SERVER_ADDR"), &nxt_py_server_addr_str }, + { nxt_string("SERVER_NAME"), &nxt_py_server_name_str }, + { nxt_string("SERVER_PORT"), &nxt_py_server_port_str }, + { nxt_string("SERVER_PROTOCOL"), &nxt_py_server_protocol_str }, + { nxt_string("wsgi.url_scheme"), &nxt_py_wsgi_uri_scheme_str }, + { nxt_null_string, NULL }, +}; + + +nxt_int_t +nxt_python_wsgi_init(nxt_task_t *task, nxt_unit_init_t *init) +{ + PyObject *obj; + + obj = NULL; + + if (nxt_slow_path(nxt_python_init_strings(nxt_python_strings) != NXT_OK)) { + nxt_alert(task, "Python failed to init string objects"); + goto fail; + } + + obj = PyCFunction_New(nxt_py_start_resp_method, NULL); + if (nxt_slow_path(obj == NULL)) { + nxt_alert(task, + "Python failed to initialize the \"start_response\" function"); + goto fail; + } + + nxt_py_start_resp_obj = obj; + + obj = PyCFunction_New(nxt_py_write_method, NULL); + if (nxt_slow_path(obj == NULL)) { + nxt_alert(task, "Python failed to initialize the \"write\" function"); + goto fail; + } + + nxt_py_write_obj = obj; + + obj = nxt_python_create_environ(task); + if (nxt_slow_path(obj == NULL)) { + goto fail; + } + + nxt_py_environ_ptyp = obj; + obj = NULL; + + init->callbacks.request_handler = nxt_python_request_handler; + + return NXT_OK; + +fail: + + Py_XDECREF(obj); + + return NXT_ERROR; +} + + +int +nxt_python_wsgi_run(nxt_unit_ctx_t *ctx) +{ + int rc; + + nxt_python_thread_state = PyEval_SaveThread(); + + rc = nxt_unit_run(ctx); + + PyEval_RestoreThread(nxt_python_thread_state); + + return rc; +} + + +void +nxt_python_wsgi_done(void) +{ + nxt_python_done_strings(nxt_python_strings); + + Py_XDECREF(nxt_py_start_resp_obj); + Py_XDECREF(nxt_py_write_obj); + Py_XDECREF(nxt_py_environ_ptyp); +} + + +static void +nxt_python_request_handler(nxt_unit_request_info_t *req) +{ + int rc; + PyObject *environ, *args, *response, *iterator, *item; + PyObject *close, *result; + nxt_python_run_ctx_t run_ctx = {-1, 0, NULL, req}; + + PyEval_RestoreThread(nxt_python_thread_state); + + environ = nxt_python_get_environ(&run_ctx); + if (nxt_slow_path(environ == NULL)) { + rc = NXT_UNIT_ERROR; + goto done; + } + + args = PyTuple_New(2); + if (nxt_slow_path(args == NULL)) { + Py_DECREF(environ); + + nxt_unit_req_error(req, "Python failed to create arguments tuple"); + + rc = NXT_UNIT_ERROR; + goto done; + } + + PyTuple_SET_ITEM(args, 0, environ); + + Py_INCREF(nxt_py_start_resp_obj); + PyTuple_SET_ITEM(args, 1, nxt_py_start_resp_obj); + + nxt_python_run_ctx = &run_ctx; + + response = PyObject_CallObject(nxt_py_application, args); + + Py_DECREF(args); + + if (nxt_slow_path(response == NULL)) { + nxt_unit_req_error(req, "Python failed to call the application"); + nxt_python_print_exception(); + + rc = NXT_UNIT_ERROR; + goto done; + } + + /* Shortcut: avoid iterate over response string symbols. */ + if (PyBytes_Check(response)) { + rc = nxt_python_write(&run_ctx, response); + + } else { + iterator = PyObject_GetIter(response); + + if (nxt_fast_path(iterator != NULL)) { + rc = NXT_UNIT_OK; + + while (run_ctx.bytes_sent < run_ctx.content_length) { + item = PyIter_Next(iterator); + + if (item == NULL) { + if (nxt_slow_path(PyErr_Occurred() != NULL)) { + nxt_unit_req_error(req, "Python failed to iterate over " + "the application response object"); + nxt_python_print_exception(); + + rc = NXT_UNIT_ERROR; + } + + break; + } + + if (nxt_fast_path(PyBytes_Check(item))) { + rc = nxt_python_write(&run_ctx, item); + + } else { + nxt_unit_req_error(req, "the application returned " + "not a bytestring object"); + rc = NXT_UNIT_ERROR; + } + + Py_DECREF(item); + + if (nxt_slow_path(rc != NXT_UNIT_OK)) { + break; + } + } + + Py_DECREF(iterator); + + } else { + nxt_unit_req_error(req, + "the application returned not an iterable object"); + nxt_python_print_exception(); + + rc = NXT_UNIT_ERROR; + } + + close = PyObject_GetAttr(response, nxt_py_close_str); + + if (close != NULL) { + result = PyObject_CallFunction(close, NULL); + if (nxt_slow_path(result == NULL)) { + nxt_unit_req_error(req, "Python failed to call the close() " + "method of the application response"); + nxt_python_print_exception(); + + } else { + Py_DECREF(result); + } + + Py_DECREF(close); + + } else { + PyErr_Clear(); + } + } + + Py_DECREF(response); + +done: + + nxt_python_thread_state = PyEval_SaveThread(); + + nxt_python_run_ctx = NULL; + nxt_unit_request_done(req, rc); +} + + +static PyObject * +nxt_python_create_environ(nxt_task_t *task) +{ + PyObject *obj, *err, *environ; + + environ = PyDict_New(); + + if (nxt_slow_path(environ == NULL)) { + nxt_alert(task, "Python failed to create the \"environ\" dictionary"); + return NULL; + } + + obj = PyString_FromStringAndSize((char *) nxt_server.start, + nxt_server.length); + if (nxt_slow_path(obj == NULL)) { + nxt_alert(task, + "Python failed to create the \"SERVER_SOFTWARE\" environ value"); + goto fail; + } + + if (nxt_slow_path(PyDict_SetItemString(environ, "SERVER_SOFTWARE", obj) + != 0)) + { + nxt_alert(task, + "Python failed to set the \"SERVER_SOFTWARE\" environ value"); + goto fail; + } + + Py_DECREF(obj); + + obj = Py_BuildValue("(ii)", 1, 0); + + if (nxt_slow_path(obj == NULL)) { + nxt_alert(task, + "Python failed to build the \"wsgi.version\" environ value"); + goto fail; + } + + if (nxt_slow_path(PyDict_SetItemString(environ, "wsgi.version", obj) != 0)) + { + nxt_alert(task, + "Python failed to set the \"wsgi.version\" environ value"); + goto fail; + } + + Py_DECREF(obj); + obj = NULL; + + + if (nxt_slow_path(PyDict_SetItemString(environ, "wsgi.multithread", + Py_False) + != 0)) + { + nxt_alert(task, + "Python failed to set the \"wsgi.multithread\" environ value"); + goto fail; + } + + if (nxt_slow_path(PyDict_SetItemString(environ, "wsgi.multiprocess", + Py_True) + != 0)) + { + nxt_alert(task, + "Python failed to set the \"wsgi.multiprocess\" environ value"); + goto fail; + } + + if (nxt_slow_path(PyDict_SetItemString(environ, "wsgi.run_once", + Py_False) + != 0)) + { + nxt_alert(task, + "Python failed to set the \"wsgi.run_once\" environ value"); + goto fail; + } + + + if (nxt_slow_path(PyType_Ready(&nxt_py_input_type) != 0)) { + nxt_alert(task, + "Python failed to initialize the \"wsgi.input\" type object"); + goto fail; + } + + obj = (PyObject *) PyObject_New(nxt_py_input_t, &nxt_py_input_type); + + if (nxt_slow_path(obj == NULL)) { + nxt_alert(task, "Python failed to create the \"wsgi.input\" object"); + goto fail; + } + + if (nxt_slow_path(PyDict_SetItemString(environ, "wsgi.input", obj) != 0)) { + nxt_alert(task, + "Python failed to set the \"wsgi.input\" environ value"); + goto fail; + } + + Py_DECREF(obj); + obj = NULL; + + + err = PySys_GetObject((char *) "stderr"); + + if (nxt_slow_path(err == NULL)) { + nxt_alert(task, "Python failed to get \"sys.stderr\" object"); + goto fail; + } + + if (nxt_slow_path(PyDict_SetItemString(environ, "wsgi.errors", err) != 0)) + { + nxt_alert(task, + "Python failed to set the \"wsgi.errors\" environ value"); + goto fail; + } + + return environ; + +fail: + + Py_XDECREF(obj); + Py_DECREF(environ); + + return NULL; +} + + +static PyObject * +nxt_python_get_environ(nxt_python_run_ctx_t *ctx) +{ + int rc; + uint32_t i, j, vl; + PyObject *environ; + nxt_unit_field_t *f, *f2; + nxt_unit_request_t *r; + + environ = PyDict_Copy(nxt_py_environ_ptyp); + if (nxt_slow_path(environ == NULL)) { + nxt_unit_req_error(ctx->req, + "Python failed to copy the \"environ\" dictionary"); + + return NULL; + } + + ctx->environ = environ; + + r = ctx->req->request; + +#define RC(S) \ + do { \ + rc = (S); \ + if (nxt_slow_path(rc != NXT_UNIT_OK)) { \ + goto fail; \ + } \ + } while(0) + + RC(nxt_python_add_sptr(ctx, nxt_py_request_method_str, &r->method, + r->method_length)); + RC(nxt_python_add_sptr(ctx, nxt_py_request_uri_str, &r->target, + r->target_length)); + RC(nxt_python_add_sptr(ctx, nxt_py_query_string_str, &r->query, + r->query_length)); + RC(nxt_python_add_sptr(ctx, nxt_py_path_info_str, &r->path, + r->path_length)); + + RC(nxt_python_add_sptr(ctx, nxt_py_remote_addr_str, &r->remote, + r->remote_length)); + RC(nxt_python_add_sptr(ctx, nxt_py_server_addr_str, &r->local, + r->local_length)); + + if (r->tls) { + RC(nxt_python_add_obj(ctx, nxt_py_wsgi_uri_scheme_str, + nxt_py_https_str)); + } else { + RC(nxt_python_add_obj(ctx, nxt_py_wsgi_uri_scheme_str, + nxt_py_http_str)); + } + + RC(nxt_python_add_sptr(ctx, nxt_py_server_protocol_str, &r->version, + r->version_length)); + + RC(nxt_python_add_sptr(ctx, nxt_py_server_name_str, &r->server_name, + r->server_name_length)); + RC(nxt_python_add_obj(ctx, nxt_py_server_port_str, nxt_py_80_str)); + + nxt_unit_request_group_dup_fields(ctx->req); + + for (i = 0; i < r->fields_count;) { + f = r->fields + i; + vl = f->value_length; + + for (j = i + 1; j < r->fields_count; j++) { + f2 = r->fields + j; + + if (f2->hash != f->hash + || nxt_unit_sptr_get(&f2->name) != nxt_unit_sptr_get(&f->name)) + { + break; + } + + vl += 2 + f2->value_length; + } + + RC(nxt_python_add_field(ctx, f, j - i, vl)); + + i = j; + } + + if (r->content_length_field != NXT_UNIT_NONE_FIELD) { + f = r->fields + r->content_length_field; + + RC(nxt_python_add_sptr(ctx, nxt_py_content_length_str, &f->value, + f->value_length)); + } + + if (r->content_type_field != NXT_UNIT_NONE_FIELD) { + f = r->fields + r->content_type_field; + + RC(nxt_python_add_sptr(ctx, nxt_py_content_type_str, &f->value, + f->value_length)); + } + +#undef RC + + return environ; + +fail: + + Py_DECREF(environ); + + return NULL; +} + + +static int +nxt_python_add_sptr(nxt_python_run_ctx_t *ctx, PyObject *name, + nxt_unit_sptr_t *sptr, uint32_t size) +{ + char *src; + PyObject *value; + + src = nxt_unit_sptr_get(sptr); + + value = PyString_FromStringAndSize(src, size); + if (nxt_slow_path(value == NULL)) { + nxt_unit_req_error(ctx->req, + "Python failed to create value string \"%.*s\"", + (int) size, src); + nxt_python_print_exception(); + + return NXT_UNIT_ERROR; + } + + if (nxt_slow_path(PyDict_SetItem(ctx->environ, name, value) != 0)) { + nxt_unit_req_error(ctx->req, + "Python failed to set the \"%s\" environ value", + PyUnicode_AsUTF8(name)); + Py_DECREF(value); + + return NXT_UNIT_ERROR; + } + + Py_DECREF(value); + + return NXT_UNIT_OK; +} + + +static int +nxt_python_add_field(nxt_python_run_ctx_t *ctx, nxt_unit_field_t *field, int n, + uint32_t vl) +{ + char *src; + PyObject *name, *value; + + src = nxt_unit_sptr_get(&field->name); + + name = nxt_python_field_name(src, field->name_length); + if (nxt_slow_path(name == NULL)) { + nxt_unit_req_error(ctx->req, + "Python failed to create name string \"%.*s\"", + (int) field->name_length, src); + nxt_python_print_exception(); + + return NXT_UNIT_ERROR; + } + + value = nxt_python_field_value(field, n, vl); + + if (nxt_slow_path(value == NULL)) { + nxt_unit_req_error(ctx->req, + "Python failed to create value string \"%.*s\"", + (int) field->value_length, + (char *) nxt_unit_sptr_get(&field->value)); + nxt_python_print_exception(); + + goto fail; + } + + if (nxt_slow_path(PyDict_SetItem(ctx->environ, name, value) != 0)) { + nxt_unit_req_error(ctx->req, + "Python failed to set the \"%s\" environ value", + PyUnicode_AsUTF8(name)); + goto fail; + } + + Py_DECREF(name); + Py_DECREF(value); + + return NXT_UNIT_OK; + +fail: + + Py_DECREF(name); + Py_XDECREF(value); + + return NXT_UNIT_ERROR; +} + + +static PyObject * +nxt_python_field_name(const char *name, uint8_t len) +{ + char *p, c; + uint8_t i; + PyObject *res; + +#if PY_MAJOR_VERSION == 3 + res = PyUnicode_New(len + 5, 255); +#else + res = PyString_FromStringAndSize(NULL, len + 5); +#endif + + if (nxt_slow_path(res == NULL)) { + return NULL; + } + + p = PyString_AS_STRING(res); + + p = nxt_cpymem(p, "HTTP_", 5); + + for (i = 0; i < len; i++) { + c = name[i]; + + if (c >= 'a' && c <= 'z') { + *p++ = (c & ~0x20); + continue; + } + + if (c == '-') { + *p++ = '_'; + continue; + } + + *p++ = c; + } + + return res; +} + + +static PyObject * +nxt_python_field_value(nxt_unit_field_t *f, int n, uint32_t vl) +{ + int i; + char *p, *src; + PyObject *res; + +#if PY_MAJOR_VERSION == 3 + res = PyUnicode_New(vl, 255); +#else + res = PyString_FromStringAndSize(NULL, vl); +#endif + + if (nxt_slow_path(res == NULL)) { + return NULL; + } + + p = PyString_AS_STRING(res); + + src = nxt_unit_sptr_get(&f->value); + p = nxt_cpymem(p, src, f->value_length); + + for (i = 1; i < n; i++) { + p = nxt_cpymem(p, ", ", 2); + + src = nxt_unit_sptr_get(&f[i].value); + p = nxt_cpymem(p, src, f[i].value_length); + } + + return res; +} + + +static int +nxt_python_add_obj(nxt_python_run_ctx_t *ctx, PyObject *name, PyObject *value) +{ + if (nxt_slow_path(PyDict_SetItem(ctx->environ, name, value) != 0)) { + nxt_unit_req_error(ctx->req, + "Python failed to set the \"%s\" environ value", + PyUnicode_AsUTF8(name)); + + return NXT_UNIT_ERROR; + } + + return NXT_UNIT_OK; +} + + +static PyObject * +nxt_py_start_resp(PyObject *self, PyObject *args) +{ + int rc, status; + char *status_str, *space_ptr; + uint32_t status_len; + PyObject *headers, *tuple, *string, *status_bytes; + Py_ssize_t i, n, fields_size, fields_count; + nxt_python_run_ctx_t *ctx; + + ctx = nxt_python_run_ctx; + if (nxt_slow_path(ctx == NULL)) { + return PyErr_Format(PyExc_RuntimeError, + "start_response() is called " + "outside of WSGI request processing"); + } + + n = PyTuple_GET_SIZE(args); + + if (n < 2 || n > 3) { + return PyErr_Format(PyExc_TypeError, "invalid number of arguments"); + } + + string = PyTuple_GET_ITEM(args, 0); + if (!PyBytes_Check(string) && !PyUnicode_Check(string)) { + return PyErr_Format(PyExc_TypeError, + "failed to write first argument (not a string?)"); + } + + headers = PyTuple_GET_ITEM(args, 1); + if (!PyList_Check(headers)) { + return PyErr_Format(PyExc_TypeError, + "the second argument is not a response headers list"); + } + + fields_size = 0; + fields_count = PyList_GET_SIZE(headers); + + for (i = 0; i < fields_count; i++) { + tuple = PyList_GET_ITEM(headers, i); + + if (!PyTuple_Check(tuple)) { + return PyErr_Format(PyExc_TypeError, + "the response headers must be a list of tuples"); + } + + if (PyTuple_GET_SIZE(tuple) != 2) { + return PyErr_Format(PyExc_TypeError, + "each header must be a tuple of two items"); + } + + string = PyTuple_GET_ITEM(tuple, 0); + if (PyBytes_Check(string)) { + fields_size += PyBytes_GET_SIZE(string); + + } else if (PyUnicode_Check(string)) { + fields_size += PyUnicode_GET_SIZE(string); + + } else { + return PyErr_Format(PyExc_TypeError, + "header #%d name is not a string", (int) i); + } + + string = PyTuple_GET_ITEM(tuple, 1); + if (PyBytes_Check(string)) { + fields_size += PyBytes_GET_SIZE(string); + + } else if (PyUnicode_Check(string)) { + fields_size += PyUnicode_GET_SIZE(string); + + } else { + return PyErr_Format(PyExc_TypeError, + "header #%d value is not a string", (int) i); + } + } + + ctx->content_length = -1; + + string = PyTuple_GET_ITEM(args, 0); + rc = nxt_python_str_buf(string, &status_str, &status_len, &status_bytes); + if (nxt_slow_path(rc != NXT_UNIT_OK)) { + return PyErr_Format(PyExc_TypeError, "status is not a string"); + } + + space_ptr = memchr(status_str, ' ', status_len); + if (space_ptr != NULL) { + status_len = space_ptr - status_str; + } + + status = nxt_int_parse((u_char *) status_str, status_len); + if (nxt_slow_path(status < 0)) { + return PyErr_Format(PyExc_TypeError, "failed to parse status code"); + } + + Py_XDECREF(status_bytes); + + /* + * PEP 3333: + * + * ... applications can replace their originally intended output with error + * output, up until the last possible moment. + */ + rc = nxt_unit_response_init(ctx->req, status, fields_count, fields_size); + if (nxt_slow_path(rc != NXT_UNIT_OK)) { + return PyErr_Format(PyExc_RuntimeError, + "failed to allocate response object"); + } + + for (i = 0; i < fields_count; i++) { + tuple = PyList_GET_ITEM(headers, i); + + rc = nxt_python_response_add_field(ctx, PyTuple_GET_ITEM(tuple, 0), + PyTuple_GET_ITEM(tuple, 1), i); + if (nxt_slow_path(rc != NXT_UNIT_OK)) { + return PyErr_Format(PyExc_RuntimeError, + "failed to add header #%d", (int) i); + } + } + + /* + * PEP 3333: + * + * However, the start_response callable must not actually transmit the + * response headers. Instead, it must store them for the server or gateway + * to transmit only after the first iteration of the application return + * value that yields a non-empty bytestring, or upon the application's + * first invocation of the write() callable. In other words, response + * headers must not be sent until there is actual body data available, or + * until the application's returned iterable is exhausted. (The only + * possible exception to this rule is if the response headers explicitly + * include a Content-Length of zero.) + */ + if (ctx->content_length == 0) { + rc = nxt_unit_response_send(ctx->req); + if (nxt_slow_path(rc != NXT_UNIT_OK)) { + return PyErr_Format(PyExc_RuntimeError, + "failed to send response headers"); + } + } + + Py_INCREF(nxt_py_write_obj); + return nxt_py_write_obj; +} + + +static int +nxt_python_response_add_field(nxt_python_run_ctx_t *ctx, PyObject *name, + PyObject *value, int i) +{ + int rc; + char *name_str, *value_str; + uint32_t name_length, value_length; + PyObject *name_bytes, *value_bytes; + nxt_off_t content_length; + + name_bytes = NULL; + value_bytes = NULL; + + rc = nxt_python_str_buf(name, &name_str, &name_length, &name_bytes); + if (nxt_slow_path(rc != NXT_UNIT_OK)) { + goto fail; + } + + rc = nxt_python_str_buf(value, &value_str, &value_length, &value_bytes); + if (nxt_slow_path(rc != NXT_UNIT_OK)) { + goto fail; + } + + rc = nxt_unit_response_add_field(ctx->req, name_str, name_length, + value_str, value_length); + if (nxt_slow_path(rc != NXT_UNIT_OK)) { + goto fail; + } + + if (ctx->req->response->fields[i].hash == NXT_UNIT_HASH_CONTENT_LENGTH) { + content_length = nxt_off_t_parse((u_char *) value_str, value_length); + if (nxt_slow_path(content_length < 0)) { + nxt_unit_req_error(ctx->req, "failed to parse Content-Length " + "value %.*s", (int) value_length, value_str); + + } else { + ctx->content_length = content_length; + } + } + +fail: + + Py_XDECREF(name_bytes); + Py_XDECREF(value_bytes); + + return rc; +} + + +static int +nxt_python_str_buf(PyObject *str, char **buf, uint32_t *len, PyObject **bytes) +{ + if (PyBytes_Check(str)) { + *buf = PyBytes_AS_STRING(str); + *len = PyBytes_GET_SIZE(str); + *bytes = NULL; + + } else { + *bytes = PyUnicode_AsLatin1String(str); + if (nxt_slow_path(*bytes == NULL)) { + return NXT_UNIT_ERROR; + } + + *buf = PyBytes_AS_STRING(*bytes); + *len = PyBytes_GET_SIZE(*bytes); + } + + return NXT_UNIT_OK; +} + + +static PyObject * +nxt_py_write(PyObject *self, PyObject *str) +{ + int rc; + + if (nxt_fast_path(!PyBytes_Check(str))) { + return PyErr_Format(PyExc_TypeError, "the argument is not a %s", + NXT_PYTHON_BYTES_TYPE); + } + + rc = nxt_python_write(nxt_python_run_ctx, str); + if (nxt_slow_path(rc != NXT_UNIT_OK)) { + return PyErr_Format(PyExc_RuntimeError, + "failed to write response value"); + } + + Py_RETURN_NONE; +} + + +static void +nxt_py_input_dealloc(nxt_py_input_t *self) +{ + PyObject_Del(self); +} + + +static PyObject * +nxt_py_input_read(nxt_py_input_t *self, PyObject *args) +{ + char *buf; + PyObject *content, *obj; + Py_ssize_t size, n; + nxt_python_run_ctx_t *ctx; + + ctx = nxt_python_run_ctx; + if (nxt_slow_path(ctx == NULL)) { + return PyErr_Format(PyExc_RuntimeError, + "wsgi.input.read() is called " + "outside of WSGI request processing"); + } + + size = ctx->req->content_length; + + n = PyTuple_GET_SIZE(args); + + if (n > 0) { + if (n != 1) { + return PyErr_Format(PyExc_TypeError, "invalid number of arguments"); + } + + obj = PyTuple_GET_ITEM(args, 0); + + size = PyNumber_AsSsize_t(obj, PyExc_OverflowError); + + if (nxt_slow_path(size < 0)) { + if (size == -1 && PyErr_Occurred()) { + return NULL; + } + + if (size != -1) { + return PyErr_Format(PyExc_ValueError, + "the read body size cannot be zero or less"); + } + } + + if (size == -1 || size > (Py_ssize_t) ctx->req->content_length) { + size = ctx->req->content_length; + } + } + + content = PyBytes_FromStringAndSize(NULL, size); + if (nxt_slow_path(content == NULL)) { + return NULL; + } + + buf = PyBytes_AS_STRING(content); + + size = nxt_unit_request_read(ctx->req, buf, size); + + return content; +} + + +static PyObject * +nxt_py_input_readline(nxt_py_input_t *self, PyObject *args) +{ + ssize_t ssize; + PyObject *obj; + Py_ssize_t n; + nxt_python_run_ctx_t *ctx; + + ctx = nxt_python_run_ctx; + if (nxt_slow_path(ctx == NULL)) { + return PyErr_Format(PyExc_RuntimeError, + "wsgi.input.readline() is called " + "outside of WSGI request processing"); + } + + n = PyTuple_GET_SIZE(args); + + if (n > 0) { + if (n != 1) { + return PyErr_Format(PyExc_TypeError, "invalid number of arguments"); + } + + obj = PyTuple_GET_ITEM(args, 0); + + ssize = PyNumber_AsSsize_t(obj, PyExc_OverflowError); + + if (nxt_fast_path(ssize > 0)) { + return nxt_py_input_getline(ctx, ssize); + } + + if (ssize == 0) { + return PyBytes_FromStringAndSize("", 0); + } + + if (ssize != -1) { + return PyErr_Format(PyExc_ValueError, + "the read line size cannot be zero or less"); + } + + if (PyErr_Occurred()) { + return NULL; + } + } + + return nxt_py_input_getline(ctx, SSIZE_MAX); +} + + +static PyObject * +nxt_py_input_getline(nxt_python_run_ctx_t *ctx, size_t size) +{ + void *buf; + ssize_t res; + PyObject *content; + + res = nxt_unit_request_readline_size(ctx->req, size); + if (nxt_slow_path(res < 0)) { + return NULL; + } + + if (res == 0) { + return PyBytes_FromStringAndSize("", 0); + } + + content = PyBytes_FromStringAndSize(NULL, res); + if (nxt_slow_path(content == NULL)) { + return NULL; + } + + buf = PyBytes_AS_STRING(content); + + res = nxt_unit_request_read(ctx->req, buf, res); + + return content; +} + + +static PyObject * +nxt_py_input_readlines(nxt_py_input_t *self, PyObject *args) +{ + PyObject *res; + nxt_python_run_ctx_t *ctx; + + ctx = nxt_python_run_ctx; + if (nxt_slow_path(ctx == NULL)) { + return PyErr_Format(PyExc_RuntimeError, + "wsgi.input.readlines() is called " + "outside of WSGI request processing"); + } + + res = PyList_New(0); + if (nxt_slow_path(res == NULL)) { + return NULL; + } + + for ( ;; ) { + PyObject *line = nxt_py_input_getline(ctx, SSIZE_MAX); + if (nxt_slow_path(line == NULL)) { + Py_DECREF(res); + return NULL; + } + + if (PyBytes_GET_SIZE(line) == 0) { + Py_DECREF(line); + return res; + } + + PyList_Append(res, line); + Py_DECREF(line); + } + + return res; +} + + +static PyObject * +nxt_py_input_iter(PyObject *self) +{ + Py_INCREF(self); + return self; +} + + +static PyObject * +nxt_py_input_next(PyObject *self) +{ + PyObject *line; + nxt_python_run_ctx_t *ctx; + + ctx = nxt_python_run_ctx; + if (nxt_slow_path(ctx == NULL)) { + return PyErr_Format(PyExc_RuntimeError, + "wsgi.input.next() is called " + "outside of WSGI request processing"); + } + + line = nxt_py_input_getline(ctx, SSIZE_MAX); + if (nxt_slow_path(line == NULL)) { + return NULL; + } + + if (PyBytes_GET_SIZE(line) == 0) { + Py_DECREF(line); + PyErr_SetNone(PyExc_StopIteration); + return NULL; + } + + return line; +} + + +static int +nxt_python_write(nxt_python_run_ctx_t *ctx, PyObject *bytes) +{ + int rc; + char *str_buf; + uint32_t str_length; + + str_buf = PyBytes_AS_STRING(bytes); + str_length = PyBytes_GET_SIZE(bytes); + + if (nxt_slow_path(str_length == 0)) { + return NXT_UNIT_OK; + } + + /* + * PEP 3333: + * + * If the application supplies a Content-Length header, the server should + * not transmit more bytes to the client than the header allows, and should + * stop iterating over the response when enough data has been sent, or raise + * an error if the application tries to write() past that point. + */ + if (nxt_slow_path(str_length > ctx->content_length - ctx->bytes_sent)) { + nxt_unit_req_error(ctx->req, "content length %"PRIu64" exceeded", + ctx->content_length); + + return NXT_UNIT_ERROR; + } + + rc = nxt_unit_response_write(ctx->req, str_buf, str_length); + if (nxt_fast_path(rc == NXT_UNIT_OK)) { + ctx->bytes_sent += str_length; + } + + return rc; +} |