summaryrefslogtreecommitdiffhomepage
path: root/src/python/nxt_python_asgi_http.c
diff options
context:
space:
mode:
Diffstat (limited to '')
-rw-r--r--src/python/nxt_python_asgi_http.c591
1 files changed, 591 insertions, 0 deletions
diff --git a/src/python/nxt_python_asgi_http.c b/src/python/nxt_python_asgi_http.c
new file mode 100644
index 00000000..b07d61d6
--- /dev/null
+++ b/src/python/nxt_python_asgi_http.c
@@ -0,0 +1,591 @@
+
+/*
+ * Copyright (C) NGINX, Inc.
+ */
+
+
+#include <python/nxt_python.h>
+
+#if (NXT_HAVE_ASGI)
+
+#include <nxt_main.h>
+#include <nxt_unit.h>
+#include <nxt_unit_request.h>
+#include <python/nxt_python_asgi.h>
+#include <python/nxt_python_asgi_str.h>
+
+
+typedef struct {
+ PyObject_HEAD
+ nxt_unit_request_info_t *req;
+ nxt_queue_link_t link;
+ PyObject *receive_future;
+ PyObject *send_future;
+ uint64_t content_length;
+ uint64_t bytes_sent;
+ int complete;
+ PyObject *send_body;
+ Py_ssize_t send_body_off;
+} nxt_py_asgi_http_t;
+
+
+static PyObject *nxt_py_asgi_http_receive(PyObject *self, PyObject *none);
+static PyObject *nxt_py_asgi_http_read_msg(nxt_py_asgi_http_t *http);
+static PyObject *nxt_py_asgi_http_send(PyObject *self, PyObject *dict);
+static PyObject *nxt_py_asgi_http_response_start(nxt_py_asgi_http_t *http,
+ PyObject *dict);
+static PyObject *nxt_py_asgi_http_response_body(nxt_py_asgi_http_t *http,
+ PyObject *dict);
+static PyObject *nxt_py_asgi_http_done(PyObject *self, PyObject *future);
+
+
+static PyMethodDef nxt_py_asgi_http_methods[] = {
+ { "receive", nxt_py_asgi_http_receive, METH_NOARGS, 0 },
+ { "send", nxt_py_asgi_http_send, METH_O, 0 },
+ { "_done", nxt_py_asgi_http_done, METH_O, 0 },
+ { NULL, NULL, 0, 0 }
+};
+
+static PyAsyncMethods nxt_py_asgi_async_methods = {
+ .am_await = nxt_py_asgi_await,
+};
+
+static PyTypeObject nxt_py_asgi_http_type = {
+ PyVarObject_HEAD_INIT(NULL, 0)
+
+ .tp_name = "unit._asgi_http",
+ .tp_basicsize = sizeof(nxt_py_asgi_http_t),
+ .tp_dealloc = nxt_py_asgi_dealloc,
+ .tp_as_async = &nxt_py_asgi_async_methods,
+ .tp_flags = Py_TPFLAGS_DEFAULT,
+ .tp_doc = "unit ASGI HTTP request object",
+ .tp_iter = nxt_py_asgi_iter,
+ .tp_iternext = nxt_py_asgi_next,
+ .tp_methods = nxt_py_asgi_http_methods,
+};
+
+static Py_ssize_t nxt_py_asgi_http_body_buf_size = 32 * 1024 * 1024;
+
+
+nxt_int_t
+nxt_py_asgi_http_init(nxt_task_t *task)
+{
+ if (nxt_slow_path(PyType_Ready(&nxt_py_asgi_http_type) != 0)) {
+ nxt_alert(task, "Python failed to initialize the 'http' type object");
+ return NXT_ERROR;
+ }
+
+ return NXT_OK;
+}
+
+
+PyObject *
+nxt_py_asgi_http_create(nxt_unit_request_info_t *req)
+{
+ nxt_py_asgi_http_t *http;
+
+ http = PyObject_New(nxt_py_asgi_http_t, &nxt_py_asgi_http_type);
+
+ if (nxt_fast_path(http != NULL)) {
+ http->req = req;
+ http->receive_future = NULL;
+ http->send_future = NULL;
+ http->content_length = -1;
+ http->bytes_sent = 0;
+ http->complete = 0;
+ http->send_body = NULL;
+ http->send_body_off = 0;
+ }
+
+ return (PyObject *) http;
+}
+
+
+static PyObject *
+nxt_py_asgi_http_receive(PyObject *self, PyObject *none)
+{
+ PyObject *msg, *future;
+ nxt_py_asgi_http_t *http;
+ nxt_unit_request_info_t *req;
+
+ http = (nxt_py_asgi_http_t *) self;
+ req = http->req;
+
+ nxt_unit_req_debug(req, "asgi_http_receive");
+
+ msg = nxt_py_asgi_http_read_msg(http);
+ if (nxt_slow_path(msg == NULL)) {
+ return NULL;
+ }
+
+ future = PyObject_CallObject(nxt_py_loop_create_future, NULL);
+ if (nxt_slow_path(future == NULL)) {
+ nxt_unit_req_alert(req, "Python failed to create Future object");
+ nxt_python_print_exception();
+
+ Py_DECREF(msg);
+
+ return PyErr_Format(PyExc_RuntimeError,
+ "failed to create Future object");
+ }
+
+ if (msg != Py_None) {
+ return nxt_py_asgi_set_result_soon(req, future, msg);
+ }
+
+ http->receive_future = future;
+ Py_INCREF(http->receive_future);
+
+ Py_DECREF(msg);
+
+ return future;
+}
+
+
+static PyObject *
+nxt_py_asgi_http_read_msg(nxt_py_asgi_http_t *http)
+{
+ char *body_buf;
+ ssize_t read_res;
+ PyObject *msg, *body;
+ Py_ssize_t size;
+ nxt_unit_request_info_t *req;
+
+ req = http->req;
+
+ size = req->content_length;
+
+ if (size > nxt_py_asgi_http_body_buf_size) {
+ size = nxt_py_asgi_http_body_buf_size;
+ }
+
+ if (size > 0) {
+ body = PyBytes_FromStringAndSize(NULL, size);
+ if (nxt_slow_path(body == NULL)) {
+ nxt_unit_req_alert(req, "Python failed to create body byte string");
+ nxt_python_print_exception();
+
+ return PyErr_Format(PyExc_RuntimeError,
+ "failed to create Bytes object");
+ }
+
+ body_buf = PyBytes_AS_STRING(body);
+
+ read_res = nxt_unit_request_read(req, body_buf, size);
+
+ } else {
+ body = NULL;
+ read_res = 0;
+ }
+
+ if (read_res > 0 || read_res == size) {
+ msg = nxt_py_asgi_new_msg(req, nxt_py_http_request_str);
+ if (nxt_slow_path(msg == NULL)) {
+ Py_XDECREF(body);
+
+ return NULL;
+ }
+
+#define SET_ITEM(dict, key, value) \
+ if (nxt_slow_path(PyDict_SetItem(dict, nxt_py_ ## key ## _str, value) \
+ == -1)) \
+ { \
+ nxt_unit_req_alert(req, \
+ "Python failed to set '" #dict "." #key "' item"); \
+ PyErr_SetString(PyExc_RuntimeError, \
+ "Python failed to set '" #dict "." #key "' item"); \
+ goto fail; \
+ }
+
+ if (body != NULL) {
+ SET_ITEM(msg, body, body)
+ }
+
+ if (req->content_length > 0) {
+ SET_ITEM(msg, more_body, Py_True)
+ }
+
+#undef SET_ITEM
+
+ Py_XDECREF(body);
+
+ return msg;
+ }
+
+ Py_XDECREF(body);
+
+ Py_RETURN_NONE;
+
+fail:
+
+ Py_DECREF(msg);
+ Py_XDECREF(body);
+
+ return NULL;
+}
+
+
+static PyObject *
+nxt_py_asgi_http_send(PyObject *self, PyObject *dict)
+{
+ PyObject *type;
+ const char *type_str;
+ Py_ssize_t type_len;
+ nxt_py_asgi_http_t *http;
+
+ static const nxt_str_t response_start = nxt_string("http.response.start");
+ static const nxt_str_t response_body = nxt_string("http.response.body");
+
+ http = (nxt_py_asgi_http_t *) self;
+
+ type = PyDict_GetItem(dict, nxt_py_type_str);
+ if (nxt_slow_path(type == NULL || !PyUnicode_Check(type))) {
+ nxt_unit_req_error(http->req, "asgi_http_send: "
+ "'type' is not a unicode string");
+ return PyErr_Format(PyExc_TypeError, "'type' is not a unicode string");
+ }
+
+ type_str = PyUnicode_AsUTF8AndSize(type, &type_len);
+
+ nxt_unit_req_debug(http->req, "asgi_http_send type is '%.*s'",
+ (int) type_len, type_str);
+
+ if (type_len == (Py_ssize_t) response_start.length
+ && memcmp(type_str, response_start.start, type_len) == 0)
+ {
+ return nxt_py_asgi_http_response_start(http, dict);
+ }
+
+ if (type_len == (Py_ssize_t) response_body.length
+ && memcmp(type_str, response_body.start, type_len) == 0)
+ {
+ return nxt_py_asgi_http_response_body(http, dict);
+ }
+
+ nxt_unit_req_error(http->req, "asgi_http_send: unexpected 'type': '%.*s'",
+ (int) type_len, type_str);
+
+ return PyErr_Format(PyExc_AssertionError, "unexpected 'type': '%U'", type);
+}
+
+
+static PyObject *
+nxt_py_asgi_http_response_start(nxt_py_asgi_http_t *http, PyObject *dict)
+{
+ int rc;
+ PyObject *status, *headers, *res;
+ nxt_py_asgi_calc_size_ctx_t calc_size_ctx;
+ nxt_py_asgi_add_field_ctx_t add_field_ctx;
+
+ status = PyDict_GetItem(dict, nxt_py_status_str);
+ if (nxt_slow_path(status == NULL || !PyLong_Check(status))) {
+ nxt_unit_req_error(http->req, "asgi_http_response_start: "
+ "'status' is not an integer");
+ return PyErr_Format(PyExc_TypeError, "'status' is not an integer");
+ }
+
+ calc_size_ctx.fields_size = 0;
+ calc_size_ctx.fields_count = 0;
+
+ headers = PyDict_GetItem(dict, nxt_py_headers_str);
+ if (headers != NULL) {
+ res = nxt_py_asgi_enum_headers(headers, nxt_py_asgi_calc_size,
+ &calc_size_ctx);
+ if (nxt_slow_path(res == NULL)) {
+ return NULL;
+ }
+
+ Py_DECREF(res);
+ }
+
+ rc = nxt_unit_response_init(http->req, PyLong_AsLong(status),
+ calc_size_ctx.fields_count,
+ calc_size_ctx.fields_size);
+ if (nxt_slow_path(rc != NXT_UNIT_OK)) {
+ return PyErr_Format(PyExc_RuntimeError,
+ "failed to allocate response object");
+ }
+
+ add_field_ctx.req = http->req;
+ add_field_ctx.content_length = -1;
+
+ if (headers != NULL) {
+ res = nxt_py_asgi_enum_headers(headers, nxt_py_asgi_add_field,
+ &add_field_ctx);
+ if (nxt_slow_path(res == NULL)) {
+ return NULL;
+ }
+
+ Py_DECREF(res);
+ }
+
+ http->content_length = add_field_ctx.content_length;
+
+ Py_INCREF(http);
+ return (PyObject *) http;
+}
+
+
+static PyObject *
+nxt_py_asgi_http_response_body(nxt_py_asgi_http_t *http, PyObject *dict)
+{
+ int rc;
+ char *body_str;
+ ssize_t sent;
+ PyObject *body, *more_body, *future;
+ Py_ssize_t body_len, body_off;
+
+ body = PyDict_GetItem(dict, nxt_py_body_str);
+ if (nxt_slow_path(body != NULL && !PyBytes_Check(body))) {
+ return PyErr_Format(PyExc_TypeError, "'body' is not a byte string");
+ }
+
+ more_body = PyDict_GetItem(dict, nxt_py_more_body_str);
+ if (nxt_slow_path(more_body != NULL && !PyBool_Check(more_body))) {
+ return PyErr_Format(PyExc_TypeError, "'more_body' is not a bool");
+ }
+
+ if (nxt_slow_path(http->complete)) {
+ return PyErr_Format(PyExc_RuntimeError,
+ "Unexpected ASGI message 'http.response.body' "
+ "sent, after response already completed");
+ }
+
+ if (nxt_slow_path(http->send_future != NULL)) {
+ return PyErr_Format(PyExc_RuntimeError, "Concurrent send");
+ }
+
+ if (body != NULL) {
+ body_str = PyBytes_AS_STRING(body);
+ body_len = PyBytes_GET_SIZE(body);
+
+ nxt_unit_req_debug(http->req, "asgi_http_response_body: %d, %d",
+ (int) body_len, (more_body == Py_True) );
+
+ if (nxt_slow_path(http->bytes_sent + body_len
+ > http->content_length))
+ {
+ return PyErr_Format(PyExc_RuntimeError,
+ "Response content longer than Content-Length");
+ }
+
+ body_off = 0;
+
+ while (body_len > 0) {
+ sent = nxt_unit_response_write_nb(http->req, body_str, body_len, 0);
+ if (nxt_slow_path(sent < 0)) {
+ return PyErr_Format(PyExc_RuntimeError, "failed to send body");
+ }
+
+ if (nxt_slow_path(sent == 0)) {
+ nxt_unit_req_debug(http->req, "asgi_http_response_body: "
+ "out of shared memory, %d",
+ (int) body_len);
+
+ future = PyObject_CallObject(nxt_py_loop_create_future, NULL);
+ if (nxt_slow_path(future == NULL)) {
+ nxt_unit_req_alert(http->req,
+ "Python failed to create Future object");
+ nxt_python_print_exception();
+
+ return PyErr_Format(PyExc_RuntimeError,
+ "failed to create Future object");
+ }
+
+ http->send_body = body;
+ Py_INCREF(http->send_body);
+ http->send_body_off = body_off;
+
+ nxt_queue_insert_tail(&nxt_py_asgi_drain_queue, &http->link);
+
+ http->send_future = future;
+ Py_INCREF(http->send_future);
+
+ return future;
+ }
+
+ body_str += sent;
+ body_len -= sent;
+ body_off += sent;
+ http->bytes_sent += sent;
+ }
+
+ } else {
+ nxt_unit_req_debug(http->req, "asgi_http_response_body: 0, %d",
+ (more_body == Py_True) );
+
+ if (!nxt_unit_response_is_sent(http->req)) {
+ rc = nxt_unit_response_send(http->req);
+ if (nxt_slow_path(rc != NXT_UNIT_OK)) {
+ return PyErr_Format(PyExc_RuntimeError,
+ "failed to send response");
+ }
+ }
+ }
+
+ if (more_body == NULL || more_body == Py_False) {
+ http->complete = 1;
+ }
+
+ Py_INCREF(http);
+ return (PyObject *) http;
+}
+
+
+void
+nxt_py_asgi_http_data_handler(nxt_unit_request_info_t *req)
+{
+ PyObject *msg, *future, *res;
+ nxt_py_asgi_http_t *http;
+
+ http = req->data;
+
+ nxt_unit_req_debug(req, "asgi_http_data_handler");
+
+ if (http->receive_future == NULL) {
+ return;
+ }
+
+ msg = nxt_py_asgi_http_read_msg(http);
+ if (nxt_slow_path(msg == NULL)) {
+ return;
+ }
+
+ if (msg == Py_None) {
+ Py_DECREF(msg);
+ return;
+ }
+
+ future = http->receive_future;
+ http->receive_future = NULL;
+
+ res = PyObject_CallMethodObjArgs(future, nxt_py_set_result_str, msg, NULL);
+ if (nxt_slow_path(res == NULL)) {
+ nxt_unit_req_alert(req, "'set_result' call failed");
+ nxt_python_print_exception();
+ }
+
+ Py_XDECREF(res);
+ Py_DECREF(future);
+
+ Py_DECREF(msg);
+}
+
+
+int
+nxt_py_asgi_http_drain(nxt_queue_link_t *lnk)
+{
+ char *body_str;
+ ssize_t sent;
+ PyObject *future, *exc, *res;
+ Py_ssize_t body_len;
+ nxt_py_asgi_http_t *http;
+
+ http = nxt_container_of(lnk, nxt_py_asgi_http_t, link);
+
+ body_str = PyBytes_AS_STRING(http->send_body) + http->send_body_off;
+ body_len = PyBytes_GET_SIZE(http->send_body) - http->send_body_off;
+
+ nxt_unit_req_debug(http->req, "asgi_http_drain: %d", (int) body_len);
+
+ while (body_len > 0) {
+ sent = nxt_unit_response_write_nb(http->req, body_str, body_len, 0);
+ if (nxt_slow_path(sent < 0)) {
+ goto fail;
+ }
+
+ if (nxt_slow_path(sent == 0)) {
+ return NXT_UNIT_AGAIN;
+ }
+
+ body_str += sent;
+ body_len -= sent;
+
+ http->send_body_off += sent;
+ http->bytes_sent += sent;
+ }
+
+ Py_CLEAR(http->send_body);
+
+ future = http->send_future;
+ http->send_future = NULL;
+
+ res = PyObject_CallMethodObjArgs(future, nxt_py_set_result_str, Py_None,
+ NULL);
+ if (nxt_slow_path(res == NULL)) {
+ nxt_unit_req_alert(http->req, "'set_result' call failed");
+ nxt_python_print_exception();
+ }
+
+ Py_XDECREF(res);
+ Py_DECREF(future);
+
+ return NXT_UNIT_OK;
+
+fail:
+
+ exc = PyObject_CallFunctionObjArgs(PyExc_RuntimeError,
+ nxt_py_failed_to_send_body_str,
+ NULL);
+ if (nxt_slow_path(exc == NULL)) {
+ nxt_unit_req_alert(http->req, "RuntimeError create failed");
+ nxt_python_print_exception();
+
+ exc = Py_None;
+ Py_INCREF(exc);
+ }
+
+ future = http->send_future;
+ http->send_future = NULL;
+
+ res = PyObject_CallMethodObjArgs(future, nxt_py_set_exception_str, exc,
+ NULL);
+ if (nxt_slow_path(res == NULL)) {
+ nxt_unit_req_alert(http->req, "'set_exception' call failed");
+ nxt_python_print_exception();
+ }
+
+ Py_XDECREF(res);
+ Py_DECREF(future);
+ Py_DECREF(exc);
+
+ return NXT_UNIT_ERROR;
+}
+
+
+static PyObject *
+nxt_py_asgi_http_done(PyObject *self, PyObject *future)
+{
+ int rc;
+ PyObject *res;
+ nxt_py_asgi_http_t *http;
+
+ http = (nxt_py_asgi_http_t *) self;
+
+ nxt_unit_req_debug(http->req, "asgi_http_done");
+
+ /*
+ * Get Future.result() and it raises an exception, if coroutine exited
+ * with exception.
+ */
+ res = PyObject_CallMethodObjArgs(future, nxt_py_result_str, NULL);
+ if (nxt_slow_path(res == NULL)) {
+ nxt_unit_req_error(http->req,
+ "Python failed to call 'future.result()'");
+ nxt_python_print_exception();
+
+ rc = NXT_UNIT_ERROR;
+
+ } else {
+ Py_DECREF(res);
+
+ rc = NXT_UNIT_OK;
+ }
+
+ nxt_unit_request_done(http->req, rc);
+
+ Py_RETURN_NONE;
+}
+
+
+#endif /* NXT_HAVE_ASGI */