diff options
Diffstat (limited to '')
-rw-r--r-- | src/python/nxt_python_asgi_lifespan.c | 505 |
1 files changed, 505 insertions, 0 deletions
diff --git a/src/python/nxt_python_asgi_lifespan.c b/src/python/nxt_python_asgi_lifespan.c new file mode 100644 index 00000000..14d0ee97 --- /dev/null +++ b/src/python/nxt_python_asgi_lifespan.c @@ -0,0 +1,505 @@ + +/* + * Copyright (C) NGINX, Inc. + */ + + +#include <python/nxt_python.h> + +#if (NXT_HAVE_ASGI) + +#include <nxt_main.h> +#include <python/nxt_python_asgi.h> +#include <python/nxt_python_asgi_str.h> + + +typedef struct { + PyObject_HEAD + int disabled; + int startup_received; + int startup_sent; + int shutdown_received; + int shutdown_sent; + int shutdown_called; + PyObject *startup_future; + PyObject *shutdown_future; + PyObject *receive_future; +} nxt_py_asgi_lifespan_t; + + +static PyObject *nxt_py_asgi_lifespan_receive(PyObject *self, PyObject *none); +static PyObject *nxt_py_asgi_lifespan_send(PyObject *self, PyObject *dict); +static PyObject *nxt_py_asgi_lifespan_send_startup( + nxt_py_asgi_lifespan_t *lifespan, int v, PyObject *dict); +static PyObject *nxt_py_asgi_lifespan_send_(nxt_py_asgi_lifespan_t *lifespan, + int v, int *sent, PyObject **future); +static PyObject *nxt_py_asgi_lifespan_send_shutdown( + nxt_py_asgi_lifespan_t *lifespan, int v, PyObject *dict); +static PyObject *nxt_py_asgi_lifespan_disable(nxt_py_asgi_lifespan_t *lifespan); +static PyObject *nxt_py_asgi_lifespan_done(PyObject *self, PyObject *future); + + +static nxt_py_asgi_lifespan_t *nxt_py_lifespan; + +static PyMethodDef nxt_py_asgi_lifespan_methods[] = { + { "receive", nxt_py_asgi_lifespan_receive, METH_NOARGS, 0 }, + { "send", nxt_py_asgi_lifespan_send, METH_O, 0 }, + { "_done", nxt_py_asgi_lifespan_done, METH_O, 0 }, + { NULL, NULL, 0, 0 } +}; + +static PyAsyncMethods nxt_py_asgi_async_methods = { + .am_await = nxt_py_asgi_await, +}; + +static PyTypeObject nxt_py_asgi_lifespan_type = { + PyVarObject_HEAD_INIT(NULL, 0) + + .tp_name = "unit._asgi_lifespan", + .tp_basicsize = sizeof(nxt_py_asgi_lifespan_t), + .tp_dealloc = nxt_py_asgi_dealloc, + .tp_as_async = &nxt_py_asgi_async_methods, + .tp_flags = Py_TPFLAGS_DEFAULT, + .tp_doc = "unit ASGI Lifespan object", + .tp_iter = nxt_py_asgi_iter, + .tp_iternext = nxt_py_asgi_next, + .tp_methods = nxt_py_asgi_lifespan_methods, +}; + + +nxt_int_t +nxt_py_asgi_lifespan_startup(nxt_task_t *task) +{ + PyObject *scope, *res, *py_task, *receive, *send, *done; + nxt_int_t rc; + nxt_py_asgi_lifespan_t *lifespan; + + if (nxt_slow_path(PyType_Ready(&nxt_py_asgi_lifespan_type) != 0)) { + nxt_alert(task, + "Python failed to initialize the 'asgi_lifespan' type object"); + return NXT_ERROR; + } + + lifespan = PyObject_New(nxt_py_asgi_lifespan_t, &nxt_py_asgi_lifespan_type); + if (nxt_slow_path(lifespan == NULL)) { + nxt_alert(task, "Python failed to create lifespan object"); + return NXT_ERROR; + } + + rc = NXT_ERROR; + + receive = PyObject_GetAttrString((PyObject *) lifespan, "receive"); + if (nxt_slow_path(receive == NULL)) { + nxt_alert(task, "Python failed to get 'receive' method"); + goto release_lifespan; + } + + send = PyObject_GetAttrString((PyObject *) lifespan, "send"); + if (nxt_slow_path(receive == NULL)) { + nxt_alert(task, "Python failed to get 'send' method"); + goto release_receive; + } + + done = PyObject_GetAttrString((PyObject *) lifespan, "_done"); + if (nxt_slow_path(receive == NULL)) { + nxt_alert(task, "Python failed to get '_done' method"); + goto release_send; + } + + lifespan->startup_future = PyObject_CallObject(nxt_py_loop_create_future, + NULL); + if (nxt_slow_path(lifespan->startup_future == NULL)) { + nxt_unit_alert(NULL, "Python failed to create Future object"); + nxt_python_print_exception(); + + goto release_done; + } + + lifespan->disabled = 0; + lifespan->startup_received = 0; + lifespan->startup_sent = 0; + lifespan->shutdown_received = 0; + lifespan->shutdown_sent = 0; + lifespan->shutdown_called = 0; + lifespan->shutdown_future = NULL; + lifespan->receive_future = NULL; + + scope = nxt_py_asgi_new_scope(NULL, nxt_py_lifespan_str, nxt_py_2_0_str); + if (nxt_slow_path(scope == NULL)) { + goto release_future; + } + + res = PyObject_CallFunctionObjArgs(nxt_py_application, + scope, receive, send, NULL); + if (nxt_slow_path(res == NULL)) { + nxt_log(task, NXT_LOG_ERR, "Python failed to call the application"); + nxt_python_print_exception(); + goto release_scope; + } + + if (nxt_slow_path(!PyCoro_CheckExact(res))) { + nxt_log(task, NXT_LOG_ERR, + "Application result type is not a coroutine"); + Py_DECREF(res); + goto release_scope; + } + + py_task = PyObject_CallFunctionObjArgs(nxt_py_loop_create_task, res, NULL); + if (nxt_slow_path(py_task == NULL)) { + nxt_log(task, NXT_LOG_ERR, "Python failed to call the create_task"); + nxt_python_print_exception(); + Py_DECREF(res); + goto release_scope; + } + + Py_DECREF(res); + + res = PyObject_CallMethodObjArgs(py_task, nxt_py_add_done_callback_str, + done, NULL); + if (nxt_slow_path(res == NULL)) { + nxt_log(task, NXT_LOG_ERR, + "Python failed to call 'task.add_done_callback'"); + nxt_python_print_exception(); + goto release_task; + } + + Py_DECREF(res); + + res = PyObject_CallFunctionObjArgs(nxt_py_loop_run_until_complete, + lifespan->startup_future, NULL); + if (nxt_slow_path(res == NULL)) { + nxt_alert(task, "Python failed to call loop.run_until_complete"); + nxt_python_print_exception(); + goto release_task; + } + + Py_DECREF(res); + + if (lifespan->startup_sent == 1 || lifespan->disabled) { + nxt_py_lifespan = lifespan; + Py_INCREF(nxt_py_lifespan); + + rc = NXT_OK; + } + +release_task: + Py_DECREF(py_task); +release_scope: + Py_DECREF(scope); +release_future: + Py_CLEAR(lifespan->startup_future); +release_done: + Py_DECREF(done); +release_send: + Py_DECREF(send); +release_receive: + Py_DECREF(receive); +release_lifespan: + Py_DECREF(lifespan); + + return rc; +} + + +nxt_int_t +nxt_py_asgi_lifespan_shutdown(void) +{ + PyObject *msg, *future, *res; + nxt_py_asgi_lifespan_t *lifespan; + + if (nxt_slow_path(nxt_py_lifespan == NULL || nxt_py_lifespan->disabled)) { + return NXT_OK; + } + + lifespan = nxt_py_lifespan; + lifespan->shutdown_called = 1; + + if (lifespan->receive_future != NULL) { + future = lifespan->receive_future; + lifespan->receive_future = NULL; + + msg = nxt_py_asgi_new_msg(NULL, nxt_py_lifespan_shutdown_str); + + if (nxt_fast_path(msg != NULL)) { + res = PyObject_CallMethodObjArgs(future, nxt_py_set_result_str, + msg, NULL); + Py_XDECREF(res); + Py_DECREF(msg); + } + + Py_DECREF(future); + } + + if (lifespan->shutdown_sent) { + return NXT_OK; + } + + lifespan->shutdown_future = PyObject_CallObject(nxt_py_loop_create_future, + NULL); + if (nxt_slow_path(lifespan->shutdown_future == NULL)) { + nxt_unit_alert(NULL, "Python failed to create Future object"); + nxt_python_print_exception(); + return NXT_ERROR; + } + + res = PyObject_CallFunctionObjArgs(nxt_py_loop_run_until_complete, + lifespan->shutdown_future, NULL); + if (nxt_slow_path(res == NULL)) { + nxt_unit_alert(NULL, "Python failed to call loop.run_until_complete"); + nxt_python_print_exception(); + return NXT_ERROR; + } + + Py_DECREF(res); + Py_CLEAR(lifespan->shutdown_future); + + return NXT_OK; +} + + +static PyObject * +nxt_py_asgi_lifespan_receive(PyObject *self, PyObject *none) +{ + PyObject *msg, *future; + nxt_py_asgi_lifespan_t *lifespan; + + lifespan = (nxt_py_asgi_lifespan_t *) self; + + nxt_unit_debug(NULL, "asgi_lifespan_receive"); + + future = PyObject_CallObject(nxt_py_loop_create_future, NULL); + if (nxt_slow_path(future == NULL)) { + nxt_unit_alert(NULL, "Python failed to create Future object"); + nxt_python_print_exception(); + + return PyErr_Format(PyExc_RuntimeError, + "failed to create Future object"); + } + + if (!lifespan->startup_received) { + lifespan->startup_received = 1; + + msg = nxt_py_asgi_new_msg(NULL, nxt_py_lifespan_startup_str); + + return nxt_py_asgi_set_result_soon(NULL, future, msg); + } + + if (lifespan->shutdown_called && !lifespan->shutdown_received) { + lifespan->shutdown_received = 1; + + msg = nxt_py_asgi_new_msg(NULL, nxt_py_lifespan_shutdown_str); + + return nxt_py_asgi_set_result_soon(NULL, future, msg); + } + + Py_INCREF(future); + lifespan->receive_future = future; + + return future; +} + + +static PyObject * +nxt_py_asgi_lifespan_send(PyObject *self, PyObject *dict) +{ + PyObject *type, *msg; + const char *type_str; + Py_ssize_t type_len; + nxt_py_asgi_lifespan_t *lifespan; + + static const nxt_str_t startup_complete + = nxt_string("lifespan.startup.complete"); + static const nxt_str_t startup_failed + = nxt_string("lifespan.startup.failed"); + static const nxt_str_t shutdown_complete + = nxt_string("lifespan.shutdown.complete"); + static const nxt_str_t shutdown_failed + = nxt_string("lifespan.shutdown.failed"); + + lifespan = (nxt_py_asgi_lifespan_t *) self; + + type = PyDict_GetItem(dict, nxt_py_type_str); + if (nxt_slow_path(type == NULL || !PyUnicode_Check(type))) { + nxt_unit_error(NULL, + "asgi_lifespan_send: 'type' is not a unicode string"); + return PyErr_Format(PyExc_TypeError, + "'type' is not a unicode string"); + } + + type_str = PyUnicode_AsUTF8AndSize(type, &type_len); + + nxt_unit_debug(NULL, "asgi_lifespan_send type is '%.*s'", + (int) type_len, type_str); + + if (type_len == (Py_ssize_t) startup_complete.length + && memcmp(type_str, startup_complete.start, type_len) == 0) + { + return nxt_py_asgi_lifespan_send_startup(lifespan, 0, NULL); + } + + if (type_len == (Py_ssize_t) startup_failed.length + && memcmp(type_str, startup_failed.start, type_len) == 0) + { + msg = PyDict_GetItem(dict, nxt_py_message_str); + return nxt_py_asgi_lifespan_send_startup(lifespan, 1, msg); + } + + if (type_len == (Py_ssize_t) shutdown_complete.length + && memcmp(type_str, shutdown_complete.start, type_len) == 0) + { + return nxt_py_asgi_lifespan_send_shutdown(lifespan, 0, NULL); + } + + if (type_len == (Py_ssize_t) shutdown_failed.length + && memcmp(type_str, shutdown_failed.start, type_len) == 0) + { + msg = PyDict_GetItem(dict, nxt_py_message_str); + return nxt_py_asgi_lifespan_send_shutdown(lifespan, 1, msg); + } + + return nxt_py_asgi_lifespan_disable(lifespan); +} + + +static PyObject * +nxt_py_asgi_lifespan_send_startup(nxt_py_asgi_lifespan_t *lifespan, int v, + PyObject *message) +{ + const char *message_str; + Py_ssize_t message_len; + + if (nxt_slow_path(v != 0)) { + nxt_unit_error(NULL, "Application startup failed"); + + if (nxt_fast_path(message != NULL && PyUnicode_Check(message))) { + message_str = PyUnicode_AsUTF8AndSize(message, &message_len); + + nxt_unit_error(NULL, "%.*s", (int) message_len, message_str); + } + } + + return nxt_py_asgi_lifespan_send_(lifespan, v, + &lifespan->startup_sent, + &lifespan->startup_future); +} + + +static PyObject * +nxt_py_asgi_lifespan_send_(nxt_py_asgi_lifespan_t *lifespan, int v, int *sent, + PyObject **pfuture) +{ + PyObject *future, *res; + + if (*sent) { + return nxt_py_asgi_lifespan_disable(lifespan); + } + + *sent = 1 + v; + + if (*pfuture != NULL) { + future = *pfuture; + *pfuture = NULL; + + res = PyObject_CallMethodObjArgs(future, nxt_py_set_result_str, + Py_None, NULL); + if (nxt_slow_path(res == NULL)) { + nxt_unit_alert(NULL, "Failed to call 'future.set_result'"); + nxt_python_print_exception(); + + return nxt_py_asgi_lifespan_disable(lifespan); + } + + Py_DECREF(res); + Py_DECREF(future); + } + + Py_INCREF(lifespan); + + return (PyObject *) lifespan; +} + + +static PyObject * +nxt_py_asgi_lifespan_disable(nxt_py_asgi_lifespan_t *lifespan) +{ + nxt_unit_warn(NULL, "Got invalid state transition on lifespan protocol"); + + lifespan->disabled = 1; + + return PyErr_Format(PyExc_AssertionError, + "Got invalid state transition on lifespan protocol"); +} + + +static PyObject * +nxt_py_asgi_lifespan_send_shutdown(nxt_py_asgi_lifespan_t *lifespan, int v, + PyObject *message) +{ + return nxt_py_asgi_lifespan_send_(lifespan, v, + &lifespan->shutdown_sent, + &lifespan->shutdown_future); +} + + +static PyObject * +nxt_py_asgi_lifespan_done(PyObject *self, PyObject *future) +{ + PyObject *res; + nxt_py_asgi_lifespan_t *lifespan; + + nxt_unit_debug(NULL, "asgi_lifespan_done"); + + lifespan = (nxt_py_asgi_lifespan_t *) self; + + if (lifespan->startup_sent == 0) { + lifespan->disabled = 1; + } + + /* + * Get Future.result() and it raises an exception, if coroutine exited + * with exception. + */ + res = PyObject_CallMethodObjArgs(future, nxt_py_result_str, NULL); + if (nxt_slow_path(res == NULL)) { + nxt_unit_log(NULL, NXT_UNIT_LOG_INFO, + "ASGI Lifespan processing exception"); + nxt_python_print_exception(); + } + + Py_XDECREF(res); + + if (lifespan->startup_future != NULL) { + future = lifespan->startup_future; + lifespan->startup_future = NULL; + + res = PyObject_CallMethodObjArgs(future, nxt_py_set_result_str, + Py_None, NULL); + if (nxt_slow_path(res == NULL)) { + nxt_unit_alert(NULL, "Failed to call 'future.set_result'"); + nxt_python_print_exception(); + } + + Py_XDECREF(res); + Py_DECREF(future); + } + + if (lifespan->shutdown_future != NULL) { + future = lifespan->shutdown_future; + lifespan->shutdown_future = NULL; + + res = PyObject_CallMethodObjArgs(future, nxt_py_set_result_str, + Py_None, NULL); + if (nxt_slow_path(res == NULL)) { + nxt_unit_alert(NULL, "Failed to call 'future.set_result'"); + nxt_python_print_exception(); + } + + Py_XDECREF(res); + Py_DECREF(future); + } + + Py_RETURN_NONE; +} + + +#endif /* NXT_HAVE_ASGI */ |