summaryrefslogtreecommitdiffhomepage
path: root/src/python/nxt_python_asgi_http.c
diff options
context:
space:
mode:
Diffstat (limited to 'src/python/nxt_python_asgi_http.c')
-rw-r--r--src/python/nxt_python_asgi_http.c113
1 files changed, 86 insertions, 27 deletions
diff --git a/src/python/nxt_python_asgi_http.c b/src/python/nxt_python_asgi_http.c
index b07d61d6..d88c4b00 100644
--- a/src/python/nxt_python_asgi_http.c
+++ b/src/python/nxt_python_asgi_http.c
@@ -24,6 +24,7 @@ typedef struct {
uint64_t content_length;
uint64_t bytes_sent;
int complete;
+ int closed;
PyObject *send_body;
Py_ssize_t send_body_off;
} nxt_py_asgi_http_t;
@@ -67,15 +68,16 @@ static PyTypeObject nxt_py_asgi_http_type = {
static Py_ssize_t nxt_py_asgi_http_body_buf_size = 32 * 1024 * 1024;
-nxt_int_t
-nxt_py_asgi_http_init(nxt_task_t *task)
+int
+nxt_py_asgi_http_init(void)
{
if (nxt_slow_path(PyType_Ready(&nxt_py_asgi_http_type) != 0)) {
- nxt_alert(task, "Python failed to initialize the 'http' type object");
- return NXT_ERROR;
+ nxt_unit_alert(NULL,
+ "Python failed to initialize the 'http' type object");
+ return NXT_UNIT_ERROR;
}
- return NXT_OK;
+ return NXT_UNIT_OK;
}
@@ -93,6 +95,7 @@ nxt_py_asgi_http_create(nxt_unit_request_info_t *req)
http->content_length = -1;
http->bytes_sent = 0;
http->complete = 0;
+ http->closed = 0;
http->send_body = NULL;
http->send_body_off = 0;
}
@@ -106,6 +109,7 @@ nxt_py_asgi_http_receive(PyObject *self, PyObject *none)
{
PyObject *msg, *future;
nxt_py_asgi_http_t *http;
+ nxt_py_asgi_ctx_data_t *ctx_data;
nxt_unit_request_info_t *req;
http = (nxt_py_asgi_http_t *) self;
@@ -113,12 +117,20 @@ nxt_py_asgi_http_receive(PyObject *self, PyObject *none)
nxt_unit_req_debug(req, "asgi_http_receive");
- msg = nxt_py_asgi_http_read_msg(http);
+ if (nxt_slow_path(http->closed || nxt_unit_response_is_sent(req))) {
+ msg = nxt_py_asgi_new_msg(req, nxt_py_http_disconnect_str);
+
+ } else {
+ msg = nxt_py_asgi_http_read_msg(http);
+ }
+
if (nxt_slow_path(msg == NULL)) {
return NULL;
}
- future = PyObject_CallObject(nxt_py_loop_create_future, NULL);
+ ctx_data = req->ctx->data;
+
+ future = PyObject_CallObject(ctx_data->loop_create_future, NULL);
if (nxt_slow_path(future == NULL)) {
nxt_unit_req_alert(req, "Python failed to create Future object");
nxt_python_print_exception();
@@ -130,7 +142,7 @@ nxt_py_asgi_http_receive(PyObject *self, PyObject *none)
}
if (msg != Py_None) {
- return nxt_py_asgi_set_result_soon(req, future, msg);
+ return nxt_py_asgi_set_result_soon(req, ctx_data, future, msg);
}
http->receive_future = future;
@@ -250,22 +262,23 @@ nxt_py_asgi_http_send(PyObject *self, PyObject *dict)
nxt_unit_req_debug(http->req, "asgi_http_send type is '%.*s'",
(int) type_len, type_str);
- if (type_len == (Py_ssize_t) response_start.length
- && memcmp(type_str, response_start.start, type_len) == 0)
- {
- return nxt_py_asgi_http_response_start(http, dict);
- }
+ if (nxt_unit_response_is_init(http->req)) {
+ if (nxt_str_eq(&response_body, type_str, (size_t) type_len)) {
+ return nxt_py_asgi_http_response_body(http, dict);
+ }
- if (type_len == (Py_ssize_t) response_body.length
- && memcmp(type_str, response_body.start, type_len) == 0)
- {
- return nxt_py_asgi_http_response_body(http, dict);
+ return PyErr_Format(PyExc_RuntimeError,
+ "Expected ASGI message 'http.response.body', "
+ "but got '%U'", type);
}
- nxt_unit_req_error(http->req, "asgi_http_send: unexpected 'type': '%.*s'",
- (int) type_len, type_str);
+ if (nxt_str_eq(&response_start, type_str, (size_t) type_len)) {
+ return nxt_py_asgi_http_response_start(http, dict);
+ }
- return PyErr_Format(PyExc_AssertionError, "unexpected 'type': '%U'", type);
+ return PyErr_Format(PyExc_RuntimeError,
+ "Expected ASGI message 'http.response.start', "
+ "but got '%U'", type);
}
@@ -329,11 +342,12 @@ nxt_py_asgi_http_response_start(nxt_py_asgi_http_t *http, PyObject *dict)
static PyObject *
nxt_py_asgi_http_response_body(nxt_py_asgi_http_t *http, PyObject *dict)
{
- int rc;
- char *body_str;
- ssize_t sent;
- PyObject *body, *more_body, *future;
- Py_ssize_t body_len, body_off;
+ int rc;
+ char *body_str;
+ ssize_t sent;
+ PyObject *body, *more_body, *future;
+ Py_ssize_t body_len, body_off;
+ nxt_py_asgi_ctx_data_t *ctx_data;
body = PyDict_GetItem(dict, nxt_py_body_str);
if (nxt_slow_path(body != NULL && !PyBytes_Check(body))) {
@@ -371,6 +385,8 @@ nxt_py_asgi_http_response_body(nxt_py_asgi_http_t *http, PyObject *dict)
body_off = 0;
+ ctx_data = http->req->ctx->data;
+
while (body_len > 0) {
sent = nxt_unit_response_write_nb(http->req, body_str, body_len, 0);
if (nxt_slow_path(sent < 0)) {
@@ -382,7 +398,8 @@ nxt_py_asgi_http_response_body(nxt_py_asgi_http_t *http, PyObject *dict)
"out of shared memory, %d",
(int) body_len);
- future = PyObject_CallObject(nxt_py_loop_create_future, NULL);
+ future = PyObject_CallObject(ctx_data->loop_create_future,
+ NULL);
if (nxt_slow_path(future == NULL)) {
nxt_unit_req_alert(http->req,
"Python failed to create Future object");
@@ -396,7 +413,7 @@ nxt_py_asgi_http_response_body(nxt_py_asgi_http_t *http, PyObject *dict)
Py_INCREF(http->send_body);
http->send_body_off = body_off;
- nxt_queue_insert_tail(&nxt_py_asgi_drain_queue, &http->link);
+ nxt_py_asgi_drain_wait(http->req, &http->link);
http->send_future = future;
Py_INCREF(http->send_future);
@@ -553,6 +570,48 @@ fail:
}
+void
+nxt_py_asgi_http_close_handler(nxt_unit_request_info_t *req)
+{
+ PyObject *msg, *future, *res;
+ nxt_py_asgi_http_t *http;
+
+ http = req->data;
+
+ nxt_unit_req_debug(req, "asgi_http_close_handler");
+
+ http->closed = 1;
+
+ if (http->receive_future == NULL) {
+ return;
+ }
+
+ msg = nxt_py_asgi_new_msg(req, nxt_py_http_disconnect_str);
+ if (nxt_slow_path(msg == NULL)) {
+ return;
+ }
+
+ if (msg == Py_None) {
+ Py_DECREF(msg);
+ return;
+ }
+
+ future = http->receive_future;
+ http->receive_future = NULL;
+
+ res = PyObject_CallMethodObjArgs(future, nxt_py_set_result_str, msg, NULL);
+ if (nxt_slow_path(res == NULL)) {
+ nxt_unit_req_alert(req, "'set_result' call failed");
+ nxt_python_print_exception();
+ }
+
+ Py_XDECREF(res);
+ Py_DECREF(future);
+
+ Py_DECREF(msg);
+}
+
+
static PyObject *
nxt_py_asgi_http_done(PyObject *self, PyObject *future)
{