diff options
Diffstat (limited to 'src/python/nxt_python_asgi_http.c')
-rw-r--r-- | src/python/nxt_python_asgi_http.c | 113 |
1 files changed, 86 insertions, 27 deletions
diff --git a/src/python/nxt_python_asgi_http.c b/src/python/nxt_python_asgi_http.c index b07d61d6..d88c4b00 100644 --- a/src/python/nxt_python_asgi_http.c +++ b/src/python/nxt_python_asgi_http.c @@ -24,6 +24,7 @@ typedef struct { uint64_t content_length; uint64_t bytes_sent; int complete; + int closed; PyObject *send_body; Py_ssize_t send_body_off; } nxt_py_asgi_http_t; @@ -67,15 +68,16 @@ static PyTypeObject nxt_py_asgi_http_type = { static Py_ssize_t nxt_py_asgi_http_body_buf_size = 32 * 1024 * 1024; -nxt_int_t -nxt_py_asgi_http_init(nxt_task_t *task) +int +nxt_py_asgi_http_init(void) { if (nxt_slow_path(PyType_Ready(&nxt_py_asgi_http_type) != 0)) { - nxt_alert(task, "Python failed to initialize the 'http' type object"); - return NXT_ERROR; + nxt_unit_alert(NULL, + "Python failed to initialize the 'http' type object"); + return NXT_UNIT_ERROR; } - return NXT_OK; + return NXT_UNIT_OK; } @@ -93,6 +95,7 @@ nxt_py_asgi_http_create(nxt_unit_request_info_t *req) http->content_length = -1; http->bytes_sent = 0; http->complete = 0; + http->closed = 0; http->send_body = NULL; http->send_body_off = 0; } @@ -106,6 +109,7 @@ nxt_py_asgi_http_receive(PyObject *self, PyObject *none) { PyObject *msg, *future; nxt_py_asgi_http_t *http; + nxt_py_asgi_ctx_data_t *ctx_data; nxt_unit_request_info_t *req; http = (nxt_py_asgi_http_t *) self; @@ -113,12 +117,20 @@ nxt_py_asgi_http_receive(PyObject *self, PyObject *none) nxt_unit_req_debug(req, "asgi_http_receive"); - msg = nxt_py_asgi_http_read_msg(http); + if (nxt_slow_path(http->closed || nxt_unit_response_is_sent(req))) { + msg = nxt_py_asgi_new_msg(req, nxt_py_http_disconnect_str); + + } else { + msg = nxt_py_asgi_http_read_msg(http); + } + if (nxt_slow_path(msg == NULL)) { return NULL; } - future = PyObject_CallObject(nxt_py_loop_create_future, NULL); + ctx_data = req->ctx->data; + + future = PyObject_CallObject(ctx_data->loop_create_future, NULL); if (nxt_slow_path(future == NULL)) { nxt_unit_req_alert(req, "Python failed to create Future object"); nxt_python_print_exception(); @@ -130,7 +142,7 @@ nxt_py_asgi_http_receive(PyObject *self, PyObject *none) } if (msg != Py_None) { - return nxt_py_asgi_set_result_soon(req, future, msg); + return nxt_py_asgi_set_result_soon(req, ctx_data, future, msg); } http->receive_future = future; @@ -250,22 +262,23 @@ nxt_py_asgi_http_send(PyObject *self, PyObject *dict) nxt_unit_req_debug(http->req, "asgi_http_send type is '%.*s'", (int) type_len, type_str); - if (type_len == (Py_ssize_t) response_start.length - && memcmp(type_str, response_start.start, type_len) == 0) - { - return nxt_py_asgi_http_response_start(http, dict); - } + if (nxt_unit_response_is_init(http->req)) { + if (nxt_str_eq(&response_body, type_str, (size_t) type_len)) { + return nxt_py_asgi_http_response_body(http, dict); + } - if (type_len == (Py_ssize_t) response_body.length - && memcmp(type_str, response_body.start, type_len) == 0) - { - return nxt_py_asgi_http_response_body(http, dict); + return PyErr_Format(PyExc_RuntimeError, + "Expected ASGI message 'http.response.body', " + "but got '%U'", type); } - nxt_unit_req_error(http->req, "asgi_http_send: unexpected 'type': '%.*s'", - (int) type_len, type_str); + if (nxt_str_eq(&response_start, type_str, (size_t) type_len)) { + return nxt_py_asgi_http_response_start(http, dict); + } - return PyErr_Format(PyExc_AssertionError, "unexpected 'type': '%U'", type); + return PyErr_Format(PyExc_RuntimeError, + "Expected ASGI message 'http.response.start', " + "but got '%U'", type); } @@ -329,11 +342,12 @@ nxt_py_asgi_http_response_start(nxt_py_asgi_http_t *http, PyObject *dict) static PyObject * nxt_py_asgi_http_response_body(nxt_py_asgi_http_t *http, PyObject *dict) { - int rc; - char *body_str; - ssize_t sent; - PyObject *body, *more_body, *future; - Py_ssize_t body_len, body_off; + int rc; + char *body_str; + ssize_t sent; + PyObject *body, *more_body, *future; + Py_ssize_t body_len, body_off; + nxt_py_asgi_ctx_data_t *ctx_data; body = PyDict_GetItem(dict, nxt_py_body_str); if (nxt_slow_path(body != NULL && !PyBytes_Check(body))) { @@ -371,6 +385,8 @@ nxt_py_asgi_http_response_body(nxt_py_asgi_http_t *http, PyObject *dict) body_off = 0; + ctx_data = http->req->ctx->data; + while (body_len > 0) { sent = nxt_unit_response_write_nb(http->req, body_str, body_len, 0); if (nxt_slow_path(sent < 0)) { @@ -382,7 +398,8 @@ nxt_py_asgi_http_response_body(nxt_py_asgi_http_t *http, PyObject *dict) "out of shared memory, %d", (int) body_len); - future = PyObject_CallObject(nxt_py_loop_create_future, NULL); + future = PyObject_CallObject(ctx_data->loop_create_future, + NULL); if (nxt_slow_path(future == NULL)) { nxt_unit_req_alert(http->req, "Python failed to create Future object"); @@ -396,7 +413,7 @@ nxt_py_asgi_http_response_body(nxt_py_asgi_http_t *http, PyObject *dict) Py_INCREF(http->send_body); http->send_body_off = body_off; - nxt_queue_insert_tail(&nxt_py_asgi_drain_queue, &http->link); + nxt_py_asgi_drain_wait(http->req, &http->link); http->send_future = future; Py_INCREF(http->send_future); @@ -553,6 +570,48 @@ fail: } +void +nxt_py_asgi_http_close_handler(nxt_unit_request_info_t *req) +{ + PyObject *msg, *future, *res; + nxt_py_asgi_http_t *http; + + http = req->data; + + nxt_unit_req_debug(req, "asgi_http_close_handler"); + + http->closed = 1; + + if (http->receive_future == NULL) { + return; + } + + msg = nxt_py_asgi_new_msg(req, nxt_py_http_disconnect_str); + if (nxt_slow_path(msg == NULL)) { + return; + } + + if (msg == Py_None) { + Py_DECREF(msg); + return; + } + + future = http->receive_future; + http->receive_future = NULL; + + res = PyObject_CallMethodObjArgs(future, nxt_py_set_result_str, msg, NULL); + if (nxt_slow_path(res == NULL)) { + nxt_unit_req_alert(req, "'set_result' call failed"); + nxt_python_print_exception(); + } + + Py_XDECREF(res); + Py_DECREF(future); + + Py_DECREF(msg); +} + + static PyObject * nxt_py_asgi_http_done(PyObject *self, PyObject *future) { |