diff options
Diffstat (limited to 'src/python/nxt_python_asgi.c')
-rw-r--r-- | src/python/nxt_python_asgi.c | 733 |
1 files changed, 518 insertions, 215 deletions
diff --git a/src/python/nxt_python_asgi.c b/src/python/nxt_python_asgi.c index 72408ea1..98aeedf4 100644 --- a/src/python/nxt_python_asgi.c +++ b/src/python/nxt_python_asgi.c @@ -16,7 +16,16 @@ #include <python/nxt_python_asgi_str.h> +static PyObject *nxt_python_asgi_get_func(PyObject *obj); +static int nxt_python_asgi_ctx_data_alloc(void **pdata); +static void nxt_python_asgi_ctx_data_free(void *data); +static int nxt_python_asgi_startup(void *data); +static int nxt_python_asgi_run(nxt_unit_ctx_t *ctx); + +static void nxt_py_asgi_remove_reader(nxt_unit_ctx_t *ctx, + nxt_unit_port_t *port); static void nxt_py_asgi_request_handler(nxt_unit_request_info_t *req); +static void nxt_py_asgi_close_handler(nxt_unit_request_info_t *req); static PyObject *nxt_py_asgi_create_http_scope(nxt_unit_request_info_t *req); static PyObject *nxt_py_asgi_create_address(nxt_unit_sptr_t *sptr, uint8_t len, @@ -24,30 +33,33 @@ static PyObject *nxt_py_asgi_create_address(nxt_unit_sptr_t *sptr, uint8_t len, static PyObject *nxt_py_asgi_create_header(nxt_unit_field_t *f); static PyObject *nxt_py_asgi_create_subprotocols(nxt_unit_field_t *f); +static int nxt_python_asgi_ready(nxt_unit_ctx_t *ctx); + static int nxt_py_asgi_add_port(nxt_unit_ctx_t *ctx, nxt_unit_port_t *port); static void nxt_py_asgi_remove_port(nxt_unit_t *lib, nxt_unit_port_t *port); static void nxt_py_asgi_quit(nxt_unit_ctx_t *ctx); static void nxt_py_asgi_shm_ack_handler(nxt_unit_ctx_t *ctx); static PyObject *nxt_py_asgi_port_read(PyObject *self, PyObject *args); +static void nxt_python_asgi_done(void); -PyObject *nxt_py_loop_run_until_complete; -PyObject *nxt_py_loop_create_future; -PyObject *nxt_py_loop_create_task; - -nxt_queue_t nxt_py_asgi_drain_queue; - -static PyObject *nxt_py_loop_call_soon; -static PyObject *nxt_py_quit_future; -static PyObject *nxt_py_quit_future_set_result; -static PyObject *nxt_py_loop_add_reader; -static PyObject *nxt_py_loop_remove_reader; -static PyObject *nxt_py_port_read; +int nxt_py_asgi_legacy; +static PyObject *nxt_py_port_read; +static nxt_unit_port_t *nxt_py_shared_port; static PyMethodDef nxt_py_port_read_method = {"unit_port_read", nxt_py_asgi_port_read, METH_VARARGS, ""}; +static nxt_python_proto_t nxt_py_asgi_proto = { + .ctx_data_alloc = nxt_python_asgi_ctx_data_alloc, + .ctx_data_free = nxt_python_asgi_ctx_data_free, + .startup = nxt_python_asgi_startup, + .run = nxt_python_asgi_run, + .ready = nxt_python_asgi_ready, + .done = nxt_python_asgi_done, +}; + #define NXT_UNIT_HASH_WS_PROTOCOL 0xED0A @@ -55,249 +67,348 @@ int nxt_python_asgi_check(PyObject *obj) { int res; - PyObject *call; + PyObject *func; PyCodeObject *code; - if (PyFunction_Check(obj)) { - code = (PyCodeObject *) PyFunction_GET_CODE(obj); + func = nxt_python_asgi_get_func(obj); + + if (func == NULL) { + return 0; + } + + code = (PyCodeObject *) PyFunction_GET_CODE(func); + + nxt_unit_debug(NULL, "asgi_check: callable is %sa coroutine function with " + "%d argument(s)", + (code->co_flags & CO_COROUTINE) != 0 ? "" : "not ", + code->co_argcount); + + res = (code->co_flags & CO_COROUTINE) != 0 || code->co_argcount == 1; + + Py_DECREF(func); + + return res; +} + + +static PyObject * +nxt_python_asgi_get_func(PyObject *obj) +{ + PyObject *call; - return (code->co_flags & CO_COROUTINE) != 0; + if (PyFunction_Check(obj)) { + Py_INCREF(obj); + return obj; } if (PyMethod_Check(obj)) { obj = PyMethod_GET_FUNCTION(obj); - code = (PyCodeObject *) PyFunction_GET_CODE(obj); - - return (code->co_flags & CO_COROUTINE) != 0; + Py_INCREF(obj); + return obj; } call = PyObject_GetAttrString(obj, "__call__"); if (call == NULL) { - return 0; + return NULL; } if (PyFunction_Check(call)) { - code = (PyCodeObject *) PyFunction_GET_CODE(call); - - res = (code->co_flags & CO_COROUTINE) != 0; - - } else { - if (PyMethod_Check(call)) { - obj = PyMethod_GET_FUNCTION(call); + return call; + } - code = (PyCodeObject *) PyFunction_GET_CODE(obj); + if (PyMethod_Check(call)) { + obj = PyMethod_GET_FUNCTION(call); - res = (code->co_flags & CO_COROUTINE) != 0; + Py_INCREF(obj); + Py_DECREF(call); - } else { - res = 0; - } + return obj; } Py_DECREF(call); - return res; + return NULL; } -nxt_int_t -nxt_python_asgi_init(nxt_task_t *task, nxt_unit_init_t *init) +int +nxt_python_asgi_init(nxt_unit_init_t *init, nxt_python_proto_t *proto) { - PyObject *asyncio, *loop, *get_event_loop; - nxt_int_t rc; + PyObject *func; + PyCodeObject *code; - nxt_debug(task, "asgi_init"); + nxt_unit_debug(NULL, "asgi_init"); - if (nxt_slow_path(nxt_py_asgi_str_init() != NXT_OK)) { - nxt_alert(task, "Python failed to init string objects"); - return NXT_ERROR; + if (nxt_slow_path(nxt_py_asgi_str_init() != NXT_UNIT_OK)) { + nxt_unit_alert(NULL, "Python failed to init string objects"); + return NXT_UNIT_ERROR; } - asyncio = PyImport_ImportModule("asyncio"); - if (nxt_slow_path(asyncio == NULL)) { - nxt_alert(task, "Python failed to import module 'asyncio'"); - nxt_python_print_exception(); - return NXT_ERROR; + nxt_py_port_read = PyCFunction_New(&nxt_py_port_read_method, NULL); + if (nxt_slow_path(nxt_py_port_read == NULL)) { + nxt_unit_alert(NULL, + "Python failed to initialize the 'port_read' function"); + return NXT_UNIT_ERROR; } - loop = NULL; - get_event_loop = PyDict_GetItemString(PyModule_GetDict(asyncio), - "get_event_loop"); - if (nxt_slow_path(get_event_loop == NULL)) { - nxt_alert(task, - "Python failed to get 'get_event_loop' from module 'asyncio'"); - goto fail; + if (nxt_slow_path(nxt_py_asgi_http_init() == NXT_UNIT_ERROR)) { + return NXT_UNIT_ERROR; } - if (nxt_slow_path(PyCallable_Check(get_event_loop) == 0)) { - nxt_alert(task, "'asyncio.get_event_loop' is not a callable object"); - goto fail; + if (nxt_slow_path(nxt_py_asgi_websocket_init() == NXT_UNIT_ERROR)) { + return NXT_UNIT_ERROR; } - loop = PyObject_CallObject(get_event_loop, NULL); - if (nxt_slow_path(loop == NULL)) { - nxt_alert(task, "Python failed to call 'asyncio.get_event_loop'"); - goto fail; + func = nxt_python_asgi_get_func(nxt_py_application); + if (nxt_slow_path(func == NULL)) { + nxt_unit_alert(NULL, "Python cannot find function for callable"); + return NXT_UNIT_ERROR; } - nxt_py_loop_create_task = PyObject_GetAttrString(loop, "create_task"); - if (nxt_slow_path(nxt_py_loop_create_task == NULL)) { - nxt_alert(task, "Python failed to get 'loop.create_task'"); - goto fail; - } + code = (PyCodeObject *) PyFunction_GET_CODE(func); - if (nxt_slow_path(PyCallable_Check(nxt_py_loop_create_task) == 0)) { - nxt_alert(task, "'loop.create_task' is not a callable object"); - goto fail; + if ((code->co_flags & CO_COROUTINE) == 0) { + nxt_unit_debug(NULL, "asgi: callable is not a coroutine function " + "switching to legacy mode"); + nxt_py_asgi_legacy = 1; } - nxt_py_loop_add_reader = PyObject_GetAttrString(loop, "add_reader"); - if (nxt_slow_path(nxt_py_loop_add_reader == NULL)) { - nxt_alert(task, "Python failed to get 'loop.add_reader'"); - goto fail; - } + Py_DECREF(func); - if (nxt_slow_path(PyCallable_Check(nxt_py_loop_add_reader) == 0)) { - nxt_alert(task, "'loop.add_reader' is not a callable object"); - goto fail; - } + init->callbacks.request_handler = nxt_py_asgi_request_handler; + init->callbacks.data_handler = nxt_py_asgi_http_data_handler; + init->callbacks.websocket_handler = nxt_py_asgi_websocket_handler; + init->callbacks.close_handler = nxt_py_asgi_close_handler; + init->callbacks.quit = nxt_py_asgi_quit; + init->callbacks.shm_ack_handler = nxt_py_asgi_shm_ack_handler; + init->callbacks.add_port = nxt_py_asgi_add_port; + init->callbacks.remove_port = nxt_py_asgi_remove_port; - nxt_py_loop_remove_reader = PyObject_GetAttrString(loop, "remove_reader"); - if (nxt_slow_path(nxt_py_loop_remove_reader == NULL)) { - nxt_alert(task, "Python failed to get 'loop.remove_reader'"); - goto fail; - } + *proto = nxt_py_asgi_proto; - if (nxt_slow_path(PyCallable_Check(nxt_py_loop_remove_reader) == 0)) { - nxt_alert(task, "'loop.remove_reader' is not a callable object"); - goto fail; - } + return NXT_UNIT_OK; +} - nxt_py_loop_call_soon = PyObject_GetAttrString(loop, "call_soon"); - if (nxt_slow_path(nxt_py_loop_call_soon == NULL)) { - nxt_alert(task, "Python failed to get 'loop.call_soon'"); - goto fail; - } - if (nxt_slow_path(PyCallable_Check(nxt_py_loop_call_soon) == 0)) { - nxt_alert(task, "'loop.call_soon' is not a callable object"); - goto fail; - } +static int +nxt_python_asgi_ctx_data_alloc(void **pdata) +{ + uint32_t i; + PyObject *asyncio, *loop, *new_event_loop, *obj; + nxt_py_asgi_ctx_data_t *ctx_data; - nxt_py_loop_run_until_complete = PyObject_GetAttrString(loop, - "run_until_complete"); - if (nxt_slow_path(nxt_py_loop_run_until_complete == NULL)) { - nxt_alert(task, "Python failed to get 'loop.run_until_complete'"); - goto fail; + ctx_data = nxt_unit_malloc(NULL, sizeof(nxt_py_asgi_ctx_data_t)); + if (nxt_slow_path(ctx_data == NULL)) { + nxt_unit_alert(NULL, "Failed to allocate context data"); + return NXT_UNIT_ERROR; } - if (nxt_slow_path(PyCallable_Check(nxt_py_loop_run_until_complete) == 0)) { - nxt_alert(task, "'loop.run_until_complete' is not a callable object"); - goto fail; - } + memset(ctx_data, 0, sizeof(nxt_py_asgi_ctx_data_t)); - nxt_py_loop_create_future = PyObject_GetAttrString(loop, "create_future"); - if (nxt_slow_path(nxt_py_loop_create_future == NULL)) { - nxt_alert(task, "Python failed to get 'loop.create_future'"); - goto fail; - } + nxt_queue_init(&ctx_data->drain_queue); - if (nxt_slow_path(PyCallable_Check(nxt_py_loop_create_future) == 0)) { - nxt_alert(task, "'loop.create_future' is not a callable object"); - goto fail; - } + struct { + const char *key; + PyObject **handler; + + } handlers[] = { + { "create_task", &ctx_data->loop_create_task }, + { "add_reader", &ctx_data->loop_add_reader }, + { "remove_reader", &ctx_data->loop_remove_reader }, + { "call_soon", &ctx_data->loop_call_soon }, + { "run_until_complete", &ctx_data->loop_run_until_complete }, + { "create_future", &ctx_data->loop_create_future }, + }; + + loop = NULL; - nxt_py_quit_future = PyObject_CallObject(nxt_py_loop_create_future, NULL); - if (nxt_slow_path(nxt_py_quit_future == NULL)) { - nxt_alert(task, "Python failed to create Future "); + asyncio = PyImport_ImportModule("asyncio"); + if (nxt_slow_path(asyncio == NULL)) { + nxt_unit_alert(NULL, "Python failed to import module 'asyncio'"); nxt_python_print_exception(); goto fail; } - nxt_py_quit_future_set_result = PyObject_GetAttrString(nxt_py_quit_future, - "set_result"); - if (nxt_slow_path(nxt_py_quit_future_set_result == NULL)) { - nxt_alert(task, "Python failed to get 'future.set_result'"); + new_event_loop = PyDict_GetItemString(PyModule_GetDict(asyncio), + "new_event_loop"); + if (nxt_slow_path(new_event_loop == NULL)) { + nxt_unit_alert(NULL, + "Python failed to get 'new_event_loop' from module 'asyncio'"); goto fail; } - if (nxt_slow_path(PyCallable_Check(nxt_py_quit_future_set_result) == 0)) { - nxt_alert(task, "'future.set_result' is not a callable object"); + if (nxt_slow_path(PyCallable_Check(new_event_loop) == 0)) { + nxt_unit_alert(NULL, + "'asyncio.new_event_loop' is not a callable object"); goto fail; } - nxt_py_port_read = PyCFunction_New(&nxt_py_port_read_method, NULL); - if (nxt_slow_path(nxt_py_port_read == NULL)) { - nxt_alert(task, "Python failed to initialize the 'port_read' function"); + loop = PyObject_CallObject(new_event_loop, NULL); + if (nxt_slow_path(loop == NULL)) { + nxt_unit_alert(NULL, "Python failed to call 'asyncio.new_event_loop'"); goto fail; } - nxt_queue_init(&nxt_py_asgi_drain_queue); + for (i = 0; i < nxt_nitems(handlers); i++) { + obj = PyObject_GetAttrString(loop, handlers[i].key); + if (nxt_slow_path(obj == NULL)) { + nxt_unit_alert(NULL, "Python failed to get 'loop.%s'", + handlers[i].key); + goto fail; + } + + *handlers[i].handler = obj; - if (nxt_slow_path(nxt_py_asgi_http_init(task) == NXT_ERROR)) { - goto fail; + if (nxt_slow_path(PyCallable_Check(obj) == 0)) { + nxt_unit_alert(NULL, "'loop.%s' is not a callable object", + handlers[i].key); + goto fail; + } } - if (nxt_slow_path(nxt_py_asgi_websocket_init(task) == NXT_ERROR)) { + obj = PyObject_CallObject(ctx_data->loop_create_future, NULL); + if (nxt_slow_path(obj == NULL)) { + nxt_unit_alert(NULL, "Python failed to create Future "); + nxt_python_print_exception(); goto fail; } - rc = nxt_py_asgi_lifespan_startup(task); - if (nxt_slow_path(rc == NXT_ERROR)) { + ctx_data->quit_future = obj; + + obj = PyObject_GetAttrString(ctx_data->quit_future, "set_result"); + if (nxt_slow_path(obj == NULL)) { + nxt_unit_alert(NULL, "Python failed to get 'future.set_result'"); goto fail; } - init->callbacks.request_handler = nxt_py_asgi_request_handler; - init->callbacks.data_handler = nxt_py_asgi_http_data_handler; - init->callbacks.websocket_handler = nxt_py_asgi_websocket_handler; - init->callbacks.close_handler = nxt_py_asgi_websocket_close_handler; - init->callbacks.quit = nxt_py_asgi_quit; - init->callbacks.shm_ack_handler = nxt_py_asgi_shm_ack_handler; - init->callbacks.add_port = nxt_py_asgi_add_port; - init->callbacks.remove_port = nxt_py_asgi_remove_port; + ctx_data->quit_future_set_result = obj; + + if (nxt_slow_path(PyCallable_Check(obj) == 0)) { + nxt_unit_alert(NULL, "'future.set_result' is not a callable object"); + goto fail; + } Py_DECREF(loop); Py_DECREF(asyncio); - return NXT_OK; + *pdata = ctx_data; + + return NXT_UNIT_OK; fail: + nxt_python_asgi_ctx_data_free(ctx_data); + Py_XDECREF(loop); - Py_DECREF(asyncio); + Py_XDECREF(asyncio); + + return NXT_UNIT_ERROR; +} + + +static void +nxt_python_asgi_ctx_data_free(void *data) +{ + nxt_py_asgi_ctx_data_t *ctx_data; + + ctx_data = data; - return NXT_ERROR; + Py_XDECREF(ctx_data->loop_run_until_complete); + Py_XDECREF(ctx_data->loop_create_future); + Py_XDECREF(ctx_data->loop_create_task); + Py_XDECREF(ctx_data->loop_call_soon); + Py_XDECREF(ctx_data->loop_add_reader); + Py_XDECREF(ctx_data->loop_remove_reader); + Py_XDECREF(ctx_data->quit_future); + Py_XDECREF(ctx_data->quit_future_set_result); + + nxt_unit_free(NULL, ctx_data); } -nxt_int_t +static int +nxt_python_asgi_startup(void *data) +{ + return nxt_py_asgi_lifespan_startup(data); +} + + +static int nxt_python_asgi_run(nxt_unit_ctx_t *ctx) { - PyObject *res; + PyObject *res; + nxt_py_asgi_ctx_data_t *ctx_data; - res = PyObject_CallFunctionObjArgs(nxt_py_loop_run_until_complete, - nxt_py_quit_future, NULL); + ctx_data = ctx->data; + + res = PyObject_CallFunctionObjArgs(ctx_data->loop_run_until_complete, + ctx_data->quit_future, NULL); if (nxt_slow_path(res == NULL)) { nxt_unit_alert(ctx, "Python failed to call loop.run_until_complete"); nxt_python_print_exception(); - return NXT_ERROR; + return NXT_UNIT_ERROR; } Py_DECREF(res); - nxt_py_asgi_lifespan_shutdown(); + nxt_py_asgi_remove_reader(ctx, nxt_py_shared_port); + nxt_py_asgi_remove_reader(ctx, ctx_data->port); + + if (ctx_data->port != NULL) { + ctx_data->port->data = NULL; + ctx_data->port = NULL; + } + + nxt_py_asgi_lifespan_shutdown(ctx); + + return NXT_UNIT_OK; +} + + +static void +nxt_py_asgi_remove_reader(nxt_unit_ctx_t *ctx, nxt_unit_port_t *port) +{ + PyObject *res, *fd; + nxt_py_asgi_ctx_data_t *ctx_data; + + if (port == NULL || port->in_fd == -1) { + return; + } + + ctx_data = ctx->data; + + nxt_unit_debug(ctx, "asgi_remove_reader %d %p", port->in_fd, port); + + fd = PyLong_FromLong(port->in_fd); + if (nxt_slow_path(fd == NULL)) { + nxt_unit_alert(ctx, "Python failed to create Long object"); + nxt_python_print_exception(); + + return; + } + + res = PyObject_CallFunctionObjArgs(ctx_data->loop_remove_reader, fd, NULL); + if (nxt_slow_path(res == NULL)) { + nxt_unit_alert(ctx, "Python failed to remove_reader"); + nxt_python_print_exception(); + + } else { + Py_DECREF(res); + } - return NXT_OK; + Py_DECREF(fd); } static void nxt_py_asgi_request_handler(nxt_unit_request_info_t *req) { - PyObject *scope, *res, *task, *receive, *send, *done, *asgi; + PyObject *scope, *res, *task, *receive, *send, *done, *asgi; + PyObject *stage2; + nxt_py_asgi_ctx_data_t *ctx_data; if (req->request->websocket_handshake) { asgi = nxt_py_asgi_websocket_create(req); @@ -346,8 +457,42 @@ nxt_py_asgi_request_handler(nxt_unit_request_info_t *req) req->data = asgi; - res = PyObject_CallFunctionObjArgs(nxt_py_application, - scope, receive, send, NULL); + if (!nxt_py_asgi_legacy) { + nxt_unit_req_debug(req, "Python call ASGI 3.0 application"); + + res = PyObject_CallFunctionObjArgs(nxt_py_application, + scope, receive, send, NULL); + + } else { + nxt_unit_req_debug(req, "Python call legacy application"); + + res = PyObject_CallFunctionObjArgs(nxt_py_application, scope, NULL); + + if (nxt_slow_path(res == NULL)) { + nxt_unit_req_error(req, "Python failed to call legacy app stage1"); + nxt_python_print_exception(); + nxt_unit_request_done(req, NXT_UNIT_ERROR); + + goto release_scope; + } + + if (nxt_slow_path(PyCallable_Check(res) == 0)) { + nxt_unit_req_error(req, + "Legacy ASGI application returns not a callable"); + nxt_unit_request_done(req, NXT_UNIT_ERROR); + + Py_DECREF(res); + + goto release_scope; + } + + stage2 = res; + + res = PyObject_CallFunctionObjArgs(stage2, receive, send, NULL); + + Py_DECREF(stage2); + } + if (nxt_slow_path(res == NULL)) { nxt_unit_req_error(req, "Python failed to call the application"); nxt_python_print_exception(); @@ -365,7 +510,9 @@ nxt_py_asgi_request_handler(nxt_unit_request_info_t *req) goto release_scope; } - task = PyObject_CallFunctionObjArgs(nxt_py_loop_create_task, res, NULL); + ctx_data = req->ctx->data; + + task = PyObject_CallFunctionObjArgs(ctx_data->loop_create_task, res, NULL); if (nxt_slow_path(task == NULL)) { nxt_unit_req_error(req, "Python failed to call the create_task"); nxt_python_print_exception(); @@ -405,6 +552,18 @@ release_asgi: } +static void +nxt_py_asgi_close_handler(nxt_unit_request_info_t *req) +{ + if (req->request->websocket_handshake) { + nxt_py_asgi_websocket_close_handler(req); + + } else { + nxt_py_asgi_http_close_handler(req); + } +} + + static PyObject * nxt_py_asgi_create_http_scope(nxt_unit_request_info_t *req) { @@ -724,10 +883,82 @@ fail: static int +nxt_python_asgi_ready(nxt_unit_ctx_t *ctx) +{ + int rc; + PyObject *res, *fd, *py_ctx, *py_port; + nxt_unit_port_t *port; + nxt_py_asgi_ctx_data_t *ctx_data; + + if (nxt_slow_path(nxt_py_shared_port == NULL)) { + return NXT_UNIT_ERROR; + } + + port = nxt_py_shared_port; + + nxt_unit_debug(ctx, "asgi_ready %d %p %p", port->in_fd, ctx, port); + + ctx_data = ctx->data; + + rc = NXT_UNIT_ERROR; + + fd = PyLong_FromLong(port->in_fd); + if (nxt_slow_path(fd == NULL)) { + nxt_unit_alert(ctx, "Python failed to create fd"); + nxt_python_print_exception(); + + return rc; + } + + py_ctx = PyLong_FromVoidPtr(ctx); + if (nxt_slow_path(py_ctx == NULL)) { + nxt_unit_alert(ctx, "Python failed to create py_ctx"); + nxt_python_print_exception(); + + goto clean_fd; + } + + py_port = PyLong_FromVoidPtr(port); + if (nxt_slow_path(py_port == NULL)) { + nxt_unit_alert(ctx, "Python failed to create py_port"); + nxt_python_print_exception(); + + goto clean_py_ctx; + } + + res = PyObject_CallFunctionObjArgs(ctx_data->loop_add_reader, + fd, nxt_py_port_read, + py_ctx, py_port, NULL); + if (nxt_slow_path(res == NULL)) { + nxt_unit_alert(ctx, "Python failed to add_reader"); + nxt_python_print_exception(); + + } else { + Py_DECREF(res); + + rc = NXT_UNIT_OK; + } + + Py_DECREF(py_port); + +clean_py_ctx: + + Py_DECREF(py_ctx); + +clean_fd: + + Py_DECREF(fd); + + return rc; +} + + +static int nxt_py_asgi_add_port(nxt_unit_ctx_t *ctx, nxt_unit_port_t *port) { - int nb; - PyObject *res; + int nb, rc; + PyObject *res, *fd, *py_ctx, *py_port; + nxt_py_asgi_ctx_data_t *ctx_data; if (port->in_fd == -1) { return NXT_UNIT_OK; @@ -744,73 +975,152 @@ nxt_py_asgi_add_port(nxt_unit_ctx_t *ctx, nxt_unit_port_t *port) nxt_unit_debug(ctx, "asgi_add_port %d %p %p", port->in_fd, ctx, port); - res = PyObject_CallFunctionObjArgs(nxt_py_loop_add_reader, - PyLong_FromLong(port->in_fd), - nxt_py_port_read, - PyLong_FromVoidPtr(ctx), - PyLong_FromVoidPtr(port), NULL); + if (port->id.id == NXT_UNIT_SHARED_PORT_ID) { + nxt_py_shared_port = port; + + return NXT_UNIT_OK; + } + + ctx_data = ctx->data; + + ctx_data->port = port; + port->data = ctx_data; + + rc = NXT_UNIT_ERROR; + + fd = PyLong_FromLong(port->in_fd); + if (nxt_slow_path(fd == NULL)) { + nxt_unit_alert(ctx, "Python failed to create fd"); + nxt_python_print_exception(); + + return rc; + } + + py_ctx = PyLong_FromVoidPtr(ctx); + if (nxt_slow_path(py_ctx == NULL)) { + nxt_unit_alert(ctx, "Python failed to create py_ctx"); + nxt_python_print_exception(); + + goto clean_fd; + } + + py_port = PyLong_FromVoidPtr(port); + if (nxt_slow_path(py_port == NULL)) { + nxt_unit_alert(ctx, "Python failed to create py_port"); + nxt_python_print_exception(); + + goto clean_py_ctx; + } + + res = PyObject_CallFunctionObjArgs(ctx_data->loop_add_reader, + fd, nxt_py_port_read, + py_ctx, py_port, NULL); if (nxt_slow_path(res == NULL)) { nxt_unit_alert(ctx, "Python failed to add_reader"); + nxt_python_print_exception(); - return NXT_UNIT_ERROR; + } else { + Py_DECREF(res); + + rc = NXT_UNIT_OK; } - Py_DECREF(res); + Py_DECREF(py_port); - return NXT_UNIT_OK; +clean_py_ctx: + + Py_DECREF(py_ctx); + +clean_fd: + + Py_DECREF(fd); + + return rc; } static void nxt_py_asgi_remove_port(nxt_unit_t *lib, nxt_unit_port_t *port) { - PyObject *res; - - nxt_unit_debug(NULL, "asgi_remove_port %d %p", port->in_fd, port); - if (port->in_fd == -1) { return; } - res = PyObject_CallFunctionObjArgs(nxt_py_loop_remove_reader, - PyLong_FromLong(port->in_fd), NULL); - if (nxt_slow_path(res == NULL)) { - nxt_unit_alert(NULL, "Python failed to remove_reader"); - } + nxt_unit_debug(NULL, "asgi_remove_port %d %p", port->in_fd, port); - Py_DECREF(res); + if (nxt_py_shared_port == port) { + nxt_py_shared_port = NULL; + } } static void nxt_py_asgi_quit(nxt_unit_ctx_t *ctx) { - PyObject *res; + PyObject *res, *p; + nxt_py_asgi_ctx_data_t *ctx_data; nxt_unit_debug(ctx, "asgi_quit %p", ctx); - res = PyObject_CallFunctionObjArgs(nxt_py_quit_future_set_result, - PyLong_FromLong(0), NULL); - if (nxt_slow_path(res == NULL)) { - nxt_unit_alert(ctx, "Python failed to set_result"); + ctx_data = ctx->data; + + if (nxt_py_shared_port != NULL) { + p = PyLong_FromLong(nxt_py_shared_port->in_fd); + if (nxt_slow_path(p == NULL)) { + nxt_unit_alert(NULL, "Python failed to create Long"); + nxt_python_print_exception(); + + } else { + res = PyObject_CallFunctionObjArgs(ctx_data->loop_remove_reader, + p, NULL); + if (nxt_slow_path(res == NULL)) { + nxt_unit_alert(NULL, "Python failed to remove_reader"); + nxt_python_print_exception(); + + } else { + Py_DECREF(res); + } + + Py_DECREF(p); + } } - Py_DECREF(res); + p = PyLong_FromLong(0); + if (nxt_slow_path(p == NULL)) { + nxt_unit_alert(NULL, "Python failed to create Long"); + nxt_python_print_exception(); + + } else { + res = PyObject_CallFunctionObjArgs(ctx_data->quit_future_set_result, + p, NULL); + if (nxt_slow_path(res == NULL)) { + nxt_unit_alert(ctx, "Python failed to set_result"); + nxt_python_print_exception(); + + } else { + Py_DECREF(res); + } + + Py_DECREF(p); + } } static void nxt_py_asgi_shm_ack_handler(nxt_unit_ctx_t *ctx) { - int rc; - nxt_queue_link_t *lnk; + int rc; + nxt_queue_link_t *lnk; + nxt_py_asgi_ctx_data_t *ctx_data; - while (!nxt_queue_is_empty(&nxt_py_asgi_drain_queue)) { - lnk = nxt_queue_first(&nxt_py_asgi_drain_queue); + ctx_data = ctx->data; + + while (!nxt_queue_is_empty(&ctx_data->drain_queue)) { + lnk = nxt_queue_first(&ctx_data->drain_queue); rc = nxt_py_asgi_http_drain(lnk); if (rc == NXT_UNIT_AGAIN) { - break; + return; } nxt_queue_remove(lnk); @@ -859,7 +1169,7 @@ nxt_py_asgi_port_read(PyObject *self, PyObject *args) if (nxt_slow_path(rc == NXT_UNIT_ERROR)) { return PyErr_Format(PyExc_RuntimeError, - "error processing port message"); + "error processing port %d message", port->id.id); } Py_RETURN_NONE; @@ -996,8 +1306,8 @@ nxt_py_asgi_add_field(void *data, int i, PyObject *name, PyObject *val) PyObject * -nxt_py_asgi_set_result_soon(nxt_unit_request_info_t *req, PyObject *future, - PyObject *result) +nxt_py_asgi_set_result_soon(nxt_unit_request_info_t *req, + nxt_py_asgi_ctx_data_t *ctx_data, PyObject *future, PyObject *result) { PyObject *set_result, *res; @@ -1013,7 +1323,7 @@ nxt_py_asgi_set_result_soon(nxt_unit_request_info_t *req, PyObject *future, Py_CLEAR(future); - goto cleanup; + goto cleanup_result; } if (nxt_slow_path(PyCallable_Check(set_result) == 0)) { @@ -1024,7 +1334,7 @@ nxt_py_asgi_set_result_soon(nxt_unit_request_info_t *req, PyObject *future, goto cleanup; } - res = PyObject_CallFunctionObjArgs(nxt_py_loop_call_soon, set_result, + res = PyObject_CallFunctionObjArgs(ctx_data->loop_call_soon, set_result, result, NULL); if (nxt_slow_path(res == NULL)) { nxt_unit_req_alert(req, "Python failed to call 'loop.call_soon'"); @@ -1038,6 +1348,9 @@ nxt_py_asgi_set_result_soon(nxt_unit_request_info_t *req, PyObject *future, cleanup: Py_DECREF(set_result); + +cleanup_result: + Py_DECREF(result); return future; @@ -1148,6 +1461,17 @@ nxt_py_asgi_new_scope(nxt_unit_request_info_t *req, PyObject *type, void +nxt_py_asgi_drain_wait(nxt_unit_request_info_t *req, nxt_queue_link_t *link) +{ + nxt_py_asgi_ctx_data_t *ctx_data; + + ctx_data = req->ctx->data; + + nxt_queue_insert_tail(&ctx_data->drain_queue, link); +} + + +void nxt_py_asgi_dealloc(PyObject *self) { PyObject_Del(self); @@ -1177,19 +1501,11 @@ nxt_py_asgi_next(PyObject *self) } -void +static void nxt_python_asgi_done(void) { nxt_py_asgi_str_done(); - Py_XDECREF(nxt_py_quit_future); - Py_XDECREF(nxt_py_quit_future_set_result); - Py_XDECREF(nxt_py_loop_run_until_complete); - Py_XDECREF(nxt_py_loop_create_future); - Py_XDECREF(nxt_py_loop_create_task); - Py_XDECREF(nxt_py_loop_call_soon); - Py_XDECREF(nxt_py_loop_add_reader); - Py_XDECREF(nxt_py_loop_remove_reader); Py_XDECREF(nxt_py_port_read); } @@ -1203,25 +1519,12 @@ nxt_python_asgi_check(PyObject *obj) } -nxt_int_t -nxt_python_asgi_init(nxt_task_t *task, nxt_unit_init_t *init) -{ - nxt_alert(task, "ASGI not implemented"); - return NXT_ERROR; -} - - -nxt_int_t -nxt_python_asgi_run(nxt_unit_ctx_t *ctx) +int +nxt_python_asgi_init(nxt_unit_init_t *init, nxt_python_proto_t *proto) { - nxt_unit_alert(ctx, "ASGI not implemented"); - return NXT_ERROR; + nxt_unit_alert(NULL, "ASGI not implemented"); + return NXT_UNIT_ERROR; } -void -nxt_python_asgi_done(void) -{ -} - #endif /* NXT_HAVE_ASGI */ |