summaryrefslogtreecommitdiffhomepage
path: root/src/python/nxt_python_asgi_websocket.c
diff options
context:
space:
mode:
Diffstat (limited to 'src/python/nxt_python_asgi_websocket.c')
-rw-r--r--src/python/nxt_python_asgi_websocket.c1084
1 files changed, 1084 insertions, 0 deletions
diff --git a/src/python/nxt_python_asgi_websocket.c b/src/python/nxt_python_asgi_websocket.c
new file mode 100644
index 00000000..5a27b588
--- /dev/null
+++ b/src/python/nxt_python_asgi_websocket.c
@@ -0,0 +1,1084 @@
+
+/*
+ * Copyright (C) NGINX, Inc.
+ */
+
+
+#include <python/nxt_python.h>
+
+#if (NXT_HAVE_ASGI)
+
+#include <nxt_main.h>
+#include <nxt_unit.h>
+#include <nxt_unit_request.h>
+#include <nxt_unit_websocket.h>
+#include <nxt_websocket_header.h>
+#include <python/nxt_python_asgi.h>
+#include <python/nxt_python_asgi_str.h>
+
+
+enum {
+ NXT_WS_INIT,
+ NXT_WS_CONNECT,
+ NXT_WS_ACCEPTED,
+ NXT_WS_DISCONNECTED,
+ NXT_WS_CLOSED,
+};
+
+
+typedef struct {
+ nxt_queue_link_t link;
+ nxt_unit_websocket_frame_t *frame;
+} nxt_py_asgi_penging_frame_t;
+
+
+typedef struct {
+ PyObject_HEAD
+ nxt_unit_request_info_t *req;
+ PyObject *receive_future;
+ PyObject *receive_exc_str;
+ int state;
+ nxt_queue_t pending_frames;
+ uint64_t pending_payload_len;
+ uint64_t pending_frame_len;
+ int pending_fins;
+} nxt_py_asgi_websocket_t;
+
+
+static PyObject *nxt_py_asgi_websocket_receive(PyObject *self, PyObject *none);
+static PyObject *nxt_py_asgi_websocket_send(PyObject *self, PyObject *dict);
+static PyObject *nxt_py_asgi_websocket_accept(nxt_py_asgi_websocket_t *ws,
+ PyObject *dict);
+static PyObject *nxt_py_asgi_websocket_close(nxt_py_asgi_websocket_t *ws,
+ PyObject *dict);
+static PyObject *nxt_py_asgi_websocket_send_frame(nxt_py_asgi_websocket_t *ws,
+ PyObject *dict);
+static void nxt_py_asgi_websocket_receive_done(nxt_py_asgi_websocket_t *ws,
+ PyObject *msg);
+static void nxt_py_asgi_websocket_receive_fail(nxt_py_asgi_websocket_t *ws,
+ PyObject *exc);
+static void nxt_py_asgi_websocket_suspend_frame(nxt_unit_websocket_frame_t *f);
+static PyObject *nxt_py_asgi_websocket_pop_msg(nxt_py_asgi_websocket_t *ws,
+ nxt_unit_websocket_frame_t *frame);
+static uint64_t nxt_py_asgi_websocket_pending_len(
+ nxt_py_asgi_websocket_t *ws);
+static nxt_unit_websocket_frame_t *nxt_py_asgi_websocket_pop_frame(
+ nxt_py_asgi_websocket_t *ws);
+static PyObject *nxt_py_asgi_websocket_disconnect_msg(
+ nxt_py_asgi_websocket_t *ws);
+static PyObject *nxt_py_asgi_websocket_done(PyObject *self, PyObject *future);
+
+
+static PyMethodDef nxt_py_asgi_websocket_methods[] = {
+ { "receive", nxt_py_asgi_websocket_receive, METH_NOARGS, 0 },
+ { "send", nxt_py_asgi_websocket_send, METH_O, 0 },
+ { "_done", nxt_py_asgi_websocket_done, METH_O, 0 },
+ { NULL, NULL, 0, 0 }
+};
+
+static PyAsyncMethods nxt_py_asgi_async_methods = {
+ .am_await = nxt_py_asgi_await,
+};
+
+static PyTypeObject nxt_py_asgi_websocket_type = {
+ PyVarObject_HEAD_INIT(NULL, 0)
+
+ .tp_name = "unit._asgi_websocket",
+ .tp_basicsize = sizeof(nxt_py_asgi_websocket_t),
+ .tp_dealloc = nxt_py_asgi_dealloc,
+ .tp_as_async = &nxt_py_asgi_async_methods,
+ .tp_flags = Py_TPFLAGS_DEFAULT,
+ .tp_doc = "unit ASGI WebSocket connection object",
+ .tp_iter = nxt_py_asgi_iter,
+ .tp_iternext = nxt_py_asgi_next,
+ .tp_methods = nxt_py_asgi_websocket_methods,
+};
+
+static uint64_t nxt_py_asgi_ws_max_frame_size = 1024 * 1024;
+static uint64_t nxt_py_asgi_ws_max_buffer_size = 10 * 1024 * 1024;
+
+
+nxt_int_t
+nxt_py_asgi_websocket_init(nxt_task_t *task)
+{
+ if (nxt_slow_path(PyType_Ready(&nxt_py_asgi_websocket_type) != 0)) {
+ nxt_alert(task,
+ "Python failed to initialize the \"asgi_websocket\" type object");
+ return NXT_ERROR;
+ }
+
+ return NXT_OK;
+}
+
+
+PyObject *
+nxt_py_asgi_websocket_create(nxt_unit_request_info_t *req)
+{
+ nxt_py_asgi_websocket_t *ws;
+
+ ws = PyObject_New(nxt_py_asgi_websocket_t, &nxt_py_asgi_websocket_type);
+
+ if (nxt_fast_path(ws != NULL)) {
+ ws->req = req;
+ ws->receive_future = NULL;
+ ws->receive_exc_str = NULL;
+ ws->state = NXT_WS_INIT;
+ nxt_queue_init(&ws->pending_frames);
+ ws->pending_payload_len = 0;
+ ws->pending_frame_len = 0;
+ ws->pending_fins = 0;
+ }
+
+ return (PyObject *) ws;
+}
+
+
+static PyObject *
+nxt_py_asgi_websocket_receive(PyObject *self, PyObject *none)
+{
+ PyObject *future, *msg;
+ nxt_py_asgi_websocket_t *ws;
+
+ ws = (nxt_py_asgi_websocket_t *) self;
+
+ nxt_unit_req_debug(ws->req, "asgi_websocket_receive");
+
+ /* If exception happened out of receive() call, raise it now. */
+ if (nxt_slow_path(ws->receive_exc_str != NULL)) {
+ PyErr_SetObject(PyExc_RuntimeError, ws->receive_exc_str);
+
+ ws->receive_exc_str = NULL;
+
+ return NULL;
+ }
+
+ if (nxt_slow_path(ws->state == NXT_WS_CLOSED)) {
+ nxt_unit_req_error(ws->req,
+ "receive() called for closed WebSocket");
+
+ return PyErr_Format(PyExc_RuntimeError,
+ "WebSocket already closed");
+ }
+
+ future = PyObject_CallObject(nxt_py_loop_create_future, NULL);
+ if (nxt_slow_path(future == NULL)) {
+ nxt_unit_req_alert(ws->req, "Python failed to create Future object");
+ nxt_python_print_exception();
+
+ return PyErr_Format(PyExc_RuntimeError,
+ "failed to create Future object");
+ }
+
+ if (nxt_slow_path(ws->state == NXT_WS_INIT)) {
+ ws->state = NXT_WS_CONNECT;
+
+ msg = nxt_py_asgi_new_msg(ws->req, nxt_py_websocket_connect_str);
+
+ return nxt_py_asgi_set_result_soon(ws->req, future, msg);
+ }
+
+ if (ws->pending_fins > 0) {
+ msg = nxt_py_asgi_websocket_pop_msg(ws, NULL);
+
+ return nxt_py_asgi_set_result_soon(ws->req, future, msg);
+ }
+
+ if (nxt_slow_path(ws->state == NXT_WS_DISCONNECTED)) {
+ msg = nxt_py_asgi_websocket_disconnect_msg(ws);
+
+ return nxt_py_asgi_set_result_soon(ws->req, future, msg);
+ }
+
+ ws->receive_future = future;
+ Py_INCREF(ws->receive_future);
+
+ return future;
+}
+
+
+static PyObject *
+nxt_py_asgi_websocket_send(PyObject *self, PyObject *dict)
+{
+ PyObject *type;
+ const char *type_str;
+ Py_ssize_t type_len;
+ nxt_py_asgi_websocket_t *ws;
+
+ static const nxt_str_t websocket_accept = nxt_string("websocket.accept");
+ static const nxt_str_t websocket_close = nxt_string("websocket.close");
+ static const nxt_str_t websocket_send = nxt_string("websocket.send");
+
+ ws = (nxt_py_asgi_websocket_t *) self;
+
+ type = PyDict_GetItem(dict, nxt_py_type_str);
+ if (nxt_slow_path(type == NULL || !PyUnicode_Check(type))) {
+ nxt_unit_req_error(ws->req, "asgi_websocket_send: "
+ "'type' is not a unicode string");
+ return PyErr_Format(PyExc_TypeError,
+ "'type' is not a unicode string");
+ }
+
+ type_str = PyUnicode_AsUTF8AndSize(type, &type_len);
+
+ nxt_unit_req_debug(ws->req, "asgi_websocket_send type is '%.*s'",
+ (int) type_len, type_str);
+
+ if (type_len == (Py_ssize_t) websocket_accept.length
+ && memcmp(type_str, websocket_accept.start, type_len) == 0)
+ {
+ return nxt_py_asgi_websocket_accept(ws, dict);
+ }
+
+ if (type_len == (Py_ssize_t) websocket_close.length
+ && memcmp(type_str, websocket_close.start, type_len) == 0)
+ {
+ return nxt_py_asgi_websocket_close(ws, dict);
+ }
+
+ if (type_len == (Py_ssize_t) websocket_send.length
+ && memcmp(type_str, websocket_send.start, type_len) == 0)
+ {
+ return nxt_py_asgi_websocket_send_frame(ws, dict);
+ }
+
+ nxt_unit_req_error(ws->req, "asgi_websocket_send: "
+ "unexpected 'type': '%.*s'", (int) type_len, type_str);
+ return PyErr_Format(PyExc_AssertionError, "unexpected 'type': '%U'", type);
+}
+
+
+static PyObject *
+nxt_py_asgi_websocket_accept(nxt_py_asgi_websocket_t *ws, PyObject *dict)
+{
+ int rc;
+ char *subprotocol_str;
+ PyObject *res, *headers, *subprotocol;
+ Py_ssize_t subprotocol_len;
+ nxt_py_asgi_calc_size_ctx_t calc_size_ctx;
+ nxt_py_asgi_add_field_ctx_t add_field_ctx;
+
+ static const nxt_str_t ws_protocol = nxt_string("sec-websocket-protocol");
+
+ switch(ws->state) {
+ case NXT_WS_INIT:
+ return PyErr_Format(PyExc_RuntimeError,
+ "WebSocket connect not received");
+ case NXT_WS_CONNECT:
+ break;
+
+ case NXT_WS_ACCEPTED:
+ return PyErr_Format(PyExc_RuntimeError, "WebSocket already accepted");
+
+ case NXT_WS_DISCONNECTED:
+ return PyErr_Format(PyExc_RuntimeError, "WebSocket disconnected");
+
+ case NXT_WS_CLOSED:
+ return PyErr_Format(PyExc_RuntimeError, "WebSocket already closed");
+ }
+
+ if (nxt_slow_path(nxt_unit_response_is_websocket(ws->req))) {
+ return PyErr_Format(PyExc_RuntimeError, "WebSocket already accepted");
+ }
+
+ if (nxt_slow_path(nxt_unit_response_is_sent(ws->req))) {
+ return PyErr_Format(PyExc_RuntimeError, "response already sent");
+ }
+
+ calc_size_ctx.fields_size = 0;
+ calc_size_ctx.fields_count = 0;
+
+ headers = PyDict_GetItem(dict, nxt_py_headers_str);
+ if (headers != NULL) {
+ res = nxt_py_asgi_enum_headers(headers, nxt_py_asgi_calc_size,
+ &calc_size_ctx);
+ if (nxt_slow_path(res == NULL)) {
+ return NULL;
+ }
+ }
+
+ subprotocol = PyDict_GetItem(dict, nxt_py_subprotocol_str);
+ if (subprotocol != NULL && PyUnicode_Check(subprotocol)) {
+ subprotocol_str = PyUnicode_DATA(subprotocol);
+ subprotocol_len = PyUnicode_GET_LENGTH(subprotocol);
+
+ calc_size_ctx.fields_size += ws_protocol.length + subprotocol_len;
+ calc_size_ctx.fields_count++;
+
+ } else {
+ subprotocol_str = NULL;
+ subprotocol_len = 0;
+ }
+
+ rc = nxt_unit_response_init(ws->req, 101,
+ calc_size_ctx.fields_count,
+ calc_size_ctx.fields_size);
+ if (nxt_slow_path(rc != NXT_UNIT_OK)) {
+ return PyErr_Format(PyExc_RuntimeError,
+ "failed to allocate response object");
+ }
+
+ add_field_ctx.req = ws->req;
+ add_field_ctx.content_length = -1;
+
+ if (headers != NULL) {
+ res = nxt_py_asgi_enum_headers(headers, nxt_py_asgi_add_field,
+ &add_field_ctx);
+ if (nxt_slow_path(res == NULL)) {
+ return NULL;
+ }
+ }
+
+ if (subprotocol_len > 0) {
+ rc = nxt_unit_response_add_field(ws->req,
+ (const char *) ws_protocol.start,
+ ws_protocol.length,
+ subprotocol_str, subprotocol_len);
+ if (nxt_slow_path(rc != NXT_UNIT_OK)) {
+ return PyErr_Format(PyExc_RuntimeError,
+ "failed to add header");
+ }
+ }
+
+ rc = nxt_unit_response_send(ws->req);
+ if (nxt_slow_path(rc != NXT_UNIT_OK)) {
+ return PyErr_Format(PyExc_RuntimeError, "failed to send response");
+ }
+
+ ws->state = NXT_WS_ACCEPTED;
+
+ Py_INCREF(ws);
+
+ return (PyObject *) ws;
+}
+
+
+static PyObject *
+nxt_py_asgi_websocket_close(nxt_py_asgi_websocket_t *ws, PyObject *dict)
+{
+ int rc;
+ uint16_t status_code;
+ PyObject *code;
+
+ if (nxt_slow_path(ws->state == NXT_WS_INIT)) {
+ return PyErr_Format(PyExc_RuntimeError,
+ "WebSocket connect not received");
+ }
+
+ if (nxt_slow_path(ws->state == NXT_WS_DISCONNECTED)) {
+ return PyErr_Format(PyExc_RuntimeError, "WebSocket disconnected");
+ }
+
+ if (nxt_slow_path(ws->state == NXT_WS_CLOSED)) {
+ return PyErr_Format(PyExc_RuntimeError, "WebSocket already closed");
+ }
+
+ if (nxt_unit_response_is_websocket(ws->req)) {
+ code = PyDict_GetItem(dict, nxt_py_code_str);
+ if (nxt_slow_path(code != NULL && !PyLong_Check(code))) {
+ return PyErr_Format(PyExc_TypeError, "'code' is not integer");
+ }
+
+ status_code = (code != NULL) ? htons(PyLong_AsLong(code))
+ : htons(NXT_WEBSOCKET_CR_NORMAL);
+
+ rc = nxt_unit_websocket_send(ws->req, NXT_WEBSOCKET_OP_CLOSE,
+ 1, &status_code, 2);
+ if (nxt_slow_path(rc != NXT_UNIT_OK)) {
+ return PyErr_Format(PyExc_RuntimeError,
+ "failed to send close frame");
+ }
+
+ } else {
+ rc = nxt_unit_response_init(ws->req, 403, 0, 0);
+ if (nxt_slow_path(rc != NXT_UNIT_OK)) {
+ return PyErr_Format(PyExc_RuntimeError,
+ "failed to allocate response object");
+ }
+
+ rc = nxt_unit_response_send(ws->req);
+ if (nxt_slow_path(rc != NXT_UNIT_OK)) {
+ return PyErr_Format(PyExc_RuntimeError,
+ "failed to send response");
+ }
+ }
+
+ ws->state = NXT_WS_CLOSED;
+
+ Py_INCREF(ws);
+
+ return (PyObject *) ws;
+}
+
+
+static PyObject *
+nxt_py_asgi_websocket_send_frame(nxt_py_asgi_websocket_t *ws, PyObject *dict)
+{
+ int rc;
+ uint8_t opcode;
+ PyObject *bytes, *text;
+ const void *buf;
+ Py_ssize_t buf_size;
+
+ if (nxt_slow_path(ws->state == NXT_WS_INIT)) {
+ return PyErr_Format(PyExc_RuntimeError,
+ "WebSocket connect not received");
+ }
+
+ if (nxt_slow_path(ws->state == NXT_WS_CONNECT)) {
+ return PyErr_Format(PyExc_RuntimeError,
+ "WebSocket not accepted yet");
+ }
+
+ if (nxt_slow_path(ws->state == NXT_WS_DISCONNECTED)) {
+ return PyErr_Format(PyExc_RuntimeError, "WebSocket disconnected");
+ }
+
+ if (nxt_slow_path(ws->state == NXT_WS_CLOSED)) {
+ return PyErr_Format(PyExc_RuntimeError, "WebSocket already closed");
+ }
+
+ bytes = PyDict_GetItem(dict, nxt_py_bytes_str);
+ if (bytes == Py_None) {
+ bytes = NULL;
+ }
+
+ if (nxt_slow_path(bytes != NULL && !PyBytes_Check(bytes))) {
+ return PyErr_Format(PyExc_TypeError,
+ "'bytes' is not a byte string");
+ }
+
+ text = PyDict_GetItem(dict, nxt_py_text_str);
+ if (text == Py_None) {
+ text = NULL;
+ }
+
+ if (nxt_slow_path(text != NULL && !PyUnicode_Check(text))) {
+ return PyErr_Format(PyExc_TypeError,
+ "'text' is not a unicode string");
+ }
+
+ if (nxt_slow_path(((bytes != NULL) ^ (text != NULL)) == 0)) {
+ return PyErr_Format(PyExc_ValueError,
+ "Exactly one of 'bytes' or 'text' must be non-None");
+ }
+
+ if (bytes != NULL) {
+ buf = PyBytes_AS_STRING(bytes);
+ buf_size = PyBytes_GET_SIZE(bytes);
+ opcode = NXT_WEBSOCKET_OP_BINARY;
+
+ } else {
+ buf = PyUnicode_AsUTF8AndSize(text, &buf_size);
+ opcode = NXT_WEBSOCKET_OP_TEXT;
+ }
+
+ rc = nxt_unit_websocket_send(ws->req, opcode, 1, buf, buf_size);
+ if (nxt_slow_path(rc != NXT_UNIT_OK)) {
+ return PyErr_Format(PyExc_RuntimeError, "failed to send close frame");
+ }
+
+ Py_INCREF(ws);
+ return (PyObject *) ws;
+}
+
+
+void
+nxt_py_asgi_websocket_handler(nxt_unit_websocket_frame_t *frame)
+{
+ uint8_t opcode;
+ uint16_t status_code;
+ uint64_t rest;
+ PyObject *msg, *exc;
+ nxt_py_asgi_websocket_t *ws;
+
+ ws = frame->req->data;
+
+ nxt_unit_req_debug(ws->req, "asgi_websocket_handler");
+
+ opcode = frame->header->opcode;
+ if (nxt_slow_path(opcode != NXT_WEBSOCKET_OP_CONT
+ && opcode != NXT_WEBSOCKET_OP_TEXT
+ && opcode != NXT_WEBSOCKET_OP_BINARY
+ && opcode != NXT_WEBSOCKET_OP_CLOSE))
+ {
+ nxt_unit_websocket_done(frame);
+
+ nxt_unit_req_debug(ws->req,
+ "asgi_websocket_handler: ignore frame with opcode %d",
+ opcode);
+
+ return;
+ }
+
+ if (nxt_slow_path(ws->state != NXT_WS_ACCEPTED)) {
+ nxt_unit_websocket_done(frame);
+
+ goto bad_state;
+ }
+
+ rest = nxt_py_asgi_ws_max_frame_size - ws->pending_frame_len;
+
+ if (nxt_slow_path(frame->payload_len > rest)) {
+ nxt_unit_websocket_done(frame);
+
+ goto too_big;
+ }
+
+ rest = nxt_py_asgi_ws_max_buffer_size - ws->pending_payload_len;
+
+ if (nxt_slow_path(frame->payload_len > rest)) {
+ nxt_unit_websocket_done(frame);
+
+ goto too_big;
+ }
+
+ if (ws->receive_future == NULL || frame->header->fin == 0) {
+ nxt_py_asgi_websocket_suspend_frame(frame);
+
+ return;
+ }
+
+ if (!nxt_queue_is_empty(&ws->pending_frames)) {
+ if (nxt_slow_path(opcode == NXT_WEBSOCKET_OP_TEXT
+ || opcode == NXT_WEBSOCKET_OP_BINARY))
+ {
+ nxt_unit_req_alert(ws->req,
+ "Invalid state: pending frames with active receiver. "
+ "CONT frame expected. (%d)", opcode);
+
+ PyErr_SetString(PyExc_AssertionError,
+ "Invalid state: pending frames with active receiver. "
+ "CONT frame expected.");
+
+ nxt_unit_websocket_done(frame);
+
+ return;
+ }
+ }
+
+ msg = nxt_py_asgi_websocket_pop_msg(ws, frame);
+ if (nxt_slow_path(msg == NULL)) {
+ exc = PyErr_Occurred();
+ Py_INCREF(exc);
+
+ goto raise;
+ }
+
+ nxt_py_asgi_websocket_receive_done(ws, msg);
+
+ return;
+
+bad_state:
+
+ if (ws->receive_future == NULL) {
+ ws->receive_exc_str = nxt_py_bad_state_str;
+
+ return;
+ }
+
+ exc = PyObject_CallFunctionObjArgs(PyExc_RuntimeError,
+ nxt_py_bad_state_str,
+ NULL);
+ if (nxt_slow_path(exc == NULL)) {
+ nxt_unit_req_alert(ws->req, "RuntimeError create failed");
+ nxt_python_print_exception();
+
+ exc = Py_None;
+ Py_INCREF(exc);
+ }
+
+ goto raise;
+
+too_big:
+
+ status_code = htons(NXT_WEBSOCKET_CR_MESSAGE_TOO_BIG);
+
+ (void) nxt_unit_websocket_send(ws->req, NXT_WEBSOCKET_OP_CLOSE,
+ 1, &status_code, 2);
+
+ ws->state = NXT_WS_CLOSED;
+
+ if (ws->receive_future == NULL) {
+ ws->receive_exc_str = nxt_py_message_too_big_str;
+
+ return;
+ }
+
+ exc = PyObject_CallFunctionObjArgs(PyExc_RuntimeError,
+ nxt_py_message_too_big_str,
+ NULL);
+ if (nxt_slow_path(exc == NULL)) {
+ nxt_unit_req_alert(ws->req, "RuntimeError create failed");
+ nxt_python_print_exception();
+
+ exc = Py_None;
+ Py_INCREF(exc);
+ }
+
+raise:
+
+ nxt_py_asgi_websocket_receive_fail(ws, exc);
+}
+
+
+static void
+nxt_py_asgi_websocket_receive_done(nxt_py_asgi_websocket_t *ws, PyObject *msg)
+{
+ PyObject *future, *res;
+
+ future = ws->receive_future;
+ ws->receive_future = NULL;
+
+ res = PyObject_CallMethodObjArgs(future, nxt_py_set_result_str, msg, NULL);
+ if (nxt_slow_path(res == NULL)) {
+ nxt_unit_req_alert(ws->req, "'set_result' call failed");
+ nxt_python_print_exception();
+ }
+
+ Py_XDECREF(res);
+ Py_DECREF(future);
+
+ Py_DECREF(msg);
+}
+
+
+static void
+nxt_py_asgi_websocket_receive_fail(nxt_py_asgi_websocket_t *ws, PyObject *exc)
+{
+ PyObject *future, *res;
+
+ future = ws->receive_future;
+ ws->receive_future = NULL;
+
+ res = PyObject_CallMethodObjArgs(future, nxt_py_set_exception_str, exc,
+ NULL);
+ if (nxt_slow_path(res == NULL)) {
+ nxt_unit_req_alert(ws->req, "'set_exception' call failed");
+ nxt_python_print_exception();
+ }
+
+ Py_XDECREF(res);
+ Py_DECREF(future);
+
+ Py_DECREF(exc);
+}
+
+
+static void
+nxt_py_asgi_websocket_suspend_frame(nxt_unit_websocket_frame_t *frame)
+{
+ int rc;
+ nxt_py_asgi_websocket_t *ws;
+ nxt_py_asgi_penging_frame_t *p;
+
+ nxt_unit_req_debug(frame->req, "asgi_websocket_suspend_frame: "
+ "%d, %"PRIu64", %d",
+ frame->header->opcode, frame->payload_len,
+ frame->header->fin);
+
+ ws = frame->req->data;
+
+ rc = nxt_unit_websocket_retain(frame);
+ if (nxt_slow_path(rc == NXT_UNIT_ERROR)) {
+ nxt_unit_req_alert(ws->req, "Failed to retain frame for suspension.");
+
+ nxt_unit_websocket_done(frame);
+
+ PyErr_SetString(PyExc_RuntimeError,
+ "Failed to retain frame for suspension.");
+
+ return;
+ }
+
+ p = nxt_unit_malloc(frame->req->ctx, sizeof(nxt_py_asgi_penging_frame_t));
+ if (nxt_slow_path(p == NULL)) {
+ nxt_unit_req_alert(ws->req,
+ "Failed to allocate buffer to suspend frame.");
+
+ nxt_unit_websocket_done(frame);
+
+ PyErr_SetString(PyExc_RuntimeError,
+ "Failed to allocate buffer to suspend frame.");
+
+ return;
+ }
+
+ p->frame = frame;
+ nxt_queue_insert_tail(&ws->pending_frames, &p->link);
+
+ ws->pending_payload_len += frame->payload_len;
+ ws->pending_fins += frame->header->fin;
+
+ if (frame->header->fin) {
+ ws->pending_frame_len = 0;
+
+ } else {
+ if (frame->header->opcode == NXT_WEBSOCKET_OP_CONT) {
+ ws->pending_frame_len += frame->payload_len;
+
+ } else {
+ ws->pending_frame_len = frame->payload_len;
+ }
+ }
+}
+
+
+static PyObject *
+nxt_py_asgi_websocket_pop_msg(nxt_py_asgi_websocket_t *ws,
+ nxt_unit_websocket_frame_t *frame)
+{
+ int fin;
+ char *buf;
+ uint8_t code_buf[2], opcode;
+ uint16_t code;
+ PyObject *msg, *data, *type, *data_key;
+ uint64_t payload_len;
+ nxt_unit_websocket_frame_t *fin_frame;
+
+ nxt_unit_req_debug(ws->req, "asgi_websocket_pop_msg");
+
+ fin_frame = NULL;
+
+ if (nxt_queue_is_empty(&ws->pending_frames)
+ || (frame != NULL
+ && frame->header->opcode == NXT_WEBSOCKET_OP_CLOSE))
+ {
+ payload_len = frame->payload_len;
+
+ } else {
+ if (frame != NULL) {
+ payload_len = ws->pending_payload_len + frame->payload_len;
+ fin_frame = frame;
+
+ } else {
+ payload_len = nxt_py_asgi_websocket_pending_len(ws);
+ }
+
+ frame = nxt_py_asgi_websocket_pop_frame(ws);
+ }
+
+ opcode = frame->header->opcode;
+
+ if (nxt_slow_path(opcode == NXT_WEBSOCKET_OP_CONT)) {
+ nxt_unit_req_alert(ws->req,
+ "Invalid state: attempt to process CONT frame.");
+
+ nxt_unit_websocket_done(frame);
+
+ return PyErr_Format(PyExc_AssertionError,
+ "Invalid state: attempt to process CONT frame.");
+ }
+
+ type = nxt_py_websocket_receive_str;
+
+ switch (opcode) {
+ case NXT_WEBSOCKET_OP_TEXT:
+ buf = nxt_unit_malloc(frame->req->ctx, payload_len);
+ if (nxt_slow_path(buf == NULL)) {
+ nxt_unit_req_alert(ws->req,
+ "Failed to allocate buffer for payload (%d).",
+ (int) payload_len);
+
+ nxt_unit_websocket_done(frame);
+
+ return PyErr_Format(PyExc_RuntimeError,
+ "Failed to allocate buffer for payload (%d).",
+ (int) payload_len);
+ }
+
+ data = NULL;
+ data_key = nxt_py_text_str;
+
+ break;
+
+ case NXT_WEBSOCKET_OP_BINARY:
+ data = PyBytes_FromStringAndSize(NULL, payload_len);
+ if (nxt_slow_path(data == NULL)) {
+ nxt_unit_req_alert(ws->req,
+ "Failed to create Bytes for payload (%d).",
+ (int) payload_len);
+ nxt_python_print_exception();
+
+ nxt_unit_websocket_done(frame);
+
+ return PyErr_Format(PyExc_RuntimeError,
+ "Failed to create Bytes for payload.");
+ }
+
+ buf = (char *) PyBytes_AS_STRING(data);
+ data_key = nxt_py_bytes_str;
+
+ break;
+
+ case NXT_WEBSOCKET_OP_CLOSE:
+ if (frame->payload_len >= 2) {
+ nxt_unit_websocket_read(frame, code_buf, 2);
+ code = ((uint16_t) code_buf[0]) << 8 | code_buf[1];
+
+ } else {
+ code = NXT_WEBSOCKET_CR_NORMAL;
+ }
+
+ nxt_unit_websocket_done(frame);
+
+ data = PyLong_FromLong(code);
+ if (nxt_slow_path(data == NULL)) {
+ nxt_unit_req_alert(ws->req,
+ "Failed to create Long from code %d.",
+ (int) code);
+ nxt_python_print_exception();
+
+ return PyErr_Format(PyExc_RuntimeError,
+ "Failed to create Long from code %d.",
+ (int) code);
+ }
+
+ buf = NULL;
+ type = nxt_py_websocket_disconnect_str;
+ data_key = nxt_py_code_str;
+
+ break;
+
+ default:
+ nxt_unit_req_alert(ws->req, "Unexpected opcode %d", opcode);
+
+ nxt_unit_websocket_done(frame);
+
+ return PyErr_Format(PyExc_AssertionError, "Unexpected opcode %d",
+ opcode);
+ }
+
+ if (buf != NULL) {
+ fin = frame->header->fin;
+ buf += nxt_unit_websocket_read(frame, buf, frame->payload_len);
+
+ nxt_unit_websocket_done(frame);
+
+ if (!fin) {
+ while (!nxt_queue_is_empty(&ws->pending_frames)) {
+ frame = nxt_py_asgi_websocket_pop_frame(ws);
+ fin = frame->header->fin;
+
+ buf += nxt_unit_websocket_read(frame, buf, frame->payload_len);
+
+ nxt_unit_websocket_done(frame);
+
+ if (fin) {
+ break;
+ }
+ }
+
+ if (fin_frame != NULL) {
+ buf += nxt_unit_websocket_read(fin_frame, buf,
+ fin_frame->payload_len);
+ nxt_unit_websocket_done(fin_frame);
+ }
+ }
+
+ if (opcode == NXT_WEBSOCKET_OP_TEXT) {
+ buf -= payload_len;
+
+ data = PyUnicode_DecodeUTF8(buf, payload_len, NULL);
+
+ nxt_unit_free(ws->req->ctx, buf);
+
+ if (nxt_slow_path(data == NULL)) {
+ nxt_unit_req_alert(ws->req,
+ "Failed to create Unicode for payload (%d).",
+ (int) payload_len);
+ nxt_python_print_exception();
+
+ return PyErr_Format(PyExc_RuntimeError,
+ "Failed to create Unicode.");
+ }
+ }
+ }
+
+ msg = nxt_py_asgi_new_msg(ws->req, type);
+ if (nxt_slow_path(msg == NULL)) {
+ Py_DECREF(data);
+ return NULL;
+ }
+
+ if (nxt_slow_path(PyDict_SetItem(msg, data_key, data) == -1)) {
+ nxt_unit_req_alert(ws->req, "Python failed to set 'msg.data' item");
+
+ Py_DECREF(msg);
+ Py_DECREF(data);
+
+ return PyErr_Format(PyExc_RuntimeError,
+ "Python failed to set 'msg.data' item");
+ }
+
+ Py_DECREF(data);
+
+ return msg;
+}
+
+
+static uint64_t
+nxt_py_asgi_websocket_pending_len(nxt_py_asgi_websocket_t *ws)
+{
+ uint64_t res;
+ nxt_py_asgi_penging_frame_t *p;
+
+ res = 0;
+
+ nxt_queue_each(p, &ws->pending_frames, nxt_py_asgi_penging_frame_t, link) {
+ res += p->frame->payload_len;
+
+ if (p->frame->header->fin) {
+ nxt_unit_req_debug(ws->req, "asgi_websocket_pending_len: %d",
+ (int) res);
+ return res;
+ }
+ } nxt_queue_loop;
+
+ nxt_unit_req_debug(ws->req, "asgi_websocket_pending_len: %d (all)",
+ (int) res);
+ return res;
+}
+
+
+static nxt_unit_websocket_frame_t *
+nxt_py_asgi_websocket_pop_frame(nxt_py_asgi_websocket_t *ws)
+{
+ nxt_queue_link_t *lnk;
+ nxt_unit_websocket_frame_t *frame;
+ nxt_py_asgi_penging_frame_t *p;
+
+ lnk = nxt_queue_first(&ws->pending_frames);
+ nxt_queue_remove(lnk);
+
+ p = nxt_queue_link_data(lnk, nxt_py_asgi_penging_frame_t, link);
+
+ frame = p->frame;
+ ws->pending_payload_len -= frame->payload_len;
+ ws->pending_fins -= frame->header->fin;
+
+ nxt_unit_free(frame->req->ctx, p);
+
+ nxt_unit_req_debug(frame->req, "asgi_websocket_pop_frame: "
+ "%d, %"PRIu64", %d",
+ frame->header->opcode, frame->payload_len,
+ frame->header->fin);
+
+ return frame;
+}
+
+
+void
+nxt_py_asgi_websocket_close_handler(nxt_unit_request_info_t *req)
+{
+ PyObject *msg, *exc;
+ nxt_py_asgi_websocket_t *ws;
+
+ ws = req->data;
+
+ nxt_unit_req_debug(req, "asgi_websocket_close_handler");
+
+ if (ws->receive_future == NULL) {
+ ws->state = NXT_WS_DISCONNECTED;
+
+ return;
+ }
+
+ msg = nxt_py_asgi_websocket_disconnect_msg(ws);
+ if (nxt_slow_path(msg == NULL)) {
+ exc = PyErr_Occurred();
+ Py_INCREF(exc);
+
+ nxt_py_asgi_websocket_receive_fail(ws, exc);
+
+ } else {
+ nxt_py_asgi_websocket_receive_done(ws, msg);
+ }
+}
+
+
+static PyObject *
+nxt_py_asgi_websocket_disconnect_msg(nxt_py_asgi_websocket_t *ws)
+{
+ PyObject *msg, *code;
+
+ msg = nxt_py_asgi_new_msg(ws->req, nxt_py_websocket_disconnect_str);
+ if (nxt_slow_path(msg == NULL)) {
+ return NULL;
+ }
+
+ code = PyLong_FromLong(NXT_WEBSOCKET_CR_GOING_AWAY);
+ if (nxt_slow_path(code == NULL)) {
+ nxt_unit_req_alert(ws->req, "Python failed to create long");
+ nxt_python_print_exception();
+
+ Py_DECREF(msg);
+
+ return PyErr_Format(PyExc_RuntimeError, "failed to create long");
+ }
+
+ if (nxt_slow_path(PyDict_SetItem(msg, nxt_py_code_str, code) == -1)) {
+ nxt_unit_req_alert(ws->req, "Python failed to set 'msg.code' item");
+
+ Py_DECREF(msg);
+ Py_DECREF(code);
+
+ return PyErr_Format(PyExc_RuntimeError,
+ "Python failed to set 'msg.code' item");
+ }
+
+ Py_DECREF(code);
+
+ return msg;
+}
+
+
+static PyObject *
+nxt_py_asgi_websocket_done(PyObject *self, PyObject *future)
+{
+ int rc;
+ uint16_t status_code;
+ PyObject *res;
+ nxt_py_asgi_websocket_t *ws;
+
+ ws = (nxt_py_asgi_websocket_t *) self;
+
+ nxt_unit_req_debug(ws->req, "asgi_websocket_done: %p", self);
+
+ /*
+ * Get Future.result() and it raises an exception, if coroutine exited
+ * with exception.
+ */
+ res = PyObject_CallMethodObjArgs(future, nxt_py_result_str, NULL);
+ if (nxt_slow_path(res == NULL)) {
+ nxt_unit_req_error(ws->req,
+ "Python failed to call 'future.result()'");
+ nxt_python_print_exception();
+
+ rc = NXT_UNIT_ERROR;
+
+ } else {
+ Py_DECREF(res);
+
+ rc = NXT_UNIT_OK;
+ }
+
+ if (ws->state == NXT_WS_ACCEPTED) {
+ status_code = (rc == NXT_UNIT_OK)
+ ? htons(NXT_WEBSOCKET_CR_NORMAL)
+ : htons(NXT_WEBSOCKET_CR_INTERNAL_SERVER_ERROR);
+
+ rc = nxt_unit_websocket_send(ws->req, NXT_WEBSOCKET_OP_CLOSE,
+ 1, &status_code, 2);
+ }
+
+ while (!nxt_queue_is_empty(&ws->pending_frames)) {
+ nxt_unit_websocket_done(nxt_py_asgi_websocket_pop_frame(ws));
+ }
+
+ nxt_unit_request_done(ws->req, rc);
+
+ Py_RETURN_NONE;
+}
+
+
+#endif /* NXT_HAVE_ASGI */