summaryrefslogtreecommitdiffhomepage
path: root/test/test_asgi_websockets.py
diff options
context:
space:
mode:
Diffstat (limited to '')
-rw-r--r--test/test_asgi_websockets.py208
1 files changed, 108 insertions, 100 deletions
diff --git a/test/test_asgi_websockets.py b/test/test_asgi_websockets.py
index 7218d526..140bcb9a 100644
--- a/test/test_asgi_websockets.py
+++ b/test/test_asgi_websockets.py
@@ -3,14 +3,16 @@ import time
from distutils.version import LooseVersion
import pytest
+
from unit.applications.lang.python import TestApplicationPython
from unit.applications.websockets import TestApplicationWebsocket
from unit.option import option
class TestASGIWebsockets(TestApplicationPython):
- prerequisites = {'modules': {'python':
- lambda v: LooseVersion(v) >= LooseVersion('3.5')}}
+ prerequisites = {
+ 'modules': {'python': lambda v: LooseVersion(v) >= LooseVersion('3.5')}
+ }
load_module = 'asgi'
ws = TestApplicationWebsocket()
@@ -74,7 +76,9 @@ class TestASGIWebsockets(TestApplicationPython):
sock.close()
assert resp['status'] == 101, 'status'
- assert resp['headers']['x-subprotocols'] == "('chat', 'phone', 'video')", 'subprotocols'
+ assert (
+ resp['headers']['x-subprotocols'] == "('chat', 'phone', 'video')"
+ ), 'subprotocols'
assert resp['headers']['sec-websocket-protocol'] == 'chat', 'key'
def test_asgi_websockets_mirror(self):
@@ -159,7 +163,7 @@ class TestASGIWebsockets(TestApplicationPython):
self.ws.frame_write(sock, self.ws.OP_TEXT, 'fragment1', fin=False)
self.ws.frame_write(
- sock, self.ws.OP_CONT, 'fragment2', length=2**64 - 1
+ sock, self.ws.OP_CONT, 'fragment2', length=2 ** 64 - 1
)
self.check_close(sock, 1009) # 1009 - CLOSE_TOO_LARGE
@@ -231,7 +235,7 @@ class TestASGIWebsockets(TestApplicationPython):
@pytest.mark.skip('not yet')
def test_asgi_websockets_handshake_upgrade_absent(
- self
+ self,
): # FAIL https://tools.ietf.org/html/rfc6455#section-4.2.1
self.load('websockets/mirror')
@@ -324,7 +328,9 @@ class TestASGIWebsockets(TestApplicationPython):
},
)
- assert resp['status'] == 400, 'key double' # FAIL https://tools.ietf.org/html/rfc6455#section-11.3.1
+ assert (
+ resp['status'] == 400
+ ), 'key double' # FAIL https://tools.ietf.org/html/rfc6455#section-11.3.1
def test_asgi_websockets_handshake_method_invalid(self):
self.load('websockets/mirror')
@@ -419,14 +425,14 @@ class TestASGIWebsockets(TestApplicationPython):
frame = self.ws.frame_read(sock)
self.check_frame(frame, True, opcode, payload)
- check_length(0) # 1_1_1
- check_length(125) # 1_1_2
- check_length(126) # 1_1_3
- check_length(127) # 1_1_4
- check_length(128) # 1_1_5
- check_length(65535) # 1_1_6
- check_length(65536) # 1_1_7
- check_length(65536, chopsize = 997) # 1_1_8
+ check_length(0) # 1_1_1
+ check_length(125) # 1_1_2
+ check_length(126) # 1_1_3
+ check_length(127) # 1_1_4
+ check_length(128) # 1_1_5
+ check_length(65535) # 1_1_6
+ check_length(65536) # 1_1_7
+ check_length(65536, chopsize=997) # 1_1_8
self.close_connection(sock)
@@ -445,14 +451,14 @@ class TestASGIWebsockets(TestApplicationPython):
self.check_frame(frame, True, opcode, payload)
- check_length(0) # 1_2_1
- check_length(125) # 1_2_2
- check_length(126) # 1_2_3
- check_length(127) # 1_2_4
- check_length(128) # 1_2_5
- check_length(65535) # 1_2_6
- check_length(65536) # 1_2_7
- check_length(65536, chopsize = 997) # 1_2_8
+ check_length(0) # 1_2_1
+ check_length(125) # 1_2_2
+ check_length(126) # 1_2_3
+ check_length(127) # 1_2_4
+ check_length(128) # 1_2_5
+ check_length(65535) # 1_2_6
+ check_length(65536) # 1_2_7
+ check_length(65536, chopsize=997) # 1_2_8
self.close_connection(sock)
@@ -470,11 +476,11 @@ class TestASGIWebsockets(TestApplicationPython):
self.check_frame(frame, True, op_pong, payload, decode=decode)
- check_ping('') # 2_1
- check_ping('Hello, world!') # 2_2
+ check_ping('') # 2_1
+ check_ping('Hello, world!') # 2_2
check_ping(b'\x00\xff\xfe\xfd\xfc\xfb\x00\xff', decode=False) # 2_3
- check_ping(b'\xfe' * 125, decode=False) # 2_4
- check_ping(b'\xfe' * 125, chopsize=1, decode=False) # 2_6
+ check_ping(b'\xfe' * 125, decode=False) # 2_4
+ check_ping(b'\xfe' * 125, chopsize=1, decode=False) # 2_6
self.close_connection(sock)
@@ -935,7 +941,9 @@ class TestASGIWebsockets(TestApplicationPython):
frame = self.ws.frame_read(sock)
if frame['opcode'] == self.ws.OP_TEXT:
- self.check_frame(frame, True, self.ws.OP_TEXT, 'fragment1fragment2')
+ self.check_frame(
+ frame, True, self.ws.OP_TEXT, 'fragment1fragment2'
+ )
frame = None
self.check_close(sock, 1002, frame=frame)
@@ -1092,27 +1100,27 @@ class TestASGIWebsockets(TestApplicationPython):
self.close_connection(sock)
-# Unit does not support UTF-8 validation
-#
-# # 6_3_1 FAIL
-#
-# payload_1 = '\xce\xba\xe1\xbd\xb9\xcf\x83\xce\xbc\xce\xb5'
-# payload_2 = '\xed\xa0\x80'
-# payload_3 = '\x65\x64\x69\x74\x65\x64'
-#
-# payload = payload_1 + payload_2 + payload_3
-#
-# self.ws.message(sock, self.ws.OP_TEXT, payload)
-# self.check_close(sock, 1007)
-#
-# # 6_3_2 FAIL
-#
-# _, sock, _ = self.ws.upgrade()
-#
-# self.ws.message(sock, self.ws.OP_TEXT, payload, fragmention_size=1)
-# self.check_close(sock, 1007)
-#
-# # 6_4_1 ... 6_4_4 FAIL
+ # Unit does not support UTF-8 validation
+ #
+ # # 6_3_1 FAIL
+ #
+ # payload_1 = '\xce\xba\xe1\xbd\xb9\xcf\x83\xce\xbc\xce\xb5'
+ # payload_2 = '\xed\xa0\x80'
+ # payload_3 = '\x65\x64\x69\x74\x65\x64'
+ #
+ # payload = payload_1 + payload_2 + payload_3
+ #
+ # self.ws.message(sock, self.ws.OP_TEXT, payload)
+ # self.check_close(sock, 1007)
+ #
+ # # 6_3_2 FAIL
+ #
+ # _, sock, _ = self.ws.upgrade()
+ #
+ # self.ws.message(sock, self.ws.OP_TEXT, payload, fragmention_size=1)
+ # self.check_close(sock, 1007)
+ #
+ # # 6_4_1 ... 6_4_4 FAIL
def test_asgi_websockets_7_1_1__7_5_1(self):
self.load('websockets/mirror')
@@ -1239,15 +1247,15 @@ class TestASGIWebsockets(TestApplicationPython):
self.ws.frame_write(sock, self.ws.OP_CLOSE, payload)
self.check_close(sock, 1002)
-# # 7_5_1 FAIL Unit does not support UTF-8 validation
-#
-# _, sock, _ = self.ws.upgrade()
-#
-# payload = self.ws.serialize_close(reason = '\xce\xba\xe1\xbd\xb9\xcf' \
-# '\x83\xce\xbc\xce\xb5\xed\xa0\x80\x65\x64\x69\x74\x65\x64')
-#
-# self.ws.frame_write(sock, self.ws.OP_CLOSE, payload)
-# self.check_close(sock, 1007)
+ # # 7_5_1 FAIL Unit does not support UTF-8 validation
+ #
+ # _, sock, _ = self.ws.upgrade()
+ #
+ # payload = self.ws.serialize_close(reason = '\xce\xba\xe1\xbd\xb9\xcf' \
+ # '\x83\xce\xbc\xce\xb5\xed\xa0\x80\x65\x64\x69\x74\x65\x64')
+ #
+ # self.ws.frame_write(sock, self.ws.OP_CLOSE, payload)
+ # self.check_close(sock, 1007)
def test_asgi_websockets_7_7_X__7_9_X(self):
self.load('websockets/mirror')
@@ -1350,52 +1358,52 @@ class TestASGIWebsockets(TestApplicationPython):
frame = self.ws.frame_read(sock, read_timeout=5)
self.check_frame(frame, True, opcode, payload)
- check_payload(op_text, 64 * 2 ** 10) # 9_1_1
- check_payload(op_text, 256 * 2 ** 10) # 9_1_2
- check_payload(op_text, 2 ** 20) # 9_1_3
- check_payload(op_text, 4 * 2 ** 20) # 9_1_4
- check_payload(op_text, 8 * 2 ** 20) # 9_1_5
- check_payload(op_text, 16 * 2 ** 20) # 9_1_6
+ check_payload(op_text, 64 * 2 ** 10) # 9_1_1
+ check_payload(op_text, 256 * 2 ** 10) # 9_1_2
+ check_payload(op_text, 2 ** 20) # 9_1_3
+ check_payload(op_text, 4 * 2 ** 20) # 9_1_4
+ check_payload(op_text, 8 * 2 ** 20) # 9_1_5
+ check_payload(op_text, 16 * 2 ** 20) # 9_1_6
- check_payload(op_binary, 64 * 2 ** 10) # 9_2_1
- check_payload(op_binary, 256 * 2 ** 10) # 9_2_2
- check_payload(op_binary, 2 ** 20) # 9_2_3
- check_payload(op_binary, 4 * 2 ** 20) # 9_2_4
- check_payload(op_binary, 8 * 2 ** 20) # 9_2_5
- check_payload(op_binary, 16 * 2 ** 20) # 9_2_6
+ check_payload(op_binary, 64 * 2 ** 10) # 9_2_1
+ check_payload(op_binary, 256 * 2 ** 10) # 9_2_2
+ check_payload(op_binary, 2 ** 20) # 9_2_3
+ check_payload(op_binary, 4 * 2 ** 20) # 9_2_4
+ check_payload(op_binary, 8 * 2 ** 20) # 9_2_5
+ check_payload(op_binary, 16 * 2 ** 20) # 9_2_6
if option.system != 'Darwin' and option.system != 'FreeBSD':
- check_message(op_text, 64) # 9_3_1
- check_message(op_text, 256) # 9_3_2
- check_message(op_text, 2 ** 10) # 9_3_3
- check_message(op_text, 4 * 2 ** 10) # 9_3_4
- check_message(op_text, 16 * 2 ** 10) # 9_3_5
- check_message(op_text, 64 * 2 ** 10) # 9_3_6
- check_message(op_text, 256 * 2 ** 10) # 9_3_7
- check_message(op_text, 2 ** 20) # 9_3_8
- check_message(op_text, 4 * 2 ** 20) # 9_3_9
-
- check_message(op_binary, 64) # 9_4_1
- check_message(op_binary, 256) # 9_4_2
- check_message(op_binary, 2 ** 10) # 9_4_3
- check_message(op_binary, 4 * 2 ** 10) # 9_4_4
- check_message(op_binary, 16 * 2 ** 10) # 9_4_5
- check_message(op_binary, 64 * 2 ** 10) # 9_4_6
- check_message(op_binary, 256 * 2 ** 10) # 9_4_7
- check_message(op_binary, 2 ** 20) # 9_4_8
- check_message(op_binary, 4 * 2 ** 20) # 9_4_9
-
- check_payload(op_text, 2 ** 20, chopsize=64) # 9_5_1
- check_payload(op_text, 2 ** 20, chopsize=128) # 9_5_2
- check_payload(op_text, 2 ** 20, chopsize=256) # 9_5_3
- check_payload(op_text, 2 ** 20, chopsize=512) # 9_5_4
- check_payload(op_text, 2 ** 20, chopsize=1024) # 9_5_5
- check_payload(op_text, 2 ** 20, chopsize=2048) # 9_5_6
-
- check_payload(op_binary, 2 ** 20, chopsize=64) # 9_6_1
- check_payload(op_binary, 2 ** 20, chopsize=128) # 9_6_2
- check_payload(op_binary, 2 ** 20, chopsize=256) # 9_6_3
- check_payload(op_binary, 2 ** 20, chopsize=512) # 9_6_4
+ check_message(op_text, 64) # 9_3_1
+ check_message(op_text, 256) # 9_3_2
+ check_message(op_text, 2 ** 10) # 9_3_3
+ check_message(op_text, 4 * 2 ** 10) # 9_3_4
+ check_message(op_text, 16 * 2 ** 10) # 9_3_5
+ check_message(op_text, 64 * 2 ** 10) # 9_3_6
+ check_message(op_text, 256 * 2 ** 10) # 9_3_7
+ check_message(op_text, 2 ** 20) # 9_3_8
+ check_message(op_text, 4 * 2 ** 20) # 9_3_9
+
+ check_message(op_binary, 64) # 9_4_1
+ check_message(op_binary, 256) # 9_4_2
+ check_message(op_binary, 2 ** 10) # 9_4_3
+ check_message(op_binary, 4 * 2 ** 10) # 9_4_4
+ check_message(op_binary, 16 * 2 ** 10) # 9_4_5
+ check_message(op_binary, 64 * 2 ** 10) # 9_4_6
+ check_message(op_binary, 256 * 2 ** 10) # 9_4_7
+ check_message(op_binary, 2 ** 20) # 9_4_8
+ check_message(op_binary, 4 * 2 ** 20) # 9_4_9
+
+ check_payload(op_text, 2 ** 20, chopsize=64) # 9_5_1
+ check_payload(op_text, 2 ** 20, chopsize=128) # 9_5_2
+ check_payload(op_text, 2 ** 20, chopsize=256) # 9_5_3
+ check_payload(op_text, 2 ** 20, chopsize=512) # 9_5_4
+ check_payload(op_text, 2 ** 20, chopsize=1024) # 9_5_5
+ check_payload(op_text, 2 ** 20, chopsize=2048) # 9_5_6
+
+ check_payload(op_binary, 2 ** 20, chopsize=64) # 9_6_1
+ check_payload(op_binary, 2 ** 20, chopsize=128) # 9_6_2
+ check_payload(op_binary, 2 ** 20, chopsize=256) # 9_6_3
+ check_payload(op_binary, 2 ** 20, chopsize=512) # 9_6_4
check_payload(op_binary, 2 ** 20, chopsize=1024) # 9_6_5
check_payload(op_binary, 2 ** 20, chopsize=2048) # 9_6_6