summaryrefslogtreecommitdiffhomepage
diff options
context:
space:
mode:
authorMax Romanov <max.romanov@nginx.com>2020-10-01 23:55:23 +0300
committerMax Romanov <max.romanov@nginx.com>2020-10-01 23:55:23 +0300
commitc4c2f90c5b532c1ec283d211e0fd50e4538c2a51 (patch)
tree2fc2558bc370d80f5d60a0d6c5a0c6791a48e466
parentbbc6d2470afe8bfc8a97427b0b576080466bd31a (diff)
downloadunit-c4c2f90c5b532c1ec283d211e0fd50e4538c2a51.tar.gz
unit-c4c2f90c5b532c1ec283d211e0fd50e4538c2a51.tar.bz2
Python: ASGI server introduced.
This closes #461 issue on GitHub.
Diffstat (limited to '')
-rw-r--r--auto/modules/python5
-rw-r--r--src/python/nxt_python.c28
-rw-r--r--src/python/nxt_python.h29
-rw-r--r--src/python/nxt_python_asgi.c1227
-rw-r--r--src/python/nxt_python_asgi.h60
-rw-r--r--src/python/nxt_python_asgi_http.c591
-rw-r--r--src/python/nxt_python_asgi_lifespan.c505
-rw-r--r--src/python/nxt_python_asgi_str.c141
-rw-r--r--src/python/nxt_python_asgi_str.h69
-rw-r--r--src/python/nxt_python_asgi_websocket.c1084
-rw-r--r--src/python/nxt_python_wsgi.c18
11 files changed, 3728 insertions, 29 deletions
diff --git a/auto/modules/python b/auto/modules/python
index afb1b586..48f6e5ef 100644
--- a/auto/modules/python
+++ b/auto/modules/python
@@ -168,6 +168,11 @@ $echo >> $NXT_MAKEFILE
NXT_PYTHON_MODULE_SRCS=" \
src/python/nxt_python.c \
+ src/python/nxt_python_asgi.c \
+ src/python/nxt_python_asgi_http.c \
+ src/python/nxt_python_asgi_lifespan.c \
+ src/python/nxt_python_asgi_str.c \
+ src/python/nxt_python_asgi_websocket.c \
src/python/nxt_python_wsgi.c \
"
diff --git a/src/python/nxt_python.c b/src/python/nxt_python.c
index 7d4589ed..01534a47 100644
--- a/src/python/nxt_python.c
+++ b/src/python/nxt_python.c
@@ -15,14 +15,6 @@
#include NXT_PYTHON_MOUNTS_H
-#if PY_MAJOR_VERSION == 3
-#define PyString_FromStringAndSize(str, size) \
- PyUnicode_DecodeLatin1((str), (size), "strict")
-
-#else
-#define PyUnicode_InternInPlace PyString_InternInPlace
-#endif
-
static nxt_int_t nxt_python_start(nxt_task_t *task,
nxt_process_data_t *data);
static void nxt_python_atexit(void);
@@ -56,7 +48,7 @@ static char *nxt_py_home;
static nxt_int_t
nxt_python_start(nxt_task_t *task, nxt_process_data_t *data)
{
- int rc;
+ int rc, asgi;
char *nxt_py_module;
size_t len;
PyObject *obj, *pypath, *module;
@@ -226,7 +218,15 @@ nxt_python_start(nxt_task_t *task, nxt_process_data_t *data)
python_init.shm_limit = data->app->shm_limit;
- rc = nxt_python_wsgi_init(task, &python_init);
+ asgi = nxt_python_asgi_check(nxt_py_application);
+
+ if (asgi) {
+ rc = nxt_python_asgi_init(task, &python_init);
+
+ } else {
+ rc = nxt_python_wsgi_init(task, &python_init);
+ }
+
if (nxt_slow_path(rc == NXT_ERROR)) {
goto fail;
}
@@ -236,7 +236,12 @@ nxt_python_start(nxt_task_t *task, nxt_process_data_t *data)
goto fail;
}
- rc = nxt_python_wsgi_run(unit_ctx);
+ if (asgi) {
+ rc = nxt_python_asgi_run(unit_ctx);
+
+ } else {
+ rc = nxt_python_wsgi_run(unit_ctx);
+ }
nxt_unit_done(unit_ctx);
@@ -300,6 +305,7 @@ static void
nxt_python_atexit(void)
{
nxt_python_wsgi_done();
+ nxt_python_asgi_done();
Py_XDECREF(nxt_py_stderr_flush);
Py_XDECREF(nxt_py_application);
diff --git a/src/python/nxt_python.h b/src/python/nxt_python.h
index 417df7fd..3211026b 100644
--- a/src/python/nxt_python.h
+++ b/src/python/nxt_python.h
@@ -8,9 +8,32 @@
#include <Python.h>
+#include <nxt_main.h>
#include <nxt_unit.h>
+#if PY_MAJOR_VERSION == 3
+#define NXT_PYTHON_BYTES_TYPE "bytestring"
+
+#define PyString_FromStringAndSize(str, size) \
+ PyUnicode_DecodeLatin1((str), (size), "strict")
+#define PyString_AS_STRING PyUnicode_DATA
+
+#else
+#define NXT_PYTHON_BYTES_TYPE "string"
+
+#define PyBytes_FromStringAndSize PyString_FromStringAndSize
+#define PyBytes_Check PyString_Check
+#define PyBytes_GET_SIZE PyString_GET_SIZE
+#define PyBytes_AS_STRING PyString_AS_STRING
+#define PyUnicode_InternInPlace PyString_InternInPlace
+#define PyUnicode_AsUTF8 PyString_AS_STRING
+#endif
+
+#if PY_MAJOR_VERSION == 3 && PY_MINOR_VERSION >= 5
+#define NXT_HAVE_ASGI 1
+#endif
+
extern PyObject *nxt_py_application;
typedef struct {
@@ -18,6 +41,7 @@ typedef struct {
PyObject **object_p;
} nxt_python_string_t;
+
nxt_int_t nxt_python_init_strings(nxt_python_string_t *pstr);
void nxt_python_done_strings(nxt_python_string_t *pstr);
@@ -27,5 +51,10 @@ nxt_int_t nxt_python_wsgi_init(nxt_task_t *task, nxt_unit_init_t *init);
int nxt_python_wsgi_run(nxt_unit_ctx_t *ctx);
void nxt_python_wsgi_done(void);
+int nxt_python_asgi_check(PyObject *obj);
+nxt_int_t nxt_python_asgi_init(nxt_task_t *task, nxt_unit_init_t *init);
+nxt_int_t nxt_python_asgi_run(nxt_unit_ctx_t *ctx);
+void nxt_python_asgi_done(void);
+
#endif /* _NXT_PYTHON_H_INCLUDED_ */
diff --git a/src/python/nxt_python_asgi.c b/src/python/nxt_python_asgi.c
new file mode 100644
index 00000000..72408ea1
--- /dev/null
+++ b/src/python/nxt_python_asgi.c
@@ -0,0 +1,1227 @@
+
+/*
+ * Copyright (C) NGINX, Inc.
+ */
+
+
+#include <python/nxt_python.h>
+
+#if (NXT_HAVE_ASGI)
+
+#include <nxt_main.h>
+#include <nxt_unit.h>
+#include <nxt_unit_request.h>
+#include <nxt_unit_response.h>
+#include <python/nxt_python_asgi.h>
+#include <python/nxt_python_asgi_str.h>
+
+
+static void nxt_py_asgi_request_handler(nxt_unit_request_info_t *req);
+
+static PyObject *nxt_py_asgi_create_http_scope(nxt_unit_request_info_t *req);
+static PyObject *nxt_py_asgi_create_address(nxt_unit_sptr_t *sptr, uint8_t len,
+ uint16_t port);
+static PyObject *nxt_py_asgi_create_header(nxt_unit_field_t *f);
+static PyObject *nxt_py_asgi_create_subprotocols(nxt_unit_field_t *f);
+
+static int nxt_py_asgi_add_port(nxt_unit_ctx_t *ctx, nxt_unit_port_t *port);
+static void nxt_py_asgi_remove_port(nxt_unit_t *lib, nxt_unit_port_t *port);
+static void nxt_py_asgi_quit(nxt_unit_ctx_t *ctx);
+static void nxt_py_asgi_shm_ack_handler(nxt_unit_ctx_t *ctx);
+
+static PyObject *nxt_py_asgi_port_read(PyObject *self, PyObject *args);
+
+
+PyObject *nxt_py_loop_run_until_complete;
+PyObject *nxt_py_loop_create_future;
+PyObject *nxt_py_loop_create_task;
+
+nxt_queue_t nxt_py_asgi_drain_queue;
+
+static PyObject *nxt_py_loop_call_soon;
+static PyObject *nxt_py_quit_future;
+static PyObject *nxt_py_quit_future_set_result;
+static PyObject *nxt_py_loop_add_reader;
+static PyObject *nxt_py_loop_remove_reader;
+static PyObject *nxt_py_port_read;
+
+static PyMethodDef nxt_py_port_read_method =
+ {"unit_port_read", nxt_py_asgi_port_read, METH_VARARGS, ""};
+
+#define NXT_UNIT_HASH_WS_PROTOCOL 0xED0A
+
+
+int
+nxt_python_asgi_check(PyObject *obj)
+{
+ int res;
+ PyObject *call;
+ PyCodeObject *code;
+
+ if (PyFunction_Check(obj)) {
+ code = (PyCodeObject *) PyFunction_GET_CODE(obj);
+
+ return (code->co_flags & CO_COROUTINE) != 0;
+ }
+
+ if (PyMethod_Check(obj)) {
+ obj = PyMethod_GET_FUNCTION(obj);
+
+ code = (PyCodeObject *) PyFunction_GET_CODE(obj);
+
+ return (code->co_flags & CO_COROUTINE) != 0;
+ }
+
+ call = PyObject_GetAttrString(obj, "__call__");
+
+ if (call == NULL) {
+ return 0;
+ }
+
+ if (PyFunction_Check(call)) {
+ code = (PyCodeObject *) PyFunction_GET_CODE(call);
+
+ res = (code->co_flags & CO_COROUTINE) != 0;
+
+ } else {
+ if (PyMethod_Check(call)) {
+ obj = PyMethod_GET_FUNCTION(call);
+
+ code = (PyCodeObject *) PyFunction_GET_CODE(obj);
+
+ res = (code->co_flags & CO_COROUTINE) != 0;
+
+ } else {
+ res = 0;
+ }
+ }
+
+ Py_DECREF(call);
+
+ return res;
+}
+
+
+nxt_int_t
+nxt_python_asgi_init(nxt_task_t *task, nxt_unit_init_t *init)
+{
+ PyObject *asyncio, *loop, *get_event_loop;
+ nxt_int_t rc;
+
+ nxt_debug(task, "asgi_init");
+
+ if (nxt_slow_path(nxt_py_asgi_str_init() != NXT_OK)) {
+ nxt_alert(task, "Python failed to init string objects");
+ return NXT_ERROR;
+ }
+
+ asyncio = PyImport_ImportModule("asyncio");
+ if (nxt_slow_path(asyncio == NULL)) {
+ nxt_alert(task, "Python failed to import module 'asyncio'");
+ nxt_python_print_exception();
+ return NXT_ERROR;
+ }
+
+ loop = NULL;
+ get_event_loop = PyDict_GetItemString(PyModule_GetDict(asyncio),
+ "get_event_loop");
+ if (nxt_slow_path(get_event_loop == NULL)) {
+ nxt_alert(task,
+ "Python failed to get 'get_event_loop' from module 'asyncio'");
+ goto fail;
+ }
+
+ if (nxt_slow_path(PyCallable_Check(get_event_loop) == 0)) {
+ nxt_alert(task, "'asyncio.get_event_loop' is not a callable object");
+ goto fail;
+ }
+
+ loop = PyObject_CallObject(get_event_loop, NULL);
+ if (nxt_slow_path(loop == NULL)) {
+ nxt_alert(task, "Python failed to call 'asyncio.get_event_loop'");
+ goto fail;
+ }
+
+ nxt_py_loop_create_task = PyObject_GetAttrString(loop, "create_task");
+ if (nxt_slow_path(nxt_py_loop_create_task == NULL)) {
+ nxt_alert(task, "Python failed to get 'loop.create_task'");
+ goto fail;
+ }
+
+ if (nxt_slow_path(PyCallable_Check(nxt_py_loop_create_task) == 0)) {
+ nxt_alert(task, "'loop.create_task' is not a callable object");
+ goto fail;
+ }
+
+ nxt_py_loop_add_reader = PyObject_GetAttrString(loop, "add_reader");
+ if (nxt_slow_path(nxt_py_loop_add_reader == NULL)) {
+ nxt_alert(task, "Python failed to get 'loop.add_reader'");
+ goto fail;
+ }
+
+ if (nxt_slow_path(PyCallable_Check(nxt_py_loop_add_reader) == 0)) {
+ nxt_alert(task, "'loop.add_reader' is not a callable object");
+ goto fail;
+ }
+
+ nxt_py_loop_remove_reader = PyObject_GetAttrString(loop, "remove_reader");
+ if (nxt_slow_path(nxt_py_loop_remove_reader == NULL)) {
+ nxt_alert(task, "Python failed to get 'loop.remove_reader'");
+ goto fail;
+ }
+
+ if (nxt_slow_path(PyCallable_Check(nxt_py_loop_remove_reader) == 0)) {
+ nxt_alert(task, "'loop.remove_reader' is not a callable object");
+ goto fail;
+ }
+
+ nxt_py_loop_call_soon = PyObject_GetAttrString(loop, "call_soon");
+ if (nxt_slow_path(nxt_py_loop_call_soon == NULL)) {
+ nxt_alert(task, "Python failed to get 'loop.call_soon'");
+ goto fail;
+ }
+
+ if (nxt_slow_path(PyCallable_Check(nxt_py_loop_call_soon) == 0)) {
+ nxt_alert(task, "'loop.call_soon' is not a callable object");
+ goto fail;
+ }
+
+ nxt_py_loop_run_until_complete = PyObject_GetAttrString(loop,
+ "run_until_complete");
+ if (nxt_slow_path(nxt_py_loop_run_until_complete == NULL)) {
+ nxt_alert(task, "Python failed to get 'loop.run_until_complete'");
+ goto fail;
+ }
+
+ if (nxt_slow_path(PyCallable_Check(nxt_py_loop_run_until_complete) == 0)) {
+ nxt_alert(task, "'loop.run_until_complete' is not a callable object");
+ goto fail;
+ }
+
+ nxt_py_loop_create_future = PyObject_GetAttrString(loop, "create_future");
+ if (nxt_slow_path(nxt_py_loop_create_future == NULL)) {
+ nxt_alert(task, "Python failed to get 'loop.create_future'");
+ goto fail;
+ }
+
+ if (nxt_slow_path(PyCallable_Check(nxt_py_loop_create_future) == 0)) {
+ nxt_alert(task, "'loop.create_future' is not a callable object");
+ goto fail;
+ }
+
+ nxt_py_quit_future = PyObject_CallObject(nxt_py_loop_create_future, NULL);
+ if (nxt_slow_path(nxt_py_quit_future == NULL)) {
+ nxt_alert(task, "Python failed to create Future ");
+ nxt_python_print_exception();
+ goto fail;
+ }
+
+ nxt_py_quit_future_set_result = PyObject_GetAttrString(nxt_py_quit_future,
+ "set_result");
+ if (nxt_slow_path(nxt_py_quit_future_set_result == NULL)) {
+ nxt_alert(task, "Python failed to get 'future.set_result'");
+ goto fail;
+ }
+
+ if (nxt_slow_path(PyCallable_Check(nxt_py_quit_future_set_result) == 0)) {
+ nxt_alert(task, "'future.set_result' is not a callable object");
+ goto fail;
+ }
+
+ nxt_py_port_read = PyCFunction_New(&nxt_py_port_read_method, NULL);
+ if (nxt_slow_path(nxt_py_port_read == NULL)) {
+ nxt_alert(task, "Python failed to initialize the 'port_read' function");
+ goto fail;
+ }
+
+ nxt_queue_init(&nxt_py_asgi_drain_queue);
+
+ if (nxt_slow_path(nxt_py_asgi_http_init(task) == NXT_ERROR)) {
+ goto fail;
+ }
+
+ if (nxt_slow_path(nxt_py_asgi_websocket_init(task) == NXT_ERROR)) {
+ goto fail;
+ }
+
+ rc = nxt_py_asgi_lifespan_startup(task);
+ if (nxt_slow_path(rc == NXT_ERROR)) {
+ goto fail;
+ }
+
+ init->callbacks.request_handler = nxt_py_asgi_request_handler;
+ init->callbacks.data_handler = nxt_py_asgi_http_data_handler;
+ init->callbacks.websocket_handler = nxt_py_asgi_websocket_handler;
+ init->callbacks.close_handler = nxt_py_asgi_websocket_close_handler;
+ init->callbacks.quit = nxt_py_asgi_quit;
+ init->callbacks.shm_ack_handler = nxt_py_asgi_shm_ack_handler;
+ init->callbacks.add_port = nxt_py_asgi_add_port;
+ init->callbacks.remove_port = nxt_py_asgi_remove_port;
+
+ Py_DECREF(loop);
+ Py_DECREF(asyncio);
+
+ return NXT_OK;
+
+fail:
+
+ Py_XDECREF(loop);
+ Py_DECREF(asyncio);
+
+ return NXT_ERROR;
+}
+
+
+nxt_int_t
+nxt_python_asgi_run(nxt_unit_ctx_t *ctx)
+{
+ PyObject *res;
+
+ res = PyObject_CallFunctionObjArgs(nxt_py_loop_run_until_complete,
+ nxt_py_quit_future, NULL);
+ if (nxt_slow_path(res == NULL)) {
+ nxt_unit_alert(ctx, "Python failed to call loop.run_until_complete");
+ nxt_python_print_exception();
+
+ return NXT_ERROR;
+ }
+
+ Py_DECREF(res);
+
+ nxt_py_asgi_lifespan_shutdown();
+
+ return NXT_OK;
+}
+
+
+static void
+nxt_py_asgi_request_handler(nxt_unit_request_info_t *req)
+{
+ PyObject *scope, *res, *task, *receive, *send, *done, *asgi;
+
+ if (req->request->websocket_handshake) {
+ asgi = nxt_py_asgi_websocket_create(req);
+
+ } else {
+ asgi = nxt_py_asgi_http_create(req);
+ }
+
+ if (nxt_slow_path(asgi == NULL)) {
+ nxt_unit_req_alert(req, "Python failed to create asgi object");
+ nxt_unit_request_done(req, NXT_UNIT_ERROR);
+
+ return;
+ }
+
+ receive = PyObject_GetAttrString(asgi, "receive");
+ if (nxt_slow_path(receive == NULL)) {
+ nxt_unit_req_alert(req, "Python failed to get 'receive' method");
+ nxt_unit_request_done(req, NXT_UNIT_ERROR);
+
+ goto release_asgi;
+ }
+
+ send = PyObject_GetAttrString(asgi, "send");
+ if (nxt_slow_path(receive == NULL)) {
+ nxt_unit_req_alert(req, "Python failed to get 'send' method");
+ nxt_unit_request_done(req, NXT_UNIT_ERROR);
+
+ goto release_receive;
+ }
+
+ done = PyObject_GetAttrString(asgi, "_done");
+ if (nxt_slow_path(receive == NULL)) {
+ nxt_unit_req_alert(req, "Python failed to get '_done' method");
+ nxt_unit_request_done(req, NXT_UNIT_ERROR);
+
+ goto release_send;
+ }
+
+ scope = nxt_py_asgi_create_http_scope(req);
+ if (nxt_slow_path(scope == NULL)) {
+ nxt_unit_request_done(req, NXT_UNIT_ERROR);
+
+ goto release_done;
+ }
+
+ req->data = asgi;
+
+ res = PyObject_CallFunctionObjArgs(nxt_py_application,
+ scope, receive, send, NULL);
+ if (nxt_slow_path(res == NULL)) {
+ nxt_unit_req_error(req, "Python failed to call the application");
+ nxt_python_print_exception();
+ nxt_unit_request_done(req, NXT_UNIT_ERROR);
+
+ goto release_scope;
+ }
+
+ if (nxt_slow_path(!PyCoro_CheckExact(res))) {
+ nxt_unit_req_error(req, "Application result type is not a coroutine");
+ nxt_unit_request_done(req, NXT_UNIT_ERROR);
+
+ Py_DECREF(res);
+
+ goto release_scope;
+ }
+
+ task = PyObject_CallFunctionObjArgs(nxt_py_loop_create_task, res, NULL);
+ if (nxt_slow_path(task == NULL)) {
+ nxt_unit_req_error(req, "Python failed to call the create_task");
+ nxt_python_print_exception();
+ nxt_unit_request_done(req, NXT_UNIT_ERROR);
+
+ Py_DECREF(res);
+
+ goto release_scope;
+ }
+
+ Py_DECREF(res);
+
+ res = PyObject_CallMethodObjArgs(task, nxt_py_add_done_callback_str, done,
+ NULL);
+ if (nxt_slow_path(res == NULL)) {
+ nxt_unit_req_error(req,
+ "Python failed to call 'task.add_done_callback'");
+ nxt_python_print_exception();
+ nxt_unit_request_done(req, NXT_UNIT_ERROR);
+
+ goto release_task;
+ }
+
+ Py_DECREF(res);
+release_task:
+ Py_DECREF(task);
+release_scope:
+ Py_DECREF(scope);
+release_done:
+ Py_DECREF(done);
+release_send:
+ Py_DECREF(send);
+release_receive:
+ Py_DECREF(receive);
+release_asgi:
+ Py_DECREF(asgi);
+}
+
+
+static PyObject *
+nxt_py_asgi_create_http_scope(nxt_unit_request_info_t *req)
+{
+ char *p, *target, *query;
+ uint32_t target_length, i;
+ PyObject *scope, *v, *type, *scheme;
+ PyObject *headers, *header;
+ nxt_unit_field_t *f;
+ nxt_unit_request_t *r;
+
+ static const nxt_str_t ws_protocol = nxt_string("sec-websocket-protocol");
+
+#define SET_ITEM(dict, key, value) \
+ if (nxt_slow_path(PyDict_SetItem(dict, nxt_py_ ## key ## _str, value) \
+ == -1)) \
+ { \
+ nxt_unit_req_alert(req, "Python failed to set '" \
+ #dict "." #key "' item"); \
+ goto fail; \
+ }
+
+ v = NULL;
+ headers = NULL;
+
+ r = req->request;
+
+ if (r->websocket_handshake) {
+ type = nxt_py_websocket_str;
+ scheme = r->tls ? nxt_py_wss_str : nxt_py_ws_str;
+
+ } else {
+ type = nxt_py_http_str;
+ scheme = r->tls ? nxt_py_https_str : nxt_py_http_str;
+ }
+
+ scope = nxt_py_asgi_new_scope(req, type, nxt_py_2_1_str);
+ if (nxt_slow_path(scope == NULL)) {
+ return NULL;
+ }
+
+ p = nxt_unit_sptr_get(&r->version);
+ SET_ITEM(scope, http_version, p[7] == '1' ? nxt_py_1_1_str
+ : nxt_py_1_0_str)
+ SET_ITEM(scope, scheme, scheme)
+
+ v = PyString_FromStringAndSize(nxt_unit_sptr_get(&r->method),
+ r->method_length);
+ if (nxt_slow_path(v == NULL)) {
+ nxt_unit_req_alert(req, "Python failed to create 'method' string");
+ goto fail;
+ }
+
+ SET_ITEM(scope, method, v)
+ Py_DECREF(v);
+
+ v = PyUnicode_DecodeUTF8(nxt_unit_sptr_get(&r->path), r->path_length,
+ "replace");
+ if (nxt_slow_path(v == NULL)) {
+ nxt_unit_req_alert(req, "Python failed to create 'path' string");
+ goto fail;
+ }
+
+ SET_ITEM(scope, path, v)
+ Py_DECREF(v);
+
+ target = nxt_unit_sptr_get(&r->target);
+ query = nxt_unit_sptr_get(&r->query);
+
+ if (r->query.offset != 0) {
+ target_length = query - target - 1;
+
+ } else {
+ target_length = r->target_length;
+ }
+
+ v = PyBytes_FromStringAndSize(target, target_length);
+ if (nxt_slow_path(v == NULL)) {
+ nxt_unit_req_alert(req, "Python failed to create 'raw_path' string");
+ goto fail;
+ }
+
+ SET_ITEM(scope, raw_path, v)
+ Py_DECREF(v);
+
+ v = PyBytes_FromStringAndSize(query, r->query_length);
+ if (nxt_slow_path(v == NULL)) {
+ nxt_unit_req_alert(req, "Python failed to create 'query' string");
+ goto fail;
+ }
+
+ SET_ITEM(scope, query_string, v)
+ Py_DECREF(v);
+
+ v = nxt_py_asgi_create_address(&r->remote, r->remote_length, 0);
+ if (nxt_slow_path(v == NULL)) {
+ nxt_unit_req_alert(req, "Python failed to create 'client' pair");
+ goto fail;
+ }
+
+ SET_ITEM(scope, client, v)
+ Py_DECREF(v);
+
+ v = nxt_py_asgi_create_address(&r->local, r->local_length, 80);
+ if (nxt_slow_path(v == NULL)) {
+ nxt_unit_req_alert(req, "Python failed to create 'server' pair");
+ goto fail;
+ }
+
+ SET_ITEM(scope, server, v)
+ Py_DECREF(v);
+
+ v = NULL;
+
+ headers = PyTuple_New(r->fields_count);
+ if (nxt_slow_path(headers == NULL)) {
+ nxt_unit_req_alert(req, "Python failed to create 'headers' object");
+ goto fail;
+ }
+
+ for (i = 0; i < r->fields_count; i++) {
+ f = r->fields + i;
+
+ header = nxt_py_asgi_create_header(f);
+ if (nxt_slow_path(header == NULL)) {
+ nxt_unit_req_alert(req, "Python failed to create 'header' pair");
+ goto fail;
+ }
+
+ PyTuple_SET_ITEM(headers, i, header);
+
+ if (f->hash == NXT_UNIT_HASH_WS_PROTOCOL
+ && f->name_length == ws_protocol.length
+ && f->value_length > 0
+ && r->websocket_handshake)
+ {
+ v = nxt_py_asgi_create_subprotocols(f);
+ if (nxt_slow_path(v == NULL)) {
+ nxt_unit_req_alert(req, "Failed to create subprotocols");
+ goto fail;
+ }
+
+ SET_ITEM(scope, subprotocols, v);
+ Py_DECREF(v);
+ }
+ }
+
+ SET_ITEM(scope, headers, headers)
+ Py_DECREF(headers);
+
+ return scope;
+
+fail:
+
+ Py_XDECREF(v);
+ Py_XDECREF(headers);
+ Py_DECREF(scope);
+
+ return NULL;
+
+#undef SET_ITEM
+}
+
+
+static PyObject *
+nxt_py_asgi_create_address(nxt_unit_sptr_t *sptr, uint8_t len, uint16_t port)
+{
+ char *p, *s;
+ PyObject *pair, *v;
+
+ pair = PyTuple_New(2);
+ if (nxt_slow_path(pair == NULL)) {
+ return NULL;
+ }
+
+ p = nxt_unit_sptr_get(sptr);
+ s = memchr(p, ':', len);
+
+ v = PyString_FromStringAndSize(p, s == NULL ? len : s - p);
+ if (nxt_slow_path(v == NULL)) {
+ Py_DECREF(pair);
+
+ return NULL;
+ }
+
+ PyTuple_SET_ITEM(pair, 0, v);
+
+ if (s != NULL) {
+ p += len;
+ v = PyLong_FromString(s + 1, &p, 10);
+
+ } else {
+ v = PyLong_FromLong(port);
+ }
+
+ if (nxt_slow_path(v == NULL)) {
+ Py_DECREF(pair);
+
+ return NULL;
+ }
+
+ PyTuple_SET_ITEM(pair, 1, v);
+
+ return pair;
+}
+
+
+static PyObject *
+nxt_py_asgi_create_header(nxt_unit_field_t *f)
+{
+ char c, *name;
+ uint8_t pos;
+ PyObject *header, *v;
+
+ header = PyTuple_New(2);
+ if (nxt_slow_path(header == NULL)) {
+ return NULL;
+ }
+
+ name = nxt_unit_sptr_get(&f->name);
+
+ for (pos = 0; pos < f->name_length; pos++) {
+ c = name[pos];
+ if (c >= 'A' && c <= 'Z') {
+ name[pos] = (c | 0x20);
+ }
+ }
+
+ v = PyBytes_FromStringAndSize(name, f->name_length);
+ if (nxt_slow_path(v == NULL)) {
+ Py_DECREF(header);
+
+ return NULL;
+ }
+
+ PyTuple_SET_ITEM(header, 0, v);
+
+ v = PyBytes_FromStringAndSize(nxt_unit_sptr_get(&f->value),
+ f->value_length);
+ if (nxt_slow_path(v == NULL)) {
+ Py_DECREF(header);
+
+ return NULL;
+ }
+
+ PyTuple_SET_ITEM(header, 1, v);
+
+ return header;
+}
+
+
+static PyObject *
+nxt_py_asgi_create_subprotocols(nxt_unit_field_t *f)
+{
+ char *v;
+ uint32_t i, n, start;
+ PyObject *res, *proto;
+
+ v = nxt_unit_sptr_get(&f->value);
+ n = 1;
+
+ for (i = 0; i < f->value_length; i++) {
+ if (v[i] == ',') {
+ n++;
+ }
+ }
+
+ res = PyTuple_New(n);
+ if (nxt_slow_path(res == NULL)) {
+ return NULL;
+ }
+
+ n = 0;
+ start = 0;
+
+ for (i = 0; i < f->value_length; ) {
+ if (v[i] != ',') {
+ i++;
+
+ continue;
+ }
+
+ if (i - start > 0) {
+ proto = PyString_FromStringAndSize(v + start, i - start);
+ if (nxt_slow_path(proto == NULL)) {
+ goto fail;
+ }
+
+ PyTuple_SET_ITEM(res, n, proto);
+
+ n++;
+ }
+
+ do {
+ i++;
+ } while (i < f->value_length && v[i] == ' ');
+
+ start = i;
+ }
+
+ if (i - start > 0) {
+ proto = PyString_FromStringAndSize(v + start, i - start);
+ if (nxt_slow_path(proto == NULL)) {
+ goto fail;
+ }
+
+ PyTuple_SET_ITEM(res, n, proto);
+ }
+
+ return res;
+
+fail:
+
+ Py_DECREF(res);
+
+ return NULL;
+}
+
+
+static int
+nxt_py_asgi_add_port(nxt_unit_ctx_t *ctx, nxt_unit_port_t *port)
+{
+ int nb;
+ PyObject *res;
+
+ if (port->in_fd == -1) {
+ return NXT_UNIT_OK;
+ }
+
+ nb = 1;
+
+ if (nxt_slow_path(ioctl(port->in_fd, FIONBIO, &nb) == -1)) {
+ nxt_unit_alert(ctx, "ioctl(%d, FIONBIO, 0) failed: %s (%d)",
+ port->in_fd, strerror(errno), errno);
+
+ return NXT_UNIT_ERROR;
+ }
+
+ nxt_unit_debug(ctx, "asgi_add_port %d %p %p", port->in_fd, ctx, port);
+
+ res = PyObject_CallFunctionObjArgs(nxt_py_loop_add_reader,
+ PyLong_FromLong(port->in_fd),
+ nxt_py_port_read,
+ PyLong_FromVoidPtr(ctx),
+ PyLong_FromVoidPtr(port), NULL);
+ if (nxt_slow_path(res == NULL)) {
+ nxt_unit_alert(ctx, "Python failed to add_reader");
+
+ return NXT_UNIT_ERROR;
+ }
+
+ Py_DECREF(res);
+
+ return NXT_UNIT_OK;
+}
+
+
+static void
+nxt_py_asgi_remove_port(nxt_unit_t *lib, nxt_unit_port_t *port)
+{
+ PyObject *res;
+
+ nxt_unit_debug(NULL, "asgi_remove_port %d %p", port->in_fd, port);
+
+ if (port->in_fd == -1) {
+ return;
+ }
+
+ res = PyObject_CallFunctionObjArgs(nxt_py_loop_remove_reader,
+ PyLong_FromLong(port->in_fd), NULL);
+ if (nxt_slow_path(res == NULL)) {
+ nxt_unit_alert(NULL, "Python failed to remove_reader");
+ }
+
+ Py_DECREF(res);
+}
+
+
+static void
+nxt_py_asgi_quit(nxt_unit_ctx_t *ctx)
+{
+ PyObject *res;
+
+ nxt_unit_debug(ctx, "asgi_quit %p", ctx);
+
+ res = PyObject_CallFunctionObjArgs(nxt_py_quit_future_set_result,
+ PyLong_FromLong(0), NULL);
+ if (nxt_slow_path(res == NULL)) {
+ nxt_unit_alert(ctx, "Python failed to set_result");
+ }
+
+ Py_DECREF(res);
+}
+
+
+static void
+nxt_py_asgi_shm_ack_handler(nxt_unit_ctx_t *ctx)
+{
+ int rc;
+ nxt_queue_link_t *lnk;
+
+ while (!nxt_queue_is_empty(&nxt_py_asgi_drain_queue)) {
+ lnk = nxt_queue_first(&nxt_py_asgi_drain_queue);
+
+ rc = nxt_py_asgi_http_drain(lnk);
+ if (rc == NXT_UNIT_AGAIN) {
+ break;
+ }
+
+ nxt_queue_remove(lnk);
+ }
+}
+
+
+static PyObject *
+nxt_py_asgi_port_read(PyObject *self, PyObject *args)
+{
+ int rc;
+ PyObject *arg;
+ Py_ssize_t n;
+ nxt_unit_ctx_t *ctx;
+ nxt_unit_port_t *port;
+
+ n = PyTuple_GET_SIZE(args);
+
+ if (n != 2) {
+ nxt_unit_alert(NULL,
+ "nxt_py_asgi_port_read: invalid number of arguments %d",
+ (int) n);
+
+ return PyErr_Format(PyExc_TypeError, "invalid number of arguments");
+ }
+
+ arg = PyTuple_GET_ITEM(args, 0);
+ if (nxt_slow_path(arg == NULL || PyLong_Check(arg) == 0)) {
+ return PyErr_Format(PyExc_TypeError,
+ "the first argument is not a long");
+ }
+
+ ctx = PyLong_AsVoidPtr(arg);
+
+ arg = PyTuple_GET_ITEM(args, 1);
+ if (nxt_slow_path(arg == NULL || PyLong_Check(arg) == 0)) {
+ return PyErr_Format(PyExc_TypeError,
+ "the second argument is not a long");
+ }
+
+ port = PyLong_AsVoidPtr(arg);
+
+ nxt_unit_debug(ctx, "asgi_port_read %p %p", ctx, port);
+
+ rc = nxt_unit_process_port_msg(ctx, port);
+
+ if (nxt_slow_path(rc == NXT_UNIT_ERROR)) {
+ return PyErr_Format(PyExc_RuntimeError,
+ "error processing port message");
+ }
+
+ Py_RETURN_NONE;
+}
+
+
+PyObject *
+nxt_py_asgi_enum_headers(PyObject *headers, nxt_py_asgi_enum_header_cb cb,
+ void *data)
+{
+ int i;
+ PyObject *iter, *header, *h_iter, *name, *val, *res;
+
+ iter = PyObject_GetIter(headers);
+ if (nxt_slow_path(iter == NULL)) {
+ return PyErr_Format(PyExc_TypeError, "'headers' is not an iterable");
+ }
+
+ for (i = 0; /* void */; i++) {
+ header = PyIter_Next(iter);
+ if (header == NULL) {
+ break;
+ }
+
+ h_iter = PyObject_GetIter(header);
+ if (nxt_slow_path(h_iter == NULL)) {
+ Py_DECREF(header);
+ Py_DECREF(iter);
+
+ return PyErr_Format(PyExc_TypeError,
+ "'headers' item #%d is not an iterable", i);
+ }
+
+ name = PyIter_Next(h_iter);
+ if (nxt_slow_path(name == NULL || !PyBytes_Check(name))) {
+ Py_XDECREF(name);
+ Py_DECREF(h_iter);
+ Py_DECREF(header);
+ Py_DECREF(iter);
+
+ return PyErr_Format(PyExc_TypeError,
+ "'headers' item #%d 'name' is not a byte string", i);
+ }
+
+ val = PyIter_Next(h_iter);
+ if (nxt_slow_path(val == NULL || !PyBytes_Check(val))) {
+ Py_XDECREF(val);
+ Py_DECREF(h_iter);
+ Py_DECREF(header);
+ Py_DECREF(iter);
+
+ return PyErr_Format(PyExc_TypeError,
+ "'headers' item #%d 'value' is not a byte string", i);
+ }
+
+ res = cb(data, i, name, val);
+
+ Py_DECREF(name);
+ Py_DECREF(val);
+ Py_DECREF(h_iter);
+ Py_DECREF(header);
+
+ if (nxt_slow_path(res == NULL)) {
+ Py_DECREF(iter);
+
+ return NULL;
+ }
+
+ Py_DECREF(res);
+ }
+
+ Py_DECREF(iter);
+
+ Py_RETURN_NONE;
+}
+
+
+PyObject *
+nxt_py_asgi_calc_size(void *data, int i, PyObject *name, PyObject *val)
+{
+ nxt_py_asgi_calc_size_ctx_t *ctx;
+
+ ctx = data;
+
+ ctx->fields_count++;
+ ctx->fields_size += PyBytes_GET_SIZE(name) + PyBytes_GET_SIZE(val);
+
+ Py_RETURN_NONE;
+}
+
+
+PyObject *
+nxt_py_asgi_add_field(void *data, int i, PyObject *name, PyObject *val)
+{
+ int rc;
+ char *name_str, *val_str;
+ uint32_t name_len, val_len;
+ nxt_off_t content_length;
+ nxt_unit_request_info_t *req;
+ nxt_py_asgi_add_field_ctx_t *ctx;
+
+ name_str = PyBytes_AS_STRING(name);
+ name_len = PyBytes_GET_SIZE(name);
+
+ val_str = PyBytes_AS_STRING(val);
+ val_len = PyBytes_GET_SIZE(val);
+
+ ctx = data;
+ req = ctx->req;
+
+ rc = nxt_unit_response_add_field(req, name_str, name_len,
+ val_str, val_len);
+ if (nxt_slow_path(rc != NXT_UNIT_OK)) {
+ return PyErr_Format(PyExc_RuntimeError,
+ "failed to add header #%d", i);
+ }
+
+ if (req->response->fields[i].hash == NXT_UNIT_HASH_CONTENT_LENGTH) {
+ content_length = nxt_off_t_parse((u_char *) val_str, val_len);
+ if (nxt_slow_path(content_length < 0)) {
+ nxt_unit_req_error(req, "failed to parse Content-Length "
+ "value %.*s", (int) val_len, val_str);
+
+ return PyErr_Format(PyExc_ValueError,
+ "Failed to parse Content-Length: '%.*s'",
+ (int) val_len, val_str);
+ }
+
+ ctx->content_length = content_length;
+ }
+
+ Py_RETURN_NONE;
+}
+
+
+PyObject *
+nxt_py_asgi_set_result_soon(nxt_unit_request_info_t *req, PyObject *future,
+ PyObject *result)
+{
+ PyObject *set_result, *res;
+
+ if (nxt_slow_path(result == NULL)) {
+ Py_DECREF(future);
+
+ return NULL;
+ }
+
+ set_result = PyObject_GetAttrString(future, "set_result");
+ if (nxt_slow_path(set_result == NULL)) {
+ nxt_unit_req_alert(req, "failed to get 'set_result' for future");
+
+ Py_CLEAR(future);
+
+ goto cleanup;
+ }
+
+ if (nxt_slow_path(PyCallable_Check(set_result) == 0)) {
+ nxt_unit_req_alert(req, "'future.set_result' is not a callable");
+
+ Py_CLEAR(future);
+
+ goto cleanup;
+ }
+
+ res = PyObject_CallFunctionObjArgs(nxt_py_loop_call_soon, set_result,
+ result, NULL);
+ if (nxt_slow_path(res == NULL)) {
+ nxt_unit_req_alert(req, "Python failed to call 'loop.call_soon'");
+ nxt_python_print_exception();
+
+ Py_CLEAR(future);
+ }
+
+ Py_XDECREF(res);
+
+cleanup:
+
+ Py_DECREF(set_result);
+ Py_DECREF(result);
+
+ return future;
+}
+
+
+PyObject *
+nxt_py_asgi_new_msg(nxt_unit_request_info_t *req, PyObject *type)
+{
+ PyObject *msg;
+
+ msg = PyDict_New();
+ if (nxt_slow_path(msg == NULL)) {
+ nxt_unit_req_alert(req, "Python failed to create message dict");
+ nxt_python_print_exception();
+
+ return PyErr_Format(PyExc_RuntimeError,
+ "failed to create message dict");
+ }
+
+ if (nxt_slow_path(PyDict_SetItem(msg, nxt_py_type_str, type) == -1)) {
+ nxt_unit_req_alert(req, "Python failed to set 'msg.type' item");
+
+ Py_DECREF(msg);
+
+ return PyErr_Format(PyExc_RuntimeError,
+ "failed to set 'msg.type' item");
+ }
+
+ return msg;
+}
+
+
+PyObject *
+nxt_py_asgi_new_scope(nxt_unit_request_info_t *req, PyObject *type,
+ PyObject *spec_version)
+{
+ PyObject *scope, *asgi;
+
+ scope = PyDict_New();
+ if (nxt_slow_path(scope == NULL)) {
+ nxt_unit_req_alert(req, "Python failed to create 'scope' dict");
+ nxt_python_print_exception();
+
+ return PyErr_Format(PyExc_RuntimeError,
+ "failed to create 'scope' dict");
+ }
+
+ if (nxt_slow_path(PyDict_SetItem(scope, nxt_py_type_str, type) == -1)) {
+ nxt_unit_req_alert(req, "Python failed to set 'scope.type' item");
+
+ Py_DECREF(scope);
+
+ return PyErr_Format(PyExc_RuntimeError,
+ "failed to set 'scope.type' item");
+ }
+
+ asgi = PyDict_New();
+ if (nxt_slow_path(asgi == NULL)) {
+ nxt_unit_req_alert(req, "Python failed to create 'asgi' dict");
+ nxt_python_print_exception();
+
+ Py_DECREF(scope);
+
+ return PyErr_Format(PyExc_RuntimeError,
+ "failed to create 'asgi' dict");
+ }
+
+ if (nxt_slow_path(PyDict_SetItem(scope, nxt_py_asgi_str, asgi) == -1)) {
+ nxt_unit_req_alert(req, "Python failed to set 'scope.asgi' item");
+
+ Py_DECREF(asgi);
+ Py_DECREF(scope);
+
+ return PyErr_Format(PyExc_RuntimeError,
+ "failed to set 'scope.asgi' item");
+ }
+
+ if (nxt_slow_path(PyDict_SetItem(asgi, nxt_py_version_str,
+ nxt_py_3_0_str) == -1))
+ {
+ nxt_unit_req_alert(req, "Python failed to set 'asgi.version' item");
+
+ Py_DECREF(asgi);
+ Py_DECREF(scope);
+
+ return PyErr_Format(PyExc_RuntimeError,
+ "failed to set 'asgi.version' item");
+ }
+
+ if (nxt_slow_path(PyDict_SetItem(asgi, nxt_py_spec_version_str,
+ spec_version) == -1))
+ {
+ nxt_unit_req_alert(req,
+ "Python failed to set 'asgi.spec_version' item");
+
+ Py_DECREF(asgi);
+ Py_DECREF(scope);
+
+ return PyErr_Format(PyExc_RuntimeError,
+ "failed to set 'asgi.spec_version' item");
+ }
+
+ Py_DECREF(asgi);
+
+ return scope;
+}
+
+
+void
+nxt_py_asgi_dealloc(PyObject *self)
+{
+ PyObject_Del(self);
+}
+
+
+PyObject *
+nxt_py_asgi_await(PyObject *self)
+{
+ Py_INCREF(self);
+ return self;
+}
+
+
+PyObject *
+nxt_py_asgi_iter(PyObject *self)
+{
+ Py_INCREF(self);
+ return self;
+}
+
+
+PyObject *
+nxt_py_asgi_next(PyObject *self)
+{
+ return NULL;
+}
+
+
+void
+nxt_python_asgi_done(void)
+{
+ nxt_py_asgi_str_done();
+
+ Py_XDECREF(nxt_py_quit_future);
+ Py_XDECREF(nxt_py_quit_future_set_result);
+ Py_XDECREF(nxt_py_loop_run_until_complete);
+ Py_XDECREF(nxt_py_loop_create_future);
+ Py_XDECREF(nxt_py_loop_create_task);
+ Py_XDECREF(nxt_py_loop_call_soon);
+ Py_XDECREF(nxt_py_loop_add_reader);
+ Py_XDECREF(nxt_py_loop_remove_reader);
+ Py_XDECREF(nxt_py_port_read);
+}
+
+#else /* !(NXT_HAVE_ASGI) */
+
+
+int
+nxt_python_asgi_check(PyObject *obj)
+{
+ return 0;
+}
+
+
+nxt_int_t
+nxt_python_asgi_init(nxt_task_t *task, nxt_unit_init_t *init)
+{
+ nxt_alert(task, "ASGI not implemented");
+ return NXT_ERROR;
+}
+
+
+nxt_int_t
+nxt_python_asgi_run(nxt_unit_ctx_t *ctx)
+{
+ nxt_unit_alert(ctx, "ASGI not implemented");
+ return NXT_ERROR;
+}
+
+
+void
+nxt_python_asgi_done(void)
+{
+}
+
+#endif /* NXT_HAVE_ASGI */
diff --git a/src/python/nxt_python_asgi.h b/src/python/nxt_python_asgi.h
new file mode 100644
index 00000000..24337c37
--- /dev/null
+++ b/src/python/nxt_python_asgi.h
@@ -0,0 +1,60 @@
+
+/*
+ * Copyright (C) NGINX, Inc.
+ */
+
+#ifndef _NXT_PYTHON_ASGI_H_INCLUDED_
+#define _NXT_PYTHON_ASGI_H_INCLUDED_
+
+
+typedef PyObject * (*nxt_py_asgi_enum_header_cb)(void *ctx, int i,
+ PyObject *name, PyObject *val);
+
+typedef struct {
+ uint32_t fields_count;
+ uint32_t fields_size;
+} nxt_py_asgi_calc_size_ctx_t;
+
+typedef struct {
+ nxt_unit_request_info_t *req;
+ uint64_t content_length;
+} nxt_py_asgi_add_field_ctx_t;
+
+PyObject *nxt_py_asgi_enum_headers(PyObject *headers,
+ nxt_py_asgi_enum_header_cb cb, void *data);
+
+PyObject *nxt_py_asgi_calc_size(void *data, int i, PyObject *n, PyObject *v);
+PyObject *nxt_py_asgi_add_field(void *data, int i, PyObject *n, PyObject *v);
+
+PyObject *nxt_py_asgi_set_result_soon(nxt_unit_request_info_t *req,
+ PyObject *future, PyObject *result);
+PyObject *nxt_py_asgi_new_msg(nxt_unit_request_info_t *req, PyObject *type);
+PyObject *nxt_py_asgi_new_scope(nxt_unit_request_info_t *req, PyObject *type,
+ PyObject *spec_version);
+
+void nxt_py_asgi_dealloc(PyObject *self);
+PyObject *nxt_py_asgi_await(PyObject *self);
+PyObject *nxt_py_asgi_iter(PyObject *self);
+PyObject *nxt_py_asgi_next(PyObject *self);
+
+nxt_int_t nxt_py_asgi_http_init(nxt_task_t *task);
+PyObject *nxt_py_asgi_http_create(nxt_unit_request_info_t *req);
+void nxt_py_asgi_http_data_handler(nxt_unit_request_info_t *req);
+int nxt_py_asgi_http_drain(nxt_queue_link_t *lnk);
+
+nxt_int_t nxt_py_asgi_websocket_init(nxt_task_t *task);
+PyObject *nxt_py_asgi_websocket_create(nxt_unit_request_info_t *req);
+void nxt_py_asgi_websocket_handler(nxt_unit_websocket_frame_t *ws);
+void nxt_py_asgi_websocket_close_handler(nxt_unit_request_info_t *req);
+
+nxt_int_t nxt_py_asgi_lifespan_startup(nxt_task_t *task);
+nxt_int_t nxt_py_asgi_lifespan_shutdown(void);
+
+extern PyObject *nxt_py_loop_run_until_complete;
+extern PyObject *nxt_py_loop_create_future;
+extern PyObject *nxt_py_loop_create_task;
+
+extern nxt_queue_t nxt_py_asgi_drain_queue;
+
+
+#endif /* _NXT_PYTHON_ASGI_H_INCLUDED_ */
diff --git a/src/python/nxt_python_asgi_http.c b/src/python/nxt_python_asgi_http.c
new file mode 100644
index 00000000..b07d61d6
--- /dev/null
+++ b/src/python/nxt_python_asgi_http.c
@@ -0,0 +1,591 @@
+
+/*
+ * Copyright (C) NGINX, Inc.
+ */
+
+
+#include <python/nxt_python.h>
+
+#if (NXT_HAVE_ASGI)
+
+#include <nxt_main.h>
+#include <nxt_unit.h>
+#include <nxt_unit_request.h>
+#include <python/nxt_python_asgi.h>
+#include <python/nxt_python_asgi_str.h>
+
+
+typedef struct {
+ PyObject_HEAD
+ nxt_unit_request_info_t *req;
+ nxt_queue_link_t link;
+ PyObject *receive_future;
+ PyObject *send_future;
+ uint64_t content_length;
+ uint64_t bytes_sent;
+ int complete;
+ PyObject *send_body;
+ Py_ssize_t send_body_off;
+} nxt_py_asgi_http_t;
+
+
+static PyObject *nxt_py_asgi_http_receive(PyObject *self, PyObject *none);
+static PyObject *nxt_py_asgi_http_read_msg(nxt_py_asgi_http_t *http);
+static PyObject *nxt_py_asgi_http_send(PyObject *self, PyObject *dict);
+static PyObject *nxt_py_asgi_http_response_start(nxt_py_asgi_http_t *http,
+ PyObject *dict);
+static PyObject *nxt_py_asgi_http_response_body(nxt_py_asgi_http_t *http,
+ PyObject *dict);
+static PyObject *nxt_py_asgi_http_done(PyObject *self, PyObject *future);
+
+
+static PyMethodDef nxt_py_asgi_http_methods[] = {
+ { "receive", nxt_py_asgi_http_receive, METH_NOARGS, 0 },
+ { "send", nxt_py_asgi_http_send, METH_O, 0 },
+ { "_done", nxt_py_asgi_http_done, METH_O, 0 },
+ { NULL, NULL, 0, 0 }
+};
+
+static PyAsyncMethods nxt_py_asgi_async_methods = {
+ .am_await = nxt_py_asgi_await,
+};
+
+static PyTypeObject nxt_py_asgi_http_type = {
+ PyVarObject_HEAD_INIT(NULL, 0)
+
+ .tp_name = "unit._asgi_http",
+ .tp_basicsize = sizeof(nxt_py_asgi_http_t),
+ .tp_dealloc = nxt_py_asgi_dealloc,
+ .tp_as_async = &nxt_py_asgi_async_methods,
+ .tp_flags = Py_TPFLAGS_DEFAULT,
+ .tp_doc = "unit ASGI HTTP request object",
+ .tp_iter = nxt_py_asgi_iter,
+ .tp_iternext = nxt_py_asgi_next,
+ .tp_methods = nxt_py_asgi_http_methods,
+};
+
+static Py_ssize_t nxt_py_asgi_http_body_buf_size = 32 * 1024 * 1024;
+
+
+nxt_int_t
+nxt_py_asgi_http_init(nxt_task_t *task)
+{
+ if (nxt_slow_path(PyType_Ready(&nxt_py_asgi_http_type) != 0)) {
+ nxt_alert(task, "Python failed to initialize the 'http' type object");
+ return NXT_ERROR;
+ }
+
+ return NXT_OK;
+}
+
+
+PyObject *
+nxt_py_asgi_http_create(nxt_unit_request_info_t *req)
+{
+ nxt_py_asgi_http_t *http;
+
+ http = PyObject_New(nxt_py_asgi_http_t, &nxt_py_asgi_http_type);
+
+ if (nxt_fast_path(http != NULL)) {
+ http->req = req;
+ http->receive_future = NULL;
+ http->send_future = NULL;
+ http->content_length = -1;
+ http->bytes_sent = 0;
+ http->complete = 0;
+ http->send_body = NULL;
+ http->send_body_off = 0;
+ }
+
+ return (PyObject *) http;
+}
+
+
+static PyObject *
+nxt_py_asgi_http_receive(PyObject *self, PyObject *none)
+{
+ PyObject *msg, *future;
+ nxt_py_asgi_http_t *http;
+ nxt_unit_request_info_t *req;
+
+ http = (nxt_py_asgi_http_t *) self;
+ req = http->req;
+
+ nxt_unit_req_debug(req, "asgi_http_receive");
+
+ msg = nxt_py_asgi_http_read_msg(http);
+ if (nxt_slow_path(msg == NULL)) {
+ return NULL;
+ }
+
+ future = PyObject_CallObject(nxt_py_loop_create_future, NULL);
+ if (nxt_slow_path(future == NULL)) {
+ nxt_unit_req_alert(req, "Python failed to create Future object");
+ nxt_python_print_exception();
+
+ Py_DECREF(msg);
+
+ return PyErr_Format(PyExc_RuntimeError,
+ "failed to create Future object");
+ }
+
+ if (msg != Py_None) {
+ return nxt_py_asgi_set_result_soon(req, future, msg);
+ }
+
+ http->receive_future = future;
+ Py_INCREF(http->receive_future);
+
+ Py_DECREF(msg);
+
+ return future;
+}
+
+
+static PyObject *
+nxt_py_asgi_http_read_msg(nxt_py_asgi_http_t *http)
+{
+ char *body_buf;
+ ssize_t read_res;
+ PyObject *msg, *body;
+ Py_ssize_t size;
+ nxt_unit_request_info_t *req;
+
+ req = http->req;
+
+ size = req->content_length;
+
+ if (size > nxt_py_asgi_http_body_buf_size) {
+ size = nxt_py_asgi_http_body_buf_size;
+ }
+
+ if (size > 0) {
+ body = PyBytes_FromStringAndSize(NULL, size);
+ if (nxt_slow_path(body == NULL)) {
+ nxt_unit_req_alert(req, "Python failed to create body byte string");
+ nxt_python_print_exception();
+
+ return PyErr_Format(PyExc_RuntimeError,
+ "failed to create Bytes object");
+ }
+
+ body_buf = PyBytes_AS_STRING(body);
+
+ read_res = nxt_unit_request_read(req, body_buf, size);
+
+ } else {
+ body = NULL;
+ read_res = 0;
+ }
+
+ if (read_res > 0 || read_res == size) {
+ msg = nxt_py_asgi_new_msg(req, nxt_py_http_request_str);
+ if (nxt_slow_path(msg == NULL)) {
+ Py_XDECREF(body);
+
+ return NULL;
+ }
+
+#define SET_ITEM(dict, key, value) \
+ if (nxt_slow_path(PyDict_SetItem(dict, nxt_py_ ## key ## _str, value) \
+ == -1)) \
+ { \
+ nxt_unit_req_alert(req, \
+ "Python failed to set '" #dict "." #key "' item"); \
+ PyErr_SetString(PyExc_RuntimeError, \
+ "Python failed to set '" #dict "." #key "' item"); \
+ goto fail; \
+ }
+
+ if (body != NULL) {
+ SET_ITEM(msg, body, body)
+ }
+
+ if (req->content_length > 0) {
+ SET_ITEM(msg, more_body, Py_True)
+ }
+
+#undef SET_ITEM
+
+ Py_XDECREF(body);
+
+ return msg;
+ }
+
+ Py_XDECREF(body);
+
+ Py_RETURN_NONE;
+
+fail:
+
+ Py_DECREF(msg);
+ Py_XDECREF(body);
+
+ return NULL;
+}
+
+
+static PyObject *
+nxt_py_asgi_http_send(PyObject *self, PyObject *dict)
+{
+ PyObject *type;
+ const char *type_str;
+ Py_ssize_t type_len;
+ nxt_py_asgi_http_t *http;
+
+ static const nxt_str_t response_start = nxt_string("http.response.start");
+ static const nxt_str_t response_body = nxt_string("http.response.body");
+
+ http = (nxt_py_asgi_http_t *) self;
+
+ type = PyDict_GetItem(dict, nxt_py_type_str);
+ if (nxt_slow_path(type == NULL || !PyUnicode_Check(type))) {
+ nxt_unit_req_error(http->req, "asgi_http_send: "
+ "'type' is not a unicode string");
+ return PyErr_Format(PyExc_TypeError, "'type' is not a unicode string");
+ }
+
+ type_str = PyUnicode_AsUTF8AndSize(type, &type_len);
+
+ nxt_unit_req_debug(http->req, "asgi_http_send type is '%.*s'",
+ (int) type_len, type_str);
+
+ if (type_len == (Py_ssize_t) response_start.length
+ && memcmp(type_str, response_start.start, type_len) == 0)
+ {
+ return nxt_py_asgi_http_response_start(http, dict);
+ }
+
+ if (type_len == (Py_ssize_t) response_body.length
+ && memcmp(type_str, response_body.start, type_len) == 0)
+ {
+ return nxt_py_asgi_http_response_body(http, dict);
+ }
+
+ nxt_unit_req_error(http->req, "asgi_http_send: unexpected 'type': '%.*s'",
+ (int) type_len, type_str);
+
+ return PyErr_Format(PyExc_AssertionError, "unexpected 'type': '%U'", type);
+}
+
+
+static PyObject *
+nxt_py_asgi_http_response_start(nxt_py_asgi_http_t *http, PyObject *dict)
+{
+ int rc;
+ PyObject *status, *headers, *res;
+ nxt_py_asgi_calc_size_ctx_t calc_size_ctx;
+ nxt_py_asgi_add_field_ctx_t add_field_ctx;
+
+ status = PyDict_GetItem(dict, nxt_py_status_str);
+ if (nxt_slow_path(status == NULL || !PyLong_Check(status))) {
+ nxt_unit_req_error(http->req, "asgi_http_response_start: "
+ "'status' is not an integer");
+ return PyErr_Format(PyExc_TypeError, "'status' is not an integer");
+ }
+
+ calc_size_ctx.fields_size = 0;
+ calc_size_ctx.fields_count = 0;
+
+ headers = PyDict_GetItem(dict, nxt_py_headers_str);
+ if (headers != NULL) {
+ res = nxt_py_asgi_enum_headers(headers, nxt_py_asgi_calc_size,
+ &calc_size_ctx);
+ if (nxt_slow_path(res == NULL)) {
+ return NULL;
+ }
+
+ Py_DECREF(res);
+ }
+
+ rc = nxt_unit_response_init(http->req, PyLong_AsLong(status),
+ calc_size_ctx.fields_count,
+ calc_size_ctx.fields_size);
+ if (nxt_slow_path(rc != NXT_UNIT_OK)) {
+ return PyErr_Format(PyExc_RuntimeError,
+ "failed to allocate response object");
+ }
+
+ add_field_ctx.req = http->req;
+ add_field_ctx.content_length = -1;
+
+ if (headers != NULL) {
+ res = nxt_py_asgi_enum_headers(headers, nxt_py_asgi_add_field,
+ &add_field_ctx);
+ if (nxt_slow_path(res == NULL)) {
+ return NULL;
+ }
+
+ Py_DECREF(res);
+ }
+
+ http->content_length = add_field_ctx.content_length;
+
+ Py_INCREF(http);
+ return (PyObject *) http;
+}
+
+
+static PyObject *
+nxt_py_asgi_http_response_body(nxt_py_asgi_http_t *http, PyObject *dict)
+{
+ int rc;
+ char *body_str;
+ ssize_t sent;
+ PyObject *body, *more_body, *future;
+ Py_ssize_t body_len, body_off;
+
+ body = PyDict_GetItem(dict, nxt_py_body_str);
+ if (nxt_slow_path(body != NULL && !PyBytes_Check(body))) {
+ return PyErr_Format(PyExc_TypeError, "'body' is not a byte string");
+ }
+
+ more_body = PyDict_GetItem(dict, nxt_py_more_body_str);
+ if (nxt_slow_path(more_body != NULL && !PyBool_Check(more_body))) {
+ return PyErr_Format(PyExc_TypeError, "'more_body' is not a bool");
+ }
+
+ if (nxt_slow_path(http->complete)) {
+ return PyErr_Format(PyExc_RuntimeError,
+ "Unexpected ASGI message 'http.response.body' "
+ "sent, after response already completed");
+ }
+
+ if (nxt_slow_path(http->send_future != NULL)) {
+ return PyErr_Format(PyExc_RuntimeError, "Concurrent send");
+ }
+
+ if (body != NULL) {
+ body_str = PyBytes_AS_STRING(body);
+ body_len = PyBytes_GET_SIZE(body);
+
+ nxt_unit_req_debug(http->req, "asgi_http_response_body: %d, %d",
+ (int) body_len, (more_body == Py_True) );
+
+ if (nxt_slow_path(http->bytes_sent + body_len
+ > http->content_length))
+ {
+ return PyErr_Format(PyExc_RuntimeError,
+ "Response content longer than Content-Length");
+ }
+
+ body_off = 0;
+
+ while (body_len > 0) {
+ sent = nxt_unit_response_write_nb(http->req, body_str, body_len, 0);
+ if (nxt_slow_path(sent < 0)) {
+ return PyErr_Format(PyExc_RuntimeError, "failed to send body");
+ }
+
+ if (nxt_slow_path(sent == 0)) {
+ nxt_unit_req_debug(http->req, "asgi_http_response_body: "
+ "out of shared memory, %d",
+ (int) body_len);
+
+ future = PyObject_CallObject(nxt_py_loop_create_future, NULL);
+ if (nxt_slow_path(future == NULL)) {
+ nxt_unit_req_alert(http->req,
+ "Python failed to create Future object");
+ nxt_python_print_exception();
+
+ return PyErr_Format(PyExc_RuntimeError,
+ "failed to create Future object");
+ }
+
+ http->send_body = body;
+ Py_INCREF(http->send_body);
+ http->send_body_off = body_off;
+
+ nxt_queue_insert_tail(&nxt_py_asgi_drain_queue, &http->link);
+
+ http->send_future = future;
+ Py_INCREF(http->send_future);
+
+ return future;
+ }
+
+ body_str += sent;
+ body_len -= sent;
+ body_off += sent;
+ http->bytes_sent += sent;
+ }
+
+ } else {
+ nxt_unit_req_debug(http->req, "asgi_http_response_body: 0, %d",
+ (more_body == Py_True) );
+
+ if (!nxt_unit_response_is_sent(http->req)) {
+ rc = nxt_unit_response_send(http->req);
+ if (nxt_slow_path(rc != NXT_UNIT_OK)) {
+ return PyErr_Format(PyExc_RuntimeError,
+ "failed to send response");
+ }
+ }
+ }
+
+ if (more_body == NULL || more_body == Py_False) {
+ http->complete = 1;
+ }
+
+ Py_INCREF(http);
+ return (PyObject *) http;
+}
+
+
+void
+nxt_py_asgi_http_data_handler(nxt_unit_request_info_t *req)
+{
+ PyObject *msg, *future, *res;
+ nxt_py_asgi_http_t *http;
+
+ http = req->data;
+
+ nxt_unit_req_debug(req, "asgi_http_data_handler");
+
+ if (http->receive_future == NULL) {
+ return;
+ }
+
+ msg = nxt_py_asgi_http_read_msg(http);
+ if (nxt_slow_path(msg == NULL)) {
+ return;
+ }
+
+ if (msg == Py_None) {
+ Py_DECREF(msg);
+ return;
+ }
+
+ future = http->receive_future;
+ http->receive_future = NULL;
+
+ res = PyObject_CallMethodObjArgs(future, nxt_py_set_result_str, msg, NULL);
+ if (nxt_slow_path(res == NULL)) {
+ nxt_unit_req_alert(req, "'set_result' call failed");
+ nxt_python_print_exception();
+ }
+
+ Py_XDECREF(res);
+ Py_DECREF(future);
+
+ Py_DECREF(msg);
+}
+
+
+int
+nxt_py_asgi_http_drain(nxt_queue_link_t *lnk)
+{
+ char *body_str;
+ ssize_t sent;
+ PyObject *future, *exc, *res;
+ Py_ssize_t body_len;
+ nxt_py_asgi_http_t *http;
+
+ http = nxt_container_of(lnk, nxt_py_asgi_http_t, link);
+
+ body_str = PyBytes_AS_STRING(http->send_body) + http->send_body_off;
+ body_len = PyBytes_GET_SIZE(http->send_body) - http->send_body_off;
+
+ nxt_unit_req_debug(http->req, "asgi_http_drain: %d", (int) body_len);
+
+ while (body_len > 0) {
+ sent = nxt_unit_response_write_nb(http->req, body_str, body_len, 0);
+ if (nxt_slow_path(sent < 0)) {
+ goto fail;
+ }
+
+ if (nxt_slow_path(sent == 0)) {
+ return NXT_UNIT_AGAIN;
+ }
+
+ body_str += sent;
+ body_len -= sent;
+
+ http->send_body_off += sent;
+ http->bytes_sent += sent;
+ }
+
+ Py_CLEAR(http->send_body);
+
+ future = http->send_future;
+ http->send_future = NULL;
+
+ res = PyObject_CallMethodObjArgs(future, nxt_py_set_result_str, Py_None,
+ NULL);
+ if (nxt_slow_path(res == NULL)) {
+ nxt_unit_req_alert(http->req, "'set_result' call failed");
+ nxt_python_print_exception();
+ }
+
+ Py_XDECREF(res);
+ Py_DECREF(future);
+
+ return NXT_UNIT_OK;
+
+fail:
+
+ exc = PyObject_CallFunctionObjArgs(PyExc_RuntimeError,
+ nxt_py_failed_to_send_body_str,
+ NULL);
+ if (nxt_slow_path(exc == NULL)) {
+ nxt_unit_req_alert(http->req, "RuntimeError create failed");
+ nxt_python_print_exception();
+
+ exc = Py_None;
+ Py_INCREF(exc);
+ }
+
+ future = http->send_future;
+ http->send_future = NULL;
+
+ res = PyObject_CallMethodObjArgs(future, nxt_py_set_exception_str, exc,
+ NULL);
+ if (nxt_slow_path(res == NULL)) {
+ nxt_unit_req_alert(http->req, "'set_exception' call failed");
+ nxt_python_print_exception();
+ }
+
+ Py_XDECREF(res);
+ Py_DECREF(future);
+ Py_DECREF(exc);
+
+ return NXT_UNIT_ERROR;
+}
+
+
+static PyObject *
+nxt_py_asgi_http_done(PyObject *self, PyObject *future)
+{
+ int rc;
+ PyObject *res;
+ nxt_py_asgi_http_t *http;
+
+ http = (nxt_py_asgi_http_t *) self;
+
+ nxt_unit_req_debug(http->req, "asgi_http_done");
+
+ /*
+ * Get Future.result() and it raises an exception, if coroutine exited
+ * with exception.
+ */
+ res = PyObject_CallMethodObjArgs(future, nxt_py_result_str, NULL);
+ if (nxt_slow_path(res == NULL)) {
+ nxt_unit_req_error(http->req,
+ "Python failed to call 'future.result()'");
+ nxt_python_print_exception();
+
+ rc = NXT_UNIT_ERROR;
+
+ } else {
+ Py_DECREF(res);
+
+ rc = NXT_UNIT_OK;
+ }
+
+ nxt_unit_request_done(http->req, rc);
+
+ Py_RETURN_NONE;
+}
+
+
+#endif /* NXT_HAVE_ASGI */
diff --git a/src/python/nxt_python_asgi_lifespan.c b/src/python/nxt_python_asgi_lifespan.c
new file mode 100644
index 00000000..14d0ee97
--- /dev/null
+++ b/src/python/nxt_python_asgi_lifespan.c
@@ -0,0 +1,505 @@
+
+/*
+ * Copyright (C) NGINX, Inc.
+ */
+
+
+#include <python/nxt_python.h>
+
+#if (NXT_HAVE_ASGI)
+
+#include <nxt_main.h>
+#include <python/nxt_python_asgi.h>
+#include <python/nxt_python_asgi_str.h>
+
+
+typedef struct {
+ PyObject_HEAD
+ int disabled;
+ int startup_received;
+ int startup_sent;
+ int shutdown_received;
+ int shutdown_sent;
+ int shutdown_called;
+ PyObject *startup_future;
+ PyObject *shutdown_future;
+ PyObject *receive_future;
+} nxt_py_asgi_lifespan_t;
+
+
+static PyObject *nxt_py_asgi_lifespan_receive(PyObject *self, PyObject *none);
+static PyObject *nxt_py_asgi_lifespan_send(PyObject *self, PyObject *dict);
+static PyObject *nxt_py_asgi_lifespan_send_startup(
+ nxt_py_asgi_lifespan_t *lifespan, int v, PyObject *dict);
+static PyObject *nxt_py_asgi_lifespan_send_(nxt_py_asgi_lifespan_t *lifespan,
+ int v, int *sent, PyObject **future);
+static PyObject *nxt_py_asgi_lifespan_send_shutdown(
+ nxt_py_asgi_lifespan_t *lifespan, int v, PyObject *dict);
+static PyObject *nxt_py_asgi_lifespan_disable(nxt_py_asgi_lifespan_t *lifespan);
+static PyObject *nxt_py_asgi_lifespan_done(PyObject *self, PyObject *future);
+
+
+static nxt_py_asgi_lifespan_t *nxt_py_lifespan;
+
+static PyMethodDef nxt_py_asgi_lifespan_methods[] = {
+ { "receive", nxt_py_asgi_lifespan_receive, METH_NOARGS, 0 },
+ { "send", nxt_py_asgi_lifespan_send, METH_O, 0 },
+ { "_done", nxt_py_asgi_lifespan_done, METH_O, 0 },
+ { NULL, NULL, 0, 0 }
+};
+
+static PyAsyncMethods nxt_py_asgi_async_methods = {
+ .am_await = nxt_py_asgi_await,
+};
+
+static PyTypeObject nxt_py_asgi_lifespan_type = {
+ PyVarObject_HEAD_INIT(NULL, 0)
+
+ .tp_name = "unit._asgi_lifespan",
+ .tp_basicsize = sizeof(nxt_py_asgi_lifespan_t),
+ .tp_dealloc = nxt_py_asgi_dealloc,
+ .tp_as_async = &nxt_py_asgi_async_methods,
+ .tp_flags = Py_TPFLAGS_DEFAULT,
+ .tp_doc = "unit ASGI Lifespan object",
+ .tp_iter = nxt_py_asgi_iter,
+ .tp_iternext = nxt_py_asgi_next,
+ .tp_methods = nxt_py_asgi_lifespan_methods,
+};
+
+
+nxt_int_t
+nxt_py_asgi_lifespan_startup(nxt_task_t *task)
+{
+ PyObject *scope, *res, *py_task, *receive, *send, *done;
+ nxt_int_t rc;
+ nxt_py_asgi_lifespan_t *lifespan;
+
+ if (nxt_slow_path(PyType_Ready(&nxt_py_asgi_lifespan_type) != 0)) {
+ nxt_alert(task,
+ "Python failed to initialize the 'asgi_lifespan' type object");
+ return NXT_ERROR;
+ }
+
+ lifespan = PyObject_New(nxt_py_asgi_lifespan_t, &nxt_py_asgi_lifespan_type);
+ if (nxt_slow_path(lifespan == NULL)) {
+ nxt_alert(task, "Python failed to create lifespan object");
+ return NXT_ERROR;
+ }
+
+ rc = NXT_ERROR;
+
+ receive = PyObject_GetAttrString((PyObject *) lifespan, "receive");
+ if (nxt_slow_path(receive == NULL)) {
+ nxt_alert(task, "Python failed to get 'receive' method");
+ goto release_lifespan;
+ }
+
+ send = PyObject_GetAttrString((PyObject *) lifespan, "send");
+ if (nxt_slow_path(receive == NULL)) {
+ nxt_alert(task, "Python failed to get 'send' method");
+ goto release_receive;
+ }
+
+ done = PyObject_GetAttrString((PyObject *) lifespan, "_done");
+ if (nxt_slow_path(receive == NULL)) {
+ nxt_alert(task, "Python failed to get '_done' method");
+ goto release_send;
+ }
+
+ lifespan->startup_future = PyObject_CallObject(nxt_py_loop_create_future,
+ NULL);
+ if (nxt_slow_path(lifespan->startup_future == NULL)) {
+ nxt_unit_alert(NULL, "Python failed to create Future object");
+ nxt_python_print_exception();
+
+ goto release_done;
+ }
+
+ lifespan->disabled = 0;
+ lifespan->startup_received = 0;
+ lifespan->startup_sent = 0;
+ lifespan->shutdown_received = 0;
+ lifespan->shutdown_sent = 0;
+ lifespan->shutdown_called = 0;
+ lifespan->shutdown_future = NULL;
+ lifespan->receive_future = NULL;
+
+ scope = nxt_py_asgi_new_scope(NULL, nxt_py_lifespan_str, nxt_py_2_0_str);
+ if (nxt_slow_path(scope == NULL)) {
+ goto release_future;
+ }
+
+ res = PyObject_CallFunctionObjArgs(nxt_py_application,
+ scope, receive, send, NULL);
+ if (nxt_slow_path(res == NULL)) {
+ nxt_log(task, NXT_LOG_ERR, "Python failed to call the application");
+ nxt_python_print_exception();
+ goto release_scope;
+ }
+
+ if (nxt_slow_path(!PyCoro_CheckExact(res))) {
+ nxt_log(task, NXT_LOG_ERR,
+ "Application result type is not a coroutine");
+ Py_DECREF(res);
+ goto release_scope;
+ }
+
+ py_task = PyObject_CallFunctionObjArgs(nxt_py_loop_create_task, res, NULL);
+ if (nxt_slow_path(py_task == NULL)) {
+ nxt_log(task, NXT_LOG_ERR, "Python failed to call the create_task");
+ nxt_python_print_exception();
+ Py_DECREF(res);
+ goto release_scope;
+ }
+
+ Py_DECREF(res);
+
+ res = PyObject_CallMethodObjArgs(py_task, nxt_py_add_done_callback_str,
+ done, NULL);
+ if (nxt_slow_path(res == NULL)) {
+ nxt_log(task, NXT_LOG_ERR,
+ "Python failed to call 'task.add_done_callback'");
+ nxt_python_print_exception();
+ goto release_task;
+ }
+
+ Py_DECREF(res);
+
+ res = PyObject_CallFunctionObjArgs(nxt_py_loop_run_until_complete,
+ lifespan->startup_future, NULL);
+ if (nxt_slow_path(res == NULL)) {
+ nxt_alert(task, "Python failed to call loop.run_until_complete");
+ nxt_python_print_exception();
+ goto release_task;
+ }
+
+ Py_DECREF(res);
+
+ if (lifespan->startup_sent == 1 || lifespan->disabled) {
+ nxt_py_lifespan = lifespan;
+ Py_INCREF(nxt_py_lifespan);
+
+ rc = NXT_OK;
+ }
+
+release_task:
+ Py_DECREF(py_task);
+release_scope:
+ Py_DECREF(scope);
+release_future:
+ Py_CLEAR(lifespan->startup_future);
+release_done:
+ Py_DECREF(done);
+release_send:
+ Py_DECREF(send);
+release_receive:
+ Py_DECREF(receive);
+release_lifespan:
+ Py_DECREF(lifespan);
+
+ return rc;
+}
+
+
+nxt_int_t
+nxt_py_asgi_lifespan_shutdown(void)
+{
+ PyObject *msg, *future, *res;
+ nxt_py_asgi_lifespan_t *lifespan;
+
+ if (nxt_slow_path(nxt_py_lifespan == NULL || nxt_py_lifespan->disabled)) {
+ return NXT_OK;
+ }
+
+ lifespan = nxt_py_lifespan;
+ lifespan->shutdown_called = 1;
+
+ if (lifespan->receive_future != NULL) {
+ future = lifespan->receive_future;
+ lifespan->receive_future = NULL;
+
+ msg = nxt_py_asgi_new_msg(NULL, nxt_py_lifespan_shutdown_str);
+
+ if (nxt_fast_path(msg != NULL)) {
+ res = PyObject_CallMethodObjArgs(future, nxt_py_set_result_str,
+ msg, NULL);
+ Py_XDECREF(res);
+ Py_DECREF(msg);
+ }
+
+ Py_DECREF(future);
+ }
+
+ if (lifespan->shutdown_sent) {
+ return NXT_OK;
+ }
+
+ lifespan->shutdown_future = PyObject_CallObject(nxt_py_loop_create_future,
+ NULL);
+ if (nxt_slow_path(lifespan->shutdown_future == NULL)) {
+ nxt_unit_alert(NULL, "Python failed to create Future object");
+ nxt_python_print_exception();
+ return NXT_ERROR;
+ }
+
+ res = PyObject_CallFunctionObjArgs(nxt_py_loop_run_until_complete,
+ lifespan->shutdown_future, NULL);
+ if (nxt_slow_path(res == NULL)) {
+ nxt_unit_alert(NULL, "Python failed to call loop.run_until_complete");
+ nxt_python_print_exception();
+ return NXT_ERROR;
+ }
+
+ Py_DECREF(res);
+ Py_CLEAR(lifespan->shutdown_future);
+
+ return NXT_OK;
+}
+
+
+static PyObject *
+nxt_py_asgi_lifespan_receive(PyObject *self, PyObject *none)
+{
+ PyObject *msg, *future;
+ nxt_py_asgi_lifespan_t *lifespan;
+
+ lifespan = (nxt_py_asgi_lifespan_t *) self;
+
+ nxt_unit_debug(NULL, "asgi_lifespan_receive");
+
+ future = PyObject_CallObject(nxt_py_loop_create_future, NULL);
+ if (nxt_slow_path(future == NULL)) {
+ nxt_unit_alert(NULL, "Python failed to create Future object");
+ nxt_python_print_exception();
+
+ return PyErr_Format(PyExc_RuntimeError,
+ "failed to create Future object");
+ }
+
+ if (!lifespan->startup_received) {
+ lifespan->startup_received = 1;
+
+ msg = nxt_py_asgi_new_msg(NULL, nxt_py_lifespan_startup_str);
+
+ return nxt_py_asgi_set_result_soon(NULL, future, msg);
+ }
+
+ if (lifespan->shutdown_called && !lifespan->shutdown_received) {
+ lifespan->shutdown_received = 1;
+
+ msg = nxt_py_asgi_new_msg(NULL, nxt_py_lifespan_shutdown_str);
+
+ return nxt_py_asgi_set_result_soon(NULL, future, msg);
+ }
+
+ Py_INCREF(future);
+ lifespan->receive_future = future;
+
+ return future;
+}
+
+
+static PyObject *
+nxt_py_asgi_lifespan_send(PyObject *self, PyObject *dict)
+{
+ PyObject *type, *msg;
+ const char *type_str;
+ Py_ssize_t type_len;
+ nxt_py_asgi_lifespan_t *lifespan;
+
+ static const nxt_str_t startup_complete
+ = nxt_string("lifespan.startup.complete");
+ static const nxt_str_t startup_failed
+ = nxt_string("lifespan.startup.failed");
+ static const nxt_str_t shutdown_complete
+ = nxt_string("lifespan.shutdown.complete");
+ static const nxt_str_t shutdown_failed
+ = nxt_string("lifespan.shutdown.failed");
+
+ lifespan = (nxt_py_asgi_lifespan_t *) self;
+
+ type = PyDict_GetItem(dict, nxt_py_type_str);
+ if (nxt_slow_path(type == NULL || !PyUnicode_Check(type))) {
+ nxt_unit_error(NULL,
+ "asgi_lifespan_send: 'type' is not a unicode string");
+ return PyErr_Format(PyExc_TypeError,
+ "'type' is not a unicode string");
+ }
+
+ type_str = PyUnicode_AsUTF8AndSize(type, &type_len);
+
+ nxt_unit_debug(NULL, "asgi_lifespan_send type is '%.*s'",
+ (int) type_len, type_str);
+
+ if (type_len == (Py_ssize_t) startup_complete.length
+ && memcmp(type_str, startup_complete.start, type_len) == 0)
+ {
+ return nxt_py_asgi_lifespan_send_startup(lifespan, 0, NULL);
+ }
+
+ if (type_len == (Py_ssize_t) startup_failed.length
+ && memcmp(type_str, startup_failed.start, type_len) == 0)
+ {
+ msg = PyDict_GetItem(dict, nxt_py_message_str);
+ return nxt_py_asgi_lifespan_send_startup(lifespan, 1, msg);
+ }
+
+ if (type_len == (Py_ssize_t) shutdown_complete.length
+ && memcmp(type_str, shutdown_complete.start, type_len) == 0)
+ {
+ return nxt_py_asgi_lifespan_send_shutdown(lifespan, 0, NULL);
+ }
+
+ if (type_len == (Py_ssize_t) shutdown_failed.length
+ && memcmp(type_str, shutdown_failed.start, type_len) == 0)
+ {
+ msg = PyDict_GetItem(dict, nxt_py_message_str);
+ return nxt_py_asgi_lifespan_send_shutdown(lifespan, 1, msg);
+ }
+
+ return nxt_py_asgi_lifespan_disable(lifespan);
+}
+
+
+static PyObject *
+nxt_py_asgi_lifespan_send_startup(nxt_py_asgi_lifespan_t *lifespan, int v,
+ PyObject *message)
+{
+ const char *message_str;
+ Py_ssize_t message_len;
+
+ if (nxt_slow_path(v != 0)) {
+ nxt_unit_error(NULL, "Application startup failed");
+
+ if (nxt_fast_path(message != NULL && PyUnicode_Check(message))) {
+ message_str = PyUnicode_AsUTF8AndSize(message, &message_len);
+
+ nxt_unit_error(NULL, "%.*s", (int) message_len, message_str);
+ }
+ }
+
+ return nxt_py_asgi_lifespan_send_(lifespan, v,
+ &lifespan->startup_sent,
+ &lifespan->startup_future);
+}
+
+
+static PyObject *
+nxt_py_asgi_lifespan_send_(nxt_py_asgi_lifespan_t *lifespan, int v, int *sent,
+ PyObject **pfuture)
+{
+ PyObject *future, *res;
+
+ if (*sent) {
+ return nxt_py_asgi_lifespan_disable(lifespan);
+ }
+
+ *sent = 1 + v;
+
+ if (*pfuture != NULL) {
+ future = *pfuture;
+ *pfuture = NULL;
+
+ res = PyObject_CallMethodObjArgs(future, nxt_py_set_result_str,
+ Py_None, NULL);
+ if (nxt_slow_path(res == NULL)) {
+ nxt_unit_alert(NULL, "Failed to call 'future.set_result'");
+ nxt_python_print_exception();
+
+ return nxt_py_asgi_lifespan_disable(lifespan);
+ }
+
+ Py_DECREF(res);
+ Py_DECREF(future);
+ }
+
+ Py_INCREF(lifespan);
+
+ return (PyObject *) lifespan;
+}
+
+
+static PyObject *
+nxt_py_asgi_lifespan_disable(nxt_py_asgi_lifespan_t *lifespan)
+{
+ nxt_unit_warn(NULL, "Got invalid state transition on lifespan protocol");
+
+ lifespan->disabled = 1;
+
+ return PyErr_Format(PyExc_AssertionError,
+ "Got invalid state transition on lifespan protocol");
+}
+
+
+static PyObject *
+nxt_py_asgi_lifespan_send_shutdown(nxt_py_asgi_lifespan_t *lifespan, int v,
+ PyObject *message)
+{
+ return nxt_py_asgi_lifespan_send_(lifespan, v,
+ &lifespan->shutdown_sent,
+ &lifespan->shutdown_future);
+}
+
+
+static PyObject *
+nxt_py_asgi_lifespan_done(PyObject *self, PyObject *future)
+{
+ PyObject *res;
+ nxt_py_asgi_lifespan_t *lifespan;
+
+ nxt_unit_debug(NULL, "asgi_lifespan_done");
+
+ lifespan = (nxt_py_asgi_lifespan_t *) self;
+
+ if (lifespan->startup_sent == 0) {
+ lifespan->disabled = 1;
+ }
+
+ /*
+ * Get Future.result() and it raises an exception, if coroutine exited
+ * with exception.
+ */
+ res = PyObject_CallMethodObjArgs(future, nxt_py_result_str, NULL);
+ if (nxt_slow_path(res == NULL)) {
+ nxt_unit_log(NULL, NXT_UNIT_LOG_INFO,
+ "ASGI Lifespan processing exception");
+ nxt_python_print_exception();
+ }
+
+ Py_XDECREF(res);
+
+ if (lifespan->startup_future != NULL) {
+ future = lifespan->startup_future;
+ lifespan->startup_future = NULL;
+
+ res = PyObject_CallMethodObjArgs(future, nxt_py_set_result_str,
+ Py_None, NULL);
+ if (nxt_slow_path(res == NULL)) {
+ nxt_unit_alert(NULL, "Failed to call 'future.set_result'");
+ nxt_python_print_exception();
+ }
+
+ Py_XDECREF(res);
+ Py_DECREF(future);
+ }
+
+ if (lifespan->shutdown_future != NULL) {
+ future = lifespan->shutdown_future;
+ lifespan->shutdown_future = NULL;
+
+ res = PyObject_CallMethodObjArgs(future, nxt_py_set_result_str,
+ Py_None, NULL);
+ if (nxt_slow_path(res == NULL)) {
+ nxt_unit_alert(NULL, "Failed to call 'future.set_result'");
+ nxt_python_print_exception();
+ }
+
+ Py_XDECREF(res);
+ Py_DECREF(future);
+ }
+
+ Py_RETURN_NONE;
+}
+
+
+#endif /* NXT_HAVE_ASGI */
diff --git a/src/python/nxt_python_asgi_str.c b/src/python/nxt_python_asgi_str.c
new file mode 100644
index 00000000..37fa7f04
--- /dev/null
+++ b/src/python/nxt_python_asgi_str.c
@@ -0,0 +1,141 @@
+
+/*
+ * Copyright (C) NGINX, Inc.
+ */
+
+
+#include <python/nxt_python.h>
+
+#if (NXT_HAVE_ASGI)
+
+#include <nxt_main.h>
+#include <python/nxt_python_asgi_str.h>
+
+
+PyObject *nxt_py_1_0_str;
+PyObject *nxt_py_1_1_str;
+PyObject *nxt_py_2_0_str;
+PyObject *nxt_py_2_1_str;
+PyObject *nxt_py_3_0_str;
+PyObject *nxt_py_add_done_callback_str;
+PyObject *nxt_py_asgi_str;
+PyObject *nxt_py_bad_state_str;
+PyObject *nxt_py_body_str;
+PyObject *nxt_py_bytes_str;
+PyObject *nxt_py_client_str;
+PyObject *nxt_py_code_str;
+PyObject *nxt_py_done_str;
+PyObject *nxt_py_exception_str;
+PyObject *nxt_py_failed_to_send_body_str;
+PyObject *nxt_py_headers_str;
+PyObject *nxt_py_http_str;
+PyObject *nxt_py_http_disconnect_str;
+PyObject *nxt_py_http_request_str;
+PyObject *nxt_py_http_version_str;
+PyObject *nxt_py_https_str;
+PyObject *nxt_py_lifespan_str;
+PyObject *nxt_py_lifespan_shutdown_str;
+PyObject *nxt_py_lifespan_startup_str;
+PyObject *nxt_py_method_str;
+PyObject *nxt_py_message_str;
+PyObject *nxt_py_message_too_big_str;
+PyObject *nxt_py_more_body_str;
+PyObject *nxt_py_path_str;
+PyObject *nxt_py_query_string_str;
+PyObject *nxt_py_raw_path_str;
+PyObject *nxt_py_result_str;
+PyObject *nxt_py_root_path_str;
+PyObject *nxt_py_scheme_str;
+PyObject *nxt_py_server_str;
+PyObject *nxt_py_set_exception_str;
+PyObject *nxt_py_set_result_str;
+PyObject *nxt_py_spec_version_str;
+PyObject *nxt_py_status_str;
+PyObject *nxt_py_subprotocol_str;
+PyObject *nxt_py_subprotocols_str;
+PyObject *nxt_py_text_str;
+PyObject *nxt_py_type_str;
+PyObject *nxt_py_version_str;
+PyObject *nxt_py_websocket_str;
+PyObject *nxt_py_websocket_accept_str;
+PyObject *nxt_py_websocket_close_str;
+PyObject *nxt_py_websocket_connect_str;
+PyObject *nxt_py_websocket_disconnect_str;
+PyObject *nxt_py_websocket_receive_str;
+PyObject *nxt_py_websocket_send_str;
+PyObject *nxt_py_ws_str;
+PyObject *nxt_py_wss_str;
+
+static nxt_python_string_t nxt_py_asgi_strings[] = {
+ { nxt_string("1.0"), &nxt_py_1_0_str },
+ { nxt_string("1.1"), &nxt_py_1_1_str },
+ { nxt_string("2.0"), &nxt_py_2_0_str },
+ { nxt_string("2.1"), &nxt_py_2_1_str },
+ { nxt_string("3.0"), &nxt_py_3_0_str },
+ { nxt_string("add_done_callback"), &nxt_py_add_done_callback_str },
+ { nxt_string("asgi"), &nxt_py_asgi_str },
+ { nxt_string("bad state"), &nxt_py_bad_state_str },
+ { nxt_string("body"), &nxt_py_body_str },
+ { nxt_string("bytes"), &nxt_py_bytes_str },
+ { nxt_string("client"), &nxt_py_client_str },
+ { nxt_string("code"), &nxt_py_code_str },
+ { nxt_string("done"), &nxt_py_done_str },
+ { nxt_string("exception"), &nxt_py_exception_str },
+ { nxt_string("failed to send body"), &nxt_py_failed_to_send_body_str },
+ { nxt_string("headers"), &nxt_py_headers_str },
+ { nxt_string("http"), &nxt_py_http_str },
+ { nxt_string("http.disconnect"), &nxt_py_http_disconnect_str },
+ { nxt_string("http.request"), &nxt_py_http_request_str },
+ { nxt_string("http_version"), &nxt_py_http_version_str },
+ { nxt_string("https"), &nxt_py_https_str },
+ { nxt_string("lifespan"), &nxt_py_lifespan_str },
+ { nxt_string("lifespan.shutdown"), &nxt_py_lifespan_shutdown_str },
+ { nxt_string("lifespan.startup"), &nxt_py_lifespan_startup_str },
+ { nxt_string("message"), &nxt_py_message_str },
+ { nxt_string("message too big"), &nxt_py_message_too_big_str },
+ { nxt_string("method"), &nxt_py_method_str },
+ { nxt_string("more_body"), &nxt_py_more_body_str },
+ { nxt_string("path"), &nxt_py_path_str },
+ { nxt_string("query_string"), &nxt_py_query_string_str },
+ { nxt_string("raw_path"), &nxt_py_raw_path_str },
+ { nxt_string("result"), &nxt_py_result_str },
+ { nxt_string("root_path"), &nxt_py_root_path_str }, // not used
+ { nxt_string("scheme"), &nxt_py_scheme_str },
+ { nxt_string("server"), &nxt_py_server_str },
+ { nxt_string("set_exception"), &nxt_py_set_exception_str },
+ { nxt_string("set_result"), &nxt_py_set_result_str },
+ { nxt_string("spec_version"), &nxt_py_spec_version_str },
+ { nxt_string("status"), &nxt_py_status_str },
+ { nxt_string("subprotocol"), &nxt_py_subprotocol_str },
+ { nxt_string("subprotocols"), &nxt_py_subprotocols_str },
+ { nxt_string("text"), &nxt_py_text_str },
+ { nxt_string("type"), &nxt_py_type_str },
+ { nxt_string("version"), &nxt_py_version_str },
+ { nxt_string("websocket"), &nxt_py_websocket_str },
+ { nxt_string("websocket.accept"), &nxt_py_websocket_accept_str },
+ { nxt_string("websocket.close"), &nxt_py_websocket_close_str },
+ { nxt_string("websocket.connect"), &nxt_py_websocket_connect_str },
+ { nxt_string("websocket.disconnect"), &nxt_py_websocket_disconnect_str },
+ { nxt_string("websocket.receive"), &nxt_py_websocket_receive_str },
+ { nxt_string("websocket.send"), &nxt_py_websocket_send_str },
+ { nxt_string("ws"), &nxt_py_ws_str },
+ { nxt_string("wss"), &nxt_py_wss_str },
+ { nxt_null_string, NULL },
+};
+
+
+nxt_int_t
+nxt_py_asgi_str_init(void)
+{
+ return nxt_python_init_strings(nxt_py_asgi_strings);
+}
+
+
+void
+nxt_py_asgi_str_done(void)
+{
+ nxt_python_done_strings(nxt_py_asgi_strings);
+}
+
+
+#endif /* NXT_HAVE_ASGI */
diff --git a/src/python/nxt_python_asgi_str.h b/src/python/nxt_python_asgi_str.h
new file mode 100644
index 00000000..3f389c62
--- /dev/null
+++ b/src/python/nxt_python_asgi_str.h
@@ -0,0 +1,69 @@
+
+/*
+ * Copyright (C) NGINX, Inc.
+ */
+
+#ifndef _NXT_PYTHON_ASGI_STR_H_INCLUDED_
+#define _NXT_PYTHON_ASGI_STR_H_INCLUDED_
+
+
+extern PyObject *nxt_py_1_0_str;
+extern PyObject *nxt_py_1_1_str;
+extern PyObject *nxt_py_2_0_str;
+extern PyObject *nxt_py_2_1_str;
+extern PyObject *nxt_py_3_0_str;
+extern PyObject *nxt_py_add_done_callback_str;
+extern PyObject *nxt_py_asgi_str;
+extern PyObject *nxt_py_bad_state_str;
+extern PyObject *nxt_py_body_str;
+extern PyObject *nxt_py_bytes_str;
+extern PyObject *nxt_py_client_str;
+extern PyObject *nxt_py_code_str;
+extern PyObject *nxt_py_done_str;
+extern PyObject *nxt_py_exception_str;
+extern PyObject *nxt_py_failed_to_send_body_str;
+extern PyObject *nxt_py_headers_str;
+extern PyObject *nxt_py_http_str;
+extern PyObject *nxt_py_http_disconnect_str;
+extern PyObject *nxt_py_http_request_str;
+extern PyObject *nxt_py_http_version_str;
+extern PyObject *nxt_py_https_str;
+extern PyObject *nxt_py_lifespan_str;
+extern PyObject *nxt_py_lifespan_shutdown_str;
+extern PyObject *nxt_py_lifespan_startup_str;
+extern PyObject *nxt_py_method_str;
+extern PyObject *nxt_py_message_str;
+extern PyObject *nxt_py_message_too_big_str;
+extern PyObject *nxt_py_more_body_str;
+extern PyObject *nxt_py_path_str;
+extern PyObject *nxt_py_query_string_str;
+extern PyObject *nxt_py_result_str;
+extern PyObject *nxt_py_raw_path_str;
+extern PyObject *nxt_py_root_path_str;
+extern PyObject *nxt_py_scheme_str;
+extern PyObject *nxt_py_server_str;
+extern PyObject *nxt_py_set_exception_str;
+extern PyObject *nxt_py_set_result_str;
+extern PyObject *nxt_py_spec_version_str;
+extern PyObject *nxt_py_status_str;
+extern PyObject *nxt_py_subprotocol_str;
+extern PyObject *nxt_py_subprotocols_str;
+extern PyObject *nxt_py_text_str;
+extern PyObject *nxt_py_type_str;
+extern PyObject *nxt_py_version_str;
+extern PyObject *nxt_py_websocket_str;
+extern PyObject *nxt_py_websocket_accept_str;
+extern PyObject *nxt_py_websocket_close_str;
+extern PyObject *nxt_py_websocket_connect_str;
+extern PyObject *nxt_py_websocket_disconnect_str;
+extern PyObject *nxt_py_websocket_receive_str;
+extern PyObject *nxt_py_websocket_send_str;
+extern PyObject *nxt_py_ws_str;
+extern PyObject *nxt_py_wss_str;
+
+
+nxt_int_t nxt_py_asgi_str_init(void);
+void nxt_py_asgi_str_done(void);
+
+
+#endif /* _NXT_PYTHON_ASGI_STR_H_INCLUDED_ */
diff --git a/src/python/nxt_python_asgi_websocket.c b/src/python/nxt_python_asgi_websocket.c
new file mode 100644
index 00000000..5a27b588
--- /dev/null
+++ b/src/python/nxt_python_asgi_websocket.c
@@ -0,0 +1,1084 @@
+
+/*
+ * Copyright (C) NGINX, Inc.
+ */
+
+
+#include <python/nxt_python.h>
+
+#if (NXT_HAVE_ASGI)
+
+#include <nxt_main.h>
+#include <nxt_unit.h>
+#include <nxt_unit_request.h>
+#include <nxt_unit_websocket.h>
+#include <nxt_websocket_header.h>
+#include <python/nxt_python_asgi.h>
+#include <python/nxt_python_asgi_str.h>
+
+
+enum {
+ NXT_WS_INIT,
+ NXT_WS_CONNECT,
+ NXT_WS_ACCEPTED,
+ NXT_WS_DISCONNECTED,
+ NXT_WS_CLOSED,
+};
+
+
+typedef struct {
+ nxt_queue_link_t link;
+ nxt_unit_websocket_frame_t *frame;
+} nxt_py_asgi_penging_frame_t;
+
+
+typedef struct {
+ PyObject_HEAD
+ nxt_unit_request_info_t *req;
+ PyObject *receive_future;
+ PyObject *receive_exc_str;
+ int state;
+ nxt_queue_t pending_frames;
+ uint64_t pending_payload_len;
+ uint64_t pending_frame_len;
+ int pending_fins;
+} nxt_py_asgi_websocket_t;
+
+
+static PyObject *nxt_py_asgi_websocket_receive(PyObject *self, PyObject *none);
+static PyObject *nxt_py_asgi_websocket_send(PyObject *self, PyObject *dict);
+static PyObject *nxt_py_asgi_websocket_accept(nxt_py_asgi_websocket_t *ws,
+ PyObject *dict);
+static PyObject *nxt_py_asgi_websocket_close(nxt_py_asgi_websocket_t *ws,
+ PyObject *dict);
+static PyObject *nxt_py_asgi_websocket_send_frame(nxt_py_asgi_websocket_t *ws,
+ PyObject *dict);
+static void nxt_py_asgi_websocket_receive_done(nxt_py_asgi_websocket_t *ws,
+ PyObject *msg);
+static void nxt_py_asgi_websocket_receive_fail(nxt_py_asgi_websocket_t *ws,
+ PyObject *exc);
+static void nxt_py_asgi_websocket_suspend_frame(nxt_unit_websocket_frame_t *f);
+static PyObject *nxt_py_asgi_websocket_pop_msg(nxt_py_asgi_websocket_t *ws,
+ nxt_unit_websocket_frame_t *frame);
+static uint64_t nxt_py_asgi_websocket_pending_len(
+ nxt_py_asgi_websocket_t *ws);
+static nxt_unit_websocket_frame_t *nxt_py_asgi_websocket_pop_frame(
+ nxt_py_asgi_websocket_t *ws);
+static PyObject *nxt_py_asgi_websocket_disconnect_msg(
+ nxt_py_asgi_websocket_t *ws);
+static PyObject *nxt_py_asgi_websocket_done(PyObject *self, PyObject *future);
+
+
+static PyMethodDef nxt_py_asgi_websocket_methods[] = {
+ { "receive", nxt_py_asgi_websocket_receive, METH_NOARGS, 0 },
+ { "send", nxt_py_asgi_websocket_send, METH_O, 0 },
+ { "_done", nxt_py_asgi_websocket_done, METH_O, 0 },
+ { NULL, NULL, 0, 0 }
+};
+
+static PyAsyncMethods nxt_py_asgi_async_methods = {
+ .am_await = nxt_py_asgi_await,
+};
+
+static PyTypeObject nxt_py_asgi_websocket_type = {
+ PyVarObject_HEAD_INIT(NULL, 0)
+
+ .tp_name = "unit._asgi_websocket",
+ .tp_basicsize = sizeof(nxt_py_asgi_websocket_t),
+ .tp_dealloc = nxt_py_asgi_dealloc,
+ .tp_as_async = &nxt_py_asgi_async_methods,
+ .tp_flags = Py_TPFLAGS_DEFAULT,
+ .tp_doc = "unit ASGI WebSocket connection object",
+ .tp_iter = nxt_py_asgi_iter,
+ .tp_iternext = nxt_py_asgi_next,
+ .tp_methods = nxt_py_asgi_websocket_methods,
+};
+
+static uint64_t nxt_py_asgi_ws_max_frame_size = 1024 * 1024;
+static uint64_t nxt_py_asgi_ws_max_buffer_size = 10 * 1024 * 1024;
+
+
+nxt_int_t
+nxt_py_asgi_websocket_init(nxt_task_t *task)
+{
+ if (nxt_slow_path(PyType_Ready(&nxt_py_asgi_websocket_type) != 0)) {
+ nxt_alert(task,
+ "Python failed to initialize the \"asgi_websocket\" type object");
+ return NXT_ERROR;
+ }
+
+ return NXT_OK;
+}
+
+
+PyObject *
+nxt_py_asgi_websocket_create(nxt_unit_request_info_t *req)
+{
+ nxt_py_asgi_websocket_t *ws;
+
+ ws = PyObject_New(nxt_py_asgi_websocket_t, &nxt_py_asgi_websocket_type);
+
+ if (nxt_fast_path(ws != NULL)) {
+ ws->req = req;
+ ws->receive_future = NULL;
+ ws->receive_exc_str = NULL;
+ ws->state = NXT_WS_INIT;
+ nxt_queue_init(&ws->pending_frames);
+ ws->pending_payload_len = 0;
+ ws->pending_frame_len = 0;
+ ws->pending_fins = 0;
+ }
+
+ return (PyObject *) ws;
+}
+
+
+static PyObject *
+nxt_py_asgi_websocket_receive(PyObject *self, PyObject *none)
+{
+ PyObject *future, *msg;
+ nxt_py_asgi_websocket_t *ws;
+
+ ws = (nxt_py_asgi_websocket_t *) self;
+
+ nxt_unit_req_debug(ws->req, "asgi_websocket_receive");
+
+ /* If exception happened out of receive() call, raise it now. */
+ if (nxt_slow_path(ws->receive_exc_str != NULL)) {
+ PyErr_SetObject(PyExc_RuntimeError, ws->receive_exc_str);
+
+ ws->receive_exc_str = NULL;
+
+ return NULL;
+ }
+
+ if (nxt_slow_path(ws->state == NXT_WS_CLOSED)) {
+ nxt_unit_req_error(ws->req,
+ "receive() called for closed WebSocket");
+
+ return PyErr_Format(PyExc_RuntimeError,
+ "WebSocket already closed");
+ }
+
+ future = PyObject_CallObject(nxt_py_loop_create_future, NULL);
+ if (nxt_slow_path(future == NULL)) {
+ nxt_unit_req_alert(ws->req, "Python failed to create Future object");
+ nxt_python_print_exception();
+
+ return PyErr_Format(PyExc_RuntimeError,
+ "failed to create Future object");
+ }
+
+ if (nxt_slow_path(ws->state == NXT_WS_INIT)) {
+ ws->state = NXT_WS_CONNECT;
+
+ msg = nxt_py_asgi_new_msg(ws->req, nxt_py_websocket_connect_str);
+
+ return nxt_py_asgi_set_result_soon(ws->req, future, msg);
+ }
+
+ if (ws->pending_fins > 0) {
+ msg = nxt_py_asgi_websocket_pop_msg(ws, NULL);
+
+ return nxt_py_asgi_set_result_soon(ws->req, future, msg);
+ }
+
+ if (nxt_slow_path(ws->state == NXT_WS_DISCONNECTED)) {
+ msg = nxt_py_asgi_websocket_disconnect_msg(ws);
+
+ return nxt_py_asgi_set_result_soon(ws->req, future, msg);
+ }
+
+ ws->receive_future = future;
+ Py_INCREF(ws->receive_future);
+
+ return future;
+}
+
+
+static PyObject *
+nxt_py_asgi_websocket_send(PyObject *self, PyObject *dict)
+{
+ PyObject *type;
+ const char *type_str;
+ Py_ssize_t type_len;
+ nxt_py_asgi_websocket_t *ws;
+
+ static const nxt_str_t websocket_accept = nxt_string("websocket.accept");
+ static const nxt_str_t websocket_close = nxt_string("websocket.close");
+ static const nxt_str_t websocket_send = nxt_string("websocket.send");
+
+ ws = (nxt_py_asgi_websocket_t *) self;
+
+ type = PyDict_GetItem(dict, nxt_py_type_str);
+ if (nxt_slow_path(type == NULL || !PyUnicode_Check(type))) {
+ nxt_unit_req_error(ws->req, "asgi_websocket_send: "
+ "'type' is not a unicode string");
+ return PyErr_Format(PyExc_TypeError,
+ "'type' is not a unicode string");
+ }
+
+ type_str = PyUnicode_AsUTF8AndSize(type, &type_len);
+
+ nxt_unit_req_debug(ws->req, "asgi_websocket_send type is '%.*s'",
+ (int) type_len, type_str);
+
+ if (type_len == (Py_ssize_t) websocket_accept.length
+ && memcmp(type_str, websocket_accept.start, type_len) == 0)
+ {
+ return nxt_py_asgi_websocket_accept(ws, dict);
+ }
+
+ if (type_len == (Py_ssize_t) websocket_close.length
+ && memcmp(type_str, websocket_close.start, type_len) == 0)
+ {
+ return nxt_py_asgi_websocket_close(ws, dict);
+ }
+
+ if (type_len == (Py_ssize_t) websocket_send.length
+ && memcmp(type_str, websocket_send.start, type_len) == 0)
+ {
+ return nxt_py_asgi_websocket_send_frame(ws, dict);
+ }
+
+ nxt_unit_req_error(ws->req, "asgi_websocket_send: "
+ "unexpected 'type': '%.*s'", (int) type_len, type_str);
+ return PyErr_Format(PyExc_AssertionError, "unexpected 'type': '%U'", type);
+}
+
+
+static PyObject *
+nxt_py_asgi_websocket_accept(nxt_py_asgi_websocket_t *ws, PyObject *dict)
+{
+ int rc;
+ char *subprotocol_str;
+ PyObject *res, *headers, *subprotocol;
+ Py_ssize_t subprotocol_len;
+ nxt_py_asgi_calc_size_ctx_t calc_size_ctx;
+ nxt_py_asgi_add_field_ctx_t add_field_ctx;
+
+ static const nxt_str_t ws_protocol = nxt_string("sec-websocket-protocol");
+
+ switch(ws->state) {
+ case NXT_WS_INIT:
+ return PyErr_Format(PyExc_RuntimeError,
+ "WebSocket connect not received");
+ case NXT_WS_CONNECT:
+ break;
+
+ case NXT_WS_ACCEPTED:
+ return PyErr_Format(PyExc_RuntimeError, "WebSocket already accepted");
+
+ case NXT_WS_DISCONNECTED:
+ return PyErr_Format(PyExc_RuntimeError, "WebSocket disconnected");
+
+ case NXT_WS_CLOSED:
+ return PyErr_Format(PyExc_RuntimeError, "WebSocket already closed");
+ }
+
+ if (nxt_slow_path(nxt_unit_response_is_websocket(ws->req))) {
+ return PyErr_Format(PyExc_RuntimeError, "WebSocket already accepted");
+ }
+
+ if (nxt_slow_path(nxt_unit_response_is_sent(ws->req))) {
+ return PyErr_Format(PyExc_RuntimeError, "response already sent");
+ }
+
+ calc_size_ctx.fields_size = 0;
+ calc_size_ctx.fields_count = 0;
+
+ headers = PyDict_GetItem(dict, nxt_py_headers_str);
+ if (headers != NULL) {
+ res = nxt_py_asgi_enum_headers(headers, nxt_py_asgi_calc_size,
+ &calc_size_ctx);
+ if (nxt_slow_path(res == NULL)) {
+ return NULL;
+ }
+ }
+
+ subprotocol = PyDict_GetItem(dict, nxt_py_subprotocol_str);
+ if (subprotocol != NULL && PyUnicode_Check(subprotocol)) {
+ subprotocol_str = PyUnicode_DATA(subprotocol);
+ subprotocol_len = PyUnicode_GET_LENGTH(subprotocol);
+
+ calc_size_ctx.fields_size += ws_protocol.length + subprotocol_len;
+ calc_size_ctx.fields_count++;
+
+ } else {
+ subprotocol_str = NULL;
+ subprotocol_len = 0;
+ }
+
+ rc = nxt_unit_response_init(ws->req, 101,
+ calc_size_ctx.fields_count,
+ calc_size_ctx.fields_size);
+ if (nxt_slow_path(rc != NXT_UNIT_OK)) {
+ return PyErr_Format(PyExc_RuntimeError,
+ "failed to allocate response object");
+ }
+
+ add_field_ctx.req = ws->req;
+ add_field_ctx.content_length = -1;
+
+ if (headers != NULL) {
+ res = nxt_py_asgi_enum_headers(headers, nxt_py_asgi_add_field,
+ &add_field_ctx);
+ if (nxt_slow_path(res == NULL)) {
+ return NULL;
+ }
+ }
+
+ if (subprotocol_len > 0) {
+ rc = nxt_unit_response_add_field(ws->req,
+ (const char *) ws_protocol.start,
+ ws_protocol.length,
+ subprotocol_str, subprotocol_len);
+ if (nxt_slow_path(rc != NXT_UNIT_OK)) {
+ return PyErr_Format(PyExc_RuntimeError,
+ "failed to add header");
+ }
+ }
+
+ rc = nxt_unit_response_send(ws->req);
+ if (nxt_slow_path(rc != NXT_UNIT_OK)) {
+ return PyErr_Format(PyExc_RuntimeError, "failed to send response");
+ }
+
+ ws->state = NXT_WS_ACCEPTED;
+
+ Py_INCREF(ws);
+
+ return (PyObject *) ws;
+}
+
+
+static PyObject *
+nxt_py_asgi_websocket_close(nxt_py_asgi_websocket_t *ws, PyObject *dict)
+{
+ int rc;
+ uint16_t status_code;
+ PyObject *code;
+
+ if (nxt_slow_path(ws->state == NXT_WS_INIT)) {
+ return PyErr_Format(PyExc_RuntimeError,
+ "WebSocket connect not received");
+ }
+
+ if (nxt_slow_path(ws->state == NXT_WS_DISCONNECTED)) {
+ return PyErr_Format(PyExc_RuntimeError, "WebSocket disconnected");
+ }
+
+ if (nxt_slow_path(ws->state == NXT_WS_CLOSED)) {
+ return PyErr_Format(PyExc_RuntimeError, "WebSocket already closed");
+ }
+
+ if (nxt_unit_response_is_websocket(ws->req)) {
+ code = PyDict_GetItem(dict, nxt_py_code_str);
+ if (nxt_slow_path(code != NULL && !PyLong_Check(code))) {
+ return PyErr_Format(PyExc_TypeError, "'code' is not integer");
+ }
+
+ status_code = (code != NULL) ? htons(PyLong_AsLong(code))
+ : htons(NXT_WEBSOCKET_CR_NORMAL);
+
+ rc = nxt_unit_websocket_send(ws->req, NXT_WEBSOCKET_OP_CLOSE,
+ 1, &status_code, 2);
+ if (nxt_slow_path(rc != NXT_UNIT_OK)) {
+ return PyErr_Format(PyExc_RuntimeError,
+ "failed to send close frame");
+ }
+
+ } else {
+ rc = nxt_unit_response_init(ws->req, 403, 0, 0);
+ if (nxt_slow_path(rc != NXT_UNIT_OK)) {
+ return PyErr_Format(PyExc_RuntimeError,
+ "failed to allocate response object");
+ }
+
+ rc = nxt_unit_response_send(ws->req);
+ if (nxt_slow_path(rc != NXT_UNIT_OK)) {
+ return PyErr_Format(PyExc_RuntimeError,
+ "failed to send response");
+ }
+ }
+
+ ws->state = NXT_WS_CLOSED;
+
+ Py_INCREF(ws);
+
+ return (PyObject *) ws;
+}
+
+
+static PyObject *
+nxt_py_asgi_websocket_send_frame(nxt_py_asgi_websocket_t *ws, PyObject *dict)
+{
+ int rc;
+ uint8_t opcode;
+ PyObject *bytes, *text;
+ const void *buf;
+ Py_ssize_t buf_size;
+
+ if (nxt_slow_path(ws->state == NXT_WS_INIT)) {
+ return PyErr_Format(PyExc_RuntimeError,
+ "WebSocket connect not received");
+ }
+
+ if (nxt_slow_path(ws->state == NXT_WS_CONNECT)) {
+ return PyErr_Format(PyExc_RuntimeError,
+ "WebSocket not accepted yet");
+ }
+
+ if (nxt_slow_path(ws->state == NXT_WS_DISCONNECTED)) {
+ return PyErr_Format(PyExc_RuntimeError, "WebSocket disconnected");
+ }
+
+ if (nxt_slow_path(ws->state == NXT_WS_CLOSED)) {
+ return PyErr_Format(PyExc_RuntimeError, "WebSocket already closed");
+ }
+
+ bytes = PyDict_GetItem(dict, nxt_py_bytes_str);
+ if (bytes == Py_None) {
+ bytes = NULL;
+ }
+
+ if (nxt_slow_path(bytes != NULL && !PyBytes_Check(bytes))) {
+ return PyErr_Format(PyExc_TypeError,
+ "'bytes' is not a byte string");
+ }
+
+ text = PyDict_GetItem(dict, nxt_py_text_str);
+ if (text == Py_None) {
+ text = NULL;
+ }
+
+ if (nxt_slow_path(text != NULL && !PyUnicode_Check(text))) {
+ return PyErr_Format(PyExc_TypeError,
+ "'text' is not a unicode string");
+ }
+
+ if (nxt_slow_path(((bytes != NULL) ^ (text != NULL)) == 0)) {
+ return PyErr_Format(PyExc_ValueError,
+ "Exactly one of 'bytes' or 'text' must be non-None");
+ }
+
+ if (bytes != NULL) {
+ buf = PyBytes_AS_STRING(bytes);
+ buf_size = PyBytes_GET_SIZE(bytes);
+ opcode = NXT_WEBSOCKET_OP_BINARY;
+
+ } else {
+ buf = PyUnicode_AsUTF8AndSize(text, &buf_size);
+ opcode = NXT_WEBSOCKET_OP_TEXT;
+ }
+
+ rc = nxt_unit_websocket_send(ws->req, opcode, 1, buf, buf_size);
+ if (nxt_slow_path(rc != NXT_UNIT_OK)) {
+ return PyErr_Format(PyExc_RuntimeError, "failed to send close frame");
+ }
+
+ Py_INCREF(ws);
+ return (PyObject *) ws;
+}
+
+
+void
+nxt_py_asgi_websocket_handler(nxt_unit_websocket_frame_t *frame)
+{
+ uint8_t opcode;
+ uint16_t status_code;
+ uint64_t rest;
+ PyObject *msg, *exc;
+ nxt_py_asgi_websocket_t *ws;
+
+ ws = frame->req->data;
+
+ nxt_unit_req_debug(ws->req, "asgi_websocket_handler");
+
+ opcode = frame->header->opcode;
+ if (nxt_slow_path(opcode != NXT_WEBSOCKET_OP_CONT
+ && opcode != NXT_WEBSOCKET_OP_TEXT
+ && opcode != NXT_WEBSOCKET_OP_BINARY
+ && opcode != NXT_WEBSOCKET_OP_CLOSE))
+ {
+ nxt_unit_websocket_done(frame);
+
+ nxt_unit_req_debug(ws->req,
+ "asgi_websocket_handler: ignore frame with opcode %d",
+ opcode);
+
+ return;
+ }
+
+ if (nxt_slow_path(ws->state != NXT_WS_ACCEPTED)) {
+ nxt_unit_websocket_done(frame);
+
+ goto bad_state;
+ }
+
+ rest = nxt_py_asgi_ws_max_frame_size - ws->pending_frame_len;
+
+ if (nxt_slow_path(frame->payload_len > rest)) {
+ nxt_unit_websocket_done(frame);
+
+ goto too_big;
+ }
+
+ rest = nxt_py_asgi_ws_max_buffer_size - ws->pending_payload_len;
+
+ if (nxt_slow_path(frame->payload_len > rest)) {
+ nxt_unit_websocket_done(frame);
+
+ goto too_big;
+ }
+
+ if (ws->receive_future == NULL || frame->header->fin == 0) {
+ nxt_py_asgi_websocket_suspend_frame(frame);
+
+ return;
+ }
+
+ if (!nxt_queue_is_empty(&ws->pending_frames)) {
+ if (nxt_slow_path(opcode == NXT_WEBSOCKET_OP_TEXT
+ || opcode == NXT_WEBSOCKET_OP_BINARY))
+ {
+ nxt_unit_req_alert(ws->req,
+ "Invalid state: pending frames with active receiver. "
+ "CONT frame expected. (%d)", opcode);
+
+ PyErr_SetString(PyExc_AssertionError,
+ "Invalid state: pending frames with active receiver. "
+ "CONT frame expected.");
+
+ nxt_unit_websocket_done(frame);
+
+ return;
+ }
+ }
+
+ msg = nxt_py_asgi_websocket_pop_msg(ws, frame);
+ if (nxt_slow_path(msg == NULL)) {
+ exc = PyErr_Occurred();
+ Py_INCREF(exc);
+
+ goto raise;
+ }
+
+ nxt_py_asgi_websocket_receive_done(ws, msg);
+
+ return;
+
+bad_state:
+
+ if (ws->receive_future == NULL) {
+ ws->receive_exc_str = nxt_py_bad_state_str;
+
+ return;
+ }
+
+ exc = PyObject_CallFunctionObjArgs(PyExc_RuntimeError,
+ nxt_py_bad_state_str,
+ NULL);
+ if (nxt_slow_path(exc == NULL)) {
+ nxt_unit_req_alert(ws->req, "RuntimeError create failed");
+ nxt_python_print_exception();
+
+ exc = Py_None;
+ Py_INCREF(exc);
+ }
+
+ goto raise;
+
+too_big:
+
+ status_code = htons(NXT_WEBSOCKET_CR_MESSAGE_TOO_BIG);
+
+ (void) nxt_unit_websocket_send(ws->req, NXT_WEBSOCKET_OP_CLOSE,
+ 1, &status_code, 2);
+
+ ws->state = NXT_WS_CLOSED;
+
+ if (ws->receive_future == NULL) {
+ ws->receive_exc_str = nxt_py_message_too_big_str;
+
+ return;
+ }
+
+ exc = PyObject_CallFunctionObjArgs(PyExc_RuntimeError,
+ nxt_py_message_too_big_str,
+ NULL);
+ if (nxt_slow_path(exc == NULL)) {
+ nxt_unit_req_alert(ws->req, "RuntimeError create failed");
+ nxt_python_print_exception();
+
+ exc = Py_None;
+ Py_INCREF(exc);
+ }
+
+raise:
+
+ nxt_py_asgi_websocket_receive_fail(ws, exc);
+}
+
+
+static void
+nxt_py_asgi_websocket_receive_done(nxt_py_asgi_websocket_t *ws, PyObject *msg)
+{
+ PyObject *future, *res;
+
+ future = ws->receive_future;
+ ws->receive_future = NULL;
+
+ res = PyObject_CallMethodObjArgs(future, nxt_py_set_result_str, msg, NULL);
+ if (nxt_slow_path(res == NULL)) {
+ nxt_unit_req_alert(ws->req, "'set_result' call failed");
+ nxt_python_print_exception();
+ }
+
+ Py_XDECREF(res);
+ Py_DECREF(future);
+
+ Py_DECREF(msg);
+}
+
+
+static void
+nxt_py_asgi_websocket_receive_fail(nxt_py_asgi_websocket_t *ws, PyObject *exc)
+{
+ PyObject *future, *res;
+
+ future = ws->receive_future;
+ ws->receive_future = NULL;
+
+ res = PyObject_CallMethodObjArgs(future, nxt_py_set_exception_str, exc,
+ NULL);
+ if (nxt_slow_path(res == NULL)) {
+ nxt_unit_req_alert(ws->req, "'set_exception' call failed");
+ nxt_python_print_exception();
+ }
+
+ Py_XDECREF(res);
+ Py_DECREF(future);
+
+ Py_DECREF(exc);
+}
+
+
+static void
+nxt_py_asgi_websocket_suspend_frame(nxt_unit_websocket_frame_t *frame)
+{
+ int rc;
+ nxt_py_asgi_websocket_t *ws;
+ nxt_py_asgi_penging_frame_t *p;
+
+ nxt_unit_req_debug(frame->req, "asgi_websocket_suspend_frame: "
+ "%d, %"PRIu64", %d",
+ frame->header->opcode, frame->payload_len,
+ frame->header->fin);
+
+ ws = frame->req->data;
+
+ rc = nxt_unit_websocket_retain(frame);
+ if (nxt_slow_path(rc == NXT_UNIT_ERROR)) {
+ nxt_unit_req_alert(ws->req, "Failed to retain frame for suspension.");
+
+ nxt_unit_websocket_done(frame);
+
+ PyErr_SetString(PyExc_RuntimeError,
+ "Failed to retain frame for suspension.");
+
+ return;
+ }
+
+ p = nxt_unit_malloc(frame->req->ctx, sizeof(nxt_py_asgi_penging_frame_t));
+ if (nxt_slow_path(p == NULL)) {
+ nxt_unit_req_alert(ws->req,
+ "Failed to allocate buffer to suspend frame.");
+
+ nxt_unit_websocket_done(frame);
+
+ PyErr_SetString(PyExc_RuntimeError,
+ "Failed to allocate buffer to suspend frame.");
+
+ return;
+ }
+
+ p->frame = frame;
+ nxt_queue_insert_tail(&ws->pending_frames, &p->link);
+
+ ws->pending_payload_len += frame->payload_len;
+ ws->pending_fins += frame->header->fin;
+
+ if (frame->header->fin) {
+ ws->pending_frame_len = 0;
+
+ } else {
+ if (frame->header->opcode == NXT_WEBSOCKET_OP_CONT) {
+ ws->pending_frame_len += frame->payload_len;
+
+ } else {
+ ws->pending_frame_len = frame->payload_len;
+ }
+ }
+}
+
+
+static PyObject *
+nxt_py_asgi_websocket_pop_msg(nxt_py_asgi_websocket_t *ws,
+ nxt_unit_websocket_frame_t *frame)
+{
+ int fin;
+ char *buf;
+ uint8_t code_buf[2], opcode;
+ uint16_t code;
+ PyObject *msg, *data, *type, *data_key;
+ uint64_t payload_len;
+ nxt_unit_websocket_frame_t *fin_frame;
+
+ nxt_unit_req_debug(ws->req, "asgi_websocket_pop_msg");
+
+ fin_frame = NULL;
+
+ if (nxt_queue_is_empty(&ws->pending_frames)
+ || (frame != NULL
+ && frame->header->opcode == NXT_WEBSOCKET_OP_CLOSE))
+ {
+ payload_len = frame->payload_len;
+
+ } else {
+ if (frame != NULL) {
+ payload_len = ws->pending_payload_len + frame->payload_len;
+ fin_frame = frame;
+
+ } else {
+ payload_len = nxt_py_asgi_websocket_pending_len(ws);
+ }
+
+ frame = nxt_py_asgi_websocket_pop_frame(ws);
+ }
+
+ opcode = frame->header->opcode;
+
+ if (nxt_slow_path(opcode == NXT_WEBSOCKET_OP_CONT)) {
+ nxt_unit_req_alert(ws->req,
+ "Invalid state: attempt to process CONT frame.");
+
+ nxt_unit_websocket_done(frame);
+
+ return PyErr_Format(PyExc_AssertionError,
+ "Invalid state: attempt to process CONT frame.");
+ }
+
+ type = nxt_py_websocket_receive_str;
+
+ switch (opcode) {
+ case NXT_WEBSOCKET_OP_TEXT:
+ buf = nxt_unit_malloc(frame->req->ctx, payload_len);
+ if (nxt_slow_path(buf == NULL)) {
+ nxt_unit_req_alert(ws->req,
+ "Failed to allocate buffer for payload (%d).",
+ (int) payload_len);
+
+ nxt_unit_websocket_done(frame);
+
+ return PyErr_Format(PyExc_RuntimeError,
+ "Failed to allocate buffer for payload (%d).",
+ (int) payload_len);
+ }
+
+ data = NULL;
+ data_key = nxt_py_text_str;
+
+ break;
+
+ case NXT_WEBSOCKET_OP_BINARY:
+ data = PyBytes_FromStringAndSize(NULL, payload_len);
+ if (nxt_slow_path(data == NULL)) {
+ nxt_unit_req_alert(ws->req,
+ "Failed to create Bytes for payload (%d).",
+ (int) payload_len);
+ nxt_python_print_exception();
+
+ nxt_unit_websocket_done(frame);
+
+ return PyErr_Format(PyExc_RuntimeError,
+ "Failed to create Bytes for payload.");
+ }
+
+ buf = (char *) PyBytes_AS_STRING(data);
+ data_key = nxt_py_bytes_str;
+
+ break;
+
+ case NXT_WEBSOCKET_OP_CLOSE:
+ if (frame->payload_len >= 2) {
+ nxt_unit_websocket_read(frame, code_buf, 2);
+ code = ((uint16_t) code_buf[0]) << 8 | code_buf[1];
+
+ } else {
+ code = NXT_WEBSOCKET_CR_NORMAL;
+ }
+
+ nxt_unit_websocket_done(frame);
+
+ data = PyLong_FromLong(code);
+ if (nxt_slow_path(data == NULL)) {
+ nxt_unit_req_alert(ws->req,
+ "Failed to create Long from code %d.",
+ (int) code);
+ nxt_python_print_exception();
+
+ return PyErr_Format(PyExc_RuntimeError,
+ "Failed to create Long from code %d.",
+ (int) code);
+ }
+
+ buf = NULL;
+ type = nxt_py_websocket_disconnect_str;
+ data_key = nxt_py_code_str;
+
+ break;
+
+ default:
+ nxt_unit_req_alert(ws->req, "Unexpected opcode %d", opcode);
+
+ nxt_unit_websocket_done(frame);
+
+ return PyErr_Format(PyExc_AssertionError, "Unexpected opcode %d",
+ opcode);
+ }
+
+ if (buf != NULL) {
+ fin = frame->header->fin;
+ buf += nxt_unit_websocket_read(frame, buf, frame->payload_len);
+
+ nxt_unit_websocket_done(frame);
+
+ if (!fin) {
+ while (!nxt_queue_is_empty(&ws->pending_frames)) {
+ frame = nxt_py_asgi_websocket_pop_frame(ws);
+ fin = frame->header->fin;
+
+ buf += nxt_unit_websocket_read(frame, buf, frame->payload_len);
+
+ nxt_unit_websocket_done(frame);
+
+ if (fin) {
+ break;
+ }
+ }
+
+ if (fin_frame != NULL) {
+ buf += nxt_unit_websocket_read(fin_frame, buf,
+ fin_frame->payload_len);
+ nxt_unit_websocket_done(fin_frame);
+ }
+ }
+
+ if (opcode == NXT_WEBSOCKET_OP_TEXT) {
+ buf -= payload_len;
+
+ data = PyUnicode_DecodeUTF8(buf, payload_len, NULL);
+
+ nxt_unit_free(ws->req->ctx, buf);
+
+ if (nxt_slow_path(data == NULL)) {
+ nxt_unit_req_alert(ws->req,
+ "Failed to create Unicode for payload (%d).",
+ (int) payload_len);
+ nxt_python_print_exception();
+
+ return PyErr_Format(PyExc_RuntimeError,
+ "Failed to create Unicode.");
+ }
+ }
+ }
+
+ msg = nxt_py_asgi_new_msg(ws->req, type);
+ if (nxt_slow_path(msg == NULL)) {
+ Py_DECREF(data);
+ return NULL;
+ }
+
+ if (nxt_slow_path(PyDict_SetItem(msg, data_key, data) == -1)) {
+ nxt_unit_req_alert(ws->req, "Python failed to set 'msg.data' item");
+
+ Py_DECREF(msg);
+ Py_DECREF(data);
+
+ return PyErr_Format(PyExc_RuntimeError,
+ "Python failed to set 'msg.data' item");
+ }
+
+ Py_DECREF(data);
+
+ return msg;
+}
+
+
+static uint64_t
+nxt_py_asgi_websocket_pending_len(nxt_py_asgi_websocket_t *ws)
+{
+ uint64_t res;
+ nxt_py_asgi_penging_frame_t *p;
+
+ res = 0;
+
+ nxt_queue_each(p, &ws->pending_frames, nxt_py_asgi_penging_frame_t, link) {
+ res += p->frame->payload_len;
+
+ if (p->frame->header->fin) {
+ nxt_unit_req_debug(ws->req, "asgi_websocket_pending_len: %d",
+ (int) res);
+ return res;
+ }
+ } nxt_queue_loop;
+
+ nxt_unit_req_debug(ws->req, "asgi_websocket_pending_len: %d (all)",
+ (int) res);
+ return res;
+}
+
+
+static nxt_unit_websocket_frame_t *
+nxt_py_asgi_websocket_pop_frame(nxt_py_asgi_websocket_t *ws)
+{
+ nxt_queue_link_t *lnk;
+ nxt_unit_websocket_frame_t *frame;
+ nxt_py_asgi_penging_frame_t *p;
+
+ lnk = nxt_queue_first(&ws->pending_frames);
+ nxt_queue_remove(lnk);
+
+ p = nxt_queue_link_data(lnk, nxt_py_asgi_penging_frame_t, link);
+
+ frame = p->frame;
+ ws->pending_payload_len -= frame->payload_len;
+ ws->pending_fins -= frame->header->fin;
+
+ nxt_unit_free(frame->req->ctx, p);
+
+ nxt_unit_req_debug(frame->req, "asgi_websocket_pop_frame: "
+ "%d, %"PRIu64", %d",
+ frame->header->opcode, frame->payload_len,
+ frame->header->fin);
+
+ return frame;
+}
+
+
+void
+nxt_py_asgi_websocket_close_handler(nxt_unit_request_info_t *req)
+{
+ PyObject *msg, *exc;
+ nxt_py_asgi_websocket_t *ws;
+
+ ws = req->data;
+
+ nxt_unit_req_debug(req, "asgi_websocket_close_handler");
+
+ if (ws->receive_future == NULL) {
+ ws->state = NXT_WS_DISCONNECTED;
+
+ return;
+ }
+
+ msg = nxt_py_asgi_websocket_disconnect_msg(ws);
+ if (nxt_slow_path(msg == NULL)) {
+ exc = PyErr_Occurred();
+ Py_INCREF(exc);
+
+ nxt_py_asgi_websocket_receive_fail(ws, exc);
+
+ } else {
+ nxt_py_asgi_websocket_receive_done(ws, msg);
+ }
+}
+
+
+static PyObject *
+nxt_py_asgi_websocket_disconnect_msg(nxt_py_asgi_websocket_t *ws)
+{
+ PyObject *msg, *code;
+
+ msg = nxt_py_asgi_new_msg(ws->req, nxt_py_websocket_disconnect_str);
+ if (nxt_slow_path(msg == NULL)) {
+ return NULL;
+ }
+
+ code = PyLong_FromLong(NXT_WEBSOCKET_CR_GOING_AWAY);
+ if (nxt_slow_path(code == NULL)) {
+ nxt_unit_req_alert(ws->req, "Python failed to create long");
+ nxt_python_print_exception();
+
+ Py_DECREF(msg);
+
+ return PyErr_Format(PyExc_RuntimeError, "failed to create long");
+ }
+
+ if (nxt_slow_path(PyDict_SetItem(msg, nxt_py_code_str, code) == -1)) {
+ nxt_unit_req_alert(ws->req, "Python failed to set 'msg.code' item");
+
+ Py_DECREF(msg);
+ Py_DECREF(code);
+
+ return PyErr_Format(PyExc_RuntimeError,
+ "Python failed to set 'msg.code' item");
+ }
+
+ Py_DECREF(code);
+
+ return msg;
+}
+
+
+static PyObject *
+nxt_py_asgi_websocket_done(PyObject *self, PyObject *future)
+{
+ int rc;
+ uint16_t status_code;
+ PyObject *res;
+ nxt_py_asgi_websocket_t *ws;
+
+ ws = (nxt_py_asgi_websocket_t *) self;
+
+ nxt_unit_req_debug(ws->req, "asgi_websocket_done: %p", self);
+
+ /*
+ * Get Future.result() and it raises an exception, if coroutine exited
+ * with exception.
+ */
+ res = PyObject_CallMethodObjArgs(future, nxt_py_result_str, NULL);
+ if (nxt_slow_path(res == NULL)) {
+ nxt_unit_req_error(ws->req,
+ "Python failed to call 'future.result()'");
+ nxt_python_print_exception();
+
+ rc = NXT_UNIT_ERROR;
+
+ } else {
+ Py_DECREF(res);
+
+ rc = NXT_UNIT_OK;
+ }
+
+ if (ws->state == NXT_WS_ACCEPTED) {
+ status_code = (rc == NXT_UNIT_OK)
+ ? htons(NXT_WEBSOCKET_CR_NORMAL)
+ : htons(NXT_WEBSOCKET_CR_INTERNAL_SERVER_ERROR);
+
+ rc = nxt_unit_websocket_send(ws->req, NXT_WEBSOCKET_OP_CLOSE,
+ 1, &status_code, 2);
+ }
+
+ while (!nxt_queue_is_empty(&ws->pending_frames)) {
+ nxt_unit_websocket_done(nxt_py_asgi_websocket_pop_frame(ws));
+ }
+
+ nxt_unit_request_done(ws->req, rc);
+
+ Py_RETURN_NONE;
+}
+
+
+#endif /* NXT_HAVE_ASGI */
diff --git a/src/python/nxt_python_wsgi.c b/src/python/nxt_python_wsgi.c
index 3a5842f1..97030cd3 100644
--- a/src/python/nxt_python_wsgi.c
+++ b/src/python/nxt_python_wsgi.c
@@ -38,24 +38,6 @@
*/
-#if PY_MAJOR_VERSION == 3
-#define NXT_PYTHON_BYTES_TYPE "bytestring"
-
-#define PyString_FromStringAndSize(str, size) \
- PyUnicode_DecodeLatin1((str), (size), "strict")
-#define PyString_AS_STRING PyUnicode_DATA
-
-#else
-#define NXT_PYTHON_BYTES_TYPE "string"
-
-#define PyBytes_FromStringAndSize PyString_FromStringAndSize
-#define PyBytes_Check PyString_Check
-#define PyBytes_GET_SIZE PyString_GET_SIZE
-#define PyBytes_AS_STRING PyString_AS_STRING
-#define PyUnicode_InternInPlace PyString_InternInPlace
-#define PyUnicode_AsUTF8 PyString_AS_STRING
-#endif
-
typedef struct nxt_python_run_ctx_s nxt_python_run_ctx_t;
typedef struct {