/* * Copyright (C) NGINX, Inc. */ #include #if (NXT_HAVE_ASGI) #include #include #include #include typedef struct { PyObject_HEAD nxt_py_asgi_ctx_data_t *ctx_data; int disabled; int startup_received; int startup_sent; int shutdown_received; int shutdown_sent; int shutdown_called; PyObject *startup_future; PyObject *shutdown_future; PyObject *receive_future; PyObject *state; } nxt_py_asgi_lifespan_t; static PyObject *nxt_py_asgi_lifespan_target_startup( nxt_py_asgi_ctx_data_t *ctx_data, nxt_python_target_t *target); static int nxt_py_asgi_lifespan_target_shutdown( nxt_py_asgi_lifespan_t *lifespan); static PyObject *nxt_py_asgi_lifespan_receive(PyObject *self, PyObject *none); static PyObject *nxt_py_asgi_lifespan_send(PyObject *self, PyObject *dict); static PyObject *nxt_py_asgi_lifespan_send_startup( nxt_py_asgi_lifespan_t *lifespan, int v, PyObject *dict); static PyObject *nxt_py_asgi_lifespan_send_(nxt_py_asgi_lifespan_t *lifespan, int v, int *sent, PyObject **future); static PyObject *nxt_py_asgi_lifespan_send_shutdown( nxt_py_asgi_lifespan_t *lifespan, int v, PyObject *dict); static PyObject *nxt_py_asgi_lifespan_disable(nxt_py_asgi_lifespan_t *lifespan); static PyObject *nxt_py_asgi_lifespan_done(PyObject *self, PyObject *future); static void nxt_py_asgi_lifespan_dealloc(PyObject *self); static PyMethodDef nxt_py_asgi_lifespan_methods[] = { { "receive", nxt_py_asgi_lifespan_receive, METH_NOARGS, 0 }, { "send", nxt_py_asgi_lifespan_send, METH_O, 0 }, { "_done", nxt_py_asgi_lifespan_done, METH_O, 0 }, { NULL, NULL, 0, 0 } }; static PyMemberDef nxt_py_asgi_lifespan_members[] = { { #if PY_VERSION_HEX >= NXT_PYTHON_VER(3, 7) .name = "state", #else .name = (char *)"state", #endif .type = T_OBJECT_EX, .offset = offsetof(nxt_py_asgi_lifespan_t, state), .flags = READONLY, #if PY_VERSION_HEX >= NXT_PYTHON_VER(3, 7) .doc = PyDoc_STR("lifespan.state") #else .doc = (char *)PyDoc_STR("lifespan.state") #endif }, { NULL, 0, 0, 0, NULL } }; static PyAsyncMethods nxt_py_asgi_async_methods = { .am_await = nxt_py_asgi_await, }; static PyTypeObject nxt_py_asgi_lifespan_type = { PyVarObject_HEAD_INIT(NULL, 0) .tp_name = "unit._asgi_lifespan", .tp_basicsize = sizeof(nxt_py_asgi_lifespan_t), .tp_dealloc = nxt_py_asgi_lifespan_dealloc, .tp_as_async = &nxt_py_asgi_async_methods, .tp_flags = Py_TPFLAGS_DEFAULT, .tp_doc = "unit ASGI Lifespan object", .tp_iter = nxt_py_asgi_iter, .tp_iternext = nxt_py_asgi_next, .tp_methods = nxt_py_asgi_lifespan_methods, .tp_members = nxt_py_asgi_lifespan_members, }; int nxt_py_asgi_lifespan_startup(nxt_py_asgi_ctx_data_t *ctx_data) { size_t size; PyObject *lifespan; PyObject **target_lifespans; nxt_int_t i; nxt_python_target_t *target; size = nxt_py_targets->count * sizeof(PyObject*); target_lifespans = nxt_unit_malloc(NULL, size); if (nxt_slow_path(target_lifespans == NULL)) { nxt_unit_alert(NULL, "Failed to allocate lifespan data"); return NXT_UNIT_ERROR; } memset(target_lifespans, 0, size); for (i = 0; i < nxt_py_targets->count; i++) { target = &nxt_py_targets->target[i]; lifespan = nxt_py_asgi_lifespan_target_startup(ctx_data, target); if (nxt_slow_path(lifespan == NULL)) { return NXT_UNIT_ERROR; } target_lifespans[i] = lifespan; } ctx_data->target_lifespans = target_lifespans; return NXT_UNIT_OK; } static PyObject * nxt_py_asgi_lifespan_target_startup(nxt_py_asgi_ctx_data_t *ctx_data, nxt_python_target_t *target) { PyObject *scope, *res, *py_task, *receive, *send, *done; PyObject *stage2; nxt_py_asgi_lifespan_t *lifespan, *ret; if (nxt_slow_path(PyType_Ready(&nxt_py_asgi_lifespan_type) != 0)) { nxt_unit_alert(NULL, "Python failed to initialize the 'asgi_lifespan' type object"); return NULL; } lifespan = PyObject_New(nxt_py_asgi_lifespan_t, &nxt_py_asgi_lifespan_type); if (nxt_slow_path(lifespan == NULL)) { nxt_unit_alert(NULL, "Python failed to create lifespan object"); return NULL; } ret = NULL; receive = PyObject_GetAttrString((PyObject *) lifespan, "receive"); if (nxt_slow_path(receive == NULL)) { nxt_unit_alert(NULL, "Python failed to get 'receive' method"); goto release_lifespan; } send = PyObject_GetAttrString((PyObject *) lifespan, "send"); if (nxt_slow_path(receive == NULL)) { nxt_unit_alert(NULL, "Python failed to get 'send' method"); goto release_receive; } done = PyObject_GetAttrString((PyObject *) lifespan, "_done"); if (nxt_slow_path(receive == NULL)) { nxt_unit_alert(NULL, "Python failed to get '_done' method"); goto release_send; } lifespan->startup_future = PyObject_CallObject(ctx_data->loop_create_future, NULL); if (nxt_slow_path(lifespan->startup_future == NULL)) { nxt_unit_alert(NULL, "Python failed to create Future object"); nxt_python_print_exception(); goto release_done; } lifespan->ctx_data = ctx_data; lifespan->disabled = 0; lifespan->startup_received = 0; lifespan->startup_sent = 0; lifespan->shutdown_received = 0; lifespan->shutdown_sent = 0; lifespan->shutdown_called = 0; lifespan->shutdown_future = NULL; lifespan->receive_future = NULL; lifespan->state = NULL; scope = nxt_py_asgi_new_scope(NULL, nxt_py_lifespan_str, nxt_py_2_0_str); if (nxt_slow_path(scope == NULL)) { goto release_future; } lifespan->state = PyDict_New(); if (nxt_slow_path(lifespan->state == NULL)) { nxt_unit_req_error(NULL, "Python failed to create 'state' dict"); goto release_future; } if (nxt_slow_path(PyDict_SetItem(scope, nxt_py_state_str, lifespan->state) == -1)) { nxt_unit_req_error(NULL, "Python failed to set 'scope.state' item"); Py_CLEAR(lifespan->state); goto release_future; } if (!target->asgi_legacy) { nxt_unit_req_debug(NULL, "Python call ASGI 3.0 application"); res = PyObject_CallFunctionObjArgs(target->application, scope, receive, send, NULL); } else { nxt_unit_req_debug(NULL, "Python call legacy application"); res = PyObject_CallFunctionObjArgs(target->application, scope, NULL); if (nxt_slow_path(res == NULL)) { nxt_unit_log(NULL, NXT_UNIT_LOG_INFO, "ASGI Lifespan processing exception"); nxt_python_print_exception(); lifespan->disabled = 1; Py_INCREF(lifespan); ret = lifespan; goto release_scope; } if (nxt_slow_path(PyCallable_Check(res) == 0)) { nxt_unit_req_error(NULL, "Legacy ASGI application returns not a callable"); Py_DECREF(res); goto release_scope; } stage2 = res; res = PyObject_CallFunctionObjArgs(stage2, receive, send, NULL); Py_DECREF(stage2); } if (nxt_slow_path(res == NULL)) { nxt_unit_error(NULL, "Python failed to call the application"); nxt_python_print_exception(); goto release_scope; } if (nxt_slow_path(!PyCoro_CheckExact(res))) { nxt_unit_error(NULL, "Application result type is not a coroutine"); Py_DECREF(res); goto release_scope; } py_task = PyObject_CallFunctionObjArgs(ctx_data->loop_create_task, res, NULL); if (nxt_slow_path(py_task == NULL)) { nxt_unit_alert(NULL, "Python failed to call the create_task"); nxt_python_print_exception(); Py_DECREF(res); goto release_scope; } Py_DECREF(res); res = PyObject_CallMethodObjArgs(py_task, nxt_py_add_done_callback_str, done, NULL); if (nxt_slow_path(res == NULL)) { nxt_unit_alert(NULL, "Python failed to call 'task.add_done_callback'"); nxt_python_print_exception(); goto release_task; } Py_DECREF(res); res = PyObject_CallFunctionObjArgs(ctx_data->loop_run_until_complete, lifespan->startup_future, NULL); if (nxt_slow_path(res == NULL)) { nxt_unit_alert(NULL, "Python failed to call loop.run_until_complete"); nxt_python_print_exception(); goto release_task; } Py_DECREF(res); if (lifespan->startup_sent == 1 || lifespan->disabled) { Py_INCREF(lifespan); ret = lifespan; } release_task: Py_DECREF(py_task); release_scope: Py_DECREF(scope); release_future: Py_CLEAR(lifespan->startup_future); release_done: Py_DECREF(done); release_send: Py_DECREF(send); release_receive: Py_DECREF(receive); release_lifespan: Py_DECREF(lifespan); return (PyObject *) ret; } int nxt_py_asgi_lifespan_shutdown(nxt_unit_ctx_t *ctx) { nxt_int_t i, ret; nxt_py_asgi_lifespan_t *lifespan; nxt_py_asgi_ctx_data_t *ctx_data; ctx_data = ctx->data; for (i = 0; i < nxt_py_targets->count; i++) { lifespan = (nxt_py_asgi_lifespan_t *)ctx_data->target_lifespans[i]; ret = nxt_py_asgi_lifespan_target_shutdown(lifespan); if (nxt_slow_path(ret != NXT_UNIT_OK)) { return NXT_UNIT_ERROR; } } nxt_unit_free(NULL, ctx_data->target_lifespans); return NXT_UNIT_OK; } static int nxt_py_asgi_lifespan_target_shutdown(nxt_py_asgi_lifespan_t *lifespan) { PyObject *msg, *future, *res; nxt_py_asgi_ctx_data_t *ctx_data; ctx_data = lifespan->ctx_data; if (nxt_slow_path(lifespan == NULL || lifespan->disabled)) { return NXT_UNIT_OK; } lifespan->shutdown_called = 1; if (lifespan->receive_future != NULL) { future = lifespan->receive_future; lifespan->receive_future = NULL; msg = nxt_py_asgi_new_msg(NULL, nxt_py_lifespan_shutdown_str); if (nxt_fast_path(msg != NULL)) { res = PyObject_CallMethodObjArgs(future, nxt_py_set_result_str, msg, NULL); Py_XDECREF(res); Py_DECREF(msg); } Py_DECREF(future); } if (lifespan->shutdown_sent) { return NXT_UNIT_OK; } lifespan->shutdown_future = PyObject_CallObject(ctx_data->loop_create_future, NULL); if (nxt_slow_path(lifespan->shutdown_future == NULL)) { nxt_unit_alert(NULL, "Python failed to create Future object"); nxt_python_print_exception(); return NXT_UNIT_ERROR; } res = PyObject_CallFunctionObjArgs(ctx_data->loop_run_until_complete, lifespan->shutdown_future, NULL); if (nxt_slow_path(res == NULL)) { nxt_unit_alert(NULL, "Python failed to call loop.run_until_complete"); nxt_python_print_exception(); return NXT_UNIT_ERROR; } Py_DECREF(res); Py_CLEAR(lifespan->shutdown_future); return NXT_UNIT_OK; } static PyObject * nxt_py_asgi_lifespan_receive(PyObject *self, PyObject *none) { PyObject *msg, *future; nxt_py_asgi_lifespan_t *lifespan; nxt_py_asgi_ctx_data_t *ctx_data; lifespan = (nxt_py_asgi_lifespan_t *) self; ctx_data = lifespan->ctx_data; nxt_unit_debug(NULL, "asgi_lifespan_receive"); future = PyObject_CallObject(ctx_data->loop_create_future, NULL); if (nxt_slow_path(future == NULL)) { nxt_unit_alert(NULL, "Python failed to create Future object"); nxt_python_print_exception(); return PyErr_Format(PyExc_RuntimeError, "failed to create Future object"); } if (!lifespan->startup_received) { lifespan->startup_received = 1; msg = nxt_py_asgi_new_msg(NULL, nxt_py_lifespan_startup_str); return nxt_py_asgi_set_result_soon(NULL, ctx_data, future, msg); } if (lifespan->shutdown_called && !lifespan->shutdown_received) { lifespan->shutdown_received = 1; msg = nxt_py_asgi_new_msg(NULL, nxt_py_lifespan_shutdown_str); return nxt_py_asgi_set_result_soon(NULL, ctx_data, future, msg); } Py_INCREF(future); lifespan->receive_future = future; return future; } static PyObject * nxt_py_asgi_lifespan_send(PyObject *self, PyObject *dict) { PyObject *type, *msg; const char *type_str; Py_ssize_t type_len; nxt_py_asgi_lifespan_t *lifespan; static const nxt_str_t startup_complete = nxt_string("lifespan.startup.complete"); static const nxt_str_t startup_failed = nxt_string("lifespan.startup.failed"); static const nxt_str_t shutdown_complete = nxt_string("lifespan.shutdown.complete"); static const nxt_str_t shutdown_failed = nxt_string("lifespan.shutdown.failed"); lifespan = (nxt_py_asgi_lifespan_t *) self; type = PyDict_GetItem(dict, nxt_py_type_str); if (nxt_slow_path(type == NULL || !PyUnicode_Check(type))) { nxt_unit_error(NULL, "asgi_lifespan_send: 'type' is not a unicode string"); return PyErr_Format(PyExc_TypeError, "'type' is not a unicode string"); } type_str = PyUnicode_AsUTF8AndSize(type, &type_len); nxt_unit_debug(NULL, "asgi_lifespan_send type is '%.*s'", (int) type_len, type_str); if (type_len == (Py_ssize_t) startup_complete.length && memcmp(type_str, startup_complete.start, type_len) == 0) { return nxt_py_asgi_lifespan_send_startup(lifespan, 0, NULL); } if (type_len == (Py_ssize_t) startup_failed.length && memcmp(type_str, startup_failed.start, type_len) == 0) { msg = PyDict_GetItem(dict, nxt_py_message_str); return nxt_py_asgi_lifespan_send_startup(lifespan, 1, msg); } if (type_len == (Py_ssize_t) shutdown_complete.length && memcmp(type_str, shutdown_complete.start, type_len) == 0) { return nxt_py_asgi_lifespan_send_shutdown(lifespan, 0, NULL); } if (type_len == (Py_ssize_t) shutdown_failed.length && memcmp(type_str, shutdown_failed.start, type_len) == 0) { msg = PyDict_GetItem(dict, nxt_py_message_str); return nxt_py_asgi_lifespan_send_shutdown(lifespan, 1, msg); } return nxt_py_asgi_lifespan_disable(lifespan); } static PyObject * nxt_py_asgi_lifespan_send_startup(nxt_py_asgi_lifespan_t *lifespan, int v, PyObject *message) { const char *message_str; Py_ssize_t message_len; if (nxt_slow_path(v != 0)) { nxt_unit_error(NULL, "Application startup failed"); if (nxt_fast_path(message != NULL && PyUnicode_Check(message))) { message_str = PyUnicode_AsUTF8AndSize(message, &message_len); nxt_unit_error(NULL, "%.*s", (int) message_len, message_str); } } return nxt_py_asgi_lifespan_send_(lifespan, v, &lifespan->startup_sent, &lifespan->startup_future); } static PyObject * nxt_py_asgi_lifespan_send_(nxt_py_asgi_lifespan_t *lifespan, int v, int *sent, PyObject **pfuture) { PyObject *future, *res; if (*sent) { return nxt_py_asgi_lifespan_disable(lifespan); } *sent = 1 + v; if (*pfuture != NULL) { future = *pfuture; *pfuture = NULL; res = PyObject_CallMethodObjArgs(future, nxt_py_set_result_str, Py_None, NULL); if (nxt_slow_path(res == NULL)) { nxt_unit_alert(NULL, "Failed to call 'future.set_result'"); nxt_python_print_exception(); return nxt_py_asgi_lifespan_disable(lifespan); } Py_DECREF(res); Py_DECREF(future); } Py_INCREF(lifespan); return (PyObject *) lifespan; } static PyObject * nxt_py_asgi_lifespan_disable(nxt_py_asgi_lifespan_t *lifespan) { nxt_unit_warn(NULL, "Got invalid state transition on lifespan protocol"); lifespan->disabled = 1; return PyErr_Format(PyExc_AssertionError, "Got invalid state transition on lifespan protocol"); } static PyObject * nxt_py_asgi_lifespan_send_shutdown(nxt_py_asgi_lifespan_t *lifespan, int v, PyObject *message) { return nxt_py_asgi_lifespan_send_(lifespan, v, &lifespan->shutdown_sent, &lifespan->shutdown_future); } static PyObject * nxt_py_asgi_lifespan_done(PyObject *self, PyObject *future) { PyObject *res; nxt_py_asgi_lifespan_t *lifespan; nxt_unit_debug(NULL, "asgi_lifespan_done"); lifespan = (nxt_py_asgi_lifespan_t *) self; if (lifespan->startup_sent == 0) { lifespan->disabled = 1; } /* * Get Future.result() and it raises an exception, if coroutine exited * with exception. */ res = PyObject_CallMethodObjArgs(future, nxt_py_result_str, NULL); if (nxt_slow_path(res == NULL)) { nxt_unit_log(NULL, NXT_UNIT_LOG_INFO, "ASGI Lifespan processing exception"); nxt_python_print_exception(); } Py_XDECREF(res); if (lifespan->startup_future != NULL) { future = lifespan->startup_future; lifespan->startup_future = NULL; res = PyObject_CallMethodObjArgs(future, nxt_py_set_result_str, Py_None, NULL); if (nxt_slow_path(res == NULL)) { nxt_unit_alert(NULL, "Failed to call 'future.set_result'"); nxt_python_print_exception(); } Py_XDECREF(res); Py_DECREF(future); } if (lifespan->shutdown_future != NULL) { future = lifespan->shutdown_future; lifespan->shutdown_future = NULL; res = PyObject_CallMethodObjArgs(future, nxt_py_set_result_str, Py_None, NULL); if (nxt_slow_path(res == NULL)) { nxt_unit_alert(NULL, "Failed to call 'future.set_result'"); nxt_python_print_exception(); } Py_XDECREF(res); Py_DECREF(future); } Py_RETURN_NONE; } static void nxt_py_asgi_lifespan_dealloc(PyObject *self) { nxt_py_asgi_lifespan_t *lifespan = (nxt_py_asgi_lifespan_t *)self; Py_CLEAR(lifespan->state); PyObject_Del(self); } #endif /* NXT_HAVE_ASGI */