diff options
author | Andrei Zeliankou <zelenkov@nginx.com> | 2023-06-14 18:20:09 +0100 |
---|---|---|
committer | Andrei Zeliankou <zelenkov@nginx.com> | 2023-06-14 18:20:09 +0100 |
commit | c183bd8749a19477390f8cb77efe5f6d223f0905 (patch) | |
tree | 4e821e9cb07be9a86bf2d442acb3ea6740ba5a99 /test/test_asgi_websockets.py | |
parent | c6d05191a069ac150cc8eb2bece75cf79c0a465a (diff) | |
download | unit-c183bd8749a19477390f8cb77efe5f6d223f0905.tar.gz unit-c183bd8749a19477390f8cb77efe5f6d223f0905.tar.bz2 |
Tests: get rid of classes in test files.
Class usage came from the unittest framework and it was always redundant
after migration to the pytest. This commit removes classes from files
containing tests to make them more readable and understandable.
Diffstat (limited to '')
-rw-r--r-- | test/test_asgi_websockets.py | 2085 |
1 files changed, 1046 insertions, 1039 deletions
diff --git a/test/test_asgi_websockets.py b/test/test_asgi_websockets.py index fa71274b..eb7a20e7 100644 --- a/test/test_asgi_websockets.py +++ b/test/test_asgi_websockets.py @@ -3,1495 +3,1502 @@ import time import pytest from packaging import version -from unit.applications.lang.python import TestApplicationPython -from unit.applications.websockets import TestApplicationWebsocket +from unit.applications.lang.python import ApplicationPython +from unit.applications.websockets import ApplicationWebsocket prerequisites = { 'modules': {'python': lambda v: version.parse(v) >= version.parse('3.5')} } +client = ApplicationPython(load_module='asgi') +ws = ApplicationWebsocket() -class TestASGIWebsockets(TestApplicationPython): - load_module = 'asgi' - ws = TestApplicationWebsocket() +@pytest.fixture(autouse=True) +def setup_method_fixture(skip_alert): + assert 'success' in client.conf( + {'http': {'websocket': {'keepalive_interval': 0}}}, 'settings' + ), 'clear keepalive_interval' - @pytest.fixture(autouse=True) - def setup_method_fixture(self, skip_alert): - assert 'success' in self.conf( - {'http': {'websocket': {'keepalive_interval': 0}}}, 'settings' - ), 'clear keepalive_interval' + skip_alert(r'socket close\(\d+\) failed') - skip_alert(r'socket close\(\d+\) failed') - def close_connection(self, sock): - assert self.recvall(sock, read_timeout=0.1) == b'', 'empty soc' +def close_connection(sock): + assert client.recvall(sock, read_timeout=0.1) == b'', 'empty soc' - self.ws.frame_write(sock, self.ws.OP_CLOSE, self.ws.serialize_close()) + ws.frame_write(sock, ws.OP_CLOSE, ws.serialize_close()) - self.check_close(sock) + check_close(sock) - def check_close(self, sock, code=1000, no_close=False, frame=None): - if frame is None: - frame = self.ws.frame_read(sock) - assert frame['fin'], 'close fin' - assert frame['opcode'] == self.ws.OP_CLOSE, 'close opcode' - assert frame['code'] == code, 'close code' +def check_close(sock, code=1000, no_close=False, frame=None): + if frame is None: + frame = ws.frame_read(sock) - if not no_close: - sock.close() + assert frame['fin'], 'close fin' + assert frame['opcode'] == ws.OP_CLOSE, 'close opcode' + assert frame['code'] == code, 'close code' - def check_frame(self, frame, fin, opcode, payload, decode=True): - if opcode == self.ws.OP_BINARY or not decode: - data = frame['data'] - else: - data = frame['data'].decode('utf-8') + if not no_close: + sock.close() - assert frame['fin'] == fin, 'fin' - assert frame['opcode'] == opcode, 'opcode' - assert data == payload, 'payload' - def test_asgi_websockets_handshake(self): - self.load('websockets/mirror') +def check_frame(frame, fin, opcode, payload, decode=True): + if opcode == ws.OP_BINARY or not decode: + data = frame['data'] + else: + data = frame['data'].decode('utf-8') - resp, sock, key = self.ws.upgrade() - sock.close() + assert frame['fin'] == fin, 'fin' + assert frame['opcode'] == opcode, 'opcode' + assert data == payload, 'payload' - assert resp['status'] == 101, 'status' - assert resp['headers']['Upgrade'] == 'websocket', 'upgrade' - assert resp['headers']['Connection'] == 'Upgrade', 'connection' - assert resp['headers']['Sec-WebSocket-Accept'] == self.ws.accept( - key - ), 'key' - # remove "mirror" application - self.load('websockets/subprotocol') +def test_asgi_websockets_handshake(): + client.load('websockets/mirror') - def test_asgi_websockets_subprotocol(self): - self.load('websockets/subprotocol') + resp, sock, key = ws.upgrade() + sock.close() - resp, sock, _ = self.ws.upgrade() - sock.close() + assert resp['status'] == 101, 'status' + assert resp['headers']['Upgrade'] == 'websocket', 'upgrade' + assert resp['headers']['Connection'] == 'Upgrade', 'connection' + assert resp['headers']['Sec-WebSocket-Accept'] == ws.accept(key), 'key' - assert resp['status'] == 101, 'status' - assert ( - resp['headers']['x-subprotocols'] == "('chat', 'phone', 'video')" - ), 'subprotocols' - assert resp['headers']['sec-websocket-protocol'] == 'chat', 'key' + # remove "mirror" application + client.load('websockets/subprotocol') - def test_asgi_websockets_mirror(self): - self.load('websockets/mirror') - message = 'blah' +def test_asgi_websockets_subprotocol(): + client.load('websockets/subprotocol') - _, sock, _ = self.ws.upgrade() + resp, sock, _ = ws.upgrade() + sock.close() - self.ws.frame_write(sock, self.ws.OP_TEXT, message) - frame = self.ws.frame_read(sock) + assert resp['status'] == 101, 'status' + assert ( + resp['headers']['x-subprotocols'] == "('chat', 'phone', 'video')" + ), 'subprotocols' + assert resp['headers']['sec-websocket-protocol'] == 'chat', 'key' - assert message == frame['data'].decode('utf-8'), 'mirror' - self.ws.frame_write(sock, self.ws.OP_TEXT, message) - frame = self.ws.frame_read(sock) +def test_asgi_websockets_mirror(): + client.load('websockets/mirror') - assert message == frame['data'].decode('utf-8'), 'mirror 2' + message = 'blah' - sock.close() + _, sock, _ = ws.upgrade() - def test_asgi_websockets_mirror_app_change(self): - self.load('websockets/mirror') + ws.frame_write(sock, ws.OP_TEXT, message) + frame = ws.frame_read(sock) - message = 'blah' + assert message == frame['data'].decode('utf-8'), 'mirror' - _, sock, _ = self.ws.upgrade() + ws.frame_write(sock, ws.OP_TEXT, message) + frame = ws.frame_read(sock) - self.ws.frame_write(sock, self.ws.OP_TEXT, message) - frame = self.ws.frame_read(sock) + assert message == frame['data'].decode('utf-8'), 'mirror 2' - assert message == frame['data'].decode('utf-8'), 'mirror' + sock.close() - self.load('websockets/subprotocol') - self.ws.frame_write(sock, self.ws.OP_TEXT, message) - frame = self.ws.frame_read(sock) +def test_asgi_websockets_mirror_app_change(): + client.load('websockets/mirror') - assert message == frame['data'].decode('utf-8'), 'mirror 2' + message = 'blah' - sock.close() + _, sock, _ = ws.upgrade() - def test_asgi_websockets_no_mask(self): - self.load('websockets/mirror') + ws.frame_write(sock, ws.OP_TEXT, message) + frame = ws.frame_read(sock) - message = 'blah' + assert message == frame['data'].decode('utf-8'), 'mirror' - _, sock, _ = self.ws.upgrade() + client.load('websockets/subprotocol') - self.ws.frame_write(sock, self.ws.OP_TEXT, message, mask=False) + ws.frame_write(sock, ws.OP_TEXT, message) + frame = ws.frame_read(sock) - frame = self.ws.frame_read(sock) + assert message == frame['data'].decode('utf-8'), 'mirror 2' - assert frame['opcode'] == self.ws.OP_CLOSE, 'no mask opcode' - assert frame['code'] == 1002, 'no mask close code' + sock.close() - sock.close() - def test_asgi_websockets_fragmentation(self): - self.load('websockets/mirror') +def test_asgi_websockets_no_mask(): + client.load('websockets/mirror') - message = 'blah' + message = 'blah' - _, sock, _ = self.ws.upgrade() + _, sock, _ = ws.upgrade() - self.ws.frame_write(sock, self.ws.OP_TEXT, message, fin=False) - self.ws.frame_write(sock, self.ws.OP_CONT, ' ', fin=False) - self.ws.frame_write(sock, self.ws.OP_CONT, message) + ws.frame_write(sock, ws.OP_TEXT, message, mask=False) - frame = self.ws.frame_read(sock) + frame = ws.frame_read(sock) - assert f'{message} {message}' == frame['data'].decode( - 'utf-8' - ), 'mirror framing' + assert frame['opcode'] == ws.OP_CLOSE, 'no mask opcode' + assert frame['code'] == 1002, 'no mask close code' - sock.close() + sock.close() - def test_asgi_websockets_length_long(self): - self.load('websockets/mirror') - _, sock, _ = self.ws.upgrade() +def test_asgi_websockets_fragmentation(): + client.load('websockets/mirror') - self.ws.frame_write(sock, self.ws.OP_TEXT, 'fragment1', fin=False) - self.ws.frame_write( - sock, self.ws.OP_CONT, 'fragment2', length=2**64 - 1 - ) + message = 'blah' - self.check_close(sock, 1009) # 1009 - CLOSE_TOO_LARGE + _, sock, _ = ws.upgrade() - def test_asgi_websockets_frame_fragmentation_invalid(self): - self.load('websockets/mirror') + ws.frame_write(sock, ws.OP_TEXT, message, fin=False) + ws.frame_write(sock, ws.OP_CONT, ' ', fin=False) + ws.frame_write(sock, ws.OP_CONT, message) - message = 'blah' + frame = ws.frame_read(sock) - _, sock, _ = self.ws.upgrade() + assert f'{message} {message}' == frame['data'].decode( + 'utf-8' + ), 'mirror framing' - self.ws.frame_write(sock, self.ws.OP_PING, message, fin=False) + sock.close() - frame = self.ws.frame_read(sock) - frame.pop('data') - assert frame == { - 'fin': True, - 'rsv1': False, - 'rsv2': False, - 'rsv3': False, - 'opcode': self.ws.OP_CLOSE, - 'mask': 0, - 'code': 1002, - 'reason': 'Fragmented control frame', - }, 'close frame' +def test_asgi_websockets_length_long(): + client.load('websockets/mirror') - sock.close() + _, sock, _ = ws.upgrade() - def test_asgi_websockets_large(self): - self.load('websockets/mirror') + ws.frame_write(sock, ws.OP_TEXT, 'fragment1', fin=False) + ws.frame_write(sock, ws.OP_CONT, 'fragment2', length=2**64 - 1) - message = '0123456789' * 300 + check_close(sock, 1009) # 1009 - CLOSE_TOO_LARGE - _, sock, _ = self.ws.upgrade() - self.ws.frame_write(sock, self.ws.OP_TEXT, message) +def test_asgi_websockets_frame_fragmentation_invalid(): + client.load('websockets/mirror') - frame = self.ws.frame_read(sock) - data = frame['data'].decode('utf-8') + message = 'blah' - frame = self.ws.frame_read(sock) - data += frame['data'].decode('utf-8') + _, sock, _ = ws.upgrade() - assert message == data, 'large' + ws.frame_write(sock, ws.OP_PING, message, fin=False) - sock.close() + frame = ws.frame_read(sock) - def test_asgi_websockets_two_clients(self): - self.load('websockets/mirror') + frame.pop('data') + assert frame == { + 'fin': True, + 'rsv1': False, + 'rsv2': False, + 'rsv3': False, + 'opcode': ws.OP_CLOSE, + 'mask': 0, + 'code': 1002, + 'reason': 'Fragmented control frame', + }, 'close frame' - message1 = 'blah1' - message2 = 'blah2' + sock.close() - _, sock1, _ = self.ws.upgrade() - _, sock2, _ = self.ws.upgrade() - self.ws.frame_write(sock1, self.ws.OP_TEXT, message1) - self.ws.frame_write(sock2, self.ws.OP_TEXT, message2) +def test_asgi_websockets_large(): + client.load('websockets/mirror') - frame1 = self.ws.frame_read(sock1) - frame2 = self.ws.frame_read(sock2) + message = '0123456789' * 300 - assert message1 == frame1['data'].decode('utf-8'), 'client 1' - assert message2 == frame2['data'].decode('utf-8'), 'client 2' + _, sock, _ = ws.upgrade() - sock1.close() - sock2.close() + ws.frame_write(sock, ws.OP_TEXT, message) - @pytest.mark.skip('not yet') - def test_asgi_websockets_handshake_upgrade_absent( - self, - ): # FAIL https://tools.ietf.org/html/rfc6455#section-4.2.1 - self.load('websockets/mirror') + frame = ws.frame_read(sock) + data = frame['data'].decode('utf-8') - resp = self.get( - headers={ - 'Host': 'localhost', - 'Connection': 'Upgrade', - 'Sec-WebSocket-Key': self.ws.key(), - 'Sec-WebSocket-Protocol': 'chat', - 'Sec-WebSocket-Version': 13, - }, - ) + frame = ws.frame_read(sock) + data += frame['data'].decode('utf-8') - assert resp['status'] == 400, 'upgrade absent' + assert message == data, 'large' - def test_asgi_websockets_handshake_case_insensitive(self): - self.load('websockets/mirror') + sock.close() - resp, sock, _ = self.ws.upgrade( - headers={ - 'Host': 'localhost', - 'Upgrade': 'WEBSOCKET', - 'Connection': 'UPGRADE', - 'Sec-WebSocket-Key': self.ws.key(), - 'Sec-WebSocket-Protocol': 'chat', - 'Sec-WebSocket-Version': 13, - } - ) - sock.close() - assert resp['status'] == 101, 'status' - - @pytest.mark.skip('not yet') - def test_asgi_websockets_handshake_connection_absent(self): # FAIL - self.load('websockets/mirror') - - resp = self.get( - headers={ - 'Host': 'localhost', - 'Upgrade': 'websocket', - 'Sec-WebSocket-Key': self.ws.key(), - 'Sec-WebSocket-Protocol': 'chat', - 'Sec-WebSocket-Version': 13, - }, - ) - - assert resp['status'] == 400, 'status' - - def test_asgi_websockets_handshake_version_absent(self): - self.load('websockets/mirror') - - resp = self.get( - headers={ - 'Host': 'localhost', - 'Upgrade': 'websocket', - 'Connection': 'Upgrade', - 'Sec-WebSocket-Key': self.ws.key(), - 'Sec-WebSocket-Protocol': 'chat', - }, - ) - - assert resp['status'] == 426, 'status' - - @pytest.mark.skip('not yet') - def test_asgi_websockets_handshake_key_invalid(self): - self.load('websockets/mirror') - - resp = self.get( - headers={ - 'Host': 'localhost', - 'Upgrade': 'websocket', - 'Connection': 'Upgrade', - 'Sec-WebSocket-Key': '!', - 'Sec-WebSocket-Protocol': 'chat', - 'Sec-WebSocket-Version': 13, - }, - ) - - assert resp['status'] == 400, 'key length' - - key = self.ws.key() - resp = self.get( - headers={ - 'Host': 'localhost', - 'Upgrade': 'websocket', - 'Connection': 'Upgrade', - 'Sec-WebSocket-Key': [key, key], - 'Sec-WebSocket-Protocol': 'chat', - 'Sec-WebSocket-Version': 13, - }, - ) - - assert ( - resp['status'] == 400 - ), 'key double' # FAIL https://tools.ietf.org/html/rfc6455#section-11.3.1 - - def test_asgi_websockets_handshake_method_invalid(self): - self.load('websockets/mirror') - - resp = self.post( - headers={ - 'Host': 'localhost', - 'Upgrade': 'websocket', - 'Connection': 'Upgrade', - 'Sec-WebSocket-Key': self.ws.key(), - 'Sec-WebSocket-Protocol': 'chat', - 'Sec-WebSocket-Version': 13, - }, - ) - - assert resp['status'] == 400, 'status' - - def test_asgi_websockets_handshake_http_10(self): - self.load('websockets/mirror') - - resp = self.get( - headers={ - 'Host': 'localhost', - 'Upgrade': 'websocket', - 'Connection': 'Upgrade', - 'Sec-WebSocket-Key': self.ws.key(), - 'Sec-WebSocket-Protocol': 'chat', - 'Sec-WebSocket-Version': 13, - }, - http_10=True, - ) - - assert resp['status'] == 400, 'status' - - def test_asgi_websockets_handshake_uri_invalid(self): - self.load('websockets/mirror') - - resp = self.get( - headers={ - 'Host': 'localhost', - 'Upgrade': 'websocket', - 'Connection': 'Upgrade', - 'Sec-WebSocket-Key': self.ws.key(), - 'Sec-WebSocket-Protocol': 'chat', - 'Sec-WebSocket-Version': 13, - }, - url='!', - ) - - assert resp['status'] == 400, 'status' - - def test_asgi_websockets_protocol_absent(self): - self.load('websockets/mirror') - - key = self.ws.key() - resp, sock, _ = self.ws.upgrade( - headers={ - 'Host': 'localhost', - 'Upgrade': 'websocket', - 'Connection': 'Upgrade', - 'Sec-WebSocket-Key': key, - 'Sec-WebSocket-Version': 13, - } - ) - sock.close() +def test_asgi_websockets_two_clients(): + client.load('websockets/mirror') - assert resp['status'] == 101, 'status' - assert resp['headers']['Upgrade'] == 'websocket', 'upgrade' - assert resp['headers']['Connection'] == 'Upgrade', 'connection' - assert resp['headers']['Sec-WebSocket-Accept'] == self.ws.accept( - key - ), 'key' + message1 = 'blah1' + message2 = 'blah2' - # autobahn-testsuite - # - # Some following tests fail because of Unit does not support UTF-8 - # validation for websocket frames. It should be implemented - # by application, if necessary. + _, sock1, _ = ws.upgrade() + _, sock2, _ = ws.upgrade() - def test_asgi_websockets_1_1_1__1_1_8(self): - self.load('websockets/mirror') + ws.frame_write(sock1, ws.OP_TEXT, message1) + ws.frame_write(sock2, ws.OP_TEXT, message2) - opcode = self.ws.OP_TEXT + frame1 = ws.frame_read(sock1) + frame2 = ws.frame_read(sock2) - _, sock, _ = self.ws.upgrade() + assert message1 == frame1['data'].decode('utf-8'), 'client 1' + assert message2 == frame2['data'].decode('utf-8'), 'client 2' - def check_length(length, chopsize=None): - payload = '*' * length + sock1.close() + sock2.close() - self.ws.frame_write(sock, opcode, payload, chopsize=chopsize) - frame = self.ws.frame_read(sock) - self.check_frame(frame, True, opcode, payload) +# FAIL https://tools.ietf.org/html/rfc6455#section-4.2.1 +@pytest.mark.skip('not yet') +def test_asgi_websockets_handshake_upgrade_absent(): + client.load('websockets/mirror') - check_length(0) # 1_1_1 - check_length(125) # 1_1_2 - check_length(126) # 1_1_3 - check_length(127) # 1_1_4 - check_length(128) # 1_1_5 - check_length(65535) # 1_1_6 - check_length(65536) # 1_1_7 - check_length(65536, chopsize=997) # 1_1_8 + resp = client.get( + headers={ + 'Host': 'localhost', + 'Connection': 'Upgrade', + 'Sec-WebSocket-Key': ws.key(), + 'Sec-WebSocket-Protocol': 'chat', + 'Sec-WebSocket-Version': 13, + }, + ) - self.close_connection(sock) + assert resp['status'] == 400, 'upgrade absent' - def test_asgi_websockets_1_2_1__1_2_8(self): - self.load('websockets/mirror') - opcode = self.ws.OP_BINARY +def test_asgi_websockets_handshake_case_insensitive(): + client.load('websockets/mirror') - _, sock, _ = self.ws.upgrade() + resp, sock, _ = ws.upgrade( + headers={ + 'Host': 'localhost', + 'Upgrade': 'WEBSOCKET', + 'Connection': 'UPGRADE', + 'Sec-WebSocket-Key': ws.key(), + 'Sec-WebSocket-Protocol': 'chat', + 'Sec-WebSocket-Version': 13, + } + ) + sock.close() - def check_length(length, chopsize=None): - payload = b'\xfe' * length + assert resp['status'] == 101, 'status' - self.ws.frame_write(sock, opcode, payload, chopsize=chopsize) - frame = self.ws.frame_read(sock) - self.check_frame(frame, True, opcode, payload) +@pytest.mark.skip('not yet') +def test_asgi_websockets_handshake_connection_absent(): # FAIL + client.load('websockets/mirror') - check_length(0) # 1_2_1 - check_length(125) # 1_2_2 - check_length(126) # 1_2_3 - check_length(127) # 1_2_4 - check_length(128) # 1_2_5 - check_length(65535) # 1_2_6 - check_length(65536) # 1_2_7 - check_length(65536, chopsize=997) # 1_2_8 + resp = client.get( + headers={ + 'Host': 'localhost', + 'Upgrade': 'websocket', + 'Sec-WebSocket-Key': ws.key(), + 'Sec-WebSocket-Protocol': 'chat', + 'Sec-WebSocket-Version': 13, + }, + ) - self.close_connection(sock) + assert resp['status'] == 400, 'status' - def test_asgi_websockets_2_1__2_6(self): - self.load('websockets/mirror') - op_ping = self.ws.OP_PING - op_pong = self.ws.OP_PONG +def test_asgi_websockets_handshake_version_absent(): + client.load('websockets/mirror') - _, sock, _ = self.ws.upgrade() + resp = client.get( + headers={ + 'Host': 'localhost', + 'Upgrade': 'websocket', + 'Connection': 'Upgrade', + 'Sec-WebSocket-Key': ws.key(), + 'Sec-WebSocket-Protocol': 'chat', + }, + ) + + assert resp['status'] == 426, 'status' + + +@pytest.mark.skip('not yet') +def test_asgi_websockets_handshake_key_invalid(): + client.load('websockets/mirror') + + resp = client.get( + headers={ + 'Host': 'localhost', + 'Upgrade': 'websocket', + 'Connection': 'Upgrade', + 'Sec-WebSocket-Key': '!', + 'Sec-WebSocket-Protocol': 'chat', + 'Sec-WebSocket-Version': 13, + }, + ) + + assert resp['status'] == 400, 'key length' + + key = ws.key() + resp = client.get( + headers={ + 'Host': 'localhost', + 'Upgrade': 'websocket', + 'Connection': 'Upgrade', + 'Sec-WebSocket-Key': [key, key], + 'Sec-WebSocket-Protocol': 'chat', + 'Sec-WebSocket-Version': 13, + }, + ) + + assert ( + resp['status'] == 400 + ), 'key double' # FAIL https://tools.ietf.org/html/rfc6455#section-11.3.1 + + +def test_asgi_websockets_handshake_method_invalid(): + client.load('websockets/mirror') + + resp = client.post( + headers={ + 'Host': 'localhost', + 'Upgrade': 'websocket', + 'Connection': 'Upgrade', + 'Sec-WebSocket-Key': ws.key(), + 'Sec-WebSocket-Protocol': 'chat', + 'Sec-WebSocket-Version': 13, + }, + ) + + assert resp['status'] == 400, 'status' + + +def test_asgi_websockets_handshake_http_10(): + client.load('websockets/mirror') + + resp = client.get( + headers={ + 'Host': 'localhost', + 'Upgrade': 'websocket', + 'Connection': 'Upgrade', + 'Sec-WebSocket-Key': ws.key(), + 'Sec-WebSocket-Protocol': 'chat', + 'Sec-WebSocket-Version': 13, + }, + http_10=True, + ) + + assert resp['status'] == 400, 'status' - def check_ping(payload, chopsize=None, decode=True): - self.ws.frame_write(sock, op_ping, payload, chopsize=chopsize) - frame = self.ws.frame_read(sock) - self.check_frame(frame, True, op_pong, payload, decode=decode) +def test_asgi_websockets_handshake_uri_invalid(): + client.load('websockets/mirror') - check_ping('') # 2_1 - check_ping('Hello, world!') # 2_2 - check_ping(b'\x00\xff\xfe\xfd\xfc\xfb\x00\xff', decode=False) # 2_3 - check_ping(b'\xfe' * 125, decode=False) # 2_4 - check_ping(b'\xfe' * 125, chopsize=1, decode=False) # 2_6 + resp = client.get( + headers={ + 'Host': 'localhost', + 'Upgrade': 'websocket', + 'Connection': 'Upgrade', + 'Sec-WebSocket-Key': ws.key(), + 'Sec-WebSocket-Protocol': 'chat', + 'Sec-WebSocket-Version': 13, + }, + url='!', + ) - self.close_connection(sock) + assert resp['status'] == 400, 'status' - # 2_5 - _, sock, _ = self.ws.upgrade() +def test_asgi_websockets_protocol_absent(): + client.load('websockets/mirror') - self.ws.frame_write(sock, self.ws.OP_PING, b'\xfe' * 126) - self.check_close(sock, 1002) + key = ws.key() + resp, sock, _ = ws.upgrade( + headers={ + 'Host': 'localhost', + 'Upgrade': 'websocket', + 'Connection': 'Upgrade', + 'Sec-WebSocket-Key': key, + 'Sec-WebSocket-Version': 13, + } + ) + sock.close() - def test_asgi_websockets_2_7__2_9(self): - self.load('websockets/mirror') + assert resp['status'] == 101, 'status' + assert resp['headers']['Upgrade'] == 'websocket', 'upgrade' + assert resp['headers']['Connection'] == 'Upgrade', 'connection' + assert resp['headers']['Sec-WebSocket-Accept'] == ws.accept(key), 'key' - # 2_7 - _, sock, _ = self.ws.upgrade() +# autobahn-testsuite +# +# Some following tests fail because of Unit does not support UTF-8 +# validation for websocket frames. It should be implemented +# by application, if necessary. - self.ws.frame_write(sock, self.ws.OP_PONG, '') - assert self.recvall(sock, read_timeout=0.1) == b'', '2_7' - # 2_8 +def test_asgi_websockets_1_1_1__1_1_8(): + client.load('websockets/mirror') - self.ws.frame_write(sock, self.ws.OP_PONG, 'unsolicited pong payload') - assert self.recvall(sock, read_timeout=0.1) == b'', '2_8' + opcode = ws.OP_TEXT - # 2_9 + _, sock, _ = ws.upgrade() - payload = 'ping payload' + def check_length(length, chopsize=None): + payload = '*' * length - self.ws.frame_write(sock, self.ws.OP_PONG, 'unsolicited pong payload') - self.ws.frame_write(sock, self.ws.OP_PING, payload) + ws.frame_write(sock, opcode, payload, chopsize=chopsize) - frame = self.ws.frame_read(sock) - self.check_frame(frame, True, self.ws.OP_PONG, payload) + frame = ws.frame_read(sock) + check_frame(frame, True, opcode, payload) - self.close_connection(sock) + check_length(0) # 1_1_1 + check_length(125) # 1_1_2 + check_length(126) # 1_1_3 + check_length(127) # 1_1_4 + check_length(128) # 1_1_5 + check_length(65535) # 1_1_6 + check_length(65536) # 1_1_7 + check_length(65536, chopsize=997) # 1_1_8 - def test_asgi_websockets_2_10__2_11(self): - self.load('websockets/mirror') + close_connection(sock) - # 2_10 - _, sock, _ = self.ws.upgrade() +def test_asgi_websockets_1_2_1__1_2_8(): + client.load('websockets/mirror') - for i in range(0, 10): - self.ws.frame_write(sock, self.ws.OP_PING, f'payload-{i}') + opcode = ws.OP_BINARY - for i in range(0, 10): - frame = self.ws.frame_read(sock) - self.check_frame(frame, True, self.ws.OP_PONG, f'payload-{i}') + _, sock, _ = ws.upgrade() - # 2_11 + def check_length(length, chopsize=None): + payload = b'\xfe' * length - for i in range(0, 10): - opcode = self.ws.OP_PING - self.ws.frame_write(sock, opcode, f'payload-{i}', chopsize=1) + ws.frame_write(sock, opcode, payload, chopsize=chopsize) + frame = ws.frame_read(sock) - for i in range(0, 10): - frame = self.ws.frame_read(sock) - self.check_frame(frame, True, self.ws.OP_PONG, f'payload-{i}') + check_frame(frame, True, opcode, payload) - self.close_connection(sock) + check_length(0) # 1_2_1 + check_length(125) # 1_2_2 + check_length(126) # 1_2_3 + check_length(127) # 1_2_4 + check_length(128) # 1_2_5 + check_length(65535) # 1_2_6 + check_length(65536) # 1_2_7 + check_length(65536, chopsize=997) # 1_2_8 - @pytest.mark.skip('not yet') - def test_asgi_websockets_3_1__3_7(self): - self.load('websockets/mirror') + close_connection(sock) - payload = 'Hello, world!' - # 3_1 +def test_asgi_websockets_2_1__2_6(): + client.load('websockets/mirror') - _, sock, _ = self.ws.upgrade() + op_ping = ws.OP_PING + op_pong = ws.OP_PONG - self.ws.frame_write(sock, self.ws.OP_TEXT, payload, rsv1=True) - self.check_close(sock, 1002) + _, sock, _ = ws.upgrade() - # 3_2 + def check_ping(payload, chopsize=None, decode=True): + ws.frame_write(sock, op_ping, payload, chopsize=chopsize) + frame = ws.frame_read(sock) - _, sock, _ = self.ws.upgrade() + check_frame(frame, True, op_pong, payload, decode=decode) - self.ws.frame_write(sock, self.ws.OP_TEXT, payload) - self.ws.frame_write(sock, self.ws.OP_TEXT, payload, rsv2=True) - self.ws.frame_write(sock, self.ws.OP_PING, '') + check_ping('') # 2_1 + check_ping('Hello, world!') # 2_2 + check_ping(b'\x00\xff\xfe\xfd\xfc\xfb\x00\xff', decode=False) # 2_3 + check_ping(b'\xfe' * 125, decode=False) # 2_4 + check_ping(b'\xfe' * 125, chopsize=1, decode=False) # 2_6 - frame = self.ws.frame_read(sock) - self.check_frame(frame, True, self.ws.OP_TEXT, payload) + close_connection(sock) - self.check_close(sock, 1002, no_close=True) + # 2_5 - assert self.recvall(sock, read_timeout=0.1) == b'', 'empty 3_2' - sock.close() + _, sock, _ = ws.upgrade() - # 3_3 + ws.frame_write(sock, ws.OP_PING, b'\xfe' * 126) + check_close(sock, 1002) - _, sock, _ = self.ws.upgrade() - self.ws.frame_write(sock, self.ws.OP_TEXT, payload) +def test_asgi_websockets_2_7__2_9(): + client.load('websockets/mirror') - frame = self.ws.frame_read(sock) - self.check_frame(frame, True, self.ws.OP_TEXT, payload) + # 2_7 - self.ws.frame_write( - sock, self.ws.OP_TEXT, payload, rsv1=True, rsv2=True - ) + _, sock, _ = ws.upgrade() - self.check_close(sock, 1002, no_close=True) + ws.frame_write(sock, ws.OP_PONG, '') + assert client.recvall(sock, read_timeout=0.1) == b'', '2_7' - assert self.recvall(sock, read_timeout=0.1) == b'', 'empty 3_3' - sock.close() + # 2_8 - # 3_4 + ws.frame_write(sock, ws.OP_PONG, 'unsolicited pong payload') + assert client.recvall(sock, read_timeout=0.1) == b'', '2_8' - _, sock, _ = self.ws.upgrade() + # 2_9 - self.ws.frame_write(sock, self.ws.OP_TEXT, payload, chopsize=1) - self.ws.frame_write( - sock, self.ws.OP_TEXT, payload, rsv3=True, chopsize=1 - ) - self.ws.frame_write(sock, self.ws.OP_PING, '') + payload = 'ping payload' - frame = self.ws.frame_read(sock) - self.check_frame(frame, True, self.ws.OP_TEXT, payload) + ws.frame_write(sock, ws.OP_PONG, 'unsolicited pong payload') + ws.frame_write(sock, ws.OP_PING, payload) - self.check_close(sock, 1002, no_close=True) + frame = ws.frame_read(sock) + check_frame(frame, True, ws.OP_PONG, payload) - assert self.recvall(sock, read_timeout=0.1) == b'', 'empty 3_4' - sock.close() + close_connection(sock) - # 3_5 - _, sock, _ = self.ws.upgrade() +def test_asgi_websockets_2_10__2_11(): + client.load('websockets/mirror') - self.ws.frame_write( - sock, - self.ws.OP_BINARY, - b'\x00\xff\xfe\xfd\xfc\xfb\x00\xff', - rsv1=True, - rsv3=True, - ) + # 2_10 - self.check_close(sock, 1002) + _, sock, _ = ws.upgrade() - # 3_6 + for i in range(0, 10): + ws.frame_write(sock, ws.OP_PING, f'payload-{i}') - _, sock, _ = self.ws.upgrade() + for i in range(0, 10): + frame = ws.frame_read(sock) + check_frame(frame, True, ws.OP_PONG, f'payload-{i}') - self.ws.frame_write( - sock, self.ws.OP_PING, payload, rsv2=True, rsv3=True - ) + # 2_11 - self.check_close(sock, 1002) + for i in range(0, 10): + opcode = ws.OP_PING + ws.frame_write(sock, opcode, f'payload-{i}', chopsize=1) - # 3_7 + for i in range(0, 10): + frame = ws.frame_read(sock) + check_frame(frame, True, ws.OP_PONG, f'payload-{i}') - _, sock, _ = self.ws.upgrade() + close_connection(sock) - self.ws.frame_write( - sock, self.ws.OP_CLOSE, payload, rsv1=True, rsv2=True, rsv3=True - ) - self.check_close(sock, 1002) +@pytest.mark.skip('not yet') +def test_asgi_websockets_3_1__3_7(): + client.load('websockets/mirror') - def test_asgi_websockets_4_1_1__4_2_5(self): - self.load('websockets/mirror') + payload = 'Hello, world!' - payload = 'Hello, world!' + # 3_1 - # 4_1_1 + _, sock, _ = ws.upgrade() - _, sock, _ = self.ws.upgrade() + ws.frame_write(sock, ws.OP_TEXT, payload, rsv1=True) + check_close(sock, 1002) - self.ws.frame_write(sock, 0x03, '') - self.check_close(sock, 1002) + # 3_2 - # 4_1_2 + _, sock, _ = ws.upgrade() - _, sock, _ = self.ws.upgrade() + ws.frame_write(sock, ws.OP_TEXT, payload) + ws.frame_write(sock, ws.OP_TEXT, payload, rsv2=True) + ws.frame_write(sock, ws.OP_PING, '') - self.ws.frame_write(sock, 0x04, 'reserved opcode payload') - self.check_close(sock, 1002) + frame = ws.frame_read(sock) + check_frame(frame, True, ws.OP_TEXT, payload) - # 4_1_3 + check_close(sock, 1002, no_close=True) - _, sock, _ = self.ws.upgrade() + assert client.recvall(sock, read_timeout=0.1) == b'', 'empty 3_2' + sock.close() - self.ws.frame_write(sock, self.ws.OP_TEXT, payload) + # 3_3 - frame = self.ws.frame_read(sock) - self.check_frame(frame, True, self.ws.OP_TEXT, payload) + _, sock, _ = ws.upgrade() - self.ws.frame_write(sock, 0x05, '') - self.ws.frame_write(sock, self.ws.OP_PING, '') + ws.frame_write(sock, ws.OP_TEXT, payload) - self.check_close(sock, 1002) + frame = ws.frame_read(sock) + check_frame(frame, True, ws.OP_TEXT, payload) - # 4_1_4 + ws.frame_write(sock, ws.OP_TEXT, payload, rsv1=True, rsv2=True) - _, sock, _ = self.ws.upgrade() + check_close(sock, 1002, no_close=True) - self.ws.frame_write(sock, self.ws.OP_TEXT, payload) + assert client.recvall(sock, read_timeout=0.1) == b'', 'empty 3_3' + sock.close() - frame = self.ws.frame_read(sock) - self.check_frame(frame, True, self.ws.OP_TEXT, payload) + # 3_4 - self.ws.frame_write(sock, 0x06, payload) - self.ws.frame_write(sock, self.ws.OP_PING, '') + _, sock, _ = ws.upgrade() - self.check_close(sock, 1002) + ws.frame_write(sock, ws.OP_TEXT, payload, chopsize=1) + ws.frame_write(sock, ws.OP_TEXT, payload, rsv3=True, chopsize=1) + ws.frame_write(sock, ws.OP_PING, '') - # 4_1_5 + frame = ws.frame_read(sock) + check_frame(frame, True, ws.OP_TEXT, payload) - _, sock, _ = self.ws.upgrade() + check_close(sock, 1002, no_close=True) - self.ws.frame_write(sock, self.ws.OP_TEXT, payload, chopsize=1) + assert client.recvall(sock, read_timeout=0.1) == b'', 'empty 3_4' + sock.close() - frame = self.ws.frame_read(sock) - self.check_frame(frame, True, self.ws.OP_TEXT, payload) + # 3_5 - self.ws.frame_write(sock, 0x07, payload, chopsize=1) - self.ws.frame_write(sock, self.ws.OP_PING, '') + _, sock, _ = ws.upgrade() - self.check_close(sock, 1002) + ws.frame_write( + sock, + ws.OP_BINARY, + b'\x00\xff\xfe\xfd\xfc\xfb\x00\xff', + rsv1=True, + rsv3=True, + ) - # 4_2_1 + check_close(sock, 1002) - _, sock, _ = self.ws.upgrade() + # 3_6 - self.ws.frame_write(sock, 0x0B, '') - self.check_close(sock, 1002) + _, sock, _ = ws.upgrade() - # 4_2_2 + ws.frame_write(sock, ws.OP_PING, payload, rsv2=True, rsv3=True) - _, sock, _ = self.ws.upgrade() + check_close(sock, 1002) - self.ws.frame_write(sock, 0x0C, 'reserved opcode payload') - self.check_close(sock, 1002) + # 3_7 - # 4_2_3 + _, sock, _ = ws.upgrade() - _, sock, _ = self.ws.upgrade() + ws.frame_write(sock, ws.OP_CLOSE, payload, rsv1=True, rsv2=True, rsv3=True) - self.ws.frame_write(sock, self.ws.OP_TEXT, payload) + check_close(sock, 1002) - frame = self.ws.frame_read(sock) - self.check_frame(frame, True, self.ws.OP_TEXT, payload) - self.ws.frame_write(sock, 0x0D, '') - self.ws.frame_write(sock, self.ws.OP_PING, '') +def test_asgi_websockets_4_1_1__4_2_5(): + client.load('websockets/mirror') - self.check_close(sock, 1002) + payload = 'Hello, world!' - # 4_2_4 + # 4_1_1 - _, sock, _ = self.ws.upgrade() + _, sock, _ = ws.upgrade() - self.ws.frame_write(sock, self.ws.OP_TEXT, payload) + ws.frame_write(sock, 0x03, '') + check_close(sock, 1002) - frame = self.ws.frame_read(sock) - self.check_frame(frame, True, self.ws.OP_TEXT, payload) + # 4_1_2 - self.ws.frame_write(sock, 0x0E, payload) - self.ws.frame_write(sock, self.ws.OP_PING, '') + _, sock, _ = ws.upgrade() - self.check_close(sock, 1002) + ws.frame_write(sock, 0x04, 'reserved opcode payload') + check_close(sock, 1002) - # 4_2_5 + # 4_1_3 - _, sock, _ = self.ws.upgrade() + _, sock, _ = ws.upgrade() - self.ws.frame_write(sock, self.ws.OP_TEXT, payload, chopsize=1) + ws.frame_write(sock, ws.OP_TEXT, payload) - frame = self.ws.frame_read(sock) - self.check_frame(frame, True, self.ws.OP_TEXT, payload) + frame = ws.frame_read(sock) + check_frame(frame, True, ws.OP_TEXT, payload) - self.ws.frame_write(sock, 0x0F, payload, chopsize=1) - self.ws.frame_write(sock, self.ws.OP_PING, '') + ws.frame_write(sock, 0x05, '') + ws.frame_write(sock, ws.OP_PING, '') - self.check_close(sock, 1002) + check_close(sock, 1002) - def test_asgi_websockets_5_1__5_20(self): - self.load('websockets/mirror') + # 4_1_4 - # 5_1 + _, sock, _ = ws.upgrade() - _, sock, _ = self.ws.upgrade() + ws.frame_write(sock, ws.OP_TEXT, payload) - self.ws.frame_write(sock, self.ws.OP_PING, 'fragment1', fin=False) - self.ws.frame_write(sock, self.ws.OP_CONT, 'fragment2', fin=True) - self.check_close(sock, 1002) + frame = ws.frame_read(sock) + check_frame(frame, True, ws.OP_TEXT, payload) - # 5_2 + ws.frame_write(sock, 0x06, payload) + ws.frame_write(sock, ws.OP_PING, '') - _, sock, _ = self.ws.upgrade() + check_close(sock, 1002) - self.ws.frame_write(sock, self.ws.OP_PONG, 'fragment1', fin=False) - self.ws.frame_write(sock, self.ws.OP_CONT, 'fragment2', fin=True) - self.check_close(sock, 1002) + # 4_1_5 - # 5_3 + _, sock, _ = ws.upgrade() - _, sock, _ = self.ws.upgrade() + ws.frame_write(sock, ws.OP_TEXT, payload, chopsize=1) - self.ws.frame_write(sock, self.ws.OP_TEXT, 'fragment1', fin=False) - self.ws.frame_write(sock, self.ws.OP_CONT, 'fragment2', fin=True) + frame = ws.frame_read(sock) + check_frame(frame, True, ws.OP_TEXT, payload) - frame = self.ws.frame_read(sock) - self.check_frame(frame, True, self.ws.OP_TEXT, 'fragment1fragment2') + ws.frame_write(sock, 0x07, payload, chopsize=1) + ws.frame_write(sock, ws.OP_PING, '') - # 5_4 + check_close(sock, 1002) - self.ws.frame_write(sock, self.ws.OP_TEXT, 'fragment1', fin=False) - assert self.recvall(sock, read_timeout=0.1) == b'', '5_4' - self.ws.frame_write(sock, self.ws.OP_CONT, 'fragment2', fin=True) + # 4_2_1 - frame = self.ws.frame_read(sock) - self.check_frame(frame, True, self.ws.OP_TEXT, 'fragment1fragment2') + _, sock, _ = ws.upgrade() - # 5_5 + ws.frame_write(sock, 0x0B, '') + check_close(sock, 1002) - self.ws.frame_write( - sock, self.ws.OP_TEXT, 'fragment1', fin=False, chopsize=1 - ) - self.ws.frame_write( - sock, self.ws.OP_CONT, 'fragment2', fin=True, chopsize=1 - ) + # 4_2_2 - frame = self.ws.frame_read(sock) - self.check_frame(frame, True, self.ws.OP_TEXT, 'fragment1fragment2') + _, sock, _ = ws.upgrade() - # 5_6 + ws.frame_write(sock, 0x0C, 'reserved opcode payload') + check_close(sock, 1002) - ping_payload = 'ping payload' + # 4_2_3 - self.ws.frame_write(sock, self.ws.OP_TEXT, 'fragment1', fin=False) - self.ws.frame_write(sock, self.ws.OP_PING, ping_payload) - self.ws.frame_write(sock, self.ws.OP_CONT, 'fragment2', fin=True) + _, sock, _ = ws.upgrade() - frame = self.ws.frame_read(sock) - self.check_frame(frame, True, self.ws.OP_PONG, ping_payload) + ws.frame_write(sock, ws.OP_TEXT, payload) - frame = self.ws.frame_read(sock) - self.check_frame(frame, True, self.ws.OP_TEXT, 'fragment1fragment2') + frame = ws.frame_read(sock) + check_frame(frame, True, ws.OP_TEXT, payload) - # 5_7 + ws.frame_write(sock, 0x0D, '') + ws.frame_write(sock, ws.OP_PING, '') - ping_payload = 'ping payload' + check_close(sock, 1002) - self.ws.frame_write(sock, self.ws.OP_TEXT, 'fragment1', fin=False) - assert self.recvall(sock, read_timeout=0.1) == b'', '5_7' + # 4_2_4 - self.ws.frame_write(sock, self.ws.OP_PING, ping_payload) + _, sock, _ = ws.upgrade() - frame = self.ws.frame_read(sock) - self.check_frame(frame, True, self.ws.OP_PONG, ping_payload) + ws.frame_write(sock, ws.OP_TEXT, payload) - self.ws.frame_write(sock, self.ws.OP_CONT, 'fragment2', fin=True) + frame = ws.frame_read(sock) + check_frame(frame, True, ws.OP_TEXT, payload) - frame = self.ws.frame_read(sock) - self.check_frame(frame, True, self.ws.OP_TEXT, 'fragment1fragment2') + ws.frame_write(sock, 0x0E, payload) + ws.frame_write(sock, ws.OP_PING, '') - # 5_8 + check_close(sock, 1002) - ping_payload = 'ping payload' + # 4_2_5 - self.ws.frame_write( - sock, self.ws.OP_TEXT, 'fragment1', fin=False, chopsize=1 - ) - self.ws.frame_write(sock, self.ws.OP_PING, ping_payload, chopsize=1) - self.ws.frame_write( - sock, self.ws.OP_CONT, 'fragment2', fin=True, chopsize=1 - ) + _, sock, _ = ws.upgrade() - frame = self.ws.frame_read(sock) - self.check_frame(frame, True, self.ws.OP_PONG, ping_payload) + ws.frame_write(sock, ws.OP_TEXT, payload, chopsize=1) - frame = self.ws.frame_read(sock) - self.check_frame(frame, True, self.ws.OP_TEXT, 'fragment1fragment2') + frame = ws.frame_read(sock) + check_frame(frame, True, ws.OP_TEXT, payload) - # 5_9 + ws.frame_write(sock, 0x0F, payload, chopsize=1) + ws.frame_write(sock, ws.OP_PING, '') - self.ws.frame_write( - sock, self.ws.OP_CONT, 'non-continuation payload', fin=True - ) - self.ws.frame_write(sock, self.ws.OP_TEXT, 'Hello, world!', fin=True) - self.check_close(sock, 1002) + check_close(sock, 1002) - # 5_10 - _, sock, _ = self.ws.upgrade() +def test_asgi_websockets_5_1__5_20(): + client.load('websockets/mirror') - self.ws.frame_write( - sock, self.ws.OP_CONT, 'non-continuation payload', fin=True - ) - self.ws.frame_write(sock, self.ws.OP_TEXT, 'Hello, world!', fin=True) - self.check_close(sock, 1002) + # 5_1 - # 5_11 + _, sock, _ = ws.upgrade() - _, sock, _ = self.ws.upgrade() + ws.frame_write(sock, ws.OP_PING, 'fragment1', fin=False) + ws.frame_write(sock, ws.OP_CONT, 'fragment2', fin=True) + check_close(sock, 1002) - self.ws.frame_write( - sock, - self.ws.OP_CONT, - 'non-continuation payload', - fin=True, - chopsize=1, - ) - self.ws.frame_write( - sock, self.ws.OP_TEXT, 'Hello, world!', fin=True, chopsize=1 - ) - self.check_close(sock, 1002) + # 5_2 - # 5_12 + _, sock, _ = ws.upgrade() - _, sock, _ = self.ws.upgrade() + ws.frame_write(sock, ws.OP_PONG, 'fragment1', fin=False) + ws.frame_write(sock, ws.OP_CONT, 'fragment2', fin=True) + check_close(sock, 1002) - self.ws.frame_write( - sock, self.ws.OP_CONT, 'non-continuation payload', fin=False - ) - self.ws.frame_write(sock, self.ws.OP_TEXT, 'Hello, world!', fin=True) - self.check_close(sock, 1002) + # 5_3 - # 5_13 + _, sock, _ = ws.upgrade() - _, sock, _ = self.ws.upgrade() + ws.frame_write(sock, ws.OP_TEXT, 'fragment1', fin=False) + ws.frame_write(sock, ws.OP_CONT, 'fragment2', fin=True) - self.ws.frame_write( - sock, self.ws.OP_CONT, 'non-continuation payload', fin=False - ) - self.ws.frame_write(sock, self.ws.OP_TEXT, 'Hello, world!', fin=True) - self.check_close(sock, 1002) + frame = ws.frame_read(sock) + check_frame(frame, True, ws.OP_TEXT, 'fragment1fragment2') - # 5_14 + # 5_4 - _, sock, _ = self.ws.upgrade() + ws.frame_write(sock, ws.OP_TEXT, 'fragment1', fin=False) + assert client.recvall(sock, read_timeout=0.1) == b'', '5_4' + ws.frame_write(sock, ws.OP_CONT, 'fragment2', fin=True) - self.ws.frame_write( - sock, - self.ws.OP_CONT, - 'non-continuation payload', - fin=False, - chopsize=1, - ) - self.ws.frame_write( - sock, self.ws.OP_TEXT, 'Hello, world!', fin=True, chopsize=1 - ) - self.check_close(sock, 1002) + frame = ws.frame_read(sock) + check_frame(frame, True, ws.OP_TEXT, 'fragment1fragment2') - # 5_15 + # 5_5 - _, sock, _ = self.ws.upgrade() + ws.frame_write(sock, ws.OP_TEXT, 'fragment1', fin=False, chopsize=1) + ws.frame_write(sock, ws.OP_CONT, 'fragment2', fin=True, chopsize=1) - self.ws.frame_write(sock, self.ws.OP_TEXT, 'fragment1', fin=False) - self.ws.frame_write(sock, self.ws.OP_CONT, 'fragment2', fin=True) - self.ws.frame_write(sock, self.ws.OP_CONT, 'fragment3', fin=False) - self.ws.frame_write(sock, self.ws.OP_TEXT, 'fragment4', fin=True) + frame = ws.frame_read(sock) + check_frame(frame, True, ws.OP_TEXT, 'fragment1fragment2') - frame = self.ws.frame_read(sock) + # 5_6 - if frame['opcode'] == self.ws.OP_TEXT: - self.check_frame(frame, True, self.ws.OP_TEXT, 'fragment1fragment2') - frame = None + ping_payload = 'ping payload' - self.check_close(sock, 1002, frame=frame) + ws.frame_write(sock, ws.OP_TEXT, 'fragment1', fin=False) + ws.frame_write(sock, ws.OP_PING, ping_payload) + ws.frame_write(sock, ws.OP_CONT, 'fragment2', fin=True) - # 5_16 + frame = ws.frame_read(sock) + check_frame(frame, True, ws.OP_PONG, ping_payload) - _, sock, _ = self.ws.upgrade() + frame = ws.frame_read(sock) + check_frame(frame, True, ws.OP_TEXT, 'fragment1fragment2') - for _ in range(0, 2): - self.ws.frame_write(sock, self.ws.OP_CONT, 'fragment1', fin=False) - self.ws.frame_write(sock, self.ws.OP_TEXT, 'fragment2', fin=False) - self.ws.frame_write(sock, self.ws.OP_CONT, 'fragment3', fin=True) - self.check_close(sock, 1002) + # 5_7 - # 5_17 + ping_payload = 'ping payload' - _, sock, _ = self.ws.upgrade() + ws.frame_write(sock, ws.OP_TEXT, 'fragment1', fin=False) + assert client.recvall(sock, read_timeout=0.1) == b'', '5_7' - for _ in range(0, 2): - self.ws.frame_write(sock, self.ws.OP_CONT, 'fragment1', fin=True) - self.ws.frame_write(sock, self.ws.OP_TEXT, 'fragment2', fin=False) - self.ws.frame_write(sock, self.ws.OP_CONT, 'fragment3', fin=True) - self.check_close(sock, 1002) + ws.frame_write(sock, ws.OP_PING, ping_payload) - # 5_18 + frame = ws.frame_read(sock) + check_frame(frame, True, ws.OP_PONG, ping_payload) - _, sock, _ = self.ws.upgrade() + ws.frame_write(sock, ws.OP_CONT, 'fragment2', fin=True) - self.ws.frame_write(sock, self.ws.OP_TEXT, 'fragment1', fin=False) - self.ws.frame_write(sock, self.ws.OP_TEXT, 'fragment2') - self.check_close(sock, 1002) + frame = ws.frame_read(sock) + check_frame(frame, True, ws.OP_TEXT, 'fragment1fragment2') - # 5_19 + # 5_8 - _, sock, _ = self.ws.upgrade() + ping_payload = 'ping payload' - self.ws.frame_write(sock, self.ws.OP_TEXT, 'fragment1', fin=False) - self.ws.frame_write(sock, self.ws.OP_CONT, 'fragment2', fin=False) - self.ws.frame_write(sock, self.ws.OP_PING, 'pongme 1!') + ws.frame_write(sock, ws.OP_TEXT, 'fragment1', fin=False, chopsize=1) + ws.frame_write(sock, ws.OP_PING, ping_payload, chopsize=1) + ws.frame_write(sock, ws.OP_CONT, 'fragment2', fin=True, chopsize=1) - time.sleep(1) + frame = ws.frame_read(sock) + check_frame(frame, True, ws.OP_PONG, ping_payload) - self.ws.frame_write(sock, self.ws.OP_CONT, 'fragment3', fin=False) - self.ws.frame_write(sock, self.ws.OP_CONT, 'fragment4', fin=False) - self.ws.frame_write(sock, self.ws.OP_PING, 'pongme 2!') - self.ws.frame_write(sock, self.ws.OP_CONT, 'fragment5') + frame = ws.frame_read(sock) + check_frame(frame, True, ws.OP_TEXT, 'fragment1fragment2') - frame = self.ws.frame_read(sock) - self.check_frame(frame, True, self.ws.OP_PONG, 'pongme 1!') + # 5_9 - frame = self.ws.frame_read(sock) - self.check_frame(frame, True, self.ws.OP_PONG, 'pongme 2!') + ws.frame_write(sock, ws.OP_CONT, 'non-continuation payload', fin=True) + ws.frame_write(sock, ws.OP_TEXT, 'Hello, world!', fin=True) + check_close(sock, 1002) - self.check_frame( - self.ws.frame_read(sock), - True, - self.ws.OP_TEXT, - 'fragment1fragment2fragment3fragment4fragment5', - ) + # 5_10 - # 5_20 + _, sock, _ = ws.upgrade() - self.ws.frame_write(sock, self.ws.OP_TEXT, 'fragment1', fin=False) - self.ws.frame_write(sock, self.ws.OP_CONT, 'fragment2', fin=False) - self.ws.frame_write(sock, self.ws.OP_PING, 'pongme 1!') + ws.frame_write(sock, ws.OP_CONT, 'non-continuation payload', fin=True) + ws.frame_write(sock, ws.OP_TEXT, 'Hello, world!', fin=True) + check_close(sock, 1002) - frame = self.ws.frame_read(sock) - self.check_frame(frame, True, self.ws.OP_PONG, 'pongme 1!') + # 5_11 - time.sleep(1) + _, sock, _ = ws.upgrade() - self.ws.frame_write(sock, self.ws.OP_CONT, 'fragment3', fin=False) - self.ws.frame_write(sock, self.ws.OP_CONT, 'fragment4', fin=False) - self.ws.frame_write(sock, self.ws.OP_PING, 'pongme 2!') + ws.frame_write( + sock, + ws.OP_CONT, + 'non-continuation payload', + fin=True, + chopsize=1, + ) + ws.frame_write(sock, ws.OP_TEXT, 'Hello, world!', fin=True, chopsize=1) + check_close(sock, 1002) - frame = self.ws.frame_read(sock) - self.check_frame(frame, True, self.ws.OP_PONG, 'pongme 2!') + # 5_12 - assert self.recvall(sock, read_timeout=0.1) == b'', '5_20' - self.ws.frame_write(sock, self.ws.OP_CONT, 'fragment5') + _, sock, _ = ws.upgrade() - self.check_frame( - self.ws.frame_read(sock), - True, - self.ws.OP_TEXT, - 'fragment1fragment2fragment3fragment4fragment5', - ) + ws.frame_write(sock, ws.OP_CONT, 'non-continuation payload', fin=False) + ws.frame_write(sock, ws.OP_TEXT, 'Hello, world!', fin=True) + check_close(sock, 1002) - self.close_connection(sock) + # 5_13 - def test_asgi_websockets_6_1_1__6_4_4(self): - self.load('websockets/mirror') + _, sock, _ = ws.upgrade() - # 6_1_1 + ws.frame_write(sock, ws.OP_CONT, 'non-continuation payload', fin=False) + ws.frame_write(sock, ws.OP_TEXT, 'Hello, world!', fin=True) + check_close(sock, 1002) - _, sock, _ = self.ws.upgrade() + # 5_14 - self.ws.frame_write(sock, self.ws.OP_TEXT, '') - frame = self.ws.frame_read(sock) - self.check_frame(frame, True, self.ws.OP_TEXT, '') + _, sock, _ = ws.upgrade() - # 6_1_2 + ws.frame_write( + sock, + ws.OP_CONT, + 'non-continuation payload', + fin=False, + chopsize=1, + ) + ws.frame_write(sock, ws.OP_TEXT, 'Hello, world!', fin=True, chopsize=1) + check_close(sock, 1002) - self.ws.frame_write(sock, self.ws.OP_TEXT, '', fin=False) - self.ws.frame_write(sock, self.ws.OP_CONT, '', fin=False) - self.ws.frame_write(sock, self.ws.OP_CONT, '') + # 5_15 - frame = self.ws.frame_read(sock) - self.check_frame(frame, True, self.ws.OP_TEXT, '') + _, sock, _ = ws.upgrade() - # 6_1_3 + ws.frame_write(sock, ws.OP_TEXT, 'fragment1', fin=False) + ws.frame_write(sock, ws.OP_CONT, 'fragment2', fin=True) + ws.frame_write(sock, ws.OP_CONT, 'fragment3', fin=False) + ws.frame_write(sock, ws.OP_TEXT, 'fragment4', fin=True) - payload = 'middle frame payload' + frame = ws.frame_read(sock) - self.ws.frame_write(sock, self.ws.OP_TEXT, '', fin=False) - self.ws.frame_write(sock, self.ws.OP_CONT, payload, fin=False) - self.ws.frame_write(sock, self.ws.OP_CONT, '') + if frame['opcode'] == ws.OP_TEXT: + check_frame(frame, True, ws.OP_TEXT, 'fragment1fragment2') + frame = None - frame = self.ws.frame_read(sock) - self.check_frame(frame, True, self.ws.OP_TEXT, payload) + check_close(sock, 1002, frame=frame) - # 6_2_1 + # 5_16 - payload = 'Hello-µ@ßöäüàá-UTF-8!!' + _, sock, _ = ws.upgrade() - self.ws.frame_write(sock, self.ws.OP_TEXT, payload) + for _ in range(0, 2): + ws.frame_write(sock, ws.OP_CONT, 'fragment1', fin=False) + ws.frame_write(sock, ws.OP_TEXT, 'fragment2', fin=False) + ws.frame_write(sock, ws.OP_CONT, 'fragment3', fin=True) + check_close(sock, 1002) - frame = self.ws.frame_read(sock) - self.check_frame(frame, True, self.ws.OP_TEXT, payload) + # 5_17 - # 6_2_2 + _, sock, _ = ws.upgrade() - self.ws.frame_write(sock, self.ws.OP_TEXT, payload[:12], fin=False) - self.ws.frame_write(sock, self.ws.OP_CONT, payload[12:]) + for _ in range(0, 2): + ws.frame_write(sock, ws.OP_CONT, 'fragment1', fin=True) + ws.frame_write(sock, ws.OP_TEXT, 'fragment2', fin=False) + ws.frame_write(sock, ws.OP_CONT, 'fragment3', fin=True) + check_close(sock, 1002) - frame = self.ws.frame_read(sock) - self.check_frame(frame, True, self.ws.OP_TEXT, payload) + # 5_18 - # 6_2_3 + _, sock, _ = ws.upgrade() - self.ws.message(sock, self.ws.OP_TEXT, payload, fragmention_size=1) + ws.frame_write(sock, ws.OP_TEXT, 'fragment1', fin=False) + ws.frame_write(sock, ws.OP_TEXT, 'fragment2') + check_close(sock, 1002) - frame = self.ws.frame_read(sock) - self.check_frame(frame, True, self.ws.OP_TEXT, payload) + # 5_19 - # 6_2_4 + _, sock, _ = ws.upgrade() - payload = '\xce\xba\xe1\xbd\xb9\xcf\x83\xce\xbc\xce\xb5' + ws.frame_write(sock, ws.OP_TEXT, 'fragment1', fin=False) + ws.frame_write(sock, ws.OP_CONT, 'fragment2', fin=False) + ws.frame_write(sock, ws.OP_PING, 'pongme 1!') - self.ws.message(sock, self.ws.OP_TEXT, payload, fragmention_size=1) + time.sleep(1) - frame = self.ws.frame_read(sock) - self.check_frame(frame, True, self.ws.OP_TEXT, payload) + ws.frame_write(sock, ws.OP_CONT, 'fragment3', fin=False) + ws.frame_write(sock, ws.OP_CONT, 'fragment4', fin=False) + ws.frame_write(sock, ws.OP_PING, 'pongme 2!') + ws.frame_write(sock, ws.OP_CONT, 'fragment5') - self.close_connection(sock) + frame = ws.frame_read(sock) + check_frame(frame, True, ws.OP_PONG, 'pongme 1!') - # Unit does not support UTF-8 validation - # - # # 6_3_1 FAIL - # - # payload_1 = '\xce\xba\xe1\xbd\xb9\xcf\x83\xce\xbc\xce\xb5' - # payload_2 = '\xed\xa0\x80' - # payload_3 = '\x65\x64\x69\x74\x65\x64' - # - # payload = payload_1 + payload_2 + payload_3 - # - # self.ws.message(sock, self.ws.OP_TEXT, payload) - # self.check_close(sock, 1007) - # - # # 6_3_2 FAIL - # - # _, sock, _ = self.ws.upgrade() - # - # self.ws.message(sock, self.ws.OP_TEXT, payload, fragmention_size=1) - # self.check_close(sock, 1007) - # - # # 6_4_1 ... 6_4_4 FAIL + frame = ws.frame_read(sock) + check_frame(frame, True, ws.OP_PONG, 'pongme 2!') - def test_asgi_websockets_7_1_1__7_5_1(self): - self.load('websockets/mirror') + check_frame( + ws.frame_read(sock), + True, + ws.OP_TEXT, + 'fragment1fragment2fragment3fragment4fragment5', + ) - # 7_1_1 + # 5_20 - _, sock, _ = self.ws.upgrade() + ws.frame_write(sock, ws.OP_TEXT, 'fragment1', fin=False) + ws.frame_write(sock, ws.OP_CONT, 'fragment2', fin=False) + ws.frame_write(sock, ws.OP_PING, 'pongme 1!') - payload = "Hello World!" + frame = ws.frame_read(sock) + check_frame(frame, True, ws.OP_PONG, 'pongme 1!') - self.ws.frame_write(sock, self.ws.OP_TEXT, payload) + time.sleep(1) - frame = self.ws.frame_read(sock) - self.check_frame(frame, True, self.ws.OP_TEXT, payload) + ws.frame_write(sock, ws.OP_CONT, 'fragment3', fin=False) + ws.frame_write(sock, ws.OP_CONT, 'fragment4', fin=False) + ws.frame_write(sock, ws.OP_PING, 'pongme 2!') - self.close_connection(sock) + frame = ws.frame_read(sock) + check_frame(frame, True, ws.OP_PONG, 'pongme 2!') - # 7_1_2 + assert client.recvall(sock, read_timeout=0.1) == b'', '5_20' + ws.frame_write(sock, ws.OP_CONT, 'fragment5') - _, sock, _ = self.ws.upgrade() + check_frame( + ws.frame_read(sock), + True, + ws.OP_TEXT, + 'fragment1fragment2fragment3fragment4fragment5', + ) - self.ws.frame_write(sock, self.ws.OP_CLOSE, self.ws.serialize_close()) - self.ws.frame_write(sock, self.ws.OP_CLOSE, self.ws.serialize_close()) + close_connection(sock) - self.check_close(sock) - # 7_1_3 +def test_asgi_websockets_6_1_1__6_4_4(): + client.load('websockets/mirror') - _, sock, _ = self.ws.upgrade() + # 6_1_1 - self.ws.frame_write(sock, self.ws.OP_CLOSE, self.ws.serialize_close()) - self.check_close(sock, no_close=True) + _, sock, _ = ws.upgrade() - self.ws.frame_write(sock, self.ws.OP_PING, '') - assert self.recvall(sock, read_timeout=0.1) == b'', 'empty soc' + ws.frame_write(sock, ws.OP_TEXT, '') + frame = ws.frame_read(sock) + check_frame(frame, True, ws.OP_TEXT, '') - sock.close() + # 6_1_2 - # 7_1_4 + ws.frame_write(sock, ws.OP_TEXT, '', fin=False) + ws.frame_write(sock, ws.OP_CONT, '', fin=False) + ws.frame_write(sock, ws.OP_CONT, '') - _, sock, _ = self.ws.upgrade() + frame = ws.frame_read(sock) + check_frame(frame, True, ws.OP_TEXT, '') - self.ws.frame_write(sock, self.ws.OP_CLOSE, self.ws.serialize_close()) - self.check_close(sock, no_close=True) + # 6_1_3 - self.ws.frame_write(sock, self.ws.OP_TEXT, payload) - assert self.recvall(sock, read_timeout=0.1) == b'', 'empty soc' + payload = 'middle frame payload' - sock.close() + ws.frame_write(sock, ws.OP_TEXT, '', fin=False) + ws.frame_write(sock, ws.OP_CONT, payload, fin=False) + ws.frame_write(sock, ws.OP_CONT, '') - # 7_1_5 + frame = ws.frame_read(sock) + check_frame(frame, True, ws.OP_TEXT, payload) - _, sock, _ = self.ws.upgrade() + # 6_2_1 - self.ws.frame_write(sock, self.ws.OP_TEXT, 'fragment1', fin=False) - self.ws.frame_write(sock, self.ws.OP_CLOSE, self.ws.serialize_close()) - self.check_close(sock, no_close=True) + payload = 'Hello-µ@ßöäüàá-UTF-8!!' - self.ws.frame_write(sock, self.ws.OP_CONT, 'fragment2') - assert self.recvall(sock, read_timeout=0.1) == b'', 'empty soc' + ws.frame_write(sock, ws.OP_TEXT, payload) - sock.close() + frame = ws.frame_read(sock) + check_frame(frame, True, ws.OP_TEXT, payload) - # 7_1_6 + # 6_2_2 - _, sock, _ = self.ws.upgrade() + ws.frame_write(sock, ws.OP_TEXT, payload[:12], fin=False) + ws.frame_write(sock, ws.OP_CONT, payload[12:]) - self.ws.frame_write(sock, self.ws.OP_TEXT, 'BAsd7&jh23' * 26 * 2**10) - self.ws.frame_write(sock, self.ws.OP_TEXT, payload) - self.ws.frame_write(sock, self.ws.OP_CLOSE, self.ws.serialize_close()) + frame = ws.frame_read(sock) + check_frame(frame, True, ws.OP_TEXT, payload) - self.recvall(sock, read_timeout=1) + # 6_2_3 - self.ws.frame_write(sock, self.ws.OP_PING, '') - assert self.recvall(sock, read_timeout=0.1) == b'', 'empty soc' + ws.message(sock, ws.OP_TEXT, payload, fragmention_size=1) - sock.close() + frame = ws.frame_read(sock) + check_frame(frame, True, ws.OP_TEXT, payload) + + # 6_2_4 + + payload = '\xce\xba\xe1\xbd\xb9\xcf\x83\xce\xbc\xce\xb5' + + ws.message(sock, ws.OP_TEXT, payload, fragmention_size=1) + + frame = ws.frame_read(sock) + check_frame(frame, True, ws.OP_TEXT, payload) + + close_connection(sock) + + +# Unit does not support UTF-8 validation +# +# # 6_3_1 FAIL +# +# payload_1 = '\xce\xba\xe1\xbd\xb9\xcf\x83\xce\xbc\xce\xb5' +# payload_2 = '\xed\xa0\x80' +# payload_3 = '\x65\x64\x69\x74\x65\x64' +# +# payload = payload_1 + payload_2 + payload_3 +# +# ws.message(sock, ws.OP_TEXT, payload) +# check_close(sock, 1007) +# +# # 6_3_2 FAIL +# +# _, sock, _ = ws.upgrade() +# +# ws.message(sock, ws.OP_TEXT, payload, fragmention_size=1) +# check_close(sock, 1007) +# +# # 6_4_1 ... 6_4_4 FAIL + + +def test_asgi_websockets_7_1_1__7_5_1(): + client.load('websockets/mirror') + + # 7_1_1 + + _, sock, _ = ws.upgrade() + + payload = "Hello World!" + + ws.frame_write(sock, ws.OP_TEXT, payload) + + frame = ws.frame_read(sock) + check_frame(frame, True, ws.OP_TEXT, payload) + + close_connection(sock) + + # 7_1_2 + + _, sock, _ = ws.upgrade() + + ws.frame_write(sock, ws.OP_CLOSE, ws.serialize_close()) + ws.frame_write(sock, ws.OP_CLOSE, ws.serialize_close()) - # 7_3_1 + check_close(sock) - _, sock, _ = self.ws.upgrade() + # 7_1_3 - self.ws.frame_write(sock, self.ws.OP_CLOSE, '') - self.check_close(sock) + _, sock, _ = ws.upgrade() - # 7_3_2 + ws.frame_write(sock, ws.OP_CLOSE, ws.serialize_close()) + check_close(sock, no_close=True) - _, sock, _ = self.ws.upgrade() + ws.frame_write(sock, ws.OP_PING, '') + assert client.recvall(sock, read_timeout=0.1) == b'', 'empty soc' - self.ws.frame_write(sock, self.ws.OP_CLOSE, 'a') - self.check_close(sock, 1002) + sock.close() - # 7_3_3 + # 7_1_4 - _, sock, _ = self.ws.upgrade() + _, sock, _ = ws.upgrade() - self.ws.frame_write(sock, self.ws.OP_CLOSE, self.ws.serialize_close()) - self.check_close(sock) + ws.frame_write(sock, ws.OP_CLOSE, ws.serialize_close()) + check_close(sock, no_close=True) - # 7_3_4 + ws.frame_write(sock, ws.OP_TEXT, payload) + assert client.recvall(sock, read_timeout=0.1) == b'', 'empty soc' - _, sock, _ = self.ws.upgrade() + sock.close() - payload = self.ws.serialize_close(reason='Hello World!') + # 7_1_5 - self.ws.frame_write(sock, self.ws.OP_CLOSE, payload) - self.check_close(sock) + _, sock, _ = ws.upgrade() - # 7_3_5 + ws.frame_write(sock, ws.OP_TEXT, 'fragment1', fin=False) + ws.frame_write(sock, ws.OP_CLOSE, ws.serialize_close()) + check_close(sock, no_close=True) - _, sock, _ = self.ws.upgrade() + ws.frame_write(sock, ws.OP_CONT, 'fragment2') + assert client.recvall(sock, read_timeout=0.1) == b'', 'empty soc' - payload = self.ws.serialize_close(reason='*' * 123) + sock.close() - self.ws.frame_write(sock, self.ws.OP_CLOSE, payload) - self.check_close(sock) + # 7_1_6 - # 7_3_6 + _, sock, _ = ws.upgrade() - _, sock, _ = self.ws.upgrade() + ws.frame_write(sock, ws.OP_TEXT, 'BAsd7&jh23' * 26 * 2**10) + ws.frame_write(sock, ws.OP_TEXT, payload) + ws.frame_write(sock, ws.OP_CLOSE, ws.serialize_close()) - payload = self.ws.serialize_close(reason='*' * 124) + client.recvall(sock, read_timeout=1) - self.ws.frame_write(sock, self.ws.OP_CLOSE, payload) - self.check_close(sock, 1002) + ws.frame_write(sock, ws.OP_PING, '') + assert client.recvall(sock, read_timeout=0.1) == b'', 'empty soc' - # # 7_5_1 FAIL Unit does not support UTF-8 validation - # - # _, sock, _ = self.ws.upgrade() - # - # payload = self.ws.serialize_close(reason = '\xce\xba\xe1\xbd\xb9\xcf' \ - # '\x83\xce\xbc\xce\xb5\xed\xa0\x80\x65\x64\x69\x74\x65\x64') - # - # self.ws.frame_write(sock, self.ws.OP_CLOSE, payload) - # self.check_close(sock, 1007) + sock.close() - def test_asgi_websockets_7_7_X__7_9_X(self): - self.load('websockets/mirror') + # 7_3_1 - valid_codes = [ - 1000, - 1001, - 1002, - 1003, - 1007, - 1008, - 1009, - 1010, - 1011, - 3000, - 3999, - 4000, - 4999, - ] + _, sock, _ = ws.upgrade() - invalid_codes = [0, 999, 1004, 1005, 1006, 1016, 1100, 2000, 2999] + ws.frame_write(sock, ws.OP_CLOSE, '') + check_close(sock) - for code in valid_codes: - _, sock, _ = self.ws.upgrade() + # 7_3_2 - payload = self.ws.serialize_close(code=code) + _, sock, _ = ws.upgrade() - self.ws.frame_write(sock, self.ws.OP_CLOSE, payload) - self.check_close(sock) + ws.frame_write(sock, ws.OP_CLOSE, 'a') + check_close(sock, 1002) - for code in invalid_codes: - _, sock, _ = self.ws.upgrade() + # 7_3_3 - payload = self.ws.serialize_close(code=code) + _, sock, _ = ws.upgrade() - self.ws.frame_write(sock, self.ws.OP_CLOSE, payload) - self.check_close(sock, 1002) + ws.frame_write(sock, ws.OP_CLOSE, ws.serialize_close()) + check_close(sock) - def test_asgi_websockets_7_13_1__7_13_2(self): - self.load('websockets/mirror') + # 7_3_4 - # 7_13_1 + _, sock, _ = ws.upgrade() - _, sock, _ = self.ws.upgrade() + payload = ws.serialize_close(reason='Hello World!') - payload = self.ws.serialize_close(code=5000) + ws.frame_write(sock, ws.OP_CLOSE, payload) + check_close(sock) - self.ws.frame_write(sock, self.ws.OP_CLOSE, payload) - self.check_close(sock, 1002) + # 7_3_5 - # 7_13_2 + _, sock, _ = ws.upgrade() - _, sock, _ = self.ws.upgrade() + payload = ws.serialize_close(reason='*' * 123) - payload = struct.pack('!I', 65536) + ''.encode('utf-8') + ws.frame_write(sock, ws.OP_CLOSE, payload) + check_close(sock) - self.ws.frame_write(sock, self.ws.OP_CLOSE, payload) - self.check_close(sock, 1002) + # 7_3_6 - def test_asgi_websockets_9_1_1__9_6_6(self, is_unsafe, system): - if not is_unsafe: - pytest.skip('unsafe, long run') + _, sock, _ = ws.upgrade() - self.load('websockets/mirror') + payload = ws.serialize_close(reason='*' * 124) - assert 'success' in self.conf( - { - 'http': { - 'websocket': { - 'max_frame_size': 33554432, - 'keepalive_interval': 0, - } + ws.frame_write(sock, ws.OP_CLOSE, payload) + check_close(sock, 1002) + + +# # 7_5_1 FAIL Unit does not support UTF-8 validation +# +# _, sock, _ = ws.upgrade() +# +# payload = ws.serialize_close(reason = '\xce\xba\xe1\xbd\xb9\xcf' \ +# '\x83\xce\xbc\xce\xb5\xed\xa0\x80\x65\x64\x69\x74\x65\x64') +# +# ws.frame_write(sock, ws.OP_CLOSE, payload) +# check_close(sock, 1007) + + +def test_asgi_websockets_7_7_X__7_9_X(): + client.load('websockets/mirror') + + valid_codes = [ + 1000, + 1001, + 1002, + 1003, + 1007, + 1008, + 1009, + 1010, + 1011, + 3000, + 3999, + 4000, + 4999, + ] + + invalid_codes = [0, 999, 1004, 1005, 1006, 1016, 1100, 2000, 2999] + + for code in valid_codes: + _, sock, _ = ws.upgrade() + + payload = ws.serialize_close(code=code) + + ws.frame_write(sock, ws.OP_CLOSE, payload) + check_close(sock) + + for code in invalid_codes: + _, sock, _ = ws.upgrade() + + payload = ws.serialize_close(code=code) + + ws.frame_write(sock, ws.OP_CLOSE, payload) + check_close(sock, 1002) + + +def test_asgi_websockets_7_13_1__7_13_2(): + client.load('websockets/mirror') + + # 7_13_1 + + _, sock, _ = ws.upgrade() + + payload = ws.serialize_close(code=5000) + + ws.frame_write(sock, ws.OP_CLOSE, payload) + check_close(sock, 1002) + + # 7_13_2 + + _, sock, _ = ws.upgrade() + + payload = struct.pack('!I', 65536) + ''.encode('utf-8') + + ws.frame_write(sock, ws.OP_CLOSE, payload) + check_close(sock, 1002) + + +def test_asgi_websockets_9_1_1__9_6_6(is_unsafe, system): + if not is_unsafe: + pytest.skip('unsafe, long run') + + client.load('websockets/mirror') + + assert 'success' in client.conf( + { + 'http': { + 'websocket': { + 'max_frame_size': 33554432, + 'keepalive_interval': 0, } - }, - 'settings', - ), 'increase max_frame_size and keepalive_interval' - - _, sock, _ = self.ws.upgrade() - - op_text = self.ws.OP_TEXT - op_binary = self.ws.OP_BINARY - - def check_payload(opcode, length, chopsize=None): - if opcode == self.ws.OP_TEXT: - payload = '*' * length - else: - payload = b'*' * length + } + }, + 'settings', + ), 'increase max_frame_size and keepalive_interval' - self.ws.frame_write(sock, opcode, payload, chopsize=chopsize) - frame = self.ws.frame_read(sock, read_timeout=5) - self.check_frame(frame, True, opcode, payload) + _, sock, _ = ws.upgrade() - def check_message(opcode, f_size): - if opcode == self.ws.OP_TEXT: - payload = '*' * 4 * 2**20 - else: - payload = b'*' * 4 * 2**20 + op_text = ws.OP_TEXT + op_binary = ws.OP_BINARY - self.ws.message(sock, opcode, payload, fragmention_size=f_size) - frame = self.ws.frame_read(sock, read_timeout=5) - self.check_frame(frame, True, opcode, payload) + def check_payload(opcode, length, chopsize=None): + if opcode == ws.OP_TEXT: + payload = '*' * length + else: + payload = b'*' * length - check_payload(op_text, 64 * 2**10) # 9_1_1 - check_payload(op_text, 256 * 2**10) # 9_1_2 - check_payload(op_text, 2**20) # 9_1_3 - check_payload(op_text, 4 * 2**20) # 9_1_4 - check_payload(op_text, 8 * 2**20) # 9_1_5 - check_payload(op_text, 16 * 2**20) # 9_1_6 + ws.frame_write(sock, opcode, payload, chopsize=chopsize) + frame = ws.frame_read(sock, read_timeout=5) + check_frame(frame, True, opcode, payload) - check_payload(op_binary, 64 * 2**10) # 9_2_1 - check_payload(op_binary, 256 * 2**10) # 9_2_2 - check_payload(op_binary, 2**20) # 9_2_3 - check_payload(op_binary, 4 * 2**20) # 9_2_4 - check_payload(op_binary, 8 * 2**20) # 9_2_5 - check_payload(op_binary, 16 * 2**20) # 9_2_6 + def check_message(opcode, f_size): + if opcode == ws.OP_TEXT: + payload = '*' * 4 * 2**20 + else: + payload = b'*' * 4 * 2**20 - if system not in ['Darwin', 'FreeBSD']: - check_message(op_text, 64) # 9_3_1 - check_message(op_text, 256) # 9_3_2 - check_message(op_text, 2**10) # 9_3_3 - check_message(op_text, 4 * 2**10) # 9_3_4 - check_message(op_text, 16 * 2**10) # 9_3_5 - check_message(op_text, 64 * 2**10) # 9_3_6 - check_message(op_text, 256 * 2**10) # 9_3_7 - check_message(op_text, 2**20) # 9_3_8 - check_message(op_text, 4 * 2**20) # 9_3_9 + ws.message(sock, opcode, payload, fragmention_size=f_size) + frame = ws.frame_read(sock, read_timeout=5) + check_frame(frame, True, opcode, payload) - check_message(op_binary, 64) # 9_4_1 - check_message(op_binary, 256) # 9_4_2 - check_message(op_binary, 2**10) # 9_4_3 - check_message(op_binary, 4 * 2**10) # 9_4_4 - check_message(op_binary, 16 * 2**10) # 9_4_5 - check_message(op_binary, 64 * 2**10) # 9_4_6 - check_message(op_binary, 256 * 2**10) # 9_4_7 - check_message(op_binary, 2**20) # 9_4_8 - check_message(op_binary, 4 * 2**20) # 9_4_9 + check_payload(op_text, 64 * 2**10) # 9_1_1 + check_payload(op_text, 256 * 2**10) # 9_1_2 + check_payload(op_text, 2**20) # 9_1_3 + check_payload(op_text, 4 * 2**20) # 9_1_4 + check_payload(op_text, 8 * 2**20) # 9_1_5 + check_payload(op_text, 16 * 2**20) # 9_1_6 - check_payload(op_text, 2**20, chopsize=64) # 9_5_1 - check_payload(op_text, 2**20, chopsize=128) # 9_5_2 - check_payload(op_text, 2**20, chopsize=256) # 9_5_3 - check_payload(op_text, 2**20, chopsize=512) # 9_5_4 - check_payload(op_text, 2**20, chopsize=1024) # 9_5_5 - check_payload(op_text, 2**20, chopsize=2048) # 9_5_6 + check_payload(op_binary, 64 * 2**10) # 9_2_1 + check_payload(op_binary, 256 * 2**10) # 9_2_2 + check_payload(op_binary, 2**20) # 9_2_3 + check_payload(op_binary, 4 * 2**20) # 9_2_4 + check_payload(op_binary, 8 * 2**20) # 9_2_5 + check_payload(op_binary, 16 * 2**20) # 9_2_6 - check_payload(op_binary, 2**20, chopsize=64) # 9_6_1 - check_payload(op_binary, 2**20, chopsize=128) # 9_6_2 - check_payload(op_binary, 2**20, chopsize=256) # 9_6_3 - check_payload(op_binary, 2**20, chopsize=512) # 9_6_4 - check_payload(op_binary, 2**20, chopsize=1024) # 9_6_5 - check_payload(op_binary, 2**20, chopsize=2048) # 9_6_6 + if system not in ['Darwin', 'FreeBSD']: + check_message(op_text, 64) # 9_3_1 + check_message(op_text, 256) # 9_3_2 + check_message(op_text, 2**10) # 9_3_3 + check_message(op_text, 4 * 2**10) # 9_3_4 + check_message(op_text, 16 * 2**10) # 9_3_5 + check_message(op_text, 64 * 2**10) # 9_3_6 + check_message(op_text, 256 * 2**10) # 9_3_7 + check_message(op_text, 2**20) # 9_3_8 + check_message(op_text, 4 * 2**20) # 9_3_9 - self.close_connection(sock) + check_message(op_binary, 64) # 9_4_1 + check_message(op_binary, 256) # 9_4_2 + check_message(op_binary, 2**10) # 9_4_3 + check_message(op_binary, 4 * 2**10) # 9_4_4 + check_message(op_binary, 16 * 2**10) # 9_4_5 + check_message(op_binary, 64 * 2**10) # 9_4_6 + check_message(op_binary, 256 * 2**10) # 9_4_7 + check_message(op_binary, 2**20) # 9_4_8 + check_message(op_binary, 4 * 2**20) # 9_4_9 - def test_asgi_websockets_10_1_1(self): - self.load('websockets/mirror') + check_payload(op_text, 2**20, chopsize=64) # 9_5_1 + check_payload(op_text, 2**20, chopsize=128) # 9_5_2 + check_payload(op_text, 2**20, chopsize=256) # 9_5_3 + check_payload(op_text, 2**20, chopsize=512) # 9_5_4 + check_payload(op_text, 2**20, chopsize=1024) # 9_5_5 + check_payload(op_text, 2**20, chopsize=2048) # 9_5_6 - _, sock, _ = self.ws.upgrade() + check_payload(op_binary, 2**20, chopsize=64) # 9_6_1 + check_payload(op_binary, 2**20, chopsize=128) # 9_6_2 + check_payload(op_binary, 2**20, chopsize=256) # 9_6_3 + check_payload(op_binary, 2**20, chopsize=512) # 9_6_4 + check_payload(op_binary, 2**20, chopsize=1024) # 9_6_5 + check_payload(op_binary, 2**20, chopsize=2048) # 9_6_6 - payload = '*' * 65536 + close_connection(sock) - self.ws.message(sock, self.ws.OP_TEXT, payload, fragmention_size=1300) - frame = self.ws.frame_read(sock) - self.check_frame(frame, True, self.ws.OP_TEXT, payload) +def test_asgi_websockets_10_1_1(): + client.load('websockets/mirror') - self.close_connection(sock) + _, sock, _ = ws.upgrade() - # settings + payload = '*' * 65536 - def test_asgi_websockets_max_frame_size(self): - self.load('websockets/mirror') + ws.message(sock, ws.OP_TEXT, payload, fragmention_size=1300) - assert 'success' in self.conf( - {'http': {'websocket': {'max_frame_size': 100}}}, 'settings' - ), 'configure max_frame_size' + frame = ws.frame_read(sock) + check_frame(frame, True, ws.OP_TEXT, payload) - _, sock, _ = self.ws.upgrade() + close_connection(sock) - payload = '*' * 94 - opcode = self.ws.OP_TEXT - self.ws.frame_write(sock, opcode, payload) # frame length is 100 +# settings - frame = self.ws.frame_read(sock) - self.check_frame(frame, True, opcode, payload) - payload = '*' * 95 +def test_asgi_websockets_max_frame_size(): + client.load('websockets/mirror') - self.ws.frame_write(sock, opcode, payload) # frame length is 101 - self.check_close(sock, 1009) # 1009 - CLOSE_TOO_LARGE + assert 'success' in client.conf( + {'http': {'websocket': {'max_frame_size': 100}}}, 'settings' + ), 'configure max_frame_size' - def test_asgi_websockets_read_timeout(self): - self.load('websockets/mirror') + _, sock, _ = ws.upgrade() - assert 'success' in self.conf( - {'http': {'websocket': {'read_timeout': 5}}}, 'settings' - ), 'configure read_timeout' + payload = '*' * 94 + opcode = ws.OP_TEXT - _, sock, _ = self.ws.upgrade() + ws.frame_write(sock, opcode, payload) # frame length is 100 - frame = self.ws.frame_to_send(self.ws.OP_TEXT, 'blah') - sock.sendall(frame[:2]) + frame = ws.frame_read(sock) + check_frame(frame, True, opcode, payload) - time.sleep(2) + payload = '*' * 95 - self.check_close(sock, 1001) # 1001 - CLOSE_GOING_AWAY + ws.frame_write(sock, opcode, payload) # frame length is 101 + check_close(sock, 1009) # 1009 - CLOSE_TOO_LARGE - def test_asgi_websockets_keepalive_interval(self): - self.load('websockets/mirror') - assert 'success' in self.conf( - {'http': {'websocket': {'keepalive_interval': 5}}}, 'settings' - ), 'configure keepalive_interval' +def test_asgi_websockets_read_timeout(): + client.load('websockets/mirror') - _, sock, _ = self.ws.upgrade() + assert 'success' in client.conf( + {'http': {'websocket': {'read_timeout': 5}}}, 'settings' + ), 'configure read_timeout' - frame = self.ws.frame_to_send(self.ws.OP_TEXT, 'blah') - sock.sendall(frame[:2]) + _, sock, _ = ws.upgrade() - time.sleep(2) + frame = ws.frame_to_send(ws.OP_TEXT, 'blah') + sock.sendall(frame[:2]) - frame = self.ws.frame_read(sock) - self.check_frame(frame, True, self.ws.OP_PING, '') # PING frame + time.sleep(2) - sock.close() + check_close(sock, 1001) # 1001 - CLOSE_GOING_AWAY - def test_asgi_websockets_client_locks_app(self): - self.load('websockets/mirror') - message = 'blah' +def test_asgi_websockets_keepalive_interval(): + client.load('websockets/mirror') - _, sock, _ = self.ws.upgrade() + assert 'success' in client.conf( + {'http': {'websocket': {'keepalive_interval': 5}}}, 'settings' + ), 'configure keepalive_interval' - assert 'success' in self.conf({}), 'remove app' + _, sock, _ = ws.upgrade() - self.ws.frame_write(sock, self.ws.OP_TEXT, message) + frame = ws.frame_to_send(ws.OP_TEXT, 'blah') + sock.sendall(frame[:2]) - frame = self.ws.frame_read(sock) + time.sleep(2) - assert message == frame['data'].decode('utf-8'), 'client' + frame = ws.frame_read(sock) + check_frame(frame, True, ws.OP_PING, '') # PING frame - sock.close() + sock.close() + + +def test_asgi_websockets_client_locks_app(): + client.load('websockets/mirror') + + message = 'blah' + + _, sock, _ = ws.upgrade() + + assert 'success' in client.conf({}), 'remove app' + + ws.frame_write(sock, ws.OP_TEXT, message) + + frame = ws.frame_read(sock) + + assert message == frame['data'].decode('utf-8'), 'client' + + sock.close() |