/* * Copyright (C) NGINX, Inc. */ #include #if (NXT_HAVE_ASGI) #include #include #include #include #include typedef struct { PyObject_HEAD nxt_unit_request_info_t *req; nxt_queue_link_t link; PyObject *receive_future; PyObject *send_future; uint64_t content_length; uint64_t bytes_sent; int complete; PyObject *send_body; Py_ssize_t send_body_off; } nxt_py_asgi_http_t; static PyObject *nxt_py_asgi_http_receive(PyObject *self, PyObject *none); static PyObject *nxt_py_asgi_http_read_msg(nxt_py_asgi_http_t *http); static PyObject *nxt_py_asgi_http_send(PyObject *self, PyObject *dict); static PyObject *nxt_py_asgi_http_response_start(nxt_py_asgi_http_t *http, PyObject *dict); static PyObject *nxt_py_asgi_http_response_body(nxt_py_asgi_http_t *http, PyObject *dict); static PyObject *nxt_py_asgi_http_done(PyObject *self, PyObject *future); static PyMethodDef nxt_py_asgi_http_methods[] = { { "receive", nxt_py_asgi_http_receive, METH_NOARGS, 0 }, { "send", nxt_py_asgi_http_send, METH_O, 0 }, { "_done", nxt_py_asgi_http_done, METH_O, 0 }, { NULL, NULL, 0, 0 } }; static PyAsyncMethods nxt_py_asgi_async_methods = { .am_await = nxt_py_asgi_await, }; static PyTypeObject nxt_py_asgi_http_type = { PyVarObject_HEAD_INIT(NULL, 0) .tp_name = "unit._asgi_http", .tp_basicsize = sizeof(nxt_py_asgi_http_t), .tp_dealloc = nxt_py_asgi_dealloc, .tp_as_async = &nxt_py_asgi_async_methods, .tp_flags = Py_TPFLAGS_DEFAULT, .tp_doc = "unit ASGI HTTP request object", .tp_iter = nxt_py_asgi_iter, .tp_iternext = nxt_py_asgi_next, .tp_methods = nxt_py_asgi_http_methods, }; static Py_ssize_t nxt_py_asgi_http_body_buf_size = 32 * 1024 * 1024; int nxt_py_asgi_http_init(void) { if (nxt_slow_path(PyType_Ready(&nxt_py_asgi_http_type) != 0)) { nxt_unit_alert(NULL, "Python failed to initialize the 'http' type object"); return NXT_UNIT_ERROR; } return NXT_UNIT_OK; } PyObject * nxt_py_asgi_http_create(nxt_unit_request_info_t *req) { nxt_py_asgi_http_t *http; http = PyObject_New(nxt_py_asgi_http_t, &nxt_py_asgi_http_type); if (nxt_fast_path(http != NULL)) { http->req = req; http->receive_future = NULL; http->send_future = NULL; http->content_length = -1; http->bytes_sent = 0; http->complete = 0; http->send_body = NULL; http->send_body_off = 0; } return (PyObject *) http; } static PyObject * nxt_py_asgi_http_receive(PyObject *self, PyObject *none) { PyObject *msg, *future; nxt_py_asgi_http_t *http; nxt_py_asgi_ctx_data_t *ctx_data; nxt_unit_request_info_t *req; http = (nxt_py_asgi_http_t *) self; req = http->req; nxt_unit_req_debug(req, "asgi_http_receive"); msg = nxt_py_asgi_http_read_msg(http); if (nxt_slow_path(msg == NULL)) { return NULL; } ctx_data = req->ctx->data; future = PyObject_CallObject(ctx_data->loop_create_future, NULL); if (nxt_slow_path(future == NULL)) { nxt_unit_req_alert(req, "Python failed to create Future object"); nxt_python_print_exception(); Py_DECREF(msg); return PyErr_Format(PyExc_RuntimeError, "failed to create Future object"); } if (msg != Py_None) { return nxt_py_asgi_set_result_soon(req, ctx_data, future, msg); } http->receive_future = future; Py_INCREF(http->receive_future); Py_DECREF(msg); return future; } static PyObject * nxt_py_asgi_http_read_msg(nxt_py_asgi_http_t *http) { char *body_buf; ssize_t read_res; PyObject *msg, *body; Py_ssize_t size; nxt_unit_request_info_t *req; req = http->req; size = req->content_length; if (size > nxt_py_asgi_http_body_buf_size) { size = nxt_py_asgi_http_body_buf_size; } if (size > 0) { body = PyBytes_FromStringAndSize(NULL, size); if (nxt_slow_path(body == NULL)) { nxt_unit_req_alert(req, "Python failed to create body byte string"); nxt_python_print_exception(); return PyErr_Format(PyExc_RuntimeError, "failed to create Bytes object"); } body_buf = PyBytes_AS_STRING(body); read_res = nxt_unit_request_read(req, body_buf, size); } else { body = NULL; read_res = 0; } if (read_res > 0 || read_res == size) { msg = nxt_py_asgi_new_msg(req, nxt_py_http_request_str); if (nxt_slow_path(msg == NULL)) { Py_XDECREF(body); return NULL; } #define SET_ITEM(dict, key, value) \ if (nxt_slow_path(PyDict_SetItem(dict, nxt_py_ ## key ## _str, value) \ == -1)) \ { \ nxt_unit_req_alert(req, \ "Python failed to set '" #dict "." #key "' item"); \ PyErr_SetString(PyExc_RuntimeError, \ "Python failed to set '" #dict "." #key "' item"); \ goto fail; \ } if (body != NULL) { SET_ITEM(msg, body, body) } if (req->content_length > 0) { SET_ITEM(msg, more_body, Py_True) } #undef SET_ITEM Py_XDECREF(body); return msg; } Py_XDECREF(body); Py_RETURN_NONE; fail: Py_DECREF(msg); Py_XDECREF(body); return NULL; } static PyObject * nxt_py_asgi_http_send(PyObject *self, PyObject *dict) { PyObject *type; const char *type_str; Py_ssize_t type_len; nxt_py_asgi_http_t *http; static const nxt_str_t response_start = nxt_string("http.response.start"); static const nxt_str_t response_body = nxt_string("http.response.body"); http = (nxt_py_asgi_http_t *) self; type = PyDict_GetItem(dict, nxt_py_type_str); if (nxt_slow_path(type == NULL || !PyUnicode_Check(type))) { nxt_unit_req_error(http->req, "asgi_http_send: " "'type' is not a unicode string"); return PyErr_Format(PyExc_TypeError, "'type' is not a unicode string"); } type_str = PyUnicode_AsUTF8AndSize(type, &type_len); nxt_unit_req_debug(http->req, "asgi_http_send type is '%.*s'", (int) type_len, type_str); if (type_len == (Py_ssize_t) response_start.length && memcmp(type_str, response_start.start, type_len) == 0) { return nxt_py_asgi_http_response_start(http, dict); } if (type_len == (Py_ssize_t) response_body.length && memcmp(type_str, response_body.start, type_len) == 0) { return nxt_py_asgi_http_response_body(http, dict); } nxt_unit_req_error(http->req, "asgi_http_send: unexpected 'type': '%.*s'", (int) type_len, type_str); return PyErr_Format(PyExc_AssertionError, "unexpected 'type': '%U'", type); } static PyObject * nxt_py_asgi_http_response_start(nxt_py_asgi_http_t *http, PyObject *dict) { int rc; PyObject *status, *headers, *res; nxt_py_asgi_calc_size_ctx_t calc_size_ctx; nxt_py_asgi_add_field_ctx_t add_field_ctx; status = PyDict_GetItem(dict, nxt_py_status_str); if (nxt_slow_path(status == NULL || !PyLong_Check(status))) { nxt_unit_req_error(http->req, "asgi_http_response_start: " "'status' is not an integer"); return PyErr_Format(PyExc_TypeError, "'status' is not an integer"); } calc_size_ctx.fields_size = 0; calc_size_ctx.fields_count = 0; headers = PyDict_GetItem(dict, nxt_py_headers_str); if (headers != NULL) { res = nxt_py_asgi_enum_headers(headers, nxt_py_asgi_calc_size, &calc_size_ctx); if (nxt_slow_path(res == NULL)) { return NULL; } Py_DECREF(res); } rc = nxt_unit_response_init(http->req, PyLong_AsLong(status), calc_size_ctx.fields_count, calc_size_ctx.fields_size); if (nxt_slow_path(rc != NXT_UNIT_OK)) { return PyErr_Format(PyExc_RuntimeError, "failed to allocate response object"); } add_field_ctx.req = http->req; add_field_ctx.content_length = -1; if (headers != NULL) { res = nxt_py_asgi_enum_headers(headers, nxt_py_asgi_add_field, &add_field_ctx); if (nxt_slow_path(res == NULL)) { return NULL; } Py_DECREF(res); } http->content_length = add_field_ctx.content_length; Py_INCREF(http); return (PyObject *) http; } static PyObject * nxt_py_asgi_http_response_body(nxt_py_asgi_http_t *http, PyObject *dict) { int rc; char *body_str; ssize_t sent; PyObject *body, *more_body, *future; Py_ssize_t body_len, body_off; nxt_py_asgi_ctx_data_t *ctx_data; body = PyDict_GetItem(dict, nxt_py_body_str); if (nxt_slow_path(body != NULL && !PyBytes_Check(body))) { return PyErr_Format(PyExc_TypeError, "'body' is not a byte string"); } more_body = PyDict_GetItem(dict, nxt_py_more_body_str); if (nxt_slow_path(more_body != NULL && !PyBool_Check(more_body))) { return PyErr_Format(PyExc_TypeError, "'more_body' is not a bool"); } if (nxt_slow_path(http->complete)) { return PyErr_Format(PyExc_RuntimeError, "Unexpected ASGI message 'http.response.body' " "sent, after response already completed"); } if (nxt_slow_path(http->send_future != NULL)) { return PyErr_Format(PyExc_RuntimeError, "Concurrent send"); } if (body != NULL) { body_str = PyBytes_AS_STRING(body); body_len = PyBytes_GET_SIZE(body); nxt_unit_req_debug(http->req, "asgi_http_response_body: %d, %d", (int) body_len, (more_body == Py_True) ); if (nxt_slow_path(http->bytes_sent + body_len > http->content_length)) { return PyErr_Format(PyExc_RuntimeError, "Response content longer than Content-Length"); } body_off = 0; ctx_data = http->req->ctx->data; while (body_len > 0) { sent = nxt_unit_response_write_nb(http->req, body_str, body_len, 0); if (nxt_slow_path(sent < 0)) { return PyErr_Format(PyExc_RuntimeError, "failed to send body"); } if (nxt_slow_path(sent == 0)) { nxt_unit_req_debug(http->req, "asgi_http_response_body: " "out of shared memory, %d", (int) body_len); future = PyObject_CallObject(ctx_data->loop_create_future, NULL); if (nxt_slow_path(future == NULL)) { nxt_unit_req_alert(http->req, "Python failed to create Future object"); nxt_python_print_exception(); return PyErr_Format(PyExc_RuntimeError, "failed to create Future object"); } http->send_body = body; Py_INCREF(http->send_body); http->send_body_off = body_off; nxt_py_asgi_drain_wait(http->req, &http->link); http->send_future = future; Py_INCREF(http->send_future); return future; } body_str += sent; body_len -= sent; body_off += sent; http->bytes_sent += sent; } } else { nxt_unit_req_debug(http->req, "asgi_http_response_body: 0, %d", (more_body == Py_True) ); if (!nxt_unit_response_is_sent(http->req)) { rc = nxt_unit_response_send(http->req); if (nxt_slow_path(rc != NXT_UNIT_OK)) { return PyErr_Format(PyExc_RuntimeError, "failed to send response"); } } } if (more_body == NULL || more_body == Py_False) { http->complete = 1; } Py_INCREF(http); return (PyObject *) http; } void nxt_py_asgi_http_data_handler(nxt_unit_request_info_t *req) { PyObject *msg, *future, *res; nxt_py_asgi_http_t *http; http = req->data; nxt_unit_req_debug(req, "asgi_http_data_handler"); if (http->receive_future == NULL) { return; } msg = nxt_py_asgi_http_read_msg(http); if (nxt_slow_path(msg == NULL)) { return; } if (msg == Py_None) { Py_DECREF(msg); return; } future = http->receive_future; http->receive_future = NULL; res = PyObject_CallMethodObjArgs(future, nxt_py_set_result_str, msg, NULL); if (nxt_slow_path(res == NULL)) { nxt_unit_req_alert(req, "'set_result' call failed"); nxt_python_print_exception(); } Py_XDECREF(res); Py_DECREF(future); Py_DECREF(msg); } int nxt_py_asgi_http_drain(nxt_queue_link_t *lnk) { char *body_str; ssize_t sent; PyObject *future, *exc, *res; Py_ssize_t body_len; nxt_py_asgi_http_t *http; http = nxt_container_of(lnk, nxt_py_asgi_http_t, link); body_str = PyBytes_AS_STRING(http->send_body) + http->send_body_off; body_len = PyBytes_GET_SIZE(http->send_body) - http->send_body_off; nxt_unit_req_debug(http->req, "asgi_http_drain: %d", (int) body_len); while (body_len > 0) { sent = nxt_unit_response_write_nb(http->req, body_str, body_len, 0); if (nxt_slow_path(sent < 0)) { goto fail; } if (nxt_slow_path(sent == 0)) { return NXT_UNIT_AGAIN; } body_str += sent; body_len -= sent; http->send_body_off += sent; http->bytes_sent += sent; } Py_CLEAR(http->send_body); future = http->send_future; http->send_future = NULL; res = PyObject_CallMethodObjArgs(future, nxt_py_set_result_str, Py_None, NULL); if (nxt_slow_path(res == NULL)) { nxt_unit_req_alert(http->req, "'set_result' call failed"); nxt_python_print_exception(); } Py_XDECREF(res); Py_DECREF(future); return NXT_UNIT_OK; fail: exc = PyObject_CallFunctionObjArgs(PyExc_RuntimeError, nxt_py_failed_to_send_body_str, NULL); if (nxt_slow_path(exc == NULL)) { nxt_unit_req_alert(http->req, "RuntimeError create failed"); nxt_python_print_exception(); exc = Py_None; Py_INCREF(exc); } future = http->send_future; http->send_future = NULL; res = PyObject_CallMethodObjArgs(future, nxt_py_set_exception_str, exc, NULL); if (nxt_slow_path(res == NULL)) { nxt_unit_req_alert(http->req, "'set_exception' call failed"); nxt_python_print_exception(); } Py_XDECREF(res); Py_DECREF(future); Py_DECREF(exc); return NXT_UNIT_ERROR; } static PyObject * nxt_py_asgi_http_done(PyObject *self, PyObject *future) { int rc; PyObject *res; nxt_py_asgi_http_t *http; http = (nxt_py_asgi_http_t *) self; nxt_unit_req_debug(http->req, "asgi_http_done"); /* * Get Future.result() and it raises an exception, if coroutine exited * with exception. */ res = PyObject_CallMethodObjArgs(future, nxt_py_result_str, NULL); if (nxt_slow_path(res == NULL)) { nxt_unit_req_error(http->req, "Python failed to call 'future.result()'"); nxt_python_print_exception(); rc = NXT_UNIT_ERROR; } else { Py_DECREF(res); rc = NXT_UNIT_OK; } nxt_unit_request_done(http->req, rc); Py_RETURN_NONE; } #endif /* NXT_HAVE_ASGI */