diff options
author | Max Romanov <max.romanov@nginx.com> | 2020-10-01 23:55:23 +0300 |
---|---|---|
committer | Max Romanov <max.romanov@nginx.com> | 2020-10-01 23:55:23 +0300 |
commit | c4c2f90c5b532c1ec283d211e0fd50e4538c2a51 (patch) | |
tree | 2fc2558bc370d80f5d60a0d6c5a0c6791a48e466 /src | |
parent | bbc6d2470afe8bfc8a97427b0b576080466bd31a (diff) | |
download | unit-c4c2f90c5b532c1ec283d211e0fd50e4538c2a51.tar.gz unit-c4c2f90c5b532c1ec283d211e0fd50e4538c2a51.tar.bz2 |
Python: ASGI server introduced.
This closes #461 issue on GitHub.
Diffstat (limited to 'src')
-rw-r--r-- | src/python/nxt_python.c | 28 | ||||
-rw-r--r-- | src/python/nxt_python.h | 29 | ||||
-rw-r--r-- | src/python/nxt_python_asgi.c | 1227 | ||||
-rw-r--r-- | src/python/nxt_python_asgi.h | 60 | ||||
-rw-r--r-- | src/python/nxt_python_asgi_http.c | 591 | ||||
-rw-r--r-- | src/python/nxt_python_asgi_lifespan.c | 505 | ||||
-rw-r--r-- | src/python/nxt_python_asgi_str.c | 141 | ||||
-rw-r--r-- | src/python/nxt_python_asgi_str.h | 69 | ||||
-rw-r--r-- | src/python/nxt_python_asgi_websocket.c | 1084 | ||||
-rw-r--r-- | src/python/nxt_python_wsgi.c | 18 |
10 files changed, 3723 insertions, 29 deletions
diff --git a/src/python/nxt_python.c b/src/python/nxt_python.c index 7d4589ed..01534a47 100644 --- a/src/python/nxt_python.c +++ b/src/python/nxt_python.c @@ -15,14 +15,6 @@ #include NXT_PYTHON_MOUNTS_H -#if PY_MAJOR_VERSION == 3 -#define PyString_FromStringAndSize(str, size) \ - PyUnicode_DecodeLatin1((str), (size), "strict") - -#else -#define PyUnicode_InternInPlace PyString_InternInPlace -#endif - static nxt_int_t nxt_python_start(nxt_task_t *task, nxt_process_data_t *data); static void nxt_python_atexit(void); @@ -56,7 +48,7 @@ static char *nxt_py_home; static nxt_int_t nxt_python_start(nxt_task_t *task, nxt_process_data_t *data) { - int rc; + int rc, asgi; char *nxt_py_module; size_t len; PyObject *obj, *pypath, *module; @@ -226,7 +218,15 @@ nxt_python_start(nxt_task_t *task, nxt_process_data_t *data) python_init.shm_limit = data->app->shm_limit; - rc = nxt_python_wsgi_init(task, &python_init); + asgi = nxt_python_asgi_check(nxt_py_application); + + if (asgi) { + rc = nxt_python_asgi_init(task, &python_init); + + } else { + rc = nxt_python_wsgi_init(task, &python_init); + } + if (nxt_slow_path(rc == NXT_ERROR)) { goto fail; } @@ -236,7 +236,12 @@ nxt_python_start(nxt_task_t *task, nxt_process_data_t *data) goto fail; } - rc = nxt_python_wsgi_run(unit_ctx); + if (asgi) { + rc = nxt_python_asgi_run(unit_ctx); + + } else { + rc = nxt_python_wsgi_run(unit_ctx); + } nxt_unit_done(unit_ctx); @@ -300,6 +305,7 @@ static void nxt_python_atexit(void) { nxt_python_wsgi_done(); + nxt_python_asgi_done(); Py_XDECREF(nxt_py_stderr_flush); Py_XDECREF(nxt_py_application); diff --git a/src/python/nxt_python.h b/src/python/nxt_python.h index 417df7fd..3211026b 100644 --- a/src/python/nxt_python.h +++ b/src/python/nxt_python.h @@ -8,9 +8,32 @@ #include <Python.h> +#include <nxt_main.h> #include <nxt_unit.h> +#if PY_MAJOR_VERSION == 3 +#define NXT_PYTHON_BYTES_TYPE "bytestring" + +#define PyString_FromStringAndSize(str, size) \ + PyUnicode_DecodeLatin1((str), (size), "strict") +#define PyString_AS_STRING PyUnicode_DATA + +#else +#define NXT_PYTHON_BYTES_TYPE "string" + +#define PyBytes_FromStringAndSize PyString_FromStringAndSize +#define PyBytes_Check PyString_Check +#define PyBytes_GET_SIZE PyString_GET_SIZE +#define PyBytes_AS_STRING PyString_AS_STRING +#define PyUnicode_InternInPlace PyString_InternInPlace +#define PyUnicode_AsUTF8 PyString_AS_STRING +#endif + +#if PY_MAJOR_VERSION == 3 && PY_MINOR_VERSION >= 5 +#define NXT_HAVE_ASGI 1 +#endif + extern PyObject *nxt_py_application; typedef struct { @@ -18,6 +41,7 @@ typedef struct { PyObject **object_p; } nxt_python_string_t; + nxt_int_t nxt_python_init_strings(nxt_python_string_t *pstr); void nxt_python_done_strings(nxt_python_string_t *pstr); @@ -27,5 +51,10 @@ nxt_int_t nxt_python_wsgi_init(nxt_task_t *task, nxt_unit_init_t *init); int nxt_python_wsgi_run(nxt_unit_ctx_t *ctx); void nxt_python_wsgi_done(void); +int nxt_python_asgi_check(PyObject *obj); +nxt_int_t nxt_python_asgi_init(nxt_task_t *task, nxt_unit_init_t *init); +nxt_int_t nxt_python_asgi_run(nxt_unit_ctx_t *ctx); +void nxt_python_asgi_done(void); + #endif /* _NXT_PYTHON_H_INCLUDED_ */ diff --git a/src/python/nxt_python_asgi.c b/src/python/nxt_python_asgi.c new file mode 100644 index 00000000..72408ea1 --- /dev/null +++ b/src/python/nxt_python_asgi.c @@ -0,0 +1,1227 @@ + +/* + * Copyright (C) NGINX, Inc. + */ + + +#include <python/nxt_python.h> + +#if (NXT_HAVE_ASGI) + +#include <nxt_main.h> +#include <nxt_unit.h> +#include <nxt_unit_request.h> +#include <nxt_unit_response.h> +#include <python/nxt_python_asgi.h> +#include <python/nxt_python_asgi_str.h> + + +static void nxt_py_asgi_request_handler(nxt_unit_request_info_t *req); + +static PyObject *nxt_py_asgi_create_http_scope(nxt_unit_request_info_t *req); +static PyObject *nxt_py_asgi_create_address(nxt_unit_sptr_t *sptr, uint8_t len, + uint16_t port); +static PyObject *nxt_py_asgi_create_header(nxt_unit_field_t *f); +static PyObject *nxt_py_asgi_create_subprotocols(nxt_unit_field_t *f); + +static int nxt_py_asgi_add_port(nxt_unit_ctx_t *ctx, nxt_unit_port_t *port); +static void nxt_py_asgi_remove_port(nxt_unit_t *lib, nxt_unit_port_t *port); +static void nxt_py_asgi_quit(nxt_unit_ctx_t *ctx); +static void nxt_py_asgi_shm_ack_handler(nxt_unit_ctx_t *ctx); + +static PyObject *nxt_py_asgi_port_read(PyObject *self, PyObject *args); + + +PyObject *nxt_py_loop_run_until_complete; +PyObject *nxt_py_loop_create_future; +PyObject *nxt_py_loop_create_task; + +nxt_queue_t nxt_py_asgi_drain_queue; + +static PyObject *nxt_py_loop_call_soon; +static PyObject *nxt_py_quit_future; +static PyObject *nxt_py_quit_future_set_result; +static PyObject *nxt_py_loop_add_reader; +static PyObject *nxt_py_loop_remove_reader; +static PyObject *nxt_py_port_read; + +static PyMethodDef nxt_py_port_read_method = + {"unit_port_read", nxt_py_asgi_port_read, METH_VARARGS, ""}; + +#define NXT_UNIT_HASH_WS_PROTOCOL 0xED0A + + +int +nxt_python_asgi_check(PyObject *obj) +{ + int res; + PyObject *call; + PyCodeObject *code; + + if (PyFunction_Check(obj)) { + code = (PyCodeObject *) PyFunction_GET_CODE(obj); + + return (code->co_flags & CO_COROUTINE) != 0; + } + + if (PyMethod_Check(obj)) { + obj = PyMethod_GET_FUNCTION(obj); + + code = (PyCodeObject *) PyFunction_GET_CODE(obj); + + return (code->co_flags & CO_COROUTINE) != 0; + } + + call = PyObject_GetAttrString(obj, "__call__"); + + if (call == NULL) { + return 0; + } + + if (PyFunction_Check(call)) { + code = (PyCodeObject *) PyFunction_GET_CODE(call); + + res = (code->co_flags & CO_COROUTINE) != 0; + + } else { + if (PyMethod_Check(call)) { + obj = PyMethod_GET_FUNCTION(call); + + code = (PyCodeObject *) PyFunction_GET_CODE(obj); + + res = (code->co_flags & CO_COROUTINE) != 0; + + } else { + res = 0; + } + } + + Py_DECREF(call); + + return res; +} + + +nxt_int_t +nxt_python_asgi_init(nxt_task_t *task, nxt_unit_init_t *init) +{ + PyObject *asyncio, *loop, *get_event_loop; + nxt_int_t rc; + + nxt_debug(task, "asgi_init"); + + if (nxt_slow_path(nxt_py_asgi_str_init() != NXT_OK)) { + nxt_alert(task, "Python failed to init string objects"); + return NXT_ERROR; + } + + asyncio = PyImport_ImportModule("asyncio"); + if (nxt_slow_path(asyncio == NULL)) { + nxt_alert(task, "Python failed to import module 'asyncio'"); + nxt_python_print_exception(); + return NXT_ERROR; + } + + loop = NULL; + get_event_loop = PyDict_GetItemString(PyModule_GetDict(asyncio), + "get_event_loop"); + if (nxt_slow_path(get_event_loop == NULL)) { + nxt_alert(task, + "Python failed to get 'get_event_loop' from module 'asyncio'"); + goto fail; + } + + if (nxt_slow_path(PyCallable_Check(get_event_loop) == 0)) { + nxt_alert(task, "'asyncio.get_event_loop' is not a callable object"); + goto fail; + } + + loop = PyObject_CallObject(get_event_loop, NULL); + if (nxt_slow_path(loop == NULL)) { + nxt_alert(task, "Python failed to call 'asyncio.get_event_loop'"); + goto fail; + } + + nxt_py_loop_create_task = PyObject_GetAttrString(loop, "create_task"); + if (nxt_slow_path(nxt_py_loop_create_task == NULL)) { + nxt_alert(task, "Python failed to get 'loop.create_task'"); + goto fail; + } + + if (nxt_slow_path(PyCallable_Check(nxt_py_loop_create_task) == 0)) { + nxt_alert(task, "'loop.create_task' is not a callable object"); + goto fail; + } + + nxt_py_loop_add_reader = PyObject_GetAttrString(loop, "add_reader"); + if (nxt_slow_path(nxt_py_loop_add_reader == NULL)) { + nxt_alert(task, "Python failed to get 'loop.add_reader'"); + goto fail; + } + + if (nxt_slow_path(PyCallable_Check(nxt_py_loop_add_reader) == 0)) { + nxt_alert(task, "'loop.add_reader' is not a callable object"); + goto fail; + } + + nxt_py_loop_remove_reader = PyObject_GetAttrString(loop, "remove_reader"); + if (nxt_slow_path(nxt_py_loop_remove_reader == NULL)) { + nxt_alert(task, "Python failed to get 'loop.remove_reader'"); + goto fail; + } + + if (nxt_slow_path(PyCallable_Check(nxt_py_loop_remove_reader) == 0)) { + nxt_alert(task, "'loop.remove_reader' is not a callable object"); + goto fail; + } + + nxt_py_loop_call_soon = PyObject_GetAttrString(loop, "call_soon"); + if (nxt_slow_path(nxt_py_loop_call_soon == NULL)) { + nxt_alert(task, "Python failed to get 'loop.call_soon'"); + goto fail; + } + + if (nxt_slow_path(PyCallable_Check(nxt_py_loop_call_soon) == 0)) { + nxt_alert(task, "'loop.call_soon' is not a callable object"); + goto fail; + } + + nxt_py_loop_run_until_complete = PyObject_GetAttrString(loop, + "run_until_complete"); + if (nxt_slow_path(nxt_py_loop_run_until_complete == NULL)) { + nxt_alert(task, "Python failed to get 'loop.run_until_complete'"); + goto fail; + } + + if (nxt_slow_path(PyCallable_Check(nxt_py_loop_run_until_complete) == 0)) { + nxt_alert(task, "'loop.run_until_complete' is not a callable object"); + goto fail; + } + + nxt_py_loop_create_future = PyObject_GetAttrString(loop, "create_future"); + if (nxt_slow_path(nxt_py_loop_create_future == NULL)) { + nxt_alert(task, "Python failed to get 'loop.create_future'"); + goto fail; + } + + if (nxt_slow_path(PyCallable_Check(nxt_py_loop_create_future) == 0)) { + nxt_alert(task, "'loop.create_future' is not a callable object"); + goto fail; + } + + nxt_py_quit_future = PyObject_CallObject(nxt_py_loop_create_future, NULL); + if (nxt_slow_path(nxt_py_quit_future == NULL)) { + nxt_alert(task, "Python failed to create Future "); + nxt_python_print_exception(); + goto fail; + } + + nxt_py_quit_future_set_result = PyObject_GetAttrString(nxt_py_quit_future, + "set_result"); + if (nxt_slow_path(nxt_py_quit_future_set_result == NULL)) { + nxt_alert(task, "Python failed to get 'future.set_result'"); + goto fail; + } + + if (nxt_slow_path(PyCallable_Check(nxt_py_quit_future_set_result) == 0)) { + nxt_alert(task, "'future.set_result' is not a callable object"); + goto fail; + } + + nxt_py_port_read = PyCFunction_New(&nxt_py_port_read_method, NULL); + if (nxt_slow_path(nxt_py_port_read == NULL)) { + nxt_alert(task, "Python failed to initialize the 'port_read' function"); + goto fail; + } + + nxt_queue_init(&nxt_py_asgi_drain_queue); + + if (nxt_slow_path(nxt_py_asgi_http_init(task) == NXT_ERROR)) { + goto fail; + } + + if (nxt_slow_path(nxt_py_asgi_websocket_init(task) == NXT_ERROR)) { + goto fail; + } + + rc = nxt_py_asgi_lifespan_startup(task); + if (nxt_slow_path(rc == NXT_ERROR)) { + goto fail; + } + + init->callbacks.request_handler = nxt_py_asgi_request_handler; + init->callbacks.data_handler = nxt_py_asgi_http_data_handler; + init->callbacks.websocket_handler = nxt_py_asgi_websocket_handler; + init->callbacks.close_handler = nxt_py_asgi_websocket_close_handler; + init->callbacks.quit = nxt_py_asgi_quit; + init->callbacks.shm_ack_handler = nxt_py_asgi_shm_ack_handler; + init->callbacks.add_port = nxt_py_asgi_add_port; + init->callbacks.remove_port = nxt_py_asgi_remove_port; + + Py_DECREF(loop); + Py_DECREF(asyncio); + + return NXT_OK; + +fail: + + Py_XDECREF(loop); + Py_DECREF(asyncio); + + return NXT_ERROR; +} + + +nxt_int_t +nxt_python_asgi_run(nxt_unit_ctx_t *ctx) +{ + PyObject *res; + + res = PyObject_CallFunctionObjArgs(nxt_py_loop_run_until_complete, + nxt_py_quit_future, NULL); + if (nxt_slow_path(res == NULL)) { + nxt_unit_alert(ctx, "Python failed to call loop.run_until_complete"); + nxt_python_print_exception(); + + return NXT_ERROR; + } + + Py_DECREF(res); + + nxt_py_asgi_lifespan_shutdown(); + + return NXT_OK; +} + + +static void +nxt_py_asgi_request_handler(nxt_unit_request_info_t *req) +{ + PyObject *scope, *res, *task, *receive, *send, *done, *asgi; + + if (req->request->websocket_handshake) { + asgi = nxt_py_asgi_websocket_create(req); + + } else { + asgi = nxt_py_asgi_http_create(req); + } + + if (nxt_slow_path(asgi == NULL)) { + nxt_unit_req_alert(req, "Python failed to create asgi object"); + nxt_unit_request_done(req, NXT_UNIT_ERROR); + + return; + } + + receive = PyObject_GetAttrString(asgi, "receive"); + if (nxt_slow_path(receive == NULL)) { + nxt_unit_req_alert(req, "Python failed to get 'receive' method"); + nxt_unit_request_done(req, NXT_UNIT_ERROR); + + goto release_asgi; + } + + send = PyObject_GetAttrString(asgi, "send"); + if (nxt_slow_path(receive == NULL)) { + nxt_unit_req_alert(req, "Python failed to get 'send' method"); + nxt_unit_request_done(req, NXT_UNIT_ERROR); + + goto release_receive; + } + + done = PyObject_GetAttrString(asgi, "_done"); + if (nxt_slow_path(receive == NULL)) { + nxt_unit_req_alert(req, "Python failed to get '_done' method"); + nxt_unit_request_done(req, NXT_UNIT_ERROR); + + goto release_send; + } + + scope = nxt_py_asgi_create_http_scope(req); + if (nxt_slow_path(scope == NULL)) { + nxt_unit_request_done(req, NXT_UNIT_ERROR); + + goto release_done; + } + + req->data = asgi; + + res = PyObject_CallFunctionObjArgs(nxt_py_application, + scope, receive, send, NULL); + if (nxt_slow_path(res == NULL)) { + nxt_unit_req_error(req, "Python failed to call the application"); + nxt_python_print_exception(); + nxt_unit_request_done(req, NXT_UNIT_ERROR); + + goto release_scope; + } + + if (nxt_slow_path(!PyCoro_CheckExact(res))) { + nxt_unit_req_error(req, "Application result type is not a coroutine"); + nxt_unit_request_done(req, NXT_UNIT_ERROR); + + Py_DECREF(res); + + goto release_scope; + } + + task = PyObject_CallFunctionObjArgs(nxt_py_loop_create_task, res, NULL); + if (nxt_slow_path(task == NULL)) { + nxt_unit_req_error(req, "Python failed to call the create_task"); + nxt_python_print_exception(); + nxt_unit_request_done(req, NXT_UNIT_ERROR); + + Py_DECREF(res); + + goto release_scope; + } + + Py_DECREF(res); + + res = PyObject_CallMethodObjArgs(task, nxt_py_add_done_callback_str, done, + NULL); + if (nxt_slow_path(res == NULL)) { + nxt_unit_req_error(req, + "Python failed to call 'task.add_done_callback'"); + nxt_python_print_exception(); + nxt_unit_request_done(req, NXT_UNIT_ERROR); + + goto release_task; + } + + Py_DECREF(res); +release_task: + Py_DECREF(task); +release_scope: + Py_DECREF(scope); +release_done: + Py_DECREF(done); +release_send: + Py_DECREF(send); +release_receive: + Py_DECREF(receive); +release_asgi: + Py_DECREF(asgi); +} + + +static PyObject * +nxt_py_asgi_create_http_scope(nxt_unit_request_info_t *req) +{ + char *p, *target, *query; + uint32_t target_length, i; + PyObject *scope, *v, *type, *scheme; + PyObject *headers, *header; + nxt_unit_field_t *f; + nxt_unit_request_t *r; + + static const nxt_str_t ws_protocol = nxt_string("sec-websocket-protocol"); + +#define SET_ITEM(dict, key, value) \ + if (nxt_slow_path(PyDict_SetItem(dict, nxt_py_ ## key ## _str, value) \ + == -1)) \ + { \ + nxt_unit_req_alert(req, "Python failed to set '" \ + #dict "." #key "' item"); \ + goto fail; \ + } + + v = NULL; + headers = NULL; + + r = req->request; + + if (r->websocket_handshake) { + type = nxt_py_websocket_str; + scheme = r->tls ? nxt_py_wss_str : nxt_py_ws_str; + + } else { + type = nxt_py_http_str; + scheme = r->tls ? nxt_py_https_str : nxt_py_http_str; + } + + scope = nxt_py_asgi_new_scope(req, type, nxt_py_2_1_str); + if (nxt_slow_path(scope == NULL)) { + return NULL; + } + + p = nxt_unit_sptr_get(&r->version); + SET_ITEM(scope, http_version, p[7] == '1' ? nxt_py_1_1_str + : nxt_py_1_0_str) + SET_ITEM(scope, scheme, scheme) + + v = PyString_FromStringAndSize(nxt_unit_sptr_get(&r->method), + r->method_length); + if (nxt_slow_path(v == NULL)) { + nxt_unit_req_alert(req, "Python failed to create 'method' string"); + goto fail; + } + + SET_ITEM(scope, method, v) + Py_DECREF(v); + + v = PyUnicode_DecodeUTF8(nxt_unit_sptr_get(&r->path), r->path_length, + "replace"); + if (nxt_slow_path(v == NULL)) { + nxt_unit_req_alert(req, "Python failed to create 'path' string"); + goto fail; + } + + SET_ITEM(scope, path, v) + Py_DECREF(v); + + target = nxt_unit_sptr_get(&r->target); + query = nxt_unit_sptr_get(&r->query); + + if (r->query.offset != 0) { + target_length = query - target - 1; + + } else { + target_length = r->target_length; + } + + v = PyBytes_FromStringAndSize(target, target_length); + if (nxt_slow_path(v == NULL)) { + nxt_unit_req_alert(req, "Python failed to create 'raw_path' string"); + goto fail; + } + + SET_ITEM(scope, raw_path, v) + Py_DECREF(v); + + v = PyBytes_FromStringAndSize(query, r->query_length); + if (nxt_slow_path(v == NULL)) { + nxt_unit_req_alert(req, "Python failed to create 'query' string"); + goto fail; + } + + SET_ITEM(scope, query_string, v) + Py_DECREF(v); + + v = nxt_py_asgi_create_address(&r->remote, r->remote_length, 0); + if (nxt_slow_path(v == NULL)) { + nxt_unit_req_alert(req, "Python failed to create 'client' pair"); + goto fail; + } + + SET_ITEM(scope, client, v) + Py_DECREF(v); + + v = nxt_py_asgi_create_address(&r->local, r->local_length, 80); + if (nxt_slow_path(v == NULL)) { + nxt_unit_req_alert(req, "Python failed to create 'server' pair"); + goto fail; + } + + SET_ITEM(scope, server, v) + Py_DECREF(v); + + v = NULL; + + headers = PyTuple_New(r->fields_count); + if (nxt_slow_path(headers == NULL)) { + nxt_unit_req_alert(req, "Python failed to create 'headers' object"); + goto fail; + } + + for (i = 0; i < r->fields_count; i++) { + f = r->fields + i; + + header = nxt_py_asgi_create_header(f); + if (nxt_slow_path(header == NULL)) { + nxt_unit_req_alert(req, "Python failed to create 'header' pair"); + goto fail; + } + + PyTuple_SET_ITEM(headers, i, header); + + if (f->hash == NXT_UNIT_HASH_WS_PROTOCOL + && f->name_length == ws_protocol.length + && f->value_length > 0 + && r->websocket_handshake) + { + v = nxt_py_asgi_create_subprotocols(f); + if (nxt_slow_path(v == NULL)) { + nxt_unit_req_alert(req, "Failed to create subprotocols"); + goto fail; + } + + SET_ITEM(scope, subprotocols, v); + Py_DECREF(v); + } + } + + SET_ITEM(scope, headers, headers) + Py_DECREF(headers); + + return scope; + +fail: + + Py_XDECREF(v); + Py_XDECREF(headers); + Py_DECREF(scope); + + return NULL; + +#undef SET_ITEM +} + + +static PyObject * +nxt_py_asgi_create_address(nxt_unit_sptr_t *sptr, uint8_t len, uint16_t port) +{ + char *p, *s; + PyObject *pair, *v; + + pair = PyTuple_New(2); + if (nxt_slow_path(pair == NULL)) { + return NULL; + } + + p = nxt_unit_sptr_get(sptr); + s = memchr(p, ':', len); + + v = PyString_FromStringAndSize(p, s == NULL ? len : s - p); + if (nxt_slow_path(v == NULL)) { + Py_DECREF(pair); + + return NULL; + } + + PyTuple_SET_ITEM(pair, 0, v); + + if (s != NULL) { + p += len; + v = PyLong_FromString(s + 1, &p, 10); + + } else { + v = PyLong_FromLong(port); + } + + if (nxt_slow_path(v == NULL)) { + Py_DECREF(pair); + + return NULL; + } + + PyTuple_SET_ITEM(pair, 1, v); + + return pair; +} + + +static PyObject * +nxt_py_asgi_create_header(nxt_unit_field_t *f) +{ + char c, *name; + uint8_t pos; + PyObject *header, *v; + + header = PyTuple_New(2); + if (nxt_slow_path(header == NULL)) { + return NULL; + } + + name = nxt_unit_sptr_get(&f->name); + + for (pos = 0; pos < f->name_length; pos++) { + c = name[pos]; + if (c >= 'A' && c <= 'Z') { + name[pos] = (c | 0x20); + } + } + + v = PyBytes_FromStringAndSize(name, f->name_length); + if (nxt_slow_path(v == NULL)) { + Py_DECREF(header); + + return NULL; + } + + PyTuple_SET_ITEM(header, 0, v); + + v = PyBytes_FromStringAndSize(nxt_unit_sptr_get(&f->value), + f->value_length); + if (nxt_slow_path(v == NULL)) { + Py_DECREF(header); + + return NULL; + } + + PyTuple_SET_ITEM(header, 1, v); + + return header; +} + + +static PyObject * +nxt_py_asgi_create_subprotocols(nxt_unit_field_t *f) +{ + char *v; + uint32_t i, n, start; + PyObject *res, *proto; + + v = nxt_unit_sptr_get(&f->value); + n = 1; + + for (i = 0; i < f->value_length; i++) { + if (v[i] == ',') { + n++; + } + } + + res = PyTuple_New(n); + if (nxt_slow_path(res == NULL)) { + return NULL; + } + + n = 0; + start = 0; + + for (i = 0; i < f->value_length; ) { + if (v[i] != ',') { + i++; + + continue; + } + + if (i - start > 0) { + proto = PyString_FromStringAndSize(v + start, i - start); + if (nxt_slow_path(proto == NULL)) { + goto fail; + } + + PyTuple_SET_ITEM(res, n, proto); + + n++; + } + + do { + i++; + } while (i < f->value_length && v[i] == ' '); + + start = i; + } + + if (i - start > 0) { + proto = PyString_FromStringAndSize(v + start, i - start); + if (nxt_slow_path(proto == NULL)) { + goto fail; + } + + PyTuple_SET_ITEM(res, n, proto); + } + + return res; + +fail: + + Py_DECREF(res); + + return NULL; +} + + +static int +nxt_py_asgi_add_port(nxt_unit_ctx_t *ctx, nxt_unit_port_t *port) +{ + int nb; + PyObject *res; + + if (port->in_fd == -1) { + return NXT_UNIT_OK; + } + + nb = 1; + + if (nxt_slow_path(ioctl(port->in_fd, FIONBIO, &nb) == -1)) { + nxt_unit_alert(ctx, "ioctl(%d, FIONBIO, 0) failed: %s (%d)", + port->in_fd, strerror(errno), errno); + + return NXT_UNIT_ERROR; + } + + nxt_unit_debug(ctx, "asgi_add_port %d %p %p", port->in_fd, ctx, port); + + res = PyObject_CallFunctionObjArgs(nxt_py_loop_add_reader, + PyLong_FromLong(port->in_fd), + nxt_py_port_read, + PyLong_FromVoidPtr(ctx), + PyLong_FromVoidPtr(port), NULL); + if (nxt_slow_path(res == NULL)) { + nxt_unit_alert(ctx, "Python failed to add_reader"); + + return NXT_UNIT_ERROR; + } + + Py_DECREF(res); + + return NXT_UNIT_OK; +} + + +static void +nxt_py_asgi_remove_port(nxt_unit_t *lib, nxt_unit_port_t *port) +{ + PyObject *res; + + nxt_unit_debug(NULL, "asgi_remove_port %d %p", port->in_fd, port); + + if (port->in_fd == -1) { + return; + } + + res = PyObject_CallFunctionObjArgs(nxt_py_loop_remove_reader, + PyLong_FromLong(port->in_fd), NULL); + if (nxt_slow_path(res == NULL)) { + nxt_unit_alert(NULL, "Python failed to remove_reader"); + } + + Py_DECREF(res); +} + + +static void +nxt_py_asgi_quit(nxt_unit_ctx_t *ctx) +{ + PyObject *res; + + nxt_unit_debug(ctx, "asgi_quit %p", ctx); + + res = PyObject_CallFunctionObjArgs(nxt_py_quit_future_set_result, + PyLong_FromLong(0), NULL); + if (nxt_slow_path(res == NULL)) { + nxt_unit_alert(ctx, "Python failed to set_result"); + } + + Py_DECREF(res); +} + + +static void +nxt_py_asgi_shm_ack_handler(nxt_unit_ctx_t *ctx) +{ + int rc; + nxt_queue_link_t *lnk; + + while (!nxt_queue_is_empty(&nxt_py_asgi_drain_queue)) { + lnk = nxt_queue_first(&nxt_py_asgi_drain_queue); + + rc = nxt_py_asgi_http_drain(lnk); + if (rc == NXT_UNIT_AGAIN) { + break; + } + + nxt_queue_remove(lnk); + } +} + + +static PyObject * +nxt_py_asgi_port_read(PyObject *self, PyObject *args) +{ + int rc; + PyObject *arg; + Py_ssize_t n; + nxt_unit_ctx_t *ctx; + nxt_unit_port_t *port; + + n = PyTuple_GET_SIZE(args); + + if (n != 2) { + nxt_unit_alert(NULL, + "nxt_py_asgi_port_read: invalid number of arguments %d", + (int) n); + + return PyErr_Format(PyExc_TypeError, "invalid number of arguments"); + } + + arg = PyTuple_GET_ITEM(args, 0); + if (nxt_slow_path(arg == NULL || PyLong_Check(arg) == 0)) { + return PyErr_Format(PyExc_TypeError, + "the first argument is not a long"); + } + + ctx = PyLong_AsVoidPtr(arg); + + arg = PyTuple_GET_ITEM(args, 1); + if (nxt_slow_path(arg == NULL || PyLong_Check(arg) == 0)) { + return PyErr_Format(PyExc_TypeError, + "the second argument is not a long"); + } + + port = PyLong_AsVoidPtr(arg); + + nxt_unit_debug(ctx, "asgi_port_read %p %p", ctx, port); + + rc = nxt_unit_process_port_msg(ctx, port); + + if (nxt_slow_path(rc == NXT_UNIT_ERROR)) { + return PyErr_Format(PyExc_RuntimeError, + "error processing port message"); + } + + Py_RETURN_NONE; +} + + +PyObject * +nxt_py_asgi_enum_headers(PyObject *headers, nxt_py_asgi_enum_header_cb cb, + void *data) +{ + int i; + PyObject *iter, *header, *h_iter, *name, *val, *res; + + iter = PyObject_GetIter(headers); + if (nxt_slow_path(iter == NULL)) { + return PyErr_Format(PyExc_TypeError, "'headers' is not an iterable"); + } + + for (i = 0; /* void */; i++) { + header = PyIter_Next(iter); + if (header == NULL) { + break; + } + + h_iter = PyObject_GetIter(header); + if (nxt_slow_path(h_iter == NULL)) { + Py_DECREF(header); + Py_DECREF(iter); + + return PyErr_Format(PyExc_TypeError, + "'headers' item #%d is not an iterable", i); + } + + name = PyIter_Next(h_iter); + if (nxt_slow_path(name == NULL || !PyBytes_Check(name))) { + Py_XDECREF(name); + Py_DECREF(h_iter); + Py_DECREF(header); + Py_DECREF(iter); + + return PyErr_Format(PyExc_TypeError, + "'headers' item #%d 'name' is not a byte string", i); + } + + val = PyIter_Next(h_iter); + if (nxt_slow_path(val == NULL || !PyBytes_Check(val))) { + Py_XDECREF(val); + Py_DECREF(h_iter); + Py_DECREF(header); + Py_DECREF(iter); + + return PyErr_Format(PyExc_TypeError, + "'headers' item #%d 'value' is not a byte string", i); + } + + res = cb(data, i, name, val); + + Py_DECREF(name); + Py_DECREF(val); + Py_DECREF(h_iter); + Py_DECREF(header); + + if (nxt_slow_path(res == NULL)) { + Py_DECREF(iter); + + return NULL; + } + + Py_DECREF(res); + } + + Py_DECREF(iter); + + Py_RETURN_NONE; +} + + +PyObject * +nxt_py_asgi_calc_size(void *data, int i, PyObject *name, PyObject *val) +{ + nxt_py_asgi_calc_size_ctx_t *ctx; + + ctx = data; + + ctx->fields_count++; + ctx->fields_size += PyBytes_GET_SIZE(name) + PyBytes_GET_SIZE(val); + + Py_RETURN_NONE; +} + + +PyObject * +nxt_py_asgi_add_field(void *data, int i, PyObject *name, PyObject *val) +{ + int rc; + char *name_str, *val_str; + uint32_t name_len, val_len; + nxt_off_t content_length; + nxt_unit_request_info_t *req; + nxt_py_asgi_add_field_ctx_t *ctx; + + name_str = PyBytes_AS_STRING(name); + name_len = PyBytes_GET_SIZE(name); + + val_str = PyBytes_AS_STRING(val); + val_len = PyBytes_GET_SIZE(val); + + ctx = data; + req = ctx->req; + + rc = nxt_unit_response_add_field(req, name_str, name_len, + val_str, val_len); + if (nxt_slow_path(rc != NXT_UNIT_OK)) { + return PyErr_Format(PyExc_RuntimeError, + "failed to add header #%d", i); + } + + if (req->response->fields[i].hash == NXT_UNIT_HASH_CONTENT_LENGTH) { + content_length = nxt_off_t_parse((u_char *) val_str, val_len); + if (nxt_slow_path(content_length < 0)) { + nxt_unit_req_error(req, "failed to parse Content-Length " + "value %.*s", (int) val_len, val_str); + + return PyErr_Format(PyExc_ValueError, + "Failed to parse Content-Length: '%.*s'", + (int) val_len, val_str); + } + + ctx->content_length = content_length; + } + + Py_RETURN_NONE; +} + + +PyObject * +nxt_py_asgi_set_result_soon(nxt_unit_request_info_t *req, PyObject *future, + PyObject *result) +{ + PyObject *set_result, *res; + + if (nxt_slow_path(result == NULL)) { + Py_DECREF(future); + + return NULL; + } + + set_result = PyObject_GetAttrString(future, "set_result"); + if (nxt_slow_path(set_result == NULL)) { + nxt_unit_req_alert(req, "failed to get 'set_result' for future"); + + Py_CLEAR(future); + + goto cleanup; + } + + if (nxt_slow_path(PyCallable_Check(set_result) == 0)) { + nxt_unit_req_alert(req, "'future.set_result' is not a callable"); + + Py_CLEAR(future); + + goto cleanup; + } + + res = PyObject_CallFunctionObjArgs(nxt_py_loop_call_soon, set_result, + result, NULL); + if (nxt_slow_path(res == NULL)) { + nxt_unit_req_alert(req, "Python failed to call 'loop.call_soon'"); + nxt_python_print_exception(); + + Py_CLEAR(future); + } + + Py_XDECREF(res); + +cleanup: + + Py_DECREF(set_result); + Py_DECREF(result); + + return future; +} + + +PyObject * +nxt_py_asgi_new_msg(nxt_unit_request_info_t *req, PyObject *type) +{ + PyObject *msg; + + msg = PyDict_New(); + if (nxt_slow_path(msg == NULL)) { + nxt_unit_req_alert(req, "Python failed to create message dict"); + nxt_python_print_exception(); + + return PyErr_Format(PyExc_RuntimeError, + "failed to create message dict"); + } + + if (nxt_slow_path(PyDict_SetItem(msg, nxt_py_type_str, type) == -1)) { + nxt_unit_req_alert(req, "Python failed to set 'msg.type' item"); + + Py_DECREF(msg); + + return PyErr_Format(PyExc_RuntimeError, + "failed to set 'msg.type' item"); + } + + return msg; +} + + +PyObject * +nxt_py_asgi_new_scope(nxt_unit_request_info_t *req, PyObject *type, + PyObject *spec_version) +{ + PyObject *scope, *asgi; + + scope = PyDict_New(); + if (nxt_slow_path(scope == NULL)) { + nxt_unit_req_alert(req, "Python failed to create 'scope' dict"); + nxt_python_print_exception(); + + return PyErr_Format(PyExc_RuntimeError, + "failed to create 'scope' dict"); + } + + if (nxt_slow_path(PyDict_SetItem(scope, nxt_py_type_str, type) == -1)) { + nxt_unit_req_alert(req, "Python failed to set 'scope.type' item"); + + Py_DECREF(scope); + + return PyErr_Format(PyExc_RuntimeError, + "failed to set 'scope.type' item"); + } + + asgi = PyDict_New(); + if (nxt_slow_path(asgi == NULL)) { + nxt_unit_req_alert(req, "Python failed to create 'asgi' dict"); + nxt_python_print_exception(); + + Py_DECREF(scope); + + return PyErr_Format(PyExc_RuntimeError, + "failed to create 'asgi' dict"); + } + + if (nxt_slow_path(PyDict_SetItem(scope, nxt_py_asgi_str, asgi) == -1)) { + nxt_unit_req_alert(req, "Python failed to set 'scope.asgi' item"); + + Py_DECREF(asgi); + Py_DECREF(scope); + + return PyErr_Format(PyExc_RuntimeError, + "failed to set 'scope.asgi' item"); + } + + if (nxt_slow_path(PyDict_SetItem(asgi, nxt_py_version_str, + nxt_py_3_0_str) == -1)) + { + nxt_unit_req_alert(req, "Python failed to set 'asgi.version' item"); + + Py_DECREF(asgi); + Py_DECREF(scope); + + return PyErr_Format(PyExc_RuntimeError, + "failed to set 'asgi.version' item"); + } + + if (nxt_slow_path(PyDict_SetItem(asgi, nxt_py_spec_version_str, + spec_version) == -1)) + { + nxt_unit_req_alert(req, + "Python failed to set 'asgi.spec_version' item"); + + Py_DECREF(asgi); + Py_DECREF(scope); + + return PyErr_Format(PyExc_RuntimeError, + "failed to set 'asgi.spec_version' item"); + } + + Py_DECREF(asgi); + + return scope; +} + + +void +nxt_py_asgi_dealloc(PyObject *self) +{ + PyObject_Del(self); +} + + +PyObject * +nxt_py_asgi_await(PyObject *self) +{ + Py_INCREF(self); + return self; +} + + +PyObject * +nxt_py_asgi_iter(PyObject *self) +{ + Py_INCREF(self); + return self; +} + + +PyObject * +nxt_py_asgi_next(PyObject *self) +{ + return NULL; +} + + +void +nxt_python_asgi_done(void) +{ + nxt_py_asgi_str_done(); + + Py_XDECREF(nxt_py_quit_future); + Py_XDECREF(nxt_py_quit_future_set_result); + Py_XDECREF(nxt_py_loop_run_until_complete); + Py_XDECREF(nxt_py_loop_create_future); + Py_XDECREF(nxt_py_loop_create_task); + Py_XDECREF(nxt_py_loop_call_soon); + Py_XDECREF(nxt_py_loop_add_reader); + Py_XDECREF(nxt_py_loop_remove_reader); + Py_XDECREF(nxt_py_port_read); +} + +#else /* !(NXT_HAVE_ASGI) */ + + +int +nxt_python_asgi_check(PyObject *obj) +{ + return 0; +} + + +nxt_int_t +nxt_python_asgi_init(nxt_task_t *task, nxt_unit_init_t *init) +{ + nxt_alert(task, "ASGI not implemented"); + return NXT_ERROR; +} + + +nxt_int_t +nxt_python_asgi_run(nxt_unit_ctx_t *ctx) +{ + nxt_unit_alert(ctx, "ASGI not implemented"); + return NXT_ERROR; +} + + +void +nxt_python_asgi_done(void) +{ +} + +#endif /* NXT_HAVE_ASGI */ diff --git a/src/python/nxt_python_asgi.h b/src/python/nxt_python_asgi.h new file mode 100644 index 00000000..24337c37 --- /dev/null +++ b/src/python/nxt_python_asgi.h @@ -0,0 +1,60 @@ + +/* + * Copyright (C) NGINX, Inc. + */ + +#ifndef _NXT_PYTHON_ASGI_H_INCLUDED_ +#define _NXT_PYTHON_ASGI_H_INCLUDED_ + + +typedef PyObject * (*nxt_py_asgi_enum_header_cb)(void *ctx, int i, + PyObject *name, PyObject *val); + +typedef struct { + uint32_t fields_count; + uint32_t fields_size; +} nxt_py_asgi_calc_size_ctx_t; + +typedef struct { + nxt_unit_request_info_t *req; + uint64_t content_length; +} nxt_py_asgi_add_field_ctx_t; + +PyObject *nxt_py_asgi_enum_headers(PyObject *headers, + nxt_py_asgi_enum_header_cb cb, void *data); + +PyObject *nxt_py_asgi_calc_size(void *data, int i, PyObject *n, PyObject *v); +PyObject *nxt_py_asgi_add_field(void *data, int i, PyObject *n, PyObject *v); + +PyObject *nxt_py_asgi_set_result_soon(nxt_unit_request_info_t *req, + PyObject *future, PyObject *result); +PyObject *nxt_py_asgi_new_msg(nxt_unit_request_info_t *req, PyObject *type); +PyObject *nxt_py_asgi_new_scope(nxt_unit_request_info_t *req, PyObject *type, + PyObject *spec_version); + +void nxt_py_asgi_dealloc(PyObject *self); +PyObject *nxt_py_asgi_await(PyObject *self); +PyObject *nxt_py_asgi_iter(PyObject *self); +PyObject *nxt_py_asgi_next(PyObject *self); + +nxt_int_t nxt_py_asgi_http_init(nxt_task_t *task); +PyObject *nxt_py_asgi_http_create(nxt_unit_request_info_t *req); +void nxt_py_asgi_http_data_handler(nxt_unit_request_info_t *req); +int nxt_py_asgi_http_drain(nxt_queue_link_t *lnk); + +nxt_int_t nxt_py_asgi_websocket_init(nxt_task_t *task); +PyObject *nxt_py_asgi_websocket_create(nxt_unit_request_info_t *req); +void nxt_py_asgi_websocket_handler(nxt_unit_websocket_frame_t *ws); +void nxt_py_asgi_websocket_close_handler(nxt_unit_request_info_t *req); + +nxt_int_t nxt_py_asgi_lifespan_startup(nxt_task_t *task); +nxt_int_t nxt_py_asgi_lifespan_shutdown(void); + +extern PyObject *nxt_py_loop_run_until_complete; +extern PyObject *nxt_py_loop_create_future; +extern PyObject *nxt_py_loop_create_task; + +extern nxt_queue_t nxt_py_asgi_drain_queue; + + +#endif /* _NXT_PYTHON_ASGI_H_INCLUDED_ */ diff --git a/src/python/nxt_python_asgi_http.c b/src/python/nxt_python_asgi_http.c new file mode 100644 index 00000000..b07d61d6 --- /dev/null +++ b/src/python/nxt_python_asgi_http.c @@ -0,0 +1,591 @@ + +/* + * Copyright (C) NGINX, Inc. + */ + + +#include <python/nxt_python.h> + +#if (NXT_HAVE_ASGI) + +#include <nxt_main.h> +#include <nxt_unit.h> +#include <nxt_unit_request.h> +#include <python/nxt_python_asgi.h> +#include <python/nxt_python_asgi_str.h> + + +typedef struct { + PyObject_HEAD + nxt_unit_request_info_t *req; + nxt_queue_link_t link; + PyObject *receive_future; + PyObject *send_future; + uint64_t content_length; + uint64_t bytes_sent; + int complete; + PyObject *send_body; + Py_ssize_t send_body_off; +} nxt_py_asgi_http_t; + + +static PyObject *nxt_py_asgi_http_receive(PyObject *self, PyObject *none); +static PyObject *nxt_py_asgi_http_read_msg(nxt_py_asgi_http_t *http); +static PyObject *nxt_py_asgi_http_send(PyObject *self, PyObject *dict); +static PyObject *nxt_py_asgi_http_response_start(nxt_py_asgi_http_t *http, + PyObject *dict); +static PyObject *nxt_py_asgi_http_response_body(nxt_py_asgi_http_t *http, + PyObject *dict); +static PyObject *nxt_py_asgi_http_done(PyObject *self, PyObject *future); + + +static PyMethodDef nxt_py_asgi_http_methods[] = { + { "receive", nxt_py_asgi_http_receive, METH_NOARGS, 0 }, + { "send", nxt_py_asgi_http_send, METH_O, 0 }, + { "_done", nxt_py_asgi_http_done, METH_O, 0 }, + { NULL, NULL, 0, 0 } +}; + +static PyAsyncMethods nxt_py_asgi_async_methods = { + .am_await = nxt_py_asgi_await, +}; + +static PyTypeObject nxt_py_asgi_http_type = { + PyVarObject_HEAD_INIT(NULL, 0) + + .tp_name = "unit._asgi_http", + .tp_basicsize = sizeof(nxt_py_asgi_http_t), + .tp_dealloc = nxt_py_asgi_dealloc, + .tp_as_async = &nxt_py_asgi_async_methods, + .tp_flags = Py_TPFLAGS_DEFAULT, + .tp_doc = "unit ASGI HTTP request object", + .tp_iter = nxt_py_asgi_iter, + .tp_iternext = nxt_py_asgi_next, + .tp_methods = nxt_py_asgi_http_methods, +}; + +static Py_ssize_t nxt_py_asgi_http_body_buf_size = 32 * 1024 * 1024; + + +nxt_int_t +nxt_py_asgi_http_init(nxt_task_t *task) +{ + if (nxt_slow_path(PyType_Ready(&nxt_py_asgi_http_type) != 0)) { + nxt_alert(task, "Python failed to initialize the 'http' type object"); + return NXT_ERROR; + } + + return NXT_OK; +} + + +PyObject * +nxt_py_asgi_http_create(nxt_unit_request_info_t *req) +{ + nxt_py_asgi_http_t *http; + + http = PyObject_New(nxt_py_asgi_http_t, &nxt_py_asgi_http_type); + + if (nxt_fast_path(http != NULL)) { + http->req = req; + http->receive_future = NULL; + http->send_future = NULL; + http->content_length = -1; + http->bytes_sent = 0; + http->complete = 0; + http->send_body = NULL; + http->send_body_off = 0; + } + + return (PyObject *) http; +} + + +static PyObject * +nxt_py_asgi_http_receive(PyObject *self, PyObject *none) +{ + PyObject *msg, *future; + nxt_py_asgi_http_t *http; + nxt_unit_request_info_t *req; + + http = (nxt_py_asgi_http_t *) self; + req = http->req; + + nxt_unit_req_debug(req, "asgi_http_receive"); + + msg = nxt_py_asgi_http_read_msg(http); + if (nxt_slow_path(msg == NULL)) { + return NULL; + } + + future = PyObject_CallObject(nxt_py_loop_create_future, NULL); + if (nxt_slow_path(future == NULL)) { + nxt_unit_req_alert(req, "Python failed to create Future object"); + nxt_python_print_exception(); + + Py_DECREF(msg); + + return PyErr_Format(PyExc_RuntimeError, + "failed to create Future object"); + } + + if (msg != Py_None) { + return nxt_py_asgi_set_result_soon(req, future, msg); + } + + http->receive_future = future; + Py_INCREF(http->receive_future); + + Py_DECREF(msg); + + return future; +} + + +static PyObject * +nxt_py_asgi_http_read_msg(nxt_py_asgi_http_t *http) +{ + char *body_buf; + ssize_t read_res; + PyObject *msg, *body; + Py_ssize_t size; + nxt_unit_request_info_t *req; + + req = http->req; + + size = req->content_length; + + if (size > nxt_py_asgi_http_body_buf_size) { + size = nxt_py_asgi_http_body_buf_size; + } + + if (size > 0) { + body = PyBytes_FromStringAndSize(NULL, size); + if (nxt_slow_path(body == NULL)) { + nxt_unit_req_alert(req, "Python failed to create body byte string"); + nxt_python_print_exception(); + + return PyErr_Format(PyExc_RuntimeError, + "failed to create Bytes object"); + } + + body_buf = PyBytes_AS_STRING(body); + + read_res = nxt_unit_request_read(req, body_buf, size); + + } else { + body = NULL; + read_res = 0; + } + + if (read_res > 0 || read_res == size) { + msg = nxt_py_asgi_new_msg(req, nxt_py_http_request_str); + if (nxt_slow_path(msg == NULL)) { + Py_XDECREF(body); + + return NULL; + } + +#define SET_ITEM(dict, key, value) \ + if (nxt_slow_path(PyDict_SetItem(dict, nxt_py_ ## key ## _str, value) \ + == -1)) \ + { \ + nxt_unit_req_alert(req, \ + "Python failed to set '" #dict "." #key "' item"); \ + PyErr_SetString(PyExc_RuntimeError, \ + "Python failed to set '" #dict "." #key "' item"); \ + goto fail; \ + } + + if (body != NULL) { + SET_ITEM(msg, body, body) + } + + if (req->content_length > 0) { + SET_ITEM(msg, more_body, Py_True) + } + +#undef SET_ITEM + + Py_XDECREF(body); + + return msg; + } + + Py_XDECREF(body); + + Py_RETURN_NONE; + +fail: + + Py_DECREF(msg); + Py_XDECREF(body); + + return NULL; +} + + +static PyObject * +nxt_py_asgi_http_send(PyObject *self, PyObject *dict) +{ + PyObject *type; + const char *type_str; + Py_ssize_t type_len; + nxt_py_asgi_http_t *http; + + static const nxt_str_t response_start = nxt_string("http.response.start"); + static const nxt_str_t response_body = nxt_string("http.response.body"); + + http = (nxt_py_asgi_http_t *) self; + + type = PyDict_GetItem(dict, nxt_py_type_str); + if (nxt_slow_path(type == NULL || !PyUnicode_Check(type))) { + nxt_unit_req_error(http->req, "asgi_http_send: " + "'type' is not a unicode string"); + return PyErr_Format(PyExc_TypeError, "'type' is not a unicode string"); + } + + type_str = PyUnicode_AsUTF8AndSize(type, &type_len); + + nxt_unit_req_debug(http->req, "asgi_http_send type is '%.*s'", + (int) type_len, type_str); + + if (type_len == (Py_ssize_t) response_start.length + && memcmp(type_str, response_start.start, type_len) == 0) + { + return nxt_py_asgi_http_response_start(http, dict); + } + + if (type_len == (Py_ssize_t) response_body.length + && memcmp(type_str, response_body.start, type_len) == 0) + { + return nxt_py_asgi_http_response_body(http, dict); + } + + nxt_unit_req_error(http->req, "asgi_http_send: unexpected 'type': '%.*s'", + (int) type_len, type_str); + + return PyErr_Format(PyExc_AssertionError, "unexpected 'type': '%U'", type); +} + + +static PyObject * +nxt_py_asgi_http_response_start(nxt_py_asgi_http_t *http, PyObject *dict) +{ + int rc; + PyObject *status, *headers, *res; + nxt_py_asgi_calc_size_ctx_t calc_size_ctx; + nxt_py_asgi_add_field_ctx_t add_field_ctx; + + status = PyDict_GetItem(dict, nxt_py_status_str); + if (nxt_slow_path(status == NULL || !PyLong_Check(status))) { + nxt_unit_req_error(http->req, "asgi_http_response_start: " + "'status' is not an integer"); + return PyErr_Format(PyExc_TypeError, "'status' is not an integer"); + } + + calc_size_ctx.fields_size = 0; + calc_size_ctx.fields_count = 0; + + headers = PyDict_GetItem(dict, nxt_py_headers_str); + if (headers != NULL) { + res = nxt_py_asgi_enum_headers(headers, nxt_py_asgi_calc_size, + &calc_size_ctx); + if (nxt_slow_path(res == NULL)) { + return NULL; + } + + Py_DECREF(res); + } + + rc = nxt_unit_response_init(http->req, PyLong_AsLong(status), + calc_size_ctx.fields_count, + calc_size_ctx.fields_size); + if (nxt_slow_path(rc != NXT_UNIT_OK)) { + return PyErr_Format(PyExc_RuntimeError, + "failed to allocate response object"); + } + + add_field_ctx.req = http->req; + add_field_ctx.content_length = -1; + + if (headers != NULL) { + res = nxt_py_asgi_enum_headers(headers, nxt_py_asgi_add_field, + &add_field_ctx); + if (nxt_slow_path(res == NULL)) { + return NULL; + } + + Py_DECREF(res); + } + + http->content_length = add_field_ctx.content_length; + + Py_INCREF(http); + return (PyObject *) http; +} + + +static PyObject * +nxt_py_asgi_http_response_body(nxt_py_asgi_http_t *http, PyObject *dict) +{ + int rc; + char *body_str; + ssize_t sent; + PyObject *body, *more_body, *future; + Py_ssize_t body_len, body_off; + + body = PyDict_GetItem(dict, nxt_py_body_str); + if (nxt_slow_path(body != NULL && !PyBytes_Check(body))) { + return PyErr_Format(PyExc_TypeError, "'body' is not a byte string"); + } + + more_body = PyDict_GetItem(dict, nxt_py_more_body_str); + if (nxt_slow_path(more_body != NULL && !PyBool_Check(more_body))) { + return PyErr_Format(PyExc_TypeError, "'more_body' is not a bool"); + } + + if (nxt_slow_path(http->complete)) { + return PyErr_Format(PyExc_RuntimeError, + "Unexpected ASGI message 'http.response.body' " + "sent, after response already completed"); + } + + if (nxt_slow_path(http->send_future != NULL)) { + return PyErr_Format(PyExc_RuntimeError, "Concurrent send"); + } + + if (body != NULL) { + body_str = PyBytes_AS_STRING(body); + body_len = PyBytes_GET_SIZE(body); + + nxt_unit_req_debug(http->req, "asgi_http_response_body: %d, %d", + (int) body_len, (more_body == Py_True) ); + + if (nxt_slow_path(http->bytes_sent + body_len + > http->content_length)) + { + return PyErr_Format(PyExc_RuntimeError, + "Response content longer than Content-Length"); + } + + body_off = 0; + + while (body_len > 0) { + sent = nxt_unit_response_write_nb(http->req, body_str, body_len, 0); + if (nxt_slow_path(sent < 0)) { + return PyErr_Format(PyExc_RuntimeError, "failed to send body"); + } + + if (nxt_slow_path(sent == 0)) { + nxt_unit_req_debug(http->req, "asgi_http_response_body: " + "out of shared memory, %d", + (int) body_len); + + future = PyObject_CallObject(nxt_py_loop_create_future, NULL); + if (nxt_slow_path(future == NULL)) { + nxt_unit_req_alert(http->req, + "Python failed to create Future object"); + nxt_python_print_exception(); + + return PyErr_Format(PyExc_RuntimeError, + "failed to create Future object"); + } + + http->send_body = body; + Py_INCREF(http->send_body); + http->send_body_off = body_off; + + nxt_queue_insert_tail(&nxt_py_asgi_drain_queue, &http->link); + + http->send_future = future; + Py_INCREF(http->send_future); + + return future; + } + + body_str += sent; + body_len -= sent; + body_off += sent; + http->bytes_sent += sent; + } + + } else { + nxt_unit_req_debug(http->req, "asgi_http_response_body: 0, %d", + (more_body == Py_True) ); + + if (!nxt_unit_response_is_sent(http->req)) { + rc = nxt_unit_response_send(http->req); + if (nxt_slow_path(rc != NXT_UNIT_OK)) { + return PyErr_Format(PyExc_RuntimeError, + "failed to send response"); + } + } + } + + if (more_body == NULL || more_body == Py_False) { + http->complete = 1; + } + + Py_INCREF(http); + return (PyObject *) http; +} + + +void +nxt_py_asgi_http_data_handler(nxt_unit_request_info_t *req) +{ + PyObject *msg, *future, *res; + nxt_py_asgi_http_t *http; + + http = req->data; + + nxt_unit_req_debug(req, "asgi_http_data_handler"); + + if (http->receive_future == NULL) { + return; + } + + msg = nxt_py_asgi_http_read_msg(http); + if (nxt_slow_path(msg == NULL)) { + return; + } + + if (msg == Py_None) { + Py_DECREF(msg); + return; + } + + future = http->receive_future; + http->receive_future = NULL; + + res = PyObject_CallMethodObjArgs(future, nxt_py_set_result_str, msg, NULL); + if (nxt_slow_path(res == NULL)) { + nxt_unit_req_alert(req, "'set_result' call failed"); + nxt_python_print_exception(); + } + + Py_XDECREF(res); + Py_DECREF(future); + + Py_DECREF(msg); +} + + +int +nxt_py_asgi_http_drain(nxt_queue_link_t *lnk) +{ + char *body_str; + ssize_t sent; + PyObject *future, *exc, *res; + Py_ssize_t body_len; + nxt_py_asgi_http_t *http; + + http = nxt_container_of(lnk, nxt_py_asgi_http_t, link); + + body_str = PyBytes_AS_STRING(http->send_body) + http->send_body_off; + body_len = PyBytes_GET_SIZE(http->send_body) - http->send_body_off; + + nxt_unit_req_debug(http->req, "asgi_http_drain: %d", (int) body_len); + + while (body_len > 0) { + sent = nxt_unit_response_write_nb(http->req, body_str, body_len, 0); + if (nxt_slow_path(sent < 0)) { + goto fail; + } + + if (nxt_slow_path(sent == 0)) { + return NXT_UNIT_AGAIN; + } + + body_str += sent; + body_len -= sent; + + http->send_body_off += sent; + http->bytes_sent += sent; + } + + Py_CLEAR(http->send_body); + + future = http->send_future; + http->send_future = NULL; + + res = PyObject_CallMethodObjArgs(future, nxt_py_set_result_str, Py_None, + NULL); + if (nxt_slow_path(res == NULL)) { + nxt_unit_req_alert(http->req, "'set_result' call failed"); + nxt_python_print_exception(); + } + + Py_XDECREF(res); + Py_DECREF(future); + + return NXT_UNIT_OK; + +fail: + + exc = PyObject_CallFunctionObjArgs(PyExc_RuntimeError, + nxt_py_failed_to_send_body_str, + NULL); + if (nxt_slow_path(exc == NULL)) { + nxt_unit_req_alert(http->req, "RuntimeError create failed"); + nxt_python_print_exception(); + + exc = Py_None; + Py_INCREF(exc); + } + + future = http->send_future; + http->send_future = NULL; + + res = PyObject_CallMethodObjArgs(future, nxt_py_set_exception_str, exc, + NULL); + if (nxt_slow_path(res == NULL)) { + nxt_unit_req_alert(http->req, "'set_exception' call failed"); + nxt_python_print_exception(); + } + + Py_XDECREF(res); + Py_DECREF(future); + Py_DECREF(exc); + + return NXT_UNIT_ERROR; +} + + +static PyObject * +nxt_py_asgi_http_done(PyObject *self, PyObject *future) +{ + int rc; + PyObject *res; + nxt_py_asgi_http_t *http; + + http = (nxt_py_asgi_http_t *) self; + + nxt_unit_req_debug(http->req, "asgi_http_done"); + + /* + * Get Future.result() and it raises an exception, if coroutine exited + * with exception. + */ + res = PyObject_CallMethodObjArgs(future, nxt_py_result_str, NULL); + if (nxt_slow_path(res == NULL)) { + nxt_unit_req_error(http->req, + "Python failed to call 'future.result()'"); + nxt_python_print_exception(); + + rc = NXT_UNIT_ERROR; + + } else { + Py_DECREF(res); + + rc = NXT_UNIT_OK; + } + + nxt_unit_request_done(http->req, rc); + + Py_RETURN_NONE; +} + + +#endif /* NXT_HAVE_ASGI */ diff --git a/src/python/nxt_python_asgi_lifespan.c b/src/python/nxt_python_asgi_lifespan.c new file mode 100644 index 00000000..14d0ee97 --- /dev/null +++ b/src/python/nxt_python_asgi_lifespan.c @@ -0,0 +1,505 @@ + +/* + * Copyright (C) NGINX, Inc. + */ + + +#include <python/nxt_python.h> + +#if (NXT_HAVE_ASGI) + +#include <nxt_main.h> +#include <python/nxt_python_asgi.h> +#include <python/nxt_python_asgi_str.h> + + +typedef struct { + PyObject_HEAD + int disabled; + int startup_received; + int startup_sent; + int shutdown_received; + int shutdown_sent; + int shutdown_called; + PyObject *startup_future; + PyObject *shutdown_future; + PyObject *receive_future; +} nxt_py_asgi_lifespan_t; + + +static PyObject *nxt_py_asgi_lifespan_receive(PyObject *self, PyObject *none); +static PyObject *nxt_py_asgi_lifespan_send(PyObject *self, PyObject *dict); +static PyObject *nxt_py_asgi_lifespan_send_startup( + nxt_py_asgi_lifespan_t *lifespan, int v, PyObject *dict); +static PyObject *nxt_py_asgi_lifespan_send_(nxt_py_asgi_lifespan_t *lifespan, + int v, int *sent, PyObject **future); +static PyObject *nxt_py_asgi_lifespan_send_shutdown( + nxt_py_asgi_lifespan_t *lifespan, int v, PyObject *dict); +static PyObject *nxt_py_asgi_lifespan_disable(nxt_py_asgi_lifespan_t *lifespan); +static PyObject *nxt_py_asgi_lifespan_done(PyObject *self, PyObject *future); + + +static nxt_py_asgi_lifespan_t *nxt_py_lifespan; + +static PyMethodDef nxt_py_asgi_lifespan_methods[] = { + { "receive", nxt_py_asgi_lifespan_receive, METH_NOARGS, 0 }, + { "send", nxt_py_asgi_lifespan_send, METH_O, 0 }, + { "_done", nxt_py_asgi_lifespan_done, METH_O, 0 }, + { NULL, NULL, 0, 0 } +}; + +static PyAsyncMethods nxt_py_asgi_async_methods = { + .am_await = nxt_py_asgi_await, +}; + +static PyTypeObject nxt_py_asgi_lifespan_type = { + PyVarObject_HEAD_INIT(NULL, 0) + + .tp_name = "unit._asgi_lifespan", + .tp_basicsize = sizeof(nxt_py_asgi_lifespan_t), + .tp_dealloc = nxt_py_asgi_dealloc, + .tp_as_async = &nxt_py_asgi_async_methods, + .tp_flags = Py_TPFLAGS_DEFAULT, + .tp_doc = "unit ASGI Lifespan object", + .tp_iter = nxt_py_asgi_iter, + .tp_iternext = nxt_py_asgi_next, + .tp_methods = nxt_py_asgi_lifespan_methods, +}; + + +nxt_int_t +nxt_py_asgi_lifespan_startup(nxt_task_t *task) +{ + PyObject *scope, *res, *py_task, *receive, *send, *done; + nxt_int_t rc; + nxt_py_asgi_lifespan_t *lifespan; + + if (nxt_slow_path(PyType_Ready(&nxt_py_asgi_lifespan_type) != 0)) { + nxt_alert(task, + "Python failed to initialize the 'asgi_lifespan' type object"); + return NXT_ERROR; + } + + lifespan = PyObject_New(nxt_py_asgi_lifespan_t, &nxt_py_asgi_lifespan_type); + if (nxt_slow_path(lifespan == NULL)) { + nxt_alert(task, "Python failed to create lifespan object"); + return NXT_ERROR; + } + + rc = NXT_ERROR; + + receive = PyObject_GetAttrString((PyObject *) lifespan, "receive"); + if (nxt_slow_path(receive == NULL)) { + nxt_alert(task, "Python failed to get 'receive' method"); + goto release_lifespan; + } + + send = PyObject_GetAttrString((PyObject *) lifespan, "send"); + if (nxt_slow_path(receive == NULL)) { + nxt_alert(task, "Python failed to get 'send' method"); + goto release_receive; + } + + done = PyObject_GetAttrString((PyObject *) lifespan, "_done"); + if (nxt_slow_path(receive == NULL)) { + nxt_alert(task, "Python failed to get '_done' method"); + goto release_send; + } + + lifespan->startup_future = PyObject_CallObject(nxt_py_loop_create_future, + NULL); + if (nxt_slow_path(lifespan->startup_future == NULL)) { + nxt_unit_alert(NULL, "Python failed to create Future object"); + nxt_python_print_exception(); + + goto release_done; + } + + lifespan->disabled = 0; + lifespan->startup_received = 0; + lifespan->startup_sent = 0; + lifespan->shutdown_received = 0; + lifespan->shutdown_sent = 0; + lifespan->shutdown_called = 0; + lifespan->shutdown_future = NULL; + lifespan->receive_future = NULL; + + scope = nxt_py_asgi_new_scope(NULL, nxt_py_lifespan_str, nxt_py_2_0_str); + if (nxt_slow_path(scope == NULL)) { + goto release_future; + } + + res = PyObject_CallFunctionObjArgs(nxt_py_application, + scope, receive, send, NULL); + if (nxt_slow_path(res == NULL)) { + nxt_log(task, NXT_LOG_ERR, "Python failed to call the application"); + nxt_python_print_exception(); + goto release_scope; + } + + if (nxt_slow_path(!PyCoro_CheckExact(res))) { + nxt_log(task, NXT_LOG_ERR, + "Application result type is not a coroutine"); + Py_DECREF(res); + goto release_scope; + } + + py_task = PyObject_CallFunctionObjArgs(nxt_py_loop_create_task, res, NULL); + if (nxt_slow_path(py_task == NULL)) { + nxt_log(task, NXT_LOG_ERR, "Python failed to call the create_task"); + nxt_python_print_exception(); + Py_DECREF(res); + goto release_scope; + } + + Py_DECREF(res); + + res = PyObject_CallMethodObjArgs(py_task, nxt_py_add_done_callback_str, + done, NULL); + if (nxt_slow_path(res == NULL)) { + nxt_log(task, NXT_LOG_ERR, + "Python failed to call 'task.add_done_callback'"); + nxt_python_print_exception(); + goto release_task; + } + + Py_DECREF(res); + + res = PyObject_CallFunctionObjArgs(nxt_py_loop_run_until_complete, + lifespan->startup_future, NULL); + if (nxt_slow_path(res == NULL)) { + nxt_alert(task, "Python failed to call loop.run_until_complete"); + nxt_python_print_exception(); + goto release_task; + } + + Py_DECREF(res); + + if (lifespan->startup_sent == 1 || lifespan->disabled) { + nxt_py_lifespan = lifespan; + Py_INCREF(nxt_py_lifespan); + + rc = NXT_OK; + } + +release_task: + Py_DECREF(py_task); +release_scope: + Py_DECREF(scope); +release_future: + Py_CLEAR(lifespan->startup_future); +release_done: + Py_DECREF(done); +release_send: + Py_DECREF(send); +release_receive: + Py_DECREF(receive); +release_lifespan: + Py_DECREF(lifespan); + + return rc; +} + + +nxt_int_t +nxt_py_asgi_lifespan_shutdown(void) +{ + PyObject *msg, *future, *res; + nxt_py_asgi_lifespan_t *lifespan; + + if (nxt_slow_path(nxt_py_lifespan == NULL || nxt_py_lifespan->disabled)) { + return NXT_OK; + } + + lifespan = nxt_py_lifespan; + lifespan->shutdown_called = 1; + + if (lifespan->receive_future != NULL) { + future = lifespan->receive_future; + lifespan->receive_future = NULL; + + msg = nxt_py_asgi_new_msg(NULL, nxt_py_lifespan_shutdown_str); + + if (nxt_fast_path(msg != NULL)) { + res = PyObject_CallMethodObjArgs(future, nxt_py_set_result_str, + msg, NULL); + Py_XDECREF(res); + Py_DECREF(msg); + } + + Py_DECREF(future); + } + + if (lifespan->shutdown_sent) { + return NXT_OK; + } + + lifespan->shutdown_future = PyObject_CallObject(nxt_py_loop_create_future, + NULL); + if (nxt_slow_path(lifespan->shutdown_future == NULL)) { + nxt_unit_alert(NULL, "Python failed to create Future object"); + nxt_python_print_exception(); + return NXT_ERROR; + } + + res = PyObject_CallFunctionObjArgs(nxt_py_loop_run_until_complete, + lifespan->shutdown_future, NULL); + if (nxt_slow_path(res == NULL)) { + nxt_unit_alert(NULL, "Python failed to call loop.run_until_complete"); + nxt_python_print_exception(); + return NXT_ERROR; + } + + Py_DECREF(res); + Py_CLEAR(lifespan->shutdown_future); + + return NXT_OK; +} + + +static PyObject * +nxt_py_asgi_lifespan_receive(PyObject *self, PyObject *none) +{ + PyObject *msg, *future; + nxt_py_asgi_lifespan_t *lifespan; + + lifespan = (nxt_py_asgi_lifespan_t *) self; + + nxt_unit_debug(NULL, "asgi_lifespan_receive"); + + future = PyObject_CallObject(nxt_py_loop_create_future, NULL); + if (nxt_slow_path(future == NULL)) { + nxt_unit_alert(NULL, "Python failed to create Future object"); + nxt_python_print_exception(); + + return PyErr_Format(PyExc_RuntimeError, + "failed to create Future object"); + } + + if (!lifespan->startup_received) { + lifespan->startup_received = 1; + + msg = nxt_py_asgi_new_msg(NULL, nxt_py_lifespan_startup_str); + + return nxt_py_asgi_set_result_soon(NULL, future, msg); + } + + if (lifespan->shutdown_called && !lifespan->shutdown_received) { + lifespan->shutdown_received = 1; + + msg = nxt_py_asgi_new_msg(NULL, nxt_py_lifespan_shutdown_str); + + return nxt_py_asgi_set_result_soon(NULL, future, msg); + } + + Py_INCREF(future); + lifespan->receive_future = future; + + return future; +} + + +static PyObject * +nxt_py_asgi_lifespan_send(PyObject *self, PyObject *dict) +{ + PyObject *type, *msg; + const char *type_str; + Py_ssize_t type_len; + nxt_py_asgi_lifespan_t *lifespan; + + static const nxt_str_t startup_complete + = nxt_string("lifespan.startup.complete"); + static const nxt_str_t startup_failed + = nxt_string("lifespan.startup.failed"); + static const nxt_str_t shutdown_complete + = nxt_string("lifespan.shutdown.complete"); + static const nxt_str_t shutdown_failed + = nxt_string("lifespan.shutdown.failed"); + + lifespan = (nxt_py_asgi_lifespan_t *) self; + + type = PyDict_GetItem(dict, nxt_py_type_str); + if (nxt_slow_path(type == NULL || !PyUnicode_Check(type))) { + nxt_unit_error(NULL, + "asgi_lifespan_send: 'type' is not a unicode string"); + return PyErr_Format(PyExc_TypeError, + "'type' is not a unicode string"); + } + + type_str = PyUnicode_AsUTF8AndSize(type, &type_len); + + nxt_unit_debug(NULL, "asgi_lifespan_send type is '%.*s'", + (int) type_len, type_str); + + if (type_len == (Py_ssize_t) startup_complete.length + && memcmp(type_str, startup_complete.start, type_len) == 0) + { + return nxt_py_asgi_lifespan_send_startup(lifespan, 0, NULL); + } + + if (type_len == (Py_ssize_t) startup_failed.length + && memcmp(type_str, startup_failed.start, type_len) == 0) + { + msg = PyDict_GetItem(dict, nxt_py_message_str); + return nxt_py_asgi_lifespan_send_startup(lifespan, 1, msg); + } + + if (type_len == (Py_ssize_t) shutdown_complete.length + && memcmp(type_str, shutdown_complete.start, type_len) == 0) + { + return nxt_py_asgi_lifespan_send_shutdown(lifespan, 0, NULL); + } + + if (type_len == (Py_ssize_t) shutdown_failed.length + && memcmp(type_str, shutdown_failed.start, type_len) == 0) + { + msg = PyDict_GetItem(dict, nxt_py_message_str); + return nxt_py_asgi_lifespan_send_shutdown(lifespan, 1, msg); + } + + return nxt_py_asgi_lifespan_disable(lifespan); +} + + +static PyObject * +nxt_py_asgi_lifespan_send_startup(nxt_py_asgi_lifespan_t *lifespan, int v, + PyObject *message) +{ + const char *message_str; + Py_ssize_t message_len; + + if (nxt_slow_path(v != 0)) { + nxt_unit_error(NULL, "Application startup failed"); + + if (nxt_fast_path(message != NULL && PyUnicode_Check(message))) { + message_str = PyUnicode_AsUTF8AndSize(message, &message_len); + + nxt_unit_error(NULL, "%.*s", (int) message_len, message_str); + } + } + + return nxt_py_asgi_lifespan_send_(lifespan, v, + &lifespan->startup_sent, + &lifespan->startup_future); +} + + +static PyObject * +nxt_py_asgi_lifespan_send_(nxt_py_asgi_lifespan_t *lifespan, int v, int *sent, + PyObject **pfuture) +{ + PyObject *future, *res; + + if (*sent) { + return nxt_py_asgi_lifespan_disable(lifespan); + } + + *sent = 1 + v; + + if (*pfuture != NULL) { + future = *pfuture; + *pfuture = NULL; + + res = PyObject_CallMethodObjArgs(future, nxt_py_set_result_str, + Py_None, NULL); + if (nxt_slow_path(res == NULL)) { + nxt_unit_alert(NULL, "Failed to call 'future.set_result'"); + nxt_python_print_exception(); + + return nxt_py_asgi_lifespan_disable(lifespan); + } + + Py_DECREF(res); + Py_DECREF(future); + } + + Py_INCREF(lifespan); + + return (PyObject *) lifespan; +} + + +static PyObject * +nxt_py_asgi_lifespan_disable(nxt_py_asgi_lifespan_t *lifespan) +{ + nxt_unit_warn(NULL, "Got invalid state transition on lifespan protocol"); + + lifespan->disabled = 1; + + return PyErr_Format(PyExc_AssertionError, + "Got invalid state transition on lifespan protocol"); +} + + +static PyObject * +nxt_py_asgi_lifespan_send_shutdown(nxt_py_asgi_lifespan_t *lifespan, int v, + PyObject *message) +{ + return nxt_py_asgi_lifespan_send_(lifespan, v, + &lifespan->shutdown_sent, + &lifespan->shutdown_future); +} + + +static PyObject * +nxt_py_asgi_lifespan_done(PyObject *self, PyObject *future) +{ + PyObject *res; + nxt_py_asgi_lifespan_t *lifespan; + + nxt_unit_debug(NULL, "asgi_lifespan_done"); + + lifespan = (nxt_py_asgi_lifespan_t *) self; + + if (lifespan->startup_sent == 0) { + lifespan->disabled = 1; + } + + /* + * Get Future.result() and it raises an exception, if coroutine exited + * with exception. + */ + res = PyObject_CallMethodObjArgs(future, nxt_py_result_str, NULL); + if (nxt_slow_path(res == NULL)) { + nxt_unit_log(NULL, NXT_UNIT_LOG_INFO, + "ASGI Lifespan processing exception"); + nxt_python_print_exception(); + } + + Py_XDECREF(res); + + if (lifespan->startup_future != NULL) { + future = lifespan->startup_future; + lifespan->startup_future = NULL; + + res = PyObject_CallMethodObjArgs(future, nxt_py_set_result_str, + Py_None, NULL); + if (nxt_slow_path(res == NULL)) { + nxt_unit_alert(NULL, "Failed to call 'future.set_result'"); + nxt_python_print_exception(); + } + + Py_XDECREF(res); + Py_DECREF(future); + } + + if (lifespan->shutdown_future != NULL) { + future = lifespan->shutdown_future; + lifespan->shutdown_future = NULL; + + res = PyObject_CallMethodObjArgs(future, nxt_py_set_result_str, + Py_None, NULL); + if (nxt_slow_path(res == NULL)) { + nxt_unit_alert(NULL, "Failed to call 'future.set_result'"); + nxt_python_print_exception(); + } + + Py_XDECREF(res); + Py_DECREF(future); + } + + Py_RETURN_NONE; +} + + +#endif /* NXT_HAVE_ASGI */ diff --git a/src/python/nxt_python_asgi_str.c b/src/python/nxt_python_asgi_str.c new file mode 100644 index 00000000..37fa7f04 --- /dev/null +++ b/src/python/nxt_python_asgi_str.c @@ -0,0 +1,141 @@ + +/* + * Copyright (C) NGINX, Inc. + */ + + +#include <python/nxt_python.h> + +#if (NXT_HAVE_ASGI) + +#include <nxt_main.h> +#include <python/nxt_python_asgi_str.h> + + +PyObject *nxt_py_1_0_str; +PyObject *nxt_py_1_1_str; +PyObject *nxt_py_2_0_str; +PyObject *nxt_py_2_1_str; +PyObject *nxt_py_3_0_str; +PyObject *nxt_py_add_done_callback_str; +PyObject *nxt_py_asgi_str; +PyObject *nxt_py_bad_state_str; +PyObject *nxt_py_body_str; +PyObject *nxt_py_bytes_str; +PyObject *nxt_py_client_str; +PyObject *nxt_py_code_str; +PyObject *nxt_py_done_str; +PyObject *nxt_py_exception_str; +PyObject *nxt_py_failed_to_send_body_str; +PyObject *nxt_py_headers_str; +PyObject *nxt_py_http_str; +PyObject *nxt_py_http_disconnect_str; +PyObject *nxt_py_http_request_str; +PyObject *nxt_py_http_version_str; +PyObject *nxt_py_https_str; +PyObject *nxt_py_lifespan_str; +PyObject *nxt_py_lifespan_shutdown_str; +PyObject *nxt_py_lifespan_startup_str; +PyObject *nxt_py_method_str; +PyObject *nxt_py_message_str; +PyObject *nxt_py_message_too_big_str; +PyObject *nxt_py_more_body_str; +PyObject *nxt_py_path_str; +PyObject *nxt_py_query_string_str; +PyObject *nxt_py_raw_path_str; +PyObject *nxt_py_result_str; +PyObject *nxt_py_root_path_str; +PyObject *nxt_py_scheme_str; +PyObject *nxt_py_server_str; +PyObject *nxt_py_set_exception_str; +PyObject *nxt_py_set_result_str; +PyObject *nxt_py_spec_version_str; +PyObject *nxt_py_status_str; +PyObject *nxt_py_subprotocol_str; +PyObject *nxt_py_subprotocols_str; +PyObject *nxt_py_text_str; +PyObject *nxt_py_type_str; +PyObject *nxt_py_version_str; +PyObject *nxt_py_websocket_str; +PyObject *nxt_py_websocket_accept_str; +PyObject *nxt_py_websocket_close_str; +PyObject *nxt_py_websocket_connect_str; +PyObject *nxt_py_websocket_disconnect_str; +PyObject *nxt_py_websocket_receive_str; +PyObject *nxt_py_websocket_send_str; +PyObject *nxt_py_ws_str; +PyObject *nxt_py_wss_str; + +static nxt_python_string_t nxt_py_asgi_strings[] = { + { nxt_string("1.0"), &nxt_py_1_0_str }, + { nxt_string("1.1"), &nxt_py_1_1_str }, + { nxt_string("2.0"), &nxt_py_2_0_str }, + { nxt_string("2.1"), &nxt_py_2_1_str }, + { nxt_string("3.0"), &nxt_py_3_0_str }, + { nxt_string("add_done_callback"), &nxt_py_add_done_callback_str }, + { nxt_string("asgi"), &nxt_py_asgi_str }, + { nxt_string("bad state"), &nxt_py_bad_state_str }, + { nxt_string("body"), &nxt_py_body_str }, + { nxt_string("bytes"), &nxt_py_bytes_str }, + { nxt_string("client"), &nxt_py_client_str }, + { nxt_string("code"), &nxt_py_code_str }, + { nxt_string("done"), &nxt_py_done_str }, + { nxt_string("exception"), &nxt_py_exception_str }, + { nxt_string("failed to send body"), &nxt_py_failed_to_send_body_str }, + { nxt_string("headers"), &nxt_py_headers_str }, + { nxt_string("http"), &nxt_py_http_str }, + { nxt_string("http.disconnect"), &nxt_py_http_disconnect_str }, + { nxt_string("http.request"), &nxt_py_http_request_str }, + { nxt_string("http_version"), &nxt_py_http_version_str }, + { nxt_string("https"), &nxt_py_https_str }, + { nxt_string("lifespan"), &nxt_py_lifespan_str }, + { nxt_string("lifespan.shutdown"), &nxt_py_lifespan_shutdown_str }, + { nxt_string("lifespan.startup"), &nxt_py_lifespan_startup_str }, + { nxt_string("message"), &nxt_py_message_str }, + { nxt_string("message too big"), &nxt_py_message_too_big_str }, + { nxt_string("method"), &nxt_py_method_str }, + { nxt_string("more_body"), &nxt_py_more_body_str }, + { nxt_string("path"), &nxt_py_path_str }, + { nxt_string("query_string"), &nxt_py_query_string_str }, + { nxt_string("raw_path"), &nxt_py_raw_path_str }, + { nxt_string("result"), &nxt_py_result_str }, + { nxt_string("root_path"), &nxt_py_root_path_str }, // not used + { nxt_string("scheme"), &nxt_py_scheme_str }, + { nxt_string("server"), &nxt_py_server_str }, + { nxt_string("set_exception"), &nxt_py_set_exception_str }, + { nxt_string("set_result"), &nxt_py_set_result_str }, + { nxt_string("spec_version"), &nxt_py_spec_version_str }, + { nxt_string("status"), &nxt_py_status_str }, + { nxt_string("subprotocol"), &nxt_py_subprotocol_str }, + { nxt_string("subprotocols"), &nxt_py_subprotocols_str }, + { nxt_string("text"), &nxt_py_text_str }, + { nxt_string("type"), &nxt_py_type_str }, + { nxt_string("version"), &nxt_py_version_str }, + { nxt_string("websocket"), &nxt_py_websocket_str }, + { nxt_string("websocket.accept"), &nxt_py_websocket_accept_str }, + { nxt_string("websocket.close"), &nxt_py_websocket_close_str }, + { nxt_string("websocket.connect"), &nxt_py_websocket_connect_str }, + { nxt_string("websocket.disconnect"), &nxt_py_websocket_disconnect_str }, + { nxt_string("websocket.receive"), &nxt_py_websocket_receive_str }, + { nxt_string("websocket.send"), &nxt_py_websocket_send_str }, + { nxt_string("ws"), &nxt_py_ws_str }, + { nxt_string("wss"), &nxt_py_wss_str }, + { nxt_null_string, NULL }, +}; + + +nxt_int_t +nxt_py_asgi_str_init(void) +{ + return nxt_python_init_strings(nxt_py_asgi_strings); +} + + +void +nxt_py_asgi_str_done(void) +{ + nxt_python_done_strings(nxt_py_asgi_strings); +} + + +#endif /* NXT_HAVE_ASGI */ diff --git a/src/python/nxt_python_asgi_str.h b/src/python/nxt_python_asgi_str.h new file mode 100644 index 00000000..3f389c62 --- /dev/null +++ b/src/python/nxt_python_asgi_str.h @@ -0,0 +1,69 @@ + +/* + * Copyright (C) NGINX, Inc. + */ + +#ifndef _NXT_PYTHON_ASGI_STR_H_INCLUDED_ +#define _NXT_PYTHON_ASGI_STR_H_INCLUDED_ + + +extern PyObject *nxt_py_1_0_str; +extern PyObject *nxt_py_1_1_str; +extern PyObject *nxt_py_2_0_str; +extern PyObject *nxt_py_2_1_str; +extern PyObject *nxt_py_3_0_str; +extern PyObject *nxt_py_add_done_callback_str; +extern PyObject *nxt_py_asgi_str; +extern PyObject *nxt_py_bad_state_str; +extern PyObject *nxt_py_body_str; +extern PyObject *nxt_py_bytes_str; +extern PyObject *nxt_py_client_str; +extern PyObject *nxt_py_code_str; +extern PyObject *nxt_py_done_str; +extern PyObject *nxt_py_exception_str; +extern PyObject *nxt_py_failed_to_send_body_str; +extern PyObject *nxt_py_headers_str; +extern PyObject *nxt_py_http_str; +extern PyObject *nxt_py_http_disconnect_str; +extern PyObject *nxt_py_http_request_str; +extern PyObject *nxt_py_http_version_str; +extern PyObject *nxt_py_https_str; +extern PyObject *nxt_py_lifespan_str; +extern PyObject *nxt_py_lifespan_shutdown_str; +extern PyObject *nxt_py_lifespan_startup_str; +extern PyObject *nxt_py_method_str; +extern PyObject *nxt_py_message_str; +extern PyObject *nxt_py_message_too_big_str; +extern PyObject *nxt_py_more_body_str; +extern PyObject *nxt_py_path_str; +extern PyObject *nxt_py_query_string_str; +extern PyObject *nxt_py_result_str; +extern PyObject *nxt_py_raw_path_str; +extern PyObject *nxt_py_root_path_str; +extern PyObject *nxt_py_scheme_str; +extern PyObject *nxt_py_server_str; +extern PyObject *nxt_py_set_exception_str; +extern PyObject *nxt_py_set_result_str; +extern PyObject *nxt_py_spec_version_str; +extern PyObject *nxt_py_status_str; +extern PyObject *nxt_py_subprotocol_str; +extern PyObject *nxt_py_subprotocols_str; +extern PyObject *nxt_py_text_str; +extern PyObject *nxt_py_type_str; +extern PyObject *nxt_py_version_str; +extern PyObject *nxt_py_websocket_str; +extern PyObject *nxt_py_websocket_accept_str; +extern PyObject *nxt_py_websocket_close_str; +extern PyObject *nxt_py_websocket_connect_str; +extern PyObject *nxt_py_websocket_disconnect_str; +extern PyObject *nxt_py_websocket_receive_str; +extern PyObject *nxt_py_websocket_send_str; +extern PyObject *nxt_py_ws_str; +extern PyObject *nxt_py_wss_str; + + +nxt_int_t nxt_py_asgi_str_init(void); +void nxt_py_asgi_str_done(void); + + +#endif /* _NXT_PYTHON_ASGI_STR_H_INCLUDED_ */ diff --git a/src/python/nxt_python_asgi_websocket.c b/src/python/nxt_python_asgi_websocket.c new file mode 100644 index 00000000..5a27b588 --- /dev/null +++ b/src/python/nxt_python_asgi_websocket.c @@ -0,0 +1,1084 @@ + +/* + * Copyright (C) NGINX, Inc. + */ + + +#include <python/nxt_python.h> + +#if (NXT_HAVE_ASGI) + +#include <nxt_main.h> +#include <nxt_unit.h> +#include <nxt_unit_request.h> +#include <nxt_unit_websocket.h> +#include <nxt_websocket_header.h> +#include <python/nxt_python_asgi.h> +#include <python/nxt_python_asgi_str.h> + + +enum { + NXT_WS_INIT, + NXT_WS_CONNECT, + NXT_WS_ACCEPTED, + NXT_WS_DISCONNECTED, + NXT_WS_CLOSED, +}; + + +typedef struct { + nxt_queue_link_t link; + nxt_unit_websocket_frame_t *frame; +} nxt_py_asgi_penging_frame_t; + + +typedef struct { + PyObject_HEAD + nxt_unit_request_info_t *req; + PyObject *receive_future; + PyObject *receive_exc_str; + int state; + nxt_queue_t pending_frames; + uint64_t pending_payload_len; + uint64_t pending_frame_len; + int pending_fins; +} nxt_py_asgi_websocket_t; + + +static PyObject *nxt_py_asgi_websocket_receive(PyObject *self, PyObject *none); +static PyObject *nxt_py_asgi_websocket_send(PyObject *self, PyObject *dict); +static PyObject *nxt_py_asgi_websocket_accept(nxt_py_asgi_websocket_t *ws, + PyObject *dict); +static PyObject *nxt_py_asgi_websocket_close(nxt_py_asgi_websocket_t *ws, + PyObject *dict); +static PyObject *nxt_py_asgi_websocket_send_frame(nxt_py_asgi_websocket_t *ws, + PyObject *dict); +static void nxt_py_asgi_websocket_receive_done(nxt_py_asgi_websocket_t *ws, + PyObject *msg); +static void nxt_py_asgi_websocket_receive_fail(nxt_py_asgi_websocket_t *ws, + PyObject *exc); +static void nxt_py_asgi_websocket_suspend_frame(nxt_unit_websocket_frame_t *f); +static PyObject *nxt_py_asgi_websocket_pop_msg(nxt_py_asgi_websocket_t *ws, + nxt_unit_websocket_frame_t *frame); +static uint64_t nxt_py_asgi_websocket_pending_len( + nxt_py_asgi_websocket_t *ws); +static nxt_unit_websocket_frame_t *nxt_py_asgi_websocket_pop_frame( + nxt_py_asgi_websocket_t *ws); +static PyObject *nxt_py_asgi_websocket_disconnect_msg( + nxt_py_asgi_websocket_t *ws); +static PyObject *nxt_py_asgi_websocket_done(PyObject *self, PyObject *future); + + +static PyMethodDef nxt_py_asgi_websocket_methods[] = { + { "receive", nxt_py_asgi_websocket_receive, METH_NOARGS, 0 }, + { "send", nxt_py_asgi_websocket_send, METH_O, 0 }, + { "_done", nxt_py_asgi_websocket_done, METH_O, 0 }, + { NULL, NULL, 0, 0 } +}; + +static PyAsyncMethods nxt_py_asgi_async_methods = { + .am_await = nxt_py_asgi_await, +}; + +static PyTypeObject nxt_py_asgi_websocket_type = { + PyVarObject_HEAD_INIT(NULL, 0) + + .tp_name = "unit._asgi_websocket", + .tp_basicsize = sizeof(nxt_py_asgi_websocket_t), + .tp_dealloc = nxt_py_asgi_dealloc, + .tp_as_async = &nxt_py_asgi_async_methods, + .tp_flags = Py_TPFLAGS_DEFAULT, + .tp_doc = "unit ASGI WebSocket connection object", + .tp_iter = nxt_py_asgi_iter, + .tp_iternext = nxt_py_asgi_next, + .tp_methods = nxt_py_asgi_websocket_methods, +}; + +static uint64_t nxt_py_asgi_ws_max_frame_size = 1024 * 1024; +static uint64_t nxt_py_asgi_ws_max_buffer_size = 10 * 1024 * 1024; + + +nxt_int_t +nxt_py_asgi_websocket_init(nxt_task_t *task) +{ + if (nxt_slow_path(PyType_Ready(&nxt_py_asgi_websocket_type) != 0)) { + nxt_alert(task, + "Python failed to initialize the \"asgi_websocket\" type object"); + return NXT_ERROR; + } + + return NXT_OK; +} + + +PyObject * +nxt_py_asgi_websocket_create(nxt_unit_request_info_t *req) +{ + nxt_py_asgi_websocket_t *ws; + + ws = PyObject_New(nxt_py_asgi_websocket_t, &nxt_py_asgi_websocket_type); + + if (nxt_fast_path(ws != NULL)) { + ws->req = req; + ws->receive_future = NULL; + ws->receive_exc_str = NULL; + ws->state = NXT_WS_INIT; + nxt_queue_init(&ws->pending_frames); + ws->pending_payload_len = 0; + ws->pending_frame_len = 0; + ws->pending_fins = 0; + } + + return (PyObject *) ws; +} + + +static PyObject * +nxt_py_asgi_websocket_receive(PyObject *self, PyObject *none) +{ + PyObject *future, *msg; + nxt_py_asgi_websocket_t *ws; + + ws = (nxt_py_asgi_websocket_t *) self; + + nxt_unit_req_debug(ws->req, "asgi_websocket_receive"); + + /* If exception happened out of receive() call, raise it now. */ + if (nxt_slow_path(ws->receive_exc_str != NULL)) { + PyErr_SetObject(PyExc_RuntimeError, ws->receive_exc_str); + + ws->receive_exc_str = NULL; + + return NULL; + } + + if (nxt_slow_path(ws->state == NXT_WS_CLOSED)) { + nxt_unit_req_error(ws->req, + "receive() called for closed WebSocket"); + + return PyErr_Format(PyExc_RuntimeError, + "WebSocket already closed"); + } + + future = PyObject_CallObject(nxt_py_loop_create_future, NULL); + if (nxt_slow_path(future == NULL)) { + nxt_unit_req_alert(ws->req, "Python failed to create Future object"); + nxt_python_print_exception(); + + return PyErr_Format(PyExc_RuntimeError, + "failed to create Future object"); + } + + if (nxt_slow_path(ws->state == NXT_WS_INIT)) { + ws->state = NXT_WS_CONNECT; + + msg = nxt_py_asgi_new_msg(ws->req, nxt_py_websocket_connect_str); + + return nxt_py_asgi_set_result_soon(ws->req, future, msg); + } + + if (ws->pending_fins > 0) { + msg = nxt_py_asgi_websocket_pop_msg(ws, NULL); + + return nxt_py_asgi_set_result_soon(ws->req, future, msg); + } + + if (nxt_slow_path(ws->state == NXT_WS_DISCONNECTED)) { + msg = nxt_py_asgi_websocket_disconnect_msg(ws); + + return nxt_py_asgi_set_result_soon(ws->req, future, msg); + } + + ws->receive_future = future; + Py_INCREF(ws->receive_future); + + return future; +} + + +static PyObject * +nxt_py_asgi_websocket_send(PyObject *self, PyObject *dict) +{ + PyObject *type; + const char *type_str; + Py_ssize_t type_len; + nxt_py_asgi_websocket_t *ws; + + static const nxt_str_t websocket_accept = nxt_string("websocket.accept"); + static const nxt_str_t websocket_close = nxt_string("websocket.close"); + static const nxt_str_t websocket_send = nxt_string("websocket.send"); + + ws = (nxt_py_asgi_websocket_t *) self; + + type = PyDict_GetItem(dict, nxt_py_type_str); + if (nxt_slow_path(type == NULL || !PyUnicode_Check(type))) { + nxt_unit_req_error(ws->req, "asgi_websocket_send: " + "'type' is not a unicode string"); + return PyErr_Format(PyExc_TypeError, + "'type' is not a unicode string"); + } + + type_str = PyUnicode_AsUTF8AndSize(type, &type_len); + + nxt_unit_req_debug(ws->req, "asgi_websocket_send type is '%.*s'", + (int) type_len, type_str); + + if (type_len == (Py_ssize_t) websocket_accept.length + && memcmp(type_str, websocket_accept.start, type_len) == 0) + { + return nxt_py_asgi_websocket_accept(ws, dict); + } + + if (type_len == (Py_ssize_t) websocket_close.length + && memcmp(type_str, websocket_close.start, type_len) == 0) + { + return nxt_py_asgi_websocket_close(ws, dict); + } + + if (type_len == (Py_ssize_t) websocket_send.length + && memcmp(type_str, websocket_send.start, type_len) == 0) + { + return nxt_py_asgi_websocket_send_frame(ws, dict); + } + + nxt_unit_req_error(ws->req, "asgi_websocket_send: " + "unexpected 'type': '%.*s'", (int) type_len, type_str); + return PyErr_Format(PyExc_AssertionError, "unexpected 'type': '%U'", type); +} + + +static PyObject * +nxt_py_asgi_websocket_accept(nxt_py_asgi_websocket_t *ws, PyObject *dict) +{ + int rc; + char *subprotocol_str; + PyObject *res, *headers, *subprotocol; + Py_ssize_t subprotocol_len; + nxt_py_asgi_calc_size_ctx_t calc_size_ctx; + nxt_py_asgi_add_field_ctx_t add_field_ctx; + + static const nxt_str_t ws_protocol = nxt_string("sec-websocket-protocol"); + + switch(ws->state) { + case NXT_WS_INIT: + return PyErr_Format(PyExc_RuntimeError, + "WebSocket connect not received"); + case NXT_WS_CONNECT: + break; + + case NXT_WS_ACCEPTED: + return PyErr_Format(PyExc_RuntimeError, "WebSocket already accepted"); + + case NXT_WS_DISCONNECTED: + return PyErr_Format(PyExc_RuntimeError, "WebSocket disconnected"); + + case NXT_WS_CLOSED: + return PyErr_Format(PyExc_RuntimeError, "WebSocket already closed"); + } + + if (nxt_slow_path(nxt_unit_response_is_websocket(ws->req))) { + return PyErr_Format(PyExc_RuntimeError, "WebSocket already accepted"); + } + + if (nxt_slow_path(nxt_unit_response_is_sent(ws->req))) { + return PyErr_Format(PyExc_RuntimeError, "response already sent"); + } + + calc_size_ctx.fields_size = 0; + calc_size_ctx.fields_count = 0; + + headers = PyDict_GetItem(dict, nxt_py_headers_str); + if (headers != NULL) { + res = nxt_py_asgi_enum_headers(headers, nxt_py_asgi_calc_size, + &calc_size_ctx); + if (nxt_slow_path(res == NULL)) { + return NULL; + } + } + + subprotocol = PyDict_GetItem(dict, nxt_py_subprotocol_str); + if (subprotocol != NULL && PyUnicode_Check(subprotocol)) { + subprotocol_str = PyUnicode_DATA(subprotocol); + subprotocol_len = PyUnicode_GET_LENGTH(subprotocol); + + calc_size_ctx.fields_size += ws_protocol.length + subprotocol_len; + calc_size_ctx.fields_count++; + + } else { + subprotocol_str = NULL; + subprotocol_len = 0; + } + + rc = nxt_unit_response_init(ws->req, 101, + calc_size_ctx.fields_count, + calc_size_ctx.fields_size); + if (nxt_slow_path(rc != NXT_UNIT_OK)) { + return PyErr_Format(PyExc_RuntimeError, + "failed to allocate response object"); + } + + add_field_ctx.req = ws->req; + add_field_ctx.content_length = -1; + + if (headers != NULL) { + res = nxt_py_asgi_enum_headers(headers, nxt_py_asgi_add_field, + &add_field_ctx); + if (nxt_slow_path(res == NULL)) { + return NULL; + } + } + + if (subprotocol_len > 0) { + rc = nxt_unit_response_add_field(ws->req, + (const char *) ws_protocol.start, + ws_protocol.length, + subprotocol_str, subprotocol_len); + if (nxt_slow_path(rc != NXT_UNIT_OK)) { + return PyErr_Format(PyExc_RuntimeError, + "failed to add header"); + } + } + + rc = nxt_unit_response_send(ws->req); + if (nxt_slow_path(rc != NXT_UNIT_OK)) { + return PyErr_Format(PyExc_RuntimeError, "failed to send response"); + } + + ws->state = NXT_WS_ACCEPTED; + + Py_INCREF(ws); + + return (PyObject *) ws; +} + + +static PyObject * +nxt_py_asgi_websocket_close(nxt_py_asgi_websocket_t *ws, PyObject *dict) +{ + int rc; + uint16_t status_code; + PyObject *code; + + if (nxt_slow_path(ws->state == NXT_WS_INIT)) { + return PyErr_Format(PyExc_RuntimeError, + "WebSocket connect not received"); + } + + if (nxt_slow_path(ws->state == NXT_WS_DISCONNECTED)) { + return PyErr_Format(PyExc_RuntimeError, "WebSocket disconnected"); + } + + if (nxt_slow_path(ws->state == NXT_WS_CLOSED)) { + return PyErr_Format(PyExc_RuntimeError, "WebSocket already closed"); + } + + if (nxt_unit_response_is_websocket(ws->req)) { + code = PyDict_GetItem(dict, nxt_py_code_str); + if (nxt_slow_path(code != NULL && !PyLong_Check(code))) { + return PyErr_Format(PyExc_TypeError, "'code' is not integer"); + } + + status_code = (code != NULL) ? htons(PyLong_AsLong(code)) + : htons(NXT_WEBSOCKET_CR_NORMAL); + + rc = nxt_unit_websocket_send(ws->req, NXT_WEBSOCKET_OP_CLOSE, + 1, &status_code, 2); + if (nxt_slow_path(rc != NXT_UNIT_OK)) { + return PyErr_Format(PyExc_RuntimeError, + "failed to send close frame"); + } + + } else { + rc = nxt_unit_response_init(ws->req, 403, 0, 0); + if (nxt_slow_path(rc != NXT_UNIT_OK)) { + return PyErr_Format(PyExc_RuntimeError, + "failed to allocate response object"); + } + + rc = nxt_unit_response_send(ws->req); + if (nxt_slow_path(rc != NXT_UNIT_OK)) { + return PyErr_Format(PyExc_RuntimeError, + "failed to send response"); + } + } + + ws->state = NXT_WS_CLOSED; + + Py_INCREF(ws); + + return (PyObject *) ws; +} + + +static PyObject * +nxt_py_asgi_websocket_send_frame(nxt_py_asgi_websocket_t *ws, PyObject *dict) +{ + int rc; + uint8_t opcode; + PyObject *bytes, *text; + const void *buf; + Py_ssize_t buf_size; + + if (nxt_slow_path(ws->state == NXT_WS_INIT)) { + return PyErr_Format(PyExc_RuntimeError, + "WebSocket connect not received"); + } + + if (nxt_slow_path(ws->state == NXT_WS_CONNECT)) { + return PyErr_Format(PyExc_RuntimeError, + "WebSocket not accepted yet"); + } + + if (nxt_slow_path(ws->state == NXT_WS_DISCONNECTED)) { + return PyErr_Format(PyExc_RuntimeError, "WebSocket disconnected"); + } + + if (nxt_slow_path(ws->state == NXT_WS_CLOSED)) { + return PyErr_Format(PyExc_RuntimeError, "WebSocket already closed"); + } + + bytes = PyDict_GetItem(dict, nxt_py_bytes_str); + if (bytes == Py_None) { + bytes = NULL; + } + + if (nxt_slow_path(bytes != NULL && !PyBytes_Check(bytes))) { + return PyErr_Format(PyExc_TypeError, + "'bytes' is not a byte string"); + } + + text = PyDict_GetItem(dict, nxt_py_text_str); + if (text == Py_None) { + text = NULL; + } + + if (nxt_slow_path(text != NULL && !PyUnicode_Check(text))) { + return PyErr_Format(PyExc_TypeError, + "'text' is not a unicode string"); + } + + if (nxt_slow_path(((bytes != NULL) ^ (text != NULL)) == 0)) { + return PyErr_Format(PyExc_ValueError, + "Exactly one of 'bytes' or 'text' must be non-None"); + } + + if (bytes != NULL) { + buf = PyBytes_AS_STRING(bytes); + buf_size = PyBytes_GET_SIZE(bytes); + opcode = NXT_WEBSOCKET_OP_BINARY; + + } else { + buf = PyUnicode_AsUTF8AndSize(text, &buf_size); + opcode = NXT_WEBSOCKET_OP_TEXT; + } + + rc = nxt_unit_websocket_send(ws->req, opcode, 1, buf, buf_size); + if (nxt_slow_path(rc != NXT_UNIT_OK)) { + return PyErr_Format(PyExc_RuntimeError, "failed to send close frame"); + } + + Py_INCREF(ws); + return (PyObject *) ws; +} + + +void +nxt_py_asgi_websocket_handler(nxt_unit_websocket_frame_t *frame) +{ + uint8_t opcode; + uint16_t status_code; + uint64_t rest; + PyObject *msg, *exc; + nxt_py_asgi_websocket_t *ws; + + ws = frame->req->data; + + nxt_unit_req_debug(ws->req, "asgi_websocket_handler"); + + opcode = frame->header->opcode; + if (nxt_slow_path(opcode != NXT_WEBSOCKET_OP_CONT + && opcode != NXT_WEBSOCKET_OP_TEXT + && opcode != NXT_WEBSOCKET_OP_BINARY + && opcode != NXT_WEBSOCKET_OP_CLOSE)) + { + nxt_unit_websocket_done(frame); + + nxt_unit_req_debug(ws->req, + "asgi_websocket_handler: ignore frame with opcode %d", + opcode); + + return; + } + + if (nxt_slow_path(ws->state != NXT_WS_ACCEPTED)) { + nxt_unit_websocket_done(frame); + + goto bad_state; + } + + rest = nxt_py_asgi_ws_max_frame_size - ws->pending_frame_len; + + if (nxt_slow_path(frame->payload_len > rest)) { + nxt_unit_websocket_done(frame); + + goto too_big; + } + + rest = nxt_py_asgi_ws_max_buffer_size - ws->pending_payload_len; + + if (nxt_slow_path(frame->payload_len > rest)) { + nxt_unit_websocket_done(frame); + + goto too_big; + } + + if (ws->receive_future == NULL || frame->header->fin == 0) { + nxt_py_asgi_websocket_suspend_frame(frame); + + return; + } + + if (!nxt_queue_is_empty(&ws->pending_frames)) { + if (nxt_slow_path(opcode == NXT_WEBSOCKET_OP_TEXT + || opcode == NXT_WEBSOCKET_OP_BINARY)) + { + nxt_unit_req_alert(ws->req, + "Invalid state: pending frames with active receiver. " + "CONT frame expected. (%d)", opcode); + + PyErr_SetString(PyExc_AssertionError, + "Invalid state: pending frames with active receiver. " + "CONT frame expected."); + + nxt_unit_websocket_done(frame); + + return; + } + } + + msg = nxt_py_asgi_websocket_pop_msg(ws, frame); + if (nxt_slow_path(msg == NULL)) { + exc = PyErr_Occurred(); + Py_INCREF(exc); + + goto raise; + } + + nxt_py_asgi_websocket_receive_done(ws, msg); + + return; + +bad_state: + + if (ws->receive_future == NULL) { + ws->receive_exc_str = nxt_py_bad_state_str; + + return; + } + + exc = PyObject_CallFunctionObjArgs(PyExc_RuntimeError, + nxt_py_bad_state_str, + NULL); + if (nxt_slow_path(exc == NULL)) { + nxt_unit_req_alert(ws->req, "RuntimeError create failed"); + nxt_python_print_exception(); + + exc = Py_None; + Py_INCREF(exc); + } + + goto raise; + +too_big: + + status_code = htons(NXT_WEBSOCKET_CR_MESSAGE_TOO_BIG); + + (void) nxt_unit_websocket_send(ws->req, NXT_WEBSOCKET_OP_CLOSE, + 1, &status_code, 2); + + ws->state = NXT_WS_CLOSED; + + if (ws->receive_future == NULL) { + ws->receive_exc_str = nxt_py_message_too_big_str; + + return; + } + + exc = PyObject_CallFunctionObjArgs(PyExc_RuntimeError, + nxt_py_message_too_big_str, + NULL); + if (nxt_slow_path(exc == NULL)) { + nxt_unit_req_alert(ws->req, "RuntimeError create failed"); + nxt_python_print_exception(); + + exc = Py_None; + Py_INCREF(exc); + } + +raise: + + nxt_py_asgi_websocket_receive_fail(ws, exc); +} + + +static void +nxt_py_asgi_websocket_receive_done(nxt_py_asgi_websocket_t *ws, PyObject *msg) +{ + PyObject *future, *res; + + future = ws->receive_future; + ws->receive_future = NULL; + + res = PyObject_CallMethodObjArgs(future, nxt_py_set_result_str, msg, NULL); + if (nxt_slow_path(res == NULL)) { + nxt_unit_req_alert(ws->req, "'set_result' call failed"); + nxt_python_print_exception(); + } + + Py_XDECREF(res); + Py_DECREF(future); + + Py_DECREF(msg); +} + + +static void +nxt_py_asgi_websocket_receive_fail(nxt_py_asgi_websocket_t *ws, PyObject *exc) +{ + PyObject *future, *res; + + future = ws->receive_future; + ws->receive_future = NULL; + + res = PyObject_CallMethodObjArgs(future, nxt_py_set_exception_str, exc, + NULL); + if (nxt_slow_path(res == NULL)) { + nxt_unit_req_alert(ws->req, "'set_exception' call failed"); + nxt_python_print_exception(); + } + + Py_XDECREF(res); + Py_DECREF(future); + + Py_DECREF(exc); +} + + +static void +nxt_py_asgi_websocket_suspend_frame(nxt_unit_websocket_frame_t *frame) +{ + int rc; + nxt_py_asgi_websocket_t *ws; + nxt_py_asgi_penging_frame_t *p; + + nxt_unit_req_debug(frame->req, "asgi_websocket_suspend_frame: " + "%d, %"PRIu64", %d", + frame->header->opcode, frame->payload_len, + frame->header->fin); + + ws = frame->req->data; + + rc = nxt_unit_websocket_retain(frame); + if (nxt_slow_path(rc == NXT_UNIT_ERROR)) { + nxt_unit_req_alert(ws->req, "Failed to retain frame for suspension."); + + nxt_unit_websocket_done(frame); + + PyErr_SetString(PyExc_RuntimeError, + "Failed to retain frame for suspension."); + + return; + } + + p = nxt_unit_malloc(frame->req->ctx, sizeof(nxt_py_asgi_penging_frame_t)); + if (nxt_slow_path(p == NULL)) { + nxt_unit_req_alert(ws->req, + "Failed to allocate buffer to suspend frame."); + + nxt_unit_websocket_done(frame); + + PyErr_SetString(PyExc_RuntimeError, + "Failed to allocate buffer to suspend frame."); + + return; + } + + p->frame = frame; + nxt_queue_insert_tail(&ws->pending_frames, &p->link); + + ws->pending_payload_len += frame->payload_len; + ws->pending_fins += frame->header->fin; + + if (frame->header->fin) { + ws->pending_frame_len = 0; + + } else { + if (frame->header->opcode == NXT_WEBSOCKET_OP_CONT) { + ws->pending_frame_len += frame->payload_len; + + } else { + ws->pending_frame_len = frame->payload_len; + } + } +} + + +static PyObject * +nxt_py_asgi_websocket_pop_msg(nxt_py_asgi_websocket_t *ws, + nxt_unit_websocket_frame_t *frame) +{ + int fin; + char *buf; + uint8_t code_buf[2], opcode; + uint16_t code; + PyObject *msg, *data, *type, *data_key; + uint64_t payload_len; + nxt_unit_websocket_frame_t *fin_frame; + + nxt_unit_req_debug(ws->req, "asgi_websocket_pop_msg"); + + fin_frame = NULL; + + if (nxt_queue_is_empty(&ws->pending_frames) + || (frame != NULL + && frame->header->opcode == NXT_WEBSOCKET_OP_CLOSE)) + { + payload_len = frame->payload_len; + + } else { + if (frame != NULL) { + payload_len = ws->pending_payload_len + frame->payload_len; + fin_frame = frame; + + } else { + payload_len = nxt_py_asgi_websocket_pending_len(ws); + } + + frame = nxt_py_asgi_websocket_pop_frame(ws); + } + + opcode = frame->header->opcode; + + if (nxt_slow_path(opcode == NXT_WEBSOCKET_OP_CONT)) { + nxt_unit_req_alert(ws->req, + "Invalid state: attempt to process CONT frame."); + + nxt_unit_websocket_done(frame); + + return PyErr_Format(PyExc_AssertionError, + "Invalid state: attempt to process CONT frame."); + } + + type = nxt_py_websocket_receive_str; + + switch (opcode) { + case NXT_WEBSOCKET_OP_TEXT: + buf = nxt_unit_malloc(frame->req->ctx, payload_len); + if (nxt_slow_path(buf == NULL)) { + nxt_unit_req_alert(ws->req, + "Failed to allocate buffer for payload (%d).", + (int) payload_len); + + nxt_unit_websocket_done(frame); + + return PyErr_Format(PyExc_RuntimeError, + "Failed to allocate buffer for payload (%d).", + (int) payload_len); + } + + data = NULL; + data_key = nxt_py_text_str; + + break; + + case NXT_WEBSOCKET_OP_BINARY: + data = PyBytes_FromStringAndSize(NULL, payload_len); + if (nxt_slow_path(data == NULL)) { + nxt_unit_req_alert(ws->req, + "Failed to create Bytes for payload (%d).", + (int) payload_len); + nxt_python_print_exception(); + + nxt_unit_websocket_done(frame); + + return PyErr_Format(PyExc_RuntimeError, + "Failed to create Bytes for payload."); + } + + buf = (char *) PyBytes_AS_STRING(data); + data_key = nxt_py_bytes_str; + + break; + + case NXT_WEBSOCKET_OP_CLOSE: + if (frame->payload_len >= 2) { + nxt_unit_websocket_read(frame, code_buf, 2); + code = ((uint16_t) code_buf[0]) << 8 | code_buf[1]; + + } else { + code = NXT_WEBSOCKET_CR_NORMAL; + } + + nxt_unit_websocket_done(frame); + + data = PyLong_FromLong(code); + if (nxt_slow_path(data == NULL)) { + nxt_unit_req_alert(ws->req, + "Failed to create Long from code %d.", + (int) code); + nxt_python_print_exception(); + + return PyErr_Format(PyExc_RuntimeError, + "Failed to create Long from code %d.", + (int) code); + } + + buf = NULL; + type = nxt_py_websocket_disconnect_str; + data_key = nxt_py_code_str; + + break; + + default: + nxt_unit_req_alert(ws->req, "Unexpected opcode %d", opcode); + + nxt_unit_websocket_done(frame); + + return PyErr_Format(PyExc_AssertionError, "Unexpected opcode %d", + opcode); + } + + if (buf != NULL) { + fin = frame->header->fin; + buf += nxt_unit_websocket_read(frame, buf, frame->payload_len); + + nxt_unit_websocket_done(frame); + + if (!fin) { + while (!nxt_queue_is_empty(&ws->pending_frames)) { + frame = nxt_py_asgi_websocket_pop_frame(ws); + fin = frame->header->fin; + + buf += nxt_unit_websocket_read(frame, buf, frame->payload_len); + + nxt_unit_websocket_done(frame); + + if (fin) { + break; + } + } + + if (fin_frame != NULL) { + buf += nxt_unit_websocket_read(fin_frame, buf, + fin_frame->payload_len); + nxt_unit_websocket_done(fin_frame); + } + } + + if (opcode == NXT_WEBSOCKET_OP_TEXT) { + buf -= payload_len; + + data = PyUnicode_DecodeUTF8(buf, payload_len, NULL); + + nxt_unit_free(ws->req->ctx, buf); + + if (nxt_slow_path(data == NULL)) { + nxt_unit_req_alert(ws->req, + "Failed to create Unicode for payload (%d).", + (int) payload_len); + nxt_python_print_exception(); + + return PyErr_Format(PyExc_RuntimeError, + "Failed to create Unicode."); + } + } + } + + msg = nxt_py_asgi_new_msg(ws->req, type); + if (nxt_slow_path(msg == NULL)) { + Py_DECREF(data); + return NULL; + } + + if (nxt_slow_path(PyDict_SetItem(msg, data_key, data) == -1)) { + nxt_unit_req_alert(ws->req, "Python failed to set 'msg.data' item"); + + Py_DECREF(msg); + Py_DECREF(data); + + return PyErr_Format(PyExc_RuntimeError, + "Python failed to set 'msg.data' item"); + } + + Py_DECREF(data); + + return msg; +} + + +static uint64_t +nxt_py_asgi_websocket_pending_len(nxt_py_asgi_websocket_t *ws) +{ + uint64_t res; + nxt_py_asgi_penging_frame_t *p; + + res = 0; + + nxt_queue_each(p, &ws->pending_frames, nxt_py_asgi_penging_frame_t, link) { + res += p->frame->payload_len; + + if (p->frame->header->fin) { + nxt_unit_req_debug(ws->req, "asgi_websocket_pending_len: %d", + (int) res); + return res; + } + } nxt_queue_loop; + + nxt_unit_req_debug(ws->req, "asgi_websocket_pending_len: %d (all)", + (int) res); + return res; +} + + +static nxt_unit_websocket_frame_t * +nxt_py_asgi_websocket_pop_frame(nxt_py_asgi_websocket_t *ws) +{ + nxt_queue_link_t *lnk; + nxt_unit_websocket_frame_t *frame; + nxt_py_asgi_penging_frame_t *p; + + lnk = nxt_queue_first(&ws->pending_frames); + nxt_queue_remove(lnk); + + p = nxt_queue_link_data(lnk, nxt_py_asgi_penging_frame_t, link); + + frame = p->frame; + ws->pending_payload_len -= frame->payload_len; + ws->pending_fins -= frame->header->fin; + + nxt_unit_free(frame->req->ctx, p); + + nxt_unit_req_debug(frame->req, "asgi_websocket_pop_frame: " + "%d, %"PRIu64", %d", + frame->header->opcode, frame->payload_len, + frame->header->fin); + + return frame; +} + + +void +nxt_py_asgi_websocket_close_handler(nxt_unit_request_info_t *req) +{ + PyObject *msg, *exc; + nxt_py_asgi_websocket_t *ws; + + ws = req->data; + + nxt_unit_req_debug(req, "asgi_websocket_close_handler"); + + if (ws->receive_future == NULL) { + ws->state = NXT_WS_DISCONNECTED; + + return; + } + + msg = nxt_py_asgi_websocket_disconnect_msg(ws); + if (nxt_slow_path(msg == NULL)) { + exc = PyErr_Occurred(); + Py_INCREF(exc); + + nxt_py_asgi_websocket_receive_fail(ws, exc); + + } else { + nxt_py_asgi_websocket_receive_done(ws, msg); + } +} + + +static PyObject * +nxt_py_asgi_websocket_disconnect_msg(nxt_py_asgi_websocket_t *ws) +{ + PyObject *msg, *code; + + msg = nxt_py_asgi_new_msg(ws->req, nxt_py_websocket_disconnect_str); + if (nxt_slow_path(msg == NULL)) { + return NULL; + } + + code = PyLong_FromLong(NXT_WEBSOCKET_CR_GOING_AWAY); + if (nxt_slow_path(code == NULL)) { + nxt_unit_req_alert(ws->req, "Python failed to create long"); + nxt_python_print_exception(); + + Py_DECREF(msg); + + return PyErr_Format(PyExc_RuntimeError, "failed to create long"); + } + + if (nxt_slow_path(PyDict_SetItem(msg, nxt_py_code_str, code) == -1)) { + nxt_unit_req_alert(ws->req, "Python failed to set 'msg.code' item"); + + Py_DECREF(msg); + Py_DECREF(code); + + return PyErr_Format(PyExc_RuntimeError, + "Python failed to set 'msg.code' item"); + } + + Py_DECREF(code); + + return msg; +} + + +static PyObject * +nxt_py_asgi_websocket_done(PyObject *self, PyObject *future) +{ + int rc; + uint16_t status_code; + PyObject *res; + nxt_py_asgi_websocket_t *ws; + + ws = (nxt_py_asgi_websocket_t *) self; + + nxt_unit_req_debug(ws->req, "asgi_websocket_done: %p", self); + + /* + * Get Future.result() and it raises an exception, if coroutine exited + * with exception. + */ + res = PyObject_CallMethodObjArgs(future, nxt_py_result_str, NULL); + if (nxt_slow_path(res == NULL)) { + nxt_unit_req_error(ws->req, + "Python failed to call 'future.result()'"); + nxt_python_print_exception(); + + rc = NXT_UNIT_ERROR; + + } else { + Py_DECREF(res); + + rc = NXT_UNIT_OK; + } + + if (ws->state == NXT_WS_ACCEPTED) { + status_code = (rc == NXT_UNIT_OK) + ? htons(NXT_WEBSOCKET_CR_NORMAL) + : htons(NXT_WEBSOCKET_CR_INTERNAL_SERVER_ERROR); + + rc = nxt_unit_websocket_send(ws->req, NXT_WEBSOCKET_OP_CLOSE, + 1, &status_code, 2); + } + + while (!nxt_queue_is_empty(&ws->pending_frames)) { + nxt_unit_websocket_done(nxt_py_asgi_websocket_pop_frame(ws)); + } + + nxt_unit_request_done(ws->req, rc); + + Py_RETURN_NONE; +} + + +#endif /* NXT_HAVE_ASGI */ diff --git a/src/python/nxt_python_wsgi.c b/src/python/nxt_python_wsgi.c index 3a5842f1..97030cd3 100644 --- a/src/python/nxt_python_wsgi.c +++ b/src/python/nxt_python_wsgi.c @@ -38,24 +38,6 @@ */ -#if PY_MAJOR_VERSION == 3 -#define NXT_PYTHON_BYTES_TYPE "bytestring" - -#define PyString_FromStringAndSize(str, size) \ - PyUnicode_DecodeLatin1((str), (size), "strict") -#define PyString_AS_STRING PyUnicode_DATA - -#else -#define NXT_PYTHON_BYTES_TYPE "string" - -#define PyBytes_FromStringAndSize PyString_FromStringAndSize -#define PyBytes_Check PyString_Check -#define PyBytes_GET_SIZE PyString_GET_SIZE -#define PyBytes_AS_STRING PyString_AS_STRING -#define PyUnicode_InternInPlace PyString_InternInPlace -#define PyUnicode_AsUTF8 PyString_AS_STRING -#endif - typedef struct nxt_python_run_ctx_s nxt_python_run_ctx_t; typedef struct { |