summaryrefslogtreecommitdiffhomepage
path: root/src/python/nxt_python_asgi.c
diff options
context:
space:
mode:
authorMax Romanov <max.romanov@nginx.com>2020-11-05 00:04:59 +0300
committerMax Romanov <max.romanov@nginx.com>2020-11-05 00:04:59 +0300
commit8dcb0b9987033d0349a6ecf528014a9daa574787 (patch)
tree34a79dc0f21f6b3c76378343cc94682f25c6b417 /src/python/nxt_python_asgi.c
parent4225361f0ea7d230c80209d76fbc67a932651380 (diff)
downloadunit-8dcb0b9987033d0349a6ecf528014a9daa574787.tar.gz
unit-8dcb0b9987033d0349a6ecf528014a9daa574787.tar.bz2
Python: request processing in multiple threads.
This closes #459 issue on GitHub.
Diffstat (limited to '')
-rw-r--r--src/python/nxt_python_asgi.c490
1 files changed, 304 insertions, 186 deletions
diff --git a/src/python/nxt_python_asgi.c b/src/python/nxt_python_asgi.c
index 72408ea1..a188ff56 100644
--- a/src/python/nxt_python_asgi.c
+++ b/src/python/nxt_python_asgi.c
@@ -16,6 +16,13 @@
#include <python/nxt_python_asgi_str.h>
+static int nxt_python_asgi_ctx_data_alloc(void **pdata);
+static void nxt_python_asgi_ctx_data_free(void *data);
+static int nxt_python_asgi_startup(void *data);
+static int nxt_python_asgi_run(nxt_unit_ctx_t *ctx);
+
+static void nxt_py_asgi_remove_reader(nxt_unit_ctx_t *ctx,
+ nxt_unit_port_t *port);
static void nxt_py_asgi_request_handler(nxt_unit_request_info_t *req);
static PyObject *nxt_py_asgi_create_http_scope(nxt_unit_request_info_t *req);
@@ -24,30 +31,32 @@ static PyObject *nxt_py_asgi_create_address(nxt_unit_sptr_t *sptr, uint8_t len,
static PyObject *nxt_py_asgi_create_header(nxt_unit_field_t *f);
static PyObject *nxt_py_asgi_create_subprotocols(nxt_unit_field_t *f);
+static int nxt_python_asgi_ready(nxt_unit_ctx_t *ctx);
+
static int nxt_py_asgi_add_port(nxt_unit_ctx_t *ctx, nxt_unit_port_t *port);
static void nxt_py_asgi_remove_port(nxt_unit_t *lib, nxt_unit_port_t *port);
static void nxt_py_asgi_quit(nxt_unit_ctx_t *ctx);
static void nxt_py_asgi_shm_ack_handler(nxt_unit_ctx_t *ctx);
static PyObject *nxt_py_asgi_port_read(PyObject *self, PyObject *args);
+static void nxt_python_asgi_done(void);
-PyObject *nxt_py_loop_run_until_complete;
-PyObject *nxt_py_loop_create_future;
-PyObject *nxt_py_loop_create_task;
-
-nxt_queue_t nxt_py_asgi_drain_queue;
-
-static PyObject *nxt_py_loop_call_soon;
-static PyObject *nxt_py_quit_future;
-static PyObject *nxt_py_quit_future_set_result;
-static PyObject *nxt_py_loop_add_reader;
-static PyObject *nxt_py_loop_remove_reader;
-static PyObject *nxt_py_port_read;
+static PyObject *nxt_py_port_read;
+static nxt_unit_port_t *nxt_py_shared_port;
static PyMethodDef nxt_py_port_read_method =
{"unit_port_read", nxt_py_asgi_port_read, METH_VARARGS, ""};
+static nxt_python_proto_t nxt_py_asgi_proto = {
+ .ctx_data_alloc = nxt_python_asgi_ctx_data_alloc,
+ .ctx_data_free = nxt_python_asgi_ctx_data_free,
+ .startup = nxt_python_asgi_startup,
+ .run = nxt_python_asgi_run,
+ .ready = nxt_python_asgi_ready,
+ .done = nxt_python_asgi_done,
+};
+
#define NXT_UNIT_HASH_WS_PROTOCOL 0xED0A
@@ -102,202 +111,254 @@ nxt_python_asgi_check(PyObject *obj)
}
-nxt_int_t
-nxt_python_asgi_init(nxt_task_t *task, nxt_unit_init_t *init)
+int
+nxt_python_asgi_init(nxt_unit_init_t *init, nxt_python_proto_t *proto)
{
- PyObject *asyncio, *loop, *get_event_loop;
- nxt_int_t rc;
-
- nxt_debug(task, "asgi_init");
+ nxt_unit_debug(NULL, "asgi_init");
- if (nxt_slow_path(nxt_py_asgi_str_init() != NXT_OK)) {
- nxt_alert(task, "Python failed to init string objects");
- return NXT_ERROR;
+ if (nxt_slow_path(nxt_py_asgi_str_init() != NXT_UNIT_OK)) {
+ nxt_unit_alert(NULL, "Python failed to init string objects");
+ return NXT_UNIT_ERROR;
}
- asyncio = PyImport_ImportModule("asyncio");
- if (nxt_slow_path(asyncio == NULL)) {
- nxt_alert(task, "Python failed to import module 'asyncio'");
- nxt_python_print_exception();
- return NXT_ERROR;
+ nxt_py_port_read = PyCFunction_New(&nxt_py_port_read_method, NULL);
+ if (nxt_slow_path(nxt_py_port_read == NULL)) {
+ nxt_unit_alert(NULL,
+ "Python failed to initialize the 'port_read' function");
+ return NXT_UNIT_ERROR;
}
- loop = NULL;
- get_event_loop = PyDict_GetItemString(PyModule_GetDict(asyncio),
- "get_event_loop");
- if (nxt_slow_path(get_event_loop == NULL)) {
- nxt_alert(task,
- "Python failed to get 'get_event_loop' from module 'asyncio'");
- goto fail;
+ if (nxt_slow_path(nxt_py_asgi_http_init() == NXT_UNIT_ERROR)) {
+ return NXT_UNIT_ERROR;
}
- if (nxt_slow_path(PyCallable_Check(get_event_loop) == 0)) {
- nxt_alert(task, "'asyncio.get_event_loop' is not a callable object");
- goto fail;
+ if (nxt_slow_path(nxt_py_asgi_websocket_init() == NXT_UNIT_ERROR)) {
+ return NXT_UNIT_ERROR;
}
- loop = PyObject_CallObject(get_event_loop, NULL);
- if (nxt_slow_path(loop == NULL)) {
- nxt_alert(task, "Python failed to call 'asyncio.get_event_loop'");
- goto fail;
- }
+ init->callbacks.request_handler = nxt_py_asgi_request_handler;
+ init->callbacks.data_handler = nxt_py_asgi_http_data_handler;
+ init->callbacks.websocket_handler = nxt_py_asgi_websocket_handler;
+ init->callbacks.close_handler = nxt_py_asgi_websocket_close_handler;
+ init->callbacks.quit = nxt_py_asgi_quit;
+ init->callbacks.shm_ack_handler = nxt_py_asgi_shm_ack_handler;
+ init->callbacks.add_port = nxt_py_asgi_add_port;
+ init->callbacks.remove_port = nxt_py_asgi_remove_port;
- nxt_py_loop_create_task = PyObject_GetAttrString(loop, "create_task");
- if (nxt_slow_path(nxt_py_loop_create_task == NULL)) {
- nxt_alert(task, "Python failed to get 'loop.create_task'");
- goto fail;
- }
+ *proto = nxt_py_asgi_proto;
- if (nxt_slow_path(PyCallable_Check(nxt_py_loop_create_task) == 0)) {
- nxt_alert(task, "'loop.create_task' is not a callable object");
- goto fail;
- }
+ return NXT_UNIT_OK;
+}
- nxt_py_loop_add_reader = PyObject_GetAttrString(loop, "add_reader");
- if (nxt_slow_path(nxt_py_loop_add_reader == NULL)) {
- nxt_alert(task, "Python failed to get 'loop.add_reader'");
- goto fail;
- }
- if (nxt_slow_path(PyCallable_Check(nxt_py_loop_add_reader) == 0)) {
- nxt_alert(task, "'loop.add_reader' is not a callable object");
- goto fail;
- }
+static int
+nxt_python_asgi_ctx_data_alloc(void **pdata)
+{
+ uint32_t i;
+ PyObject *asyncio, *loop, *new_event_loop, *obj;
+ nxt_py_asgi_ctx_data_t *ctx_data;
- nxt_py_loop_remove_reader = PyObject_GetAttrString(loop, "remove_reader");
- if (nxt_slow_path(nxt_py_loop_remove_reader == NULL)) {
- nxt_alert(task, "Python failed to get 'loop.remove_reader'");
- goto fail;
+ ctx_data = nxt_unit_malloc(NULL, sizeof(nxt_py_asgi_ctx_data_t));
+ if (nxt_slow_path(ctx_data == NULL)) {
+ nxt_unit_alert(NULL, "Failed to allocate context data");
+ return NXT_UNIT_ERROR;
}
- if (nxt_slow_path(PyCallable_Check(nxt_py_loop_remove_reader) == 0)) {
- nxt_alert(task, "'loop.remove_reader' is not a callable object");
- goto fail;
- }
+ memset(ctx_data, 0, sizeof(nxt_py_asgi_ctx_data_t));
- nxt_py_loop_call_soon = PyObject_GetAttrString(loop, "call_soon");
- if (nxt_slow_path(nxt_py_loop_call_soon == NULL)) {
- nxt_alert(task, "Python failed to get 'loop.call_soon'");
- goto fail;
- }
+ nxt_queue_init(&ctx_data->drain_queue);
- if (nxt_slow_path(PyCallable_Check(nxt_py_loop_call_soon) == 0)) {
- nxt_alert(task, "'loop.call_soon' is not a callable object");
- goto fail;
- }
+ struct {
+ const char *key;
+ PyObject **handler;
- nxt_py_loop_run_until_complete = PyObject_GetAttrString(loop,
- "run_until_complete");
- if (nxt_slow_path(nxt_py_loop_run_until_complete == NULL)) {
- nxt_alert(task, "Python failed to get 'loop.run_until_complete'");
- goto fail;
- }
+ } handlers[] = {
+ { "create_task", &ctx_data->loop_create_task },
+ { "add_reader", &ctx_data->loop_add_reader },
+ { "remove_reader", &ctx_data->loop_remove_reader },
+ { "call_soon", &ctx_data->loop_call_soon },
+ { "run_until_complete", &ctx_data->loop_run_until_complete },
+ { "create_future", &ctx_data->loop_create_future },
+ };
- if (nxt_slow_path(PyCallable_Check(nxt_py_loop_run_until_complete) == 0)) {
- nxt_alert(task, "'loop.run_until_complete' is not a callable object");
- goto fail;
- }
+ loop = NULL;
- nxt_py_loop_create_future = PyObject_GetAttrString(loop, "create_future");
- if (nxt_slow_path(nxt_py_loop_create_future == NULL)) {
- nxt_alert(task, "Python failed to get 'loop.create_future'");
+ asyncio = PyImport_ImportModule("asyncio");
+ if (nxt_slow_path(asyncio == NULL)) {
+ nxt_unit_alert(NULL, "Python failed to import module 'asyncio'");
+ nxt_python_print_exception();
goto fail;
}
- if (nxt_slow_path(PyCallable_Check(nxt_py_loop_create_future) == 0)) {
- nxt_alert(task, "'loop.create_future' is not a callable object");
+ new_event_loop = PyDict_GetItemString(PyModule_GetDict(asyncio),
+ "new_event_loop");
+ if (nxt_slow_path(new_event_loop == NULL)) {
+ nxt_unit_alert(NULL,
+ "Python failed to get 'new_event_loop' from module 'asyncio'");
goto fail;
}
- nxt_py_quit_future = PyObject_CallObject(nxt_py_loop_create_future, NULL);
- if (nxt_slow_path(nxt_py_quit_future == NULL)) {
- nxt_alert(task, "Python failed to create Future ");
- nxt_python_print_exception();
+ if (nxt_slow_path(PyCallable_Check(new_event_loop) == 0)) {
+ nxt_unit_alert(NULL,
+ "'asyncio.new_event_loop' is not a callable object");
goto fail;
}
- nxt_py_quit_future_set_result = PyObject_GetAttrString(nxt_py_quit_future,
- "set_result");
- if (nxt_slow_path(nxt_py_quit_future_set_result == NULL)) {
- nxt_alert(task, "Python failed to get 'future.set_result'");
+ loop = PyObject_CallObject(new_event_loop, NULL);
+ if (nxt_slow_path(loop == NULL)) {
+ nxt_unit_alert(NULL, "Python failed to call 'asyncio.new_event_loop'");
goto fail;
}
- if (nxt_slow_path(PyCallable_Check(nxt_py_quit_future_set_result) == 0)) {
- nxt_alert(task, "'future.set_result' is not a callable object");
- goto fail;
+ for (i = 0; i < nxt_nitems(handlers); i++) {
+ obj = PyObject_GetAttrString(loop, handlers[i].key);
+ if (nxt_slow_path(obj == NULL)) {
+ nxt_unit_alert(NULL, "Python failed to get 'loop.%s'",
+ handlers[i].key);
+ goto fail;
+ }
+
+ *handlers[i].handler = obj;
+
+ if (nxt_slow_path(PyCallable_Check(obj) == 0)) {
+ nxt_unit_alert(NULL, "'loop.%s' is not a callable object",
+ handlers[i].key);
+ goto fail;
+ }
}
- nxt_py_port_read = PyCFunction_New(&nxt_py_port_read_method, NULL);
- if (nxt_slow_path(nxt_py_port_read == NULL)) {
- nxt_alert(task, "Python failed to initialize the 'port_read' function");
+ obj = PyObject_CallObject(ctx_data->loop_create_future, NULL);
+ if (nxt_slow_path(obj == NULL)) {
+ nxt_unit_alert(NULL, "Python failed to create Future ");
+ nxt_python_print_exception();
goto fail;
}
- nxt_queue_init(&nxt_py_asgi_drain_queue);
+ ctx_data->quit_future = obj;
- if (nxt_slow_path(nxt_py_asgi_http_init(task) == NXT_ERROR)) {
+ obj = PyObject_GetAttrString(ctx_data->quit_future, "set_result");
+ if (nxt_slow_path(obj == NULL)) {
+ nxt_unit_alert(NULL, "Python failed to get 'future.set_result'");
goto fail;
}
- if (nxt_slow_path(nxt_py_asgi_websocket_init(task) == NXT_ERROR)) {
- goto fail;
- }
+ ctx_data->quit_future_set_result = obj;
- rc = nxt_py_asgi_lifespan_startup(task);
- if (nxt_slow_path(rc == NXT_ERROR)) {
+ if (nxt_slow_path(PyCallable_Check(obj) == 0)) {
+ nxt_unit_alert(NULL, "'future.set_result' is not a callable object");
goto fail;
}
- init->callbacks.request_handler = nxt_py_asgi_request_handler;
- init->callbacks.data_handler = nxt_py_asgi_http_data_handler;
- init->callbacks.websocket_handler = nxt_py_asgi_websocket_handler;
- init->callbacks.close_handler = nxt_py_asgi_websocket_close_handler;
- init->callbacks.quit = nxt_py_asgi_quit;
- init->callbacks.shm_ack_handler = nxt_py_asgi_shm_ack_handler;
- init->callbacks.add_port = nxt_py_asgi_add_port;
- init->callbacks.remove_port = nxt_py_asgi_remove_port;
-
Py_DECREF(loop);
Py_DECREF(asyncio);
- return NXT_OK;
+ *pdata = ctx_data;
+
+ return NXT_UNIT_OK;
fail:
+ nxt_python_asgi_ctx_data_free(ctx_data);
+
Py_XDECREF(loop);
- Py_DECREF(asyncio);
+ Py_XDECREF(asyncio);
+
+ return NXT_UNIT_ERROR;
+}
+
- return NXT_ERROR;
+static void
+nxt_python_asgi_ctx_data_free(void *data)
+{
+ nxt_py_asgi_ctx_data_t *ctx_data;
+
+ ctx_data = data;
+
+ Py_XDECREF(ctx_data->loop_run_until_complete);
+ Py_XDECREF(ctx_data->loop_create_future);
+ Py_XDECREF(ctx_data->loop_create_task);
+ Py_XDECREF(ctx_data->loop_call_soon);
+ Py_XDECREF(ctx_data->loop_add_reader);
+ Py_XDECREF(ctx_data->loop_remove_reader);
+ Py_XDECREF(ctx_data->quit_future);
+ Py_XDECREF(ctx_data->quit_future_set_result);
+
+ nxt_unit_free(NULL, ctx_data);
}
-nxt_int_t
+static int
+nxt_python_asgi_startup(void *data)
+{
+ return nxt_py_asgi_lifespan_startup(data);
+}
+
+
+static int
nxt_python_asgi_run(nxt_unit_ctx_t *ctx)
{
- PyObject *res;
+ PyObject *res;
+ nxt_py_asgi_ctx_data_t *ctx_data;
+
+ ctx_data = ctx->data;
- res = PyObject_CallFunctionObjArgs(nxt_py_loop_run_until_complete,
- nxt_py_quit_future, NULL);
+ res = PyObject_CallFunctionObjArgs(ctx_data->loop_run_until_complete,
+ ctx_data->quit_future, NULL);
if (nxt_slow_path(res == NULL)) {
nxt_unit_alert(ctx, "Python failed to call loop.run_until_complete");
nxt_python_print_exception();
- return NXT_ERROR;
+ return NXT_UNIT_ERROR;
}
Py_DECREF(res);
- nxt_py_asgi_lifespan_shutdown();
+ nxt_py_asgi_remove_reader(ctx, nxt_py_shared_port);
+ nxt_py_asgi_remove_reader(ctx, ctx_data->port);
- return NXT_OK;
+ if (ctx_data->port != NULL) {
+ ctx_data->port->data = NULL;
+ ctx_data->port = NULL;
+ }
+
+ nxt_py_asgi_lifespan_shutdown(ctx);
+
+ return NXT_UNIT_OK;
+}
+
+
+static void
+nxt_py_asgi_remove_reader(nxt_unit_ctx_t *ctx, nxt_unit_port_t *port)
+{
+ PyObject *res;
+ nxt_py_asgi_ctx_data_t *ctx_data;
+
+ if (port == NULL || port->in_fd == -1) {
+ return;
+ }
+
+ ctx_data = ctx->data;
+
+ nxt_unit_debug(ctx, "asgi_remove_reader %d %p", port->in_fd, port);
+
+ res = PyObject_CallFunctionObjArgs(ctx_data->loop_remove_reader,
+ PyLong_FromLong(port->in_fd), NULL);
+ if (nxt_slow_path(res == NULL)) {
+ nxt_unit_alert(ctx, "Python failed to remove_reader");
+ nxt_python_print_exception();
+
+ return;
+ }
+
+ Py_DECREF(res);
}
static void
nxt_py_asgi_request_handler(nxt_unit_request_info_t *req)
{
- PyObject *scope, *res, *task, *receive, *send, *done, *asgi;
+ PyObject *scope, *res, *task, *receive, *send, *done, *asgi;
+ nxt_py_asgi_ctx_data_t *ctx_data;
if (req->request->websocket_handshake) {
asgi = nxt_py_asgi_websocket_create(req);
@@ -365,7 +426,9 @@ nxt_py_asgi_request_handler(nxt_unit_request_info_t *req)
goto release_scope;
}
- task = PyObject_CallFunctionObjArgs(nxt_py_loop_create_task, res, NULL);
+ ctx_data = req->ctx->data;
+
+ task = PyObject_CallFunctionObjArgs(ctx_data->loop_create_task, res, NULL);
if (nxt_slow_path(task == NULL)) {
nxt_unit_req_error(req, "Python failed to call the create_task");
nxt_python_print_exception();
@@ -724,10 +787,46 @@ fail:
static int
+nxt_python_asgi_ready(nxt_unit_ctx_t *ctx)
+{
+ PyObject *res;
+ nxt_unit_port_t *port;
+ nxt_py_asgi_ctx_data_t *ctx_data;
+
+ if (nxt_slow_path(nxt_py_shared_port == NULL)) {
+ return NXT_UNIT_ERROR;
+ }
+
+ port = nxt_py_shared_port;
+
+ nxt_unit_debug(ctx, "asgi_ready %d %p %p", port->in_fd, ctx, port);
+
+ ctx_data = ctx->data;
+
+ res = PyObject_CallFunctionObjArgs(ctx_data->loop_add_reader,
+ PyLong_FromLong(port->in_fd),
+ nxt_py_port_read,
+ PyLong_FromVoidPtr(ctx),
+ PyLong_FromVoidPtr(port), NULL);
+ if (nxt_slow_path(res == NULL)) {
+ nxt_unit_alert(ctx, "Python failed to add_reader");
+ nxt_python_print_exception();
+
+ return NXT_UNIT_ERROR;
+ }
+
+ Py_DECREF(res);
+
+ return NXT_UNIT_OK;
+}
+
+
+static int
nxt_py_asgi_add_port(nxt_unit_ctx_t *ctx, nxt_unit_port_t *port)
{
- int nb;
- PyObject *res;
+ int nb;
+ PyObject *res;
+ nxt_py_asgi_ctx_data_t *ctx_data;
if (port->in_fd == -1) {
return NXT_UNIT_OK;
@@ -744,13 +843,25 @@ nxt_py_asgi_add_port(nxt_unit_ctx_t *ctx, nxt_unit_port_t *port)
nxt_unit_debug(ctx, "asgi_add_port %d %p %p", port->in_fd, ctx, port);
- res = PyObject_CallFunctionObjArgs(nxt_py_loop_add_reader,
+ if (port->id.id == NXT_UNIT_SHARED_PORT_ID) {
+ nxt_py_shared_port = port;
+
+ return NXT_UNIT_OK;
+ }
+
+ ctx_data = ctx->data;
+
+ ctx_data->port = port;
+ port->data = ctx_data;
+
+ res = PyObject_CallFunctionObjArgs(ctx_data->loop_add_reader,
PyLong_FromLong(port->in_fd),
nxt_py_port_read,
PyLong_FromVoidPtr(ctx),
PyLong_FromVoidPtr(port), NULL);
if (nxt_slow_path(res == NULL)) {
nxt_unit_alert(ctx, "Python failed to add_reader");
+ nxt_python_print_exception();
return NXT_UNIT_ERROR;
}
@@ -764,53 +875,67 @@ nxt_py_asgi_add_port(nxt_unit_ctx_t *ctx, nxt_unit_port_t *port)
static void
nxt_py_asgi_remove_port(nxt_unit_t *lib, nxt_unit_port_t *port)
{
- PyObject *res;
-
- nxt_unit_debug(NULL, "asgi_remove_port %d %p", port->in_fd, port);
-
if (port->in_fd == -1) {
return;
}
- res = PyObject_CallFunctionObjArgs(nxt_py_loop_remove_reader,
- PyLong_FromLong(port->in_fd), NULL);
- if (nxt_slow_path(res == NULL)) {
- nxt_unit_alert(NULL, "Python failed to remove_reader");
- }
+ nxt_unit_debug(NULL, "asgi_remove_port %d %p", port->in_fd, port);
- Py_DECREF(res);
+ if (nxt_py_shared_port == port) {
+ nxt_py_shared_port = NULL;
+ }
}
static void
nxt_py_asgi_quit(nxt_unit_ctx_t *ctx)
{
- PyObject *res;
+ PyObject *res;
+ nxt_py_asgi_ctx_data_t *ctx_data;
nxt_unit_debug(ctx, "asgi_quit %p", ctx);
- res = PyObject_CallFunctionObjArgs(nxt_py_quit_future_set_result,
+ ctx_data = ctx->data;
+
+ if (nxt_py_shared_port != NULL) {
+ res = PyObject_CallFunctionObjArgs(ctx_data->loop_remove_reader,
+ PyLong_FromLong(nxt_py_shared_port->in_fd), NULL);
+ if (nxt_slow_path(res == NULL)) {
+ nxt_unit_alert(NULL, "Python failed to remove_reader");
+ nxt_python_print_exception();
+
+ } else {
+ Py_DECREF(res);
+ }
+ }
+
+ res = PyObject_CallFunctionObjArgs(ctx_data->quit_future_set_result,
PyLong_FromLong(0), NULL);
if (nxt_slow_path(res == NULL)) {
nxt_unit_alert(ctx, "Python failed to set_result");
- }
+ nxt_python_print_exception();
- Py_DECREF(res);
+ } else {
+ Py_DECREF(res);
+ }
}
static void
nxt_py_asgi_shm_ack_handler(nxt_unit_ctx_t *ctx)
{
- int rc;
- nxt_queue_link_t *lnk;
+ int rc;
+ nxt_queue_link_t *lnk;
+ nxt_py_asgi_ctx_data_t *ctx_data;
+
+ ctx_data = ctx->data;
- while (!nxt_queue_is_empty(&nxt_py_asgi_drain_queue)) {
- lnk = nxt_queue_first(&nxt_py_asgi_drain_queue);
+ while (!nxt_queue_is_empty(&ctx_data->drain_queue)) {
+ lnk = nxt_queue_first(&ctx_data->drain_queue);
rc = nxt_py_asgi_http_drain(lnk);
if (rc == NXT_UNIT_AGAIN) {
- break;
+ return;
}
nxt_queue_remove(lnk);
@@ -859,7 +984,7 @@ nxt_py_asgi_port_read(PyObject *self, PyObject *args)
if (nxt_slow_path(rc == NXT_UNIT_ERROR)) {
return PyErr_Format(PyExc_RuntimeError,
- "error processing port message");
+ "error processing port %d message", port->id.id);
}
Py_RETURN_NONE;
@@ -996,8 +1121,8 @@ nxt_py_asgi_add_field(void *data, int i, PyObject *name, PyObject *val)
PyObject *
-nxt_py_asgi_set_result_soon(nxt_unit_request_info_t *req, PyObject *future,
- PyObject *result)
+nxt_py_asgi_set_result_soon(nxt_unit_request_info_t *req,
+ nxt_py_asgi_ctx_data_t *ctx_data, PyObject *future, PyObject *result)
{
PyObject *set_result, *res;
@@ -1013,7 +1138,7 @@ nxt_py_asgi_set_result_soon(nxt_unit_request_info_t *req, PyObject *future,
Py_CLEAR(future);
- goto cleanup;
+ goto cleanup_result;
}
if (nxt_slow_path(PyCallable_Check(set_result) == 0)) {
@@ -1024,7 +1149,7 @@ nxt_py_asgi_set_result_soon(nxt_unit_request_info_t *req, PyObject *future,
goto cleanup;
}
- res = PyObject_CallFunctionObjArgs(nxt_py_loop_call_soon, set_result,
+ res = PyObject_CallFunctionObjArgs(ctx_data->loop_call_soon, set_result,
result, NULL);
if (nxt_slow_path(res == NULL)) {
nxt_unit_req_alert(req, "Python failed to call 'loop.call_soon'");
@@ -1038,6 +1163,9 @@ nxt_py_asgi_set_result_soon(nxt_unit_request_info_t *req, PyObject *future,
cleanup:
Py_DECREF(set_result);
+
+cleanup_result:
+
Py_DECREF(result);
return future;
@@ -1148,6 +1276,17 @@ nxt_py_asgi_new_scope(nxt_unit_request_info_t *req, PyObject *type,
void
+nxt_py_asgi_drain_wait(nxt_unit_request_info_t *req, nxt_queue_link_t *link)
+{
+ nxt_py_asgi_ctx_data_t *ctx_data;
+
+ ctx_data = req->ctx->data;
+
+ nxt_queue_insert_tail(&ctx_data->drain_queue, link);
+}
+
+
+void
nxt_py_asgi_dealloc(PyObject *self)
{
PyObject_Del(self);
@@ -1177,19 +1316,11 @@ nxt_py_asgi_next(PyObject *self)
}
-void
+static void
nxt_python_asgi_done(void)
{
nxt_py_asgi_str_done();
- Py_XDECREF(nxt_py_quit_future);
- Py_XDECREF(nxt_py_quit_future_set_result);
- Py_XDECREF(nxt_py_loop_run_until_complete);
- Py_XDECREF(nxt_py_loop_create_future);
- Py_XDECREF(nxt_py_loop_create_task);
- Py_XDECREF(nxt_py_loop_call_soon);
- Py_XDECREF(nxt_py_loop_add_reader);
- Py_XDECREF(nxt_py_loop_remove_reader);
Py_XDECREF(nxt_py_port_read);
}
@@ -1203,25 +1334,12 @@ nxt_python_asgi_check(PyObject *obj)
}
-nxt_int_t
-nxt_python_asgi_init(nxt_task_t *task, nxt_unit_init_t *init)
-{
- nxt_alert(task, "ASGI not implemented");
- return NXT_ERROR;
-}
-
-
-nxt_int_t
-nxt_python_asgi_run(nxt_unit_ctx_t *ctx)
+int
+nxt_python_asgi_init(nxt_unit_init_t *init, nxt_python_proto_t *proto)
{
- nxt_unit_alert(ctx, "ASGI not implemented");
- return NXT_ERROR;
+ nxt_unit_alert(NULL, "ASGI not implemented");
+ return NXT_UNIT_ERROR;
}
-void
-nxt_python_asgi_done(void)
-{
-}
-
#endif /* NXT_HAVE_ASGI */