summaryrefslogtreecommitdiffhomepage
path: root/src/python/nxt_python_asgi.c
diff options
context:
space:
mode:
Diffstat (limited to 'src/python/nxt_python_asgi.c')
-rw-r--r--src/python/nxt_python_asgi.c733
1 files changed, 518 insertions, 215 deletions
diff --git a/src/python/nxt_python_asgi.c b/src/python/nxt_python_asgi.c
index 72408ea1..98aeedf4 100644
--- a/src/python/nxt_python_asgi.c
+++ b/src/python/nxt_python_asgi.c
@@ -16,7 +16,16 @@
#include <python/nxt_python_asgi_str.h>
+static PyObject *nxt_python_asgi_get_func(PyObject *obj);
+static int nxt_python_asgi_ctx_data_alloc(void **pdata);
+static void nxt_python_asgi_ctx_data_free(void *data);
+static int nxt_python_asgi_startup(void *data);
+static int nxt_python_asgi_run(nxt_unit_ctx_t *ctx);
+
+static void nxt_py_asgi_remove_reader(nxt_unit_ctx_t *ctx,
+ nxt_unit_port_t *port);
static void nxt_py_asgi_request_handler(nxt_unit_request_info_t *req);
+static void nxt_py_asgi_close_handler(nxt_unit_request_info_t *req);
static PyObject *nxt_py_asgi_create_http_scope(nxt_unit_request_info_t *req);
static PyObject *nxt_py_asgi_create_address(nxt_unit_sptr_t *sptr, uint8_t len,
@@ -24,30 +33,33 @@ static PyObject *nxt_py_asgi_create_address(nxt_unit_sptr_t *sptr, uint8_t len,
static PyObject *nxt_py_asgi_create_header(nxt_unit_field_t *f);
static PyObject *nxt_py_asgi_create_subprotocols(nxt_unit_field_t *f);
+static int nxt_python_asgi_ready(nxt_unit_ctx_t *ctx);
+
static int nxt_py_asgi_add_port(nxt_unit_ctx_t *ctx, nxt_unit_port_t *port);
static void nxt_py_asgi_remove_port(nxt_unit_t *lib, nxt_unit_port_t *port);
static void nxt_py_asgi_quit(nxt_unit_ctx_t *ctx);
static void nxt_py_asgi_shm_ack_handler(nxt_unit_ctx_t *ctx);
static PyObject *nxt_py_asgi_port_read(PyObject *self, PyObject *args);
+static void nxt_python_asgi_done(void);
-PyObject *nxt_py_loop_run_until_complete;
-PyObject *nxt_py_loop_create_future;
-PyObject *nxt_py_loop_create_task;
-
-nxt_queue_t nxt_py_asgi_drain_queue;
-
-static PyObject *nxt_py_loop_call_soon;
-static PyObject *nxt_py_quit_future;
-static PyObject *nxt_py_quit_future_set_result;
-static PyObject *nxt_py_loop_add_reader;
-static PyObject *nxt_py_loop_remove_reader;
-static PyObject *nxt_py_port_read;
+int nxt_py_asgi_legacy;
+static PyObject *nxt_py_port_read;
+static nxt_unit_port_t *nxt_py_shared_port;
static PyMethodDef nxt_py_port_read_method =
{"unit_port_read", nxt_py_asgi_port_read, METH_VARARGS, ""};
+static nxt_python_proto_t nxt_py_asgi_proto = {
+ .ctx_data_alloc = nxt_python_asgi_ctx_data_alloc,
+ .ctx_data_free = nxt_python_asgi_ctx_data_free,
+ .startup = nxt_python_asgi_startup,
+ .run = nxt_python_asgi_run,
+ .ready = nxt_python_asgi_ready,
+ .done = nxt_python_asgi_done,
+};
+
#define NXT_UNIT_HASH_WS_PROTOCOL 0xED0A
@@ -55,249 +67,348 @@ int
nxt_python_asgi_check(PyObject *obj)
{
int res;
- PyObject *call;
+ PyObject *func;
PyCodeObject *code;
- if (PyFunction_Check(obj)) {
- code = (PyCodeObject *) PyFunction_GET_CODE(obj);
+ func = nxt_python_asgi_get_func(obj);
+
+ if (func == NULL) {
+ return 0;
+ }
+
+ code = (PyCodeObject *) PyFunction_GET_CODE(func);
+
+ nxt_unit_debug(NULL, "asgi_check: callable is %sa coroutine function with "
+ "%d argument(s)",
+ (code->co_flags & CO_COROUTINE) != 0 ? "" : "not ",
+ code->co_argcount);
+
+ res = (code->co_flags & CO_COROUTINE) != 0 || code->co_argcount == 1;
+
+ Py_DECREF(func);
+
+ return res;
+}
+
+
+static PyObject *
+nxt_python_asgi_get_func(PyObject *obj)
+{
+ PyObject *call;
- return (code->co_flags & CO_COROUTINE) != 0;
+ if (PyFunction_Check(obj)) {
+ Py_INCREF(obj);
+ return obj;
}
if (PyMethod_Check(obj)) {
obj = PyMethod_GET_FUNCTION(obj);
- code = (PyCodeObject *) PyFunction_GET_CODE(obj);
-
- return (code->co_flags & CO_COROUTINE) != 0;
+ Py_INCREF(obj);
+ return obj;
}
call = PyObject_GetAttrString(obj, "__call__");
if (call == NULL) {
- return 0;
+ return NULL;
}
if (PyFunction_Check(call)) {
- code = (PyCodeObject *) PyFunction_GET_CODE(call);
-
- res = (code->co_flags & CO_COROUTINE) != 0;
-
- } else {
- if (PyMethod_Check(call)) {
- obj = PyMethod_GET_FUNCTION(call);
+ return call;
+ }
- code = (PyCodeObject *) PyFunction_GET_CODE(obj);
+ if (PyMethod_Check(call)) {
+ obj = PyMethod_GET_FUNCTION(call);
- res = (code->co_flags & CO_COROUTINE) != 0;
+ Py_INCREF(obj);
+ Py_DECREF(call);
- } else {
- res = 0;
- }
+ return obj;
}
Py_DECREF(call);
- return res;
+ return NULL;
}
-nxt_int_t
-nxt_python_asgi_init(nxt_task_t *task, nxt_unit_init_t *init)
+int
+nxt_python_asgi_init(nxt_unit_init_t *init, nxt_python_proto_t *proto)
{
- PyObject *asyncio, *loop, *get_event_loop;
- nxt_int_t rc;
+ PyObject *func;
+ PyCodeObject *code;
- nxt_debug(task, "asgi_init");
+ nxt_unit_debug(NULL, "asgi_init");
- if (nxt_slow_path(nxt_py_asgi_str_init() != NXT_OK)) {
- nxt_alert(task, "Python failed to init string objects");
- return NXT_ERROR;
+ if (nxt_slow_path(nxt_py_asgi_str_init() != NXT_UNIT_OK)) {
+ nxt_unit_alert(NULL, "Python failed to init string objects");
+ return NXT_UNIT_ERROR;
}
- asyncio = PyImport_ImportModule("asyncio");
- if (nxt_slow_path(asyncio == NULL)) {
- nxt_alert(task, "Python failed to import module 'asyncio'");
- nxt_python_print_exception();
- return NXT_ERROR;
+ nxt_py_port_read = PyCFunction_New(&nxt_py_port_read_method, NULL);
+ if (nxt_slow_path(nxt_py_port_read == NULL)) {
+ nxt_unit_alert(NULL,
+ "Python failed to initialize the 'port_read' function");
+ return NXT_UNIT_ERROR;
}
- loop = NULL;
- get_event_loop = PyDict_GetItemString(PyModule_GetDict(asyncio),
- "get_event_loop");
- if (nxt_slow_path(get_event_loop == NULL)) {
- nxt_alert(task,
- "Python failed to get 'get_event_loop' from module 'asyncio'");
- goto fail;
+ if (nxt_slow_path(nxt_py_asgi_http_init() == NXT_UNIT_ERROR)) {
+ return NXT_UNIT_ERROR;
}
- if (nxt_slow_path(PyCallable_Check(get_event_loop) == 0)) {
- nxt_alert(task, "'asyncio.get_event_loop' is not a callable object");
- goto fail;
+ if (nxt_slow_path(nxt_py_asgi_websocket_init() == NXT_UNIT_ERROR)) {
+ return NXT_UNIT_ERROR;
}
- loop = PyObject_CallObject(get_event_loop, NULL);
- if (nxt_slow_path(loop == NULL)) {
- nxt_alert(task, "Python failed to call 'asyncio.get_event_loop'");
- goto fail;
+ func = nxt_python_asgi_get_func(nxt_py_application);
+ if (nxt_slow_path(func == NULL)) {
+ nxt_unit_alert(NULL, "Python cannot find function for callable");
+ return NXT_UNIT_ERROR;
}
- nxt_py_loop_create_task = PyObject_GetAttrString(loop, "create_task");
- if (nxt_slow_path(nxt_py_loop_create_task == NULL)) {
- nxt_alert(task, "Python failed to get 'loop.create_task'");
- goto fail;
- }
+ code = (PyCodeObject *) PyFunction_GET_CODE(func);
- if (nxt_slow_path(PyCallable_Check(nxt_py_loop_create_task) == 0)) {
- nxt_alert(task, "'loop.create_task' is not a callable object");
- goto fail;
+ if ((code->co_flags & CO_COROUTINE) == 0) {
+ nxt_unit_debug(NULL, "asgi: callable is not a coroutine function "
+ "switching to legacy mode");
+ nxt_py_asgi_legacy = 1;
}
- nxt_py_loop_add_reader = PyObject_GetAttrString(loop, "add_reader");
- if (nxt_slow_path(nxt_py_loop_add_reader == NULL)) {
- nxt_alert(task, "Python failed to get 'loop.add_reader'");
- goto fail;
- }
+ Py_DECREF(func);
- if (nxt_slow_path(PyCallable_Check(nxt_py_loop_add_reader) == 0)) {
- nxt_alert(task, "'loop.add_reader' is not a callable object");
- goto fail;
- }
+ init->callbacks.request_handler = nxt_py_asgi_request_handler;
+ init->callbacks.data_handler = nxt_py_asgi_http_data_handler;
+ init->callbacks.websocket_handler = nxt_py_asgi_websocket_handler;
+ init->callbacks.close_handler = nxt_py_asgi_close_handler;
+ init->callbacks.quit = nxt_py_asgi_quit;
+ init->callbacks.shm_ack_handler = nxt_py_asgi_shm_ack_handler;
+ init->callbacks.add_port = nxt_py_asgi_add_port;
+ init->callbacks.remove_port = nxt_py_asgi_remove_port;
- nxt_py_loop_remove_reader = PyObject_GetAttrString(loop, "remove_reader");
- if (nxt_slow_path(nxt_py_loop_remove_reader == NULL)) {
- nxt_alert(task, "Python failed to get 'loop.remove_reader'");
- goto fail;
- }
+ *proto = nxt_py_asgi_proto;
- if (nxt_slow_path(PyCallable_Check(nxt_py_loop_remove_reader) == 0)) {
- nxt_alert(task, "'loop.remove_reader' is not a callable object");
- goto fail;
- }
+ return NXT_UNIT_OK;
+}
- nxt_py_loop_call_soon = PyObject_GetAttrString(loop, "call_soon");
- if (nxt_slow_path(nxt_py_loop_call_soon == NULL)) {
- nxt_alert(task, "Python failed to get 'loop.call_soon'");
- goto fail;
- }
- if (nxt_slow_path(PyCallable_Check(nxt_py_loop_call_soon) == 0)) {
- nxt_alert(task, "'loop.call_soon' is not a callable object");
- goto fail;
- }
+static int
+nxt_python_asgi_ctx_data_alloc(void **pdata)
+{
+ uint32_t i;
+ PyObject *asyncio, *loop, *new_event_loop, *obj;
+ nxt_py_asgi_ctx_data_t *ctx_data;
- nxt_py_loop_run_until_complete = PyObject_GetAttrString(loop,
- "run_until_complete");
- if (nxt_slow_path(nxt_py_loop_run_until_complete == NULL)) {
- nxt_alert(task, "Python failed to get 'loop.run_until_complete'");
- goto fail;
+ ctx_data = nxt_unit_malloc(NULL, sizeof(nxt_py_asgi_ctx_data_t));
+ if (nxt_slow_path(ctx_data == NULL)) {
+ nxt_unit_alert(NULL, "Failed to allocate context data");
+ return NXT_UNIT_ERROR;
}
- if (nxt_slow_path(PyCallable_Check(nxt_py_loop_run_until_complete) == 0)) {
- nxt_alert(task, "'loop.run_until_complete' is not a callable object");
- goto fail;
- }
+ memset(ctx_data, 0, sizeof(nxt_py_asgi_ctx_data_t));
- nxt_py_loop_create_future = PyObject_GetAttrString(loop, "create_future");
- if (nxt_slow_path(nxt_py_loop_create_future == NULL)) {
- nxt_alert(task, "Python failed to get 'loop.create_future'");
- goto fail;
- }
+ nxt_queue_init(&ctx_data->drain_queue);
- if (nxt_slow_path(PyCallable_Check(nxt_py_loop_create_future) == 0)) {
- nxt_alert(task, "'loop.create_future' is not a callable object");
- goto fail;
- }
+ struct {
+ const char *key;
+ PyObject **handler;
+
+ } handlers[] = {
+ { "create_task", &ctx_data->loop_create_task },
+ { "add_reader", &ctx_data->loop_add_reader },
+ { "remove_reader", &ctx_data->loop_remove_reader },
+ { "call_soon", &ctx_data->loop_call_soon },
+ { "run_until_complete", &ctx_data->loop_run_until_complete },
+ { "create_future", &ctx_data->loop_create_future },
+ };
+
+ loop = NULL;
- nxt_py_quit_future = PyObject_CallObject(nxt_py_loop_create_future, NULL);
- if (nxt_slow_path(nxt_py_quit_future == NULL)) {
- nxt_alert(task, "Python failed to create Future ");
+ asyncio = PyImport_ImportModule("asyncio");
+ if (nxt_slow_path(asyncio == NULL)) {
+ nxt_unit_alert(NULL, "Python failed to import module 'asyncio'");
nxt_python_print_exception();
goto fail;
}
- nxt_py_quit_future_set_result = PyObject_GetAttrString(nxt_py_quit_future,
- "set_result");
- if (nxt_slow_path(nxt_py_quit_future_set_result == NULL)) {
- nxt_alert(task, "Python failed to get 'future.set_result'");
+ new_event_loop = PyDict_GetItemString(PyModule_GetDict(asyncio),
+ "new_event_loop");
+ if (nxt_slow_path(new_event_loop == NULL)) {
+ nxt_unit_alert(NULL,
+ "Python failed to get 'new_event_loop' from module 'asyncio'");
goto fail;
}
- if (nxt_slow_path(PyCallable_Check(nxt_py_quit_future_set_result) == 0)) {
- nxt_alert(task, "'future.set_result' is not a callable object");
+ if (nxt_slow_path(PyCallable_Check(new_event_loop) == 0)) {
+ nxt_unit_alert(NULL,
+ "'asyncio.new_event_loop' is not a callable object");
goto fail;
}
- nxt_py_port_read = PyCFunction_New(&nxt_py_port_read_method, NULL);
- if (nxt_slow_path(nxt_py_port_read == NULL)) {
- nxt_alert(task, "Python failed to initialize the 'port_read' function");
+ loop = PyObject_CallObject(new_event_loop, NULL);
+ if (nxt_slow_path(loop == NULL)) {
+ nxt_unit_alert(NULL, "Python failed to call 'asyncio.new_event_loop'");
goto fail;
}
- nxt_queue_init(&nxt_py_asgi_drain_queue);
+ for (i = 0; i < nxt_nitems(handlers); i++) {
+ obj = PyObject_GetAttrString(loop, handlers[i].key);
+ if (nxt_slow_path(obj == NULL)) {
+ nxt_unit_alert(NULL, "Python failed to get 'loop.%s'",
+ handlers[i].key);
+ goto fail;
+ }
+
+ *handlers[i].handler = obj;
- if (nxt_slow_path(nxt_py_asgi_http_init(task) == NXT_ERROR)) {
- goto fail;
+ if (nxt_slow_path(PyCallable_Check(obj) == 0)) {
+ nxt_unit_alert(NULL, "'loop.%s' is not a callable object",
+ handlers[i].key);
+ goto fail;
+ }
}
- if (nxt_slow_path(nxt_py_asgi_websocket_init(task) == NXT_ERROR)) {
+ obj = PyObject_CallObject(ctx_data->loop_create_future, NULL);
+ if (nxt_slow_path(obj == NULL)) {
+ nxt_unit_alert(NULL, "Python failed to create Future ");
+ nxt_python_print_exception();
goto fail;
}
- rc = nxt_py_asgi_lifespan_startup(task);
- if (nxt_slow_path(rc == NXT_ERROR)) {
+ ctx_data->quit_future = obj;
+
+ obj = PyObject_GetAttrString(ctx_data->quit_future, "set_result");
+ if (nxt_slow_path(obj == NULL)) {
+ nxt_unit_alert(NULL, "Python failed to get 'future.set_result'");
goto fail;
}
- init->callbacks.request_handler = nxt_py_asgi_request_handler;
- init->callbacks.data_handler = nxt_py_asgi_http_data_handler;
- init->callbacks.websocket_handler = nxt_py_asgi_websocket_handler;
- init->callbacks.close_handler = nxt_py_asgi_websocket_close_handler;
- init->callbacks.quit = nxt_py_asgi_quit;
- init->callbacks.shm_ack_handler = nxt_py_asgi_shm_ack_handler;
- init->callbacks.add_port = nxt_py_asgi_add_port;
- init->callbacks.remove_port = nxt_py_asgi_remove_port;
+ ctx_data->quit_future_set_result = obj;
+
+ if (nxt_slow_path(PyCallable_Check(obj) == 0)) {
+ nxt_unit_alert(NULL, "'future.set_result' is not a callable object");
+ goto fail;
+ }
Py_DECREF(loop);
Py_DECREF(asyncio);
- return NXT_OK;
+ *pdata = ctx_data;
+
+ return NXT_UNIT_OK;
fail:
+ nxt_python_asgi_ctx_data_free(ctx_data);
+
Py_XDECREF(loop);
- Py_DECREF(asyncio);
+ Py_XDECREF(asyncio);
+
+ return NXT_UNIT_ERROR;
+}
+
+
+static void
+nxt_python_asgi_ctx_data_free(void *data)
+{
+ nxt_py_asgi_ctx_data_t *ctx_data;
+
+ ctx_data = data;
- return NXT_ERROR;
+ Py_XDECREF(ctx_data->loop_run_until_complete);
+ Py_XDECREF(ctx_data->loop_create_future);
+ Py_XDECREF(ctx_data->loop_create_task);
+ Py_XDECREF(ctx_data->loop_call_soon);
+ Py_XDECREF(ctx_data->loop_add_reader);
+ Py_XDECREF(ctx_data->loop_remove_reader);
+ Py_XDECREF(ctx_data->quit_future);
+ Py_XDECREF(ctx_data->quit_future_set_result);
+
+ nxt_unit_free(NULL, ctx_data);
}
-nxt_int_t
+static int
+nxt_python_asgi_startup(void *data)
+{
+ return nxt_py_asgi_lifespan_startup(data);
+}
+
+
+static int
nxt_python_asgi_run(nxt_unit_ctx_t *ctx)
{
- PyObject *res;
+ PyObject *res;
+ nxt_py_asgi_ctx_data_t *ctx_data;
- res = PyObject_CallFunctionObjArgs(nxt_py_loop_run_until_complete,
- nxt_py_quit_future, NULL);
+ ctx_data = ctx->data;
+
+ res = PyObject_CallFunctionObjArgs(ctx_data->loop_run_until_complete,
+ ctx_data->quit_future, NULL);
if (nxt_slow_path(res == NULL)) {
nxt_unit_alert(ctx, "Python failed to call loop.run_until_complete");
nxt_python_print_exception();
- return NXT_ERROR;
+ return NXT_UNIT_ERROR;
}
Py_DECREF(res);
- nxt_py_asgi_lifespan_shutdown();
+ nxt_py_asgi_remove_reader(ctx, nxt_py_shared_port);
+ nxt_py_asgi_remove_reader(ctx, ctx_data->port);
+
+ if (ctx_data->port != NULL) {
+ ctx_data->port->data = NULL;
+ ctx_data->port = NULL;
+ }
+
+ nxt_py_asgi_lifespan_shutdown(ctx);
+
+ return NXT_UNIT_OK;
+}
+
+
+static void
+nxt_py_asgi_remove_reader(nxt_unit_ctx_t *ctx, nxt_unit_port_t *port)
+{
+ PyObject *res, *fd;
+ nxt_py_asgi_ctx_data_t *ctx_data;
+
+ if (port == NULL || port->in_fd == -1) {
+ return;
+ }
+
+ ctx_data = ctx->data;
+
+ nxt_unit_debug(ctx, "asgi_remove_reader %d %p", port->in_fd, port);
+
+ fd = PyLong_FromLong(port->in_fd);
+ if (nxt_slow_path(fd == NULL)) {
+ nxt_unit_alert(ctx, "Python failed to create Long object");
+ nxt_python_print_exception();
+
+ return;
+ }
+
+ res = PyObject_CallFunctionObjArgs(ctx_data->loop_remove_reader, fd, NULL);
+ if (nxt_slow_path(res == NULL)) {
+ nxt_unit_alert(ctx, "Python failed to remove_reader");
+ nxt_python_print_exception();
+
+ } else {
+ Py_DECREF(res);
+ }
- return NXT_OK;
+ Py_DECREF(fd);
}
static void
nxt_py_asgi_request_handler(nxt_unit_request_info_t *req)
{
- PyObject *scope, *res, *task, *receive, *send, *done, *asgi;
+ PyObject *scope, *res, *task, *receive, *send, *done, *asgi;
+ PyObject *stage2;
+ nxt_py_asgi_ctx_data_t *ctx_data;
if (req->request->websocket_handshake) {
asgi = nxt_py_asgi_websocket_create(req);
@@ -346,8 +457,42 @@ nxt_py_asgi_request_handler(nxt_unit_request_info_t *req)
req->data = asgi;
- res = PyObject_CallFunctionObjArgs(nxt_py_application,
- scope, receive, send, NULL);
+ if (!nxt_py_asgi_legacy) {
+ nxt_unit_req_debug(req, "Python call ASGI 3.0 application");
+
+ res = PyObject_CallFunctionObjArgs(nxt_py_application,
+ scope, receive, send, NULL);
+
+ } else {
+ nxt_unit_req_debug(req, "Python call legacy application");
+
+ res = PyObject_CallFunctionObjArgs(nxt_py_application, scope, NULL);
+
+ if (nxt_slow_path(res == NULL)) {
+ nxt_unit_req_error(req, "Python failed to call legacy app stage1");
+ nxt_python_print_exception();
+ nxt_unit_request_done(req, NXT_UNIT_ERROR);
+
+ goto release_scope;
+ }
+
+ if (nxt_slow_path(PyCallable_Check(res) == 0)) {
+ nxt_unit_req_error(req,
+ "Legacy ASGI application returns not a callable");
+ nxt_unit_request_done(req, NXT_UNIT_ERROR);
+
+ Py_DECREF(res);
+
+ goto release_scope;
+ }
+
+ stage2 = res;
+
+ res = PyObject_CallFunctionObjArgs(stage2, receive, send, NULL);
+
+ Py_DECREF(stage2);
+ }
+
if (nxt_slow_path(res == NULL)) {
nxt_unit_req_error(req, "Python failed to call the application");
nxt_python_print_exception();
@@ -365,7 +510,9 @@ nxt_py_asgi_request_handler(nxt_unit_request_info_t *req)
goto release_scope;
}
- task = PyObject_CallFunctionObjArgs(nxt_py_loop_create_task, res, NULL);
+ ctx_data = req->ctx->data;
+
+ task = PyObject_CallFunctionObjArgs(ctx_data->loop_create_task, res, NULL);
if (nxt_slow_path(task == NULL)) {
nxt_unit_req_error(req, "Python failed to call the create_task");
nxt_python_print_exception();
@@ -405,6 +552,18 @@ release_asgi:
}
+static void
+nxt_py_asgi_close_handler(nxt_unit_request_info_t *req)
+{
+ if (req->request->websocket_handshake) {
+ nxt_py_asgi_websocket_close_handler(req);
+
+ } else {
+ nxt_py_asgi_http_close_handler(req);
+ }
+}
+
+
static PyObject *
nxt_py_asgi_create_http_scope(nxt_unit_request_info_t *req)
{
@@ -724,10 +883,82 @@ fail:
static int
+nxt_python_asgi_ready(nxt_unit_ctx_t *ctx)
+{
+ int rc;
+ PyObject *res, *fd, *py_ctx, *py_port;
+ nxt_unit_port_t *port;
+ nxt_py_asgi_ctx_data_t *ctx_data;
+
+ if (nxt_slow_path(nxt_py_shared_port == NULL)) {
+ return NXT_UNIT_ERROR;
+ }
+
+ port = nxt_py_shared_port;
+
+ nxt_unit_debug(ctx, "asgi_ready %d %p %p", port->in_fd, ctx, port);
+
+ ctx_data = ctx->data;
+
+ rc = NXT_UNIT_ERROR;
+
+ fd = PyLong_FromLong(port->in_fd);
+ if (nxt_slow_path(fd == NULL)) {
+ nxt_unit_alert(ctx, "Python failed to create fd");
+ nxt_python_print_exception();
+
+ return rc;
+ }
+
+ py_ctx = PyLong_FromVoidPtr(ctx);
+ if (nxt_slow_path(py_ctx == NULL)) {
+ nxt_unit_alert(ctx, "Python failed to create py_ctx");
+ nxt_python_print_exception();
+
+ goto clean_fd;
+ }
+
+ py_port = PyLong_FromVoidPtr(port);
+ if (nxt_slow_path(py_port == NULL)) {
+ nxt_unit_alert(ctx, "Python failed to create py_port");
+ nxt_python_print_exception();
+
+ goto clean_py_ctx;
+ }
+
+ res = PyObject_CallFunctionObjArgs(ctx_data->loop_add_reader,
+ fd, nxt_py_port_read,
+ py_ctx, py_port, NULL);
+ if (nxt_slow_path(res == NULL)) {
+ nxt_unit_alert(ctx, "Python failed to add_reader");
+ nxt_python_print_exception();
+
+ } else {
+ Py_DECREF(res);
+
+ rc = NXT_UNIT_OK;
+ }
+
+ Py_DECREF(py_port);
+
+clean_py_ctx:
+
+ Py_DECREF(py_ctx);
+
+clean_fd:
+
+ Py_DECREF(fd);
+
+ return rc;
+}
+
+
+static int
nxt_py_asgi_add_port(nxt_unit_ctx_t *ctx, nxt_unit_port_t *port)
{
- int nb;
- PyObject *res;
+ int nb, rc;
+ PyObject *res, *fd, *py_ctx, *py_port;
+ nxt_py_asgi_ctx_data_t *ctx_data;
if (port->in_fd == -1) {
return NXT_UNIT_OK;
@@ -744,73 +975,152 @@ nxt_py_asgi_add_port(nxt_unit_ctx_t *ctx, nxt_unit_port_t *port)
nxt_unit_debug(ctx, "asgi_add_port %d %p %p", port->in_fd, ctx, port);
- res = PyObject_CallFunctionObjArgs(nxt_py_loop_add_reader,
- PyLong_FromLong(port->in_fd),
- nxt_py_port_read,
- PyLong_FromVoidPtr(ctx),
- PyLong_FromVoidPtr(port), NULL);
+ if (port->id.id == NXT_UNIT_SHARED_PORT_ID) {
+ nxt_py_shared_port = port;
+
+ return NXT_UNIT_OK;
+ }
+
+ ctx_data = ctx->data;
+
+ ctx_data->port = port;
+ port->data = ctx_data;
+
+ rc = NXT_UNIT_ERROR;
+
+ fd = PyLong_FromLong(port->in_fd);
+ if (nxt_slow_path(fd == NULL)) {
+ nxt_unit_alert(ctx, "Python failed to create fd");
+ nxt_python_print_exception();
+
+ return rc;
+ }
+
+ py_ctx = PyLong_FromVoidPtr(ctx);
+ if (nxt_slow_path(py_ctx == NULL)) {
+ nxt_unit_alert(ctx, "Python failed to create py_ctx");
+ nxt_python_print_exception();
+
+ goto clean_fd;
+ }
+
+ py_port = PyLong_FromVoidPtr(port);
+ if (nxt_slow_path(py_port == NULL)) {
+ nxt_unit_alert(ctx, "Python failed to create py_port");
+ nxt_python_print_exception();
+
+ goto clean_py_ctx;
+ }
+
+ res = PyObject_CallFunctionObjArgs(ctx_data->loop_add_reader,
+ fd, nxt_py_port_read,
+ py_ctx, py_port, NULL);
if (nxt_slow_path(res == NULL)) {
nxt_unit_alert(ctx, "Python failed to add_reader");
+ nxt_python_print_exception();
- return NXT_UNIT_ERROR;
+ } else {
+ Py_DECREF(res);
+
+ rc = NXT_UNIT_OK;
}
- Py_DECREF(res);
+ Py_DECREF(py_port);
- return NXT_UNIT_OK;
+clean_py_ctx:
+
+ Py_DECREF(py_ctx);
+
+clean_fd:
+
+ Py_DECREF(fd);
+
+ return rc;
}
static void
nxt_py_asgi_remove_port(nxt_unit_t *lib, nxt_unit_port_t *port)
{
- PyObject *res;
-
- nxt_unit_debug(NULL, "asgi_remove_port %d %p", port->in_fd, port);
-
if (port->in_fd == -1) {
return;
}
- res = PyObject_CallFunctionObjArgs(nxt_py_loop_remove_reader,
- PyLong_FromLong(port->in_fd), NULL);
- if (nxt_slow_path(res == NULL)) {
- nxt_unit_alert(NULL, "Python failed to remove_reader");
- }
+ nxt_unit_debug(NULL, "asgi_remove_port %d %p", port->in_fd, port);
- Py_DECREF(res);
+ if (nxt_py_shared_port == port) {
+ nxt_py_shared_port = NULL;
+ }
}
static void
nxt_py_asgi_quit(nxt_unit_ctx_t *ctx)
{
- PyObject *res;
+ PyObject *res, *p;
+ nxt_py_asgi_ctx_data_t *ctx_data;
nxt_unit_debug(ctx, "asgi_quit %p", ctx);
- res = PyObject_CallFunctionObjArgs(nxt_py_quit_future_set_result,
- PyLong_FromLong(0), NULL);
- if (nxt_slow_path(res == NULL)) {
- nxt_unit_alert(ctx, "Python failed to set_result");
+ ctx_data = ctx->data;
+
+ if (nxt_py_shared_port != NULL) {
+ p = PyLong_FromLong(nxt_py_shared_port->in_fd);
+ if (nxt_slow_path(p == NULL)) {
+ nxt_unit_alert(NULL, "Python failed to create Long");
+ nxt_python_print_exception();
+
+ } else {
+ res = PyObject_CallFunctionObjArgs(ctx_data->loop_remove_reader,
+ p, NULL);
+ if (nxt_slow_path(res == NULL)) {
+ nxt_unit_alert(NULL, "Python failed to remove_reader");
+ nxt_python_print_exception();
+
+ } else {
+ Py_DECREF(res);
+ }
+
+ Py_DECREF(p);
+ }
}
- Py_DECREF(res);
+ p = PyLong_FromLong(0);
+ if (nxt_slow_path(p == NULL)) {
+ nxt_unit_alert(NULL, "Python failed to create Long");
+ nxt_python_print_exception();
+
+ } else {
+ res = PyObject_CallFunctionObjArgs(ctx_data->quit_future_set_result,
+ p, NULL);
+ if (nxt_slow_path(res == NULL)) {
+ nxt_unit_alert(ctx, "Python failed to set_result");
+ nxt_python_print_exception();
+
+ } else {
+ Py_DECREF(res);
+ }
+
+ Py_DECREF(p);
+ }
}
static void
nxt_py_asgi_shm_ack_handler(nxt_unit_ctx_t *ctx)
{
- int rc;
- nxt_queue_link_t *lnk;
+ int rc;
+ nxt_queue_link_t *lnk;
+ nxt_py_asgi_ctx_data_t *ctx_data;
- while (!nxt_queue_is_empty(&nxt_py_asgi_drain_queue)) {
- lnk = nxt_queue_first(&nxt_py_asgi_drain_queue);
+ ctx_data = ctx->data;
+
+ while (!nxt_queue_is_empty(&ctx_data->drain_queue)) {
+ lnk = nxt_queue_first(&ctx_data->drain_queue);
rc = nxt_py_asgi_http_drain(lnk);
if (rc == NXT_UNIT_AGAIN) {
- break;
+ return;
}
nxt_queue_remove(lnk);
@@ -859,7 +1169,7 @@ nxt_py_asgi_port_read(PyObject *self, PyObject *args)
if (nxt_slow_path(rc == NXT_UNIT_ERROR)) {
return PyErr_Format(PyExc_RuntimeError,
- "error processing port message");
+ "error processing port %d message", port->id.id);
}
Py_RETURN_NONE;
@@ -996,8 +1306,8 @@ nxt_py_asgi_add_field(void *data, int i, PyObject *name, PyObject *val)
PyObject *
-nxt_py_asgi_set_result_soon(nxt_unit_request_info_t *req, PyObject *future,
- PyObject *result)
+nxt_py_asgi_set_result_soon(nxt_unit_request_info_t *req,
+ nxt_py_asgi_ctx_data_t *ctx_data, PyObject *future, PyObject *result)
{
PyObject *set_result, *res;
@@ -1013,7 +1323,7 @@ nxt_py_asgi_set_result_soon(nxt_unit_request_info_t *req, PyObject *future,
Py_CLEAR(future);
- goto cleanup;
+ goto cleanup_result;
}
if (nxt_slow_path(PyCallable_Check(set_result) == 0)) {
@@ -1024,7 +1334,7 @@ nxt_py_asgi_set_result_soon(nxt_unit_request_info_t *req, PyObject *future,
goto cleanup;
}
- res = PyObject_CallFunctionObjArgs(nxt_py_loop_call_soon, set_result,
+ res = PyObject_CallFunctionObjArgs(ctx_data->loop_call_soon, set_result,
result, NULL);
if (nxt_slow_path(res == NULL)) {
nxt_unit_req_alert(req, "Python failed to call 'loop.call_soon'");
@@ -1038,6 +1348,9 @@ nxt_py_asgi_set_result_soon(nxt_unit_request_info_t *req, PyObject *future,
cleanup:
Py_DECREF(set_result);
+
+cleanup_result:
+
Py_DECREF(result);
return future;
@@ -1148,6 +1461,17 @@ nxt_py_asgi_new_scope(nxt_unit_request_info_t *req, PyObject *type,
void
+nxt_py_asgi_drain_wait(nxt_unit_request_info_t *req, nxt_queue_link_t *link)
+{
+ nxt_py_asgi_ctx_data_t *ctx_data;
+
+ ctx_data = req->ctx->data;
+
+ nxt_queue_insert_tail(&ctx_data->drain_queue, link);
+}
+
+
+void
nxt_py_asgi_dealloc(PyObject *self)
{
PyObject_Del(self);
@@ -1177,19 +1501,11 @@ nxt_py_asgi_next(PyObject *self)
}
-void
+static void
nxt_python_asgi_done(void)
{
nxt_py_asgi_str_done();
- Py_XDECREF(nxt_py_quit_future);
- Py_XDECREF(nxt_py_quit_future_set_result);
- Py_XDECREF(nxt_py_loop_run_until_complete);
- Py_XDECREF(nxt_py_loop_create_future);
- Py_XDECREF(nxt_py_loop_create_task);
- Py_XDECREF(nxt_py_loop_call_soon);
- Py_XDECREF(nxt_py_loop_add_reader);
- Py_XDECREF(nxt_py_loop_remove_reader);
Py_XDECREF(nxt_py_port_read);
}
@@ -1203,25 +1519,12 @@ nxt_python_asgi_check(PyObject *obj)
}
-nxt_int_t
-nxt_python_asgi_init(nxt_task_t *task, nxt_unit_init_t *init)
-{
- nxt_alert(task, "ASGI not implemented");
- return NXT_ERROR;
-}
-
-
-nxt_int_t
-nxt_python_asgi_run(nxt_unit_ctx_t *ctx)
+int
+nxt_python_asgi_init(nxt_unit_init_t *init, nxt_python_proto_t *proto)
{
- nxt_unit_alert(ctx, "ASGI not implemented");
- return NXT_ERROR;
+ nxt_unit_alert(NULL, "ASGI not implemented");
+ return NXT_UNIT_ERROR;
}
-void
-nxt_python_asgi_done(void)
-{
-}
-
#endif /* NXT_HAVE_ASGI */