diff options
Diffstat (limited to 'src/python/nxt_python_asgi_http.c')
-rw-r--r-- | src/python/nxt_python_asgi_http.c | 591 |
1 files changed, 591 insertions, 0 deletions
diff --git a/src/python/nxt_python_asgi_http.c b/src/python/nxt_python_asgi_http.c new file mode 100644 index 00000000..b07d61d6 --- /dev/null +++ b/src/python/nxt_python_asgi_http.c @@ -0,0 +1,591 @@ + +/* + * Copyright (C) NGINX, Inc. + */ + + +#include <python/nxt_python.h> + +#if (NXT_HAVE_ASGI) + +#include <nxt_main.h> +#include <nxt_unit.h> +#include <nxt_unit_request.h> +#include <python/nxt_python_asgi.h> +#include <python/nxt_python_asgi_str.h> + + +typedef struct { + PyObject_HEAD + nxt_unit_request_info_t *req; + nxt_queue_link_t link; + PyObject *receive_future; + PyObject *send_future; + uint64_t content_length; + uint64_t bytes_sent; + int complete; + PyObject *send_body; + Py_ssize_t send_body_off; +} nxt_py_asgi_http_t; + + +static PyObject *nxt_py_asgi_http_receive(PyObject *self, PyObject *none); +static PyObject *nxt_py_asgi_http_read_msg(nxt_py_asgi_http_t *http); +static PyObject *nxt_py_asgi_http_send(PyObject *self, PyObject *dict); +static PyObject *nxt_py_asgi_http_response_start(nxt_py_asgi_http_t *http, + PyObject *dict); +static PyObject *nxt_py_asgi_http_response_body(nxt_py_asgi_http_t *http, + PyObject *dict); +static PyObject *nxt_py_asgi_http_done(PyObject *self, PyObject *future); + + +static PyMethodDef nxt_py_asgi_http_methods[] = { + { "receive", nxt_py_asgi_http_receive, METH_NOARGS, 0 }, + { "send", nxt_py_asgi_http_send, METH_O, 0 }, + { "_done", nxt_py_asgi_http_done, METH_O, 0 }, + { NULL, NULL, 0, 0 } +}; + +static PyAsyncMethods nxt_py_asgi_async_methods = { + .am_await = nxt_py_asgi_await, +}; + +static PyTypeObject nxt_py_asgi_http_type = { + PyVarObject_HEAD_INIT(NULL, 0) + + .tp_name = "unit._asgi_http", + .tp_basicsize = sizeof(nxt_py_asgi_http_t), + .tp_dealloc = nxt_py_asgi_dealloc, + .tp_as_async = &nxt_py_asgi_async_methods, + .tp_flags = Py_TPFLAGS_DEFAULT, + .tp_doc = "unit ASGI HTTP request object", + .tp_iter = nxt_py_asgi_iter, + .tp_iternext = nxt_py_asgi_next, + .tp_methods = nxt_py_asgi_http_methods, +}; + +static Py_ssize_t nxt_py_asgi_http_body_buf_size = 32 * 1024 * 1024; + + +nxt_int_t +nxt_py_asgi_http_init(nxt_task_t *task) +{ + if (nxt_slow_path(PyType_Ready(&nxt_py_asgi_http_type) != 0)) { + nxt_alert(task, "Python failed to initialize the 'http' type object"); + return NXT_ERROR; + } + + return NXT_OK; +} + + +PyObject * +nxt_py_asgi_http_create(nxt_unit_request_info_t *req) +{ + nxt_py_asgi_http_t *http; + + http = PyObject_New(nxt_py_asgi_http_t, &nxt_py_asgi_http_type); + + if (nxt_fast_path(http != NULL)) { + http->req = req; + http->receive_future = NULL; + http->send_future = NULL; + http->content_length = -1; + http->bytes_sent = 0; + http->complete = 0; + http->send_body = NULL; + http->send_body_off = 0; + } + + return (PyObject *) http; +} + + +static PyObject * +nxt_py_asgi_http_receive(PyObject *self, PyObject *none) +{ + PyObject *msg, *future; + nxt_py_asgi_http_t *http; + nxt_unit_request_info_t *req; + + http = (nxt_py_asgi_http_t *) self; + req = http->req; + + nxt_unit_req_debug(req, "asgi_http_receive"); + + msg = nxt_py_asgi_http_read_msg(http); + if (nxt_slow_path(msg == NULL)) { + return NULL; + } + + future = PyObject_CallObject(nxt_py_loop_create_future, NULL); + if (nxt_slow_path(future == NULL)) { + nxt_unit_req_alert(req, "Python failed to create Future object"); + nxt_python_print_exception(); + + Py_DECREF(msg); + + return PyErr_Format(PyExc_RuntimeError, + "failed to create Future object"); + } + + if (msg != Py_None) { + return nxt_py_asgi_set_result_soon(req, future, msg); + } + + http->receive_future = future; + Py_INCREF(http->receive_future); + + Py_DECREF(msg); + + return future; +} + + +static PyObject * +nxt_py_asgi_http_read_msg(nxt_py_asgi_http_t *http) +{ + char *body_buf; + ssize_t read_res; + PyObject *msg, *body; + Py_ssize_t size; + nxt_unit_request_info_t *req; + + req = http->req; + + size = req->content_length; + + if (size > nxt_py_asgi_http_body_buf_size) { + size = nxt_py_asgi_http_body_buf_size; + } + + if (size > 0) { + body = PyBytes_FromStringAndSize(NULL, size); + if (nxt_slow_path(body == NULL)) { + nxt_unit_req_alert(req, "Python failed to create body byte string"); + nxt_python_print_exception(); + + return PyErr_Format(PyExc_RuntimeError, + "failed to create Bytes object"); + } + + body_buf = PyBytes_AS_STRING(body); + + read_res = nxt_unit_request_read(req, body_buf, size); + + } else { + body = NULL; + read_res = 0; + } + + if (read_res > 0 || read_res == size) { + msg = nxt_py_asgi_new_msg(req, nxt_py_http_request_str); + if (nxt_slow_path(msg == NULL)) { + Py_XDECREF(body); + + return NULL; + } + +#define SET_ITEM(dict, key, value) \ + if (nxt_slow_path(PyDict_SetItem(dict, nxt_py_ ## key ## _str, value) \ + == -1)) \ + { \ + nxt_unit_req_alert(req, \ + "Python failed to set '" #dict "." #key "' item"); \ + PyErr_SetString(PyExc_RuntimeError, \ + "Python failed to set '" #dict "." #key "' item"); \ + goto fail; \ + } + + if (body != NULL) { + SET_ITEM(msg, body, body) + } + + if (req->content_length > 0) { + SET_ITEM(msg, more_body, Py_True) + } + +#undef SET_ITEM + + Py_XDECREF(body); + + return msg; + } + + Py_XDECREF(body); + + Py_RETURN_NONE; + +fail: + + Py_DECREF(msg); + Py_XDECREF(body); + + return NULL; +} + + +static PyObject * +nxt_py_asgi_http_send(PyObject *self, PyObject *dict) +{ + PyObject *type; + const char *type_str; + Py_ssize_t type_len; + nxt_py_asgi_http_t *http; + + static const nxt_str_t response_start = nxt_string("http.response.start"); + static const nxt_str_t response_body = nxt_string("http.response.body"); + + http = (nxt_py_asgi_http_t *) self; + + type = PyDict_GetItem(dict, nxt_py_type_str); + if (nxt_slow_path(type == NULL || !PyUnicode_Check(type))) { + nxt_unit_req_error(http->req, "asgi_http_send: " + "'type' is not a unicode string"); + return PyErr_Format(PyExc_TypeError, "'type' is not a unicode string"); + } + + type_str = PyUnicode_AsUTF8AndSize(type, &type_len); + + nxt_unit_req_debug(http->req, "asgi_http_send type is '%.*s'", + (int) type_len, type_str); + + if (type_len == (Py_ssize_t) response_start.length + && memcmp(type_str, response_start.start, type_len) == 0) + { + return nxt_py_asgi_http_response_start(http, dict); + } + + if (type_len == (Py_ssize_t) response_body.length + && memcmp(type_str, response_body.start, type_len) == 0) + { + return nxt_py_asgi_http_response_body(http, dict); + } + + nxt_unit_req_error(http->req, "asgi_http_send: unexpected 'type': '%.*s'", + (int) type_len, type_str); + + return PyErr_Format(PyExc_AssertionError, "unexpected 'type': '%U'", type); +} + + +static PyObject * +nxt_py_asgi_http_response_start(nxt_py_asgi_http_t *http, PyObject *dict) +{ + int rc; + PyObject *status, *headers, *res; + nxt_py_asgi_calc_size_ctx_t calc_size_ctx; + nxt_py_asgi_add_field_ctx_t add_field_ctx; + + status = PyDict_GetItem(dict, nxt_py_status_str); + if (nxt_slow_path(status == NULL || !PyLong_Check(status))) { + nxt_unit_req_error(http->req, "asgi_http_response_start: " + "'status' is not an integer"); + return PyErr_Format(PyExc_TypeError, "'status' is not an integer"); + } + + calc_size_ctx.fields_size = 0; + calc_size_ctx.fields_count = 0; + + headers = PyDict_GetItem(dict, nxt_py_headers_str); + if (headers != NULL) { + res = nxt_py_asgi_enum_headers(headers, nxt_py_asgi_calc_size, + &calc_size_ctx); + if (nxt_slow_path(res == NULL)) { + return NULL; + } + + Py_DECREF(res); + } + + rc = nxt_unit_response_init(http->req, PyLong_AsLong(status), + calc_size_ctx.fields_count, + calc_size_ctx.fields_size); + if (nxt_slow_path(rc != NXT_UNIT_OK)) { + return PyErr_Format(PyExc_RuntimeError, + "failed to allocate response object"); + } + + add_field_ctx.req = http->req; + add_field_ctx.content_length = -1; + + if (headers != NULL) { + res = nxt_py_asgi_enum_headers(headers, nxt_py_asgi_add_field, + &add_field_ctx); + if (nxt_slow_path(res == NULL)) { + return NULL; + } + + Py_DECREF(res); + } + + http->content_length = add_field_ctx.content_length; + + Py_INCREF(http); + return (PyObject *) http; +} + + +static PyObject * +nxt_py_asgi_http_response_body(nxt_py_asgi_http_t *http, PyObject *dict) +{ + int rc; + char *body_str; + ssize_t sent; + PyObject *body, *more_body, *future; + Py_ssize_t body_len, body_off; + + body = PyDict_GetItem(dict, nxt_py_body_str); + if (nxt_slow_path(body != NULL && !PyBytes_Check(body))) { + return PyErr_Format(PyExc_TypeError, "'body' is not a byte string"); + } + + more_body = PyDict_GetItem(dict, nxt_py_more_body_str); + if (nxt_slow_path(more_body != NULL && !PyBool_Check(more_body))) { + return PyErr_Format(PyExc_TypeError, "'more_body' is not a bool"); + } + + if (nxt_slow_path(http->complete)) { + return PyErr_Format(PyExc_RuntimeError, + "Unexpected ASGI message 'http.response.body' " + "sent, after response already completed"); + } + + if (nxt_slow_path(http->send_future != NULL)) { + return PyErr_Format(PyExc_RuntimeError, "Concurrent send"); + } + + if (body != NULL) { + body_str = PyBytes_AS_STRING(body); + body_len = PyBytes_GET_SIZE(body); + + nxt_unit_req_debug(http->req, "asgi_http_response_body: %d, %d", + (int) body_len, (more_body == Py_True) ); + + if (nxt_slow_path(http->bytes_sent + body_len + > http->content_length)) + { + return PyErr_Format(PyExc_RuntimeError, + "Response content longer than Content-Length"); + } + + body_off = 0; + + while (body_len > 0) { + sent = nxt_unit_response_write_nb(http->req, body_str, body_len, 0); + if (nxt_slow_path(sent < 0)) { + return PyErr_Format(PyExc_RuntimeError, "failed to send body"); + } + + if (nxt_slow_path(sent == 0)) { + nxt_unit_req_debug(http->req, "asgi_http_response_body: " + "out of shared memory, %d", + (int) body_len); + + future = PyObject_CallObject(nxt_py_loop_create_future, NULL); + if (nxt_slow_path(future == NULL)) { + nxt_unit_req_alert(http->req, + "Python failed to create Future object"); + nxt_python_print_exception(); + + return PyErr_Format(PyExc_RuntimeError, + "failed to create Future object"); + } + + http->send_body = body; + Py_INCREF(http->send_body); + http->send_body_off = body_off; + + nxt_queue_insert_tail(&nxt_py_asgi_drain_queue, &http->link); + + http->send_future = future; + Py_INCREF(http->send_future); + + return future; + } + + body_str += sent; + body_len -= sent; + body_off += sent; + http->bytes_sent += sent; + } + + } else { + nxt_unit_req_debug(http->req, "asgi_http_response_body: 0, %d", + (more_body == Py_True) ); + + if (!nxt_unit_response_is_sent(http->req)) { + rc = nxt_unit_response_send(http->req); + if (nxt_slow_path(rc != NXT_UNIT_OK)) { + return PyErr_Format(PyExc_RuntimeError, + "failed to send response"); + } + } + } + + if (more_body == NULL || more_body == Py_False) { + http->complete = 1; + } + + Py_INCREF(http); + return (PyObject *) http; +} + + +void +nxt_py_asgi_http_data_handler(nxt_unit_request_info_t *req) +{ + PyObject *msg, *future, *res; + nxt_py_asgi_http_t *http; + + http = req->data; + + nxt_unit_req_debug(req, "asgi_http_data_handler"); + + if (http->receive_future == NULL) { + return; + } + + msg = nxt_py_asgi_http_read_msg(http); + if (nxt_slow_path(msg == NULL)) { + return; + } + + if (msg == Py_None) { + Py_DECREF(msg); + return; + } + + future = http->receive_future; + http->receive_future = NULL; + + res = PyObject_CallMethodObjArgs(future, nxt_py_set_result_str, msg, NULL); + if (nxt_slow_path(res == NULL)) { + nxt_unit_req_alert(req, "'set_result' call failed"); + nxt_python_print_exception(); + } + + Py_XDECREF(res); + Py_DECREF(future); + + Py_DECREF(msg); +} + + +int +nxt_py_asgi_http_drain(nxt_queue_link_t *lnk) +{ + char *body_str; + ssize_t sent; + PyObject *future, *exc, *res; + Py_ssize_t body_len; + nxt_py_asgi_http_t *http; + + http = nxt_container_of(lnk, nxt_py_asgi_http_t, link); + + body_str = PyBytes_AS_STRING(http->send_body) + http->send_body_off; + body_len = PyBytes_GET_SIZE(http->send_body) - http->send_body_off; + + nxt_unit_req_debug(http->req, "asgi_http_drain: %d", (int) body_len); + + while (body_len > 0) { + sent = nxt_unit_response_write_nb(http->req, body_str, body_len, 0); + if (nxt_slow_path(sent < 0)) { + goto fail; + } + + if (nxt_slow_path(sent == 0)) { + return NXT_UNIT_AGAIN; + } + + body_str += sent; + body_len -= sent; + + http->send_body_off += sent; + http->bytes_sent += sent; + } + + Py_CLEAR(http->send_body); + + future = http->send_future; + http->send_future = NULL; + + res = PyObject_CallMethodObjArgs(future, nxt_py_set_result_str, Py_None, + NULL); + if (nxt_slow_path(res == NULL)) { + nxt_unit_req_alert(http->req, "'set_result' call failed"); + nxt_python_print_exception(); + } + + Py_XDECREF(res); + Py_DECREF(future); + + return NXT_UNIT_OK; + +fail: + + exc = PyObject_CallFunctionObjArgs(PyExc_RuntimeError, + nxt_py_failed_to_send_body_str, + NULL); + if (nxt_slow_path(exc == NULL)) { + nxt_unit_req_alert(http->req, "RuntimeError create failed"); + nxt_python_print_exception(); + + exc = Py_None; + Py_INCREF(exc); + } + + future = http->send_future; + http->send_future = NULL; + + res = PyObject_CallMethodObjArgs(future, nxt_py_set_exception_str, exc, + NULL); + if (nxt_slow_path(res == NULL)) { + nxt_unit_req_alert(http->req, "'set_exception' call failed"); + nxt_python_print_exception(); + } + + Py_XDECREF(res); + Py_DECREF(future); + Py_DECREF(exc); + + return NXT_UNIT_ERROR; +} + + +static PyObject * +nxt_py_asgi_http_done(PyObject *self, PyObject *future) +{ + int rc; + PyObject *res; + nxt_py_asgi_http_t *http; + + http = (nxt_py_asgi_http_t *) self; + + nxt_unit_req_debug(http->req, "asgi_http_done"); + + /* + * Get Future.result() and it raises an exception, if coroutine exited + * with exception. + */ + res = PyObject_CallMethodObjArgs(future, nxt_py_result_str, NULL); + if (nxt_slow_path(res == NULL)) { + nxt_unit_req_error(http->req, + "Python failed to call 'future.result()'"); + nxt_python_print_exception(); + + rc = NXT_UNIT_ERROR; + + } else { + Py_DECREF(res); + + rc = NXT_UNIT_OK; + } + + nxt_unit_request_done(http->req, rc); + + Py_RETURN_NONE; +} + + +#endif /* NXT_HAVE_ASGI */ |