diff options
author | Konstantin Pavlov <thresh@nginx.com> | 2023-08-31 09:41:46 -0700 |
---|---|---|
committer | Konstantin Pavlov <thresh@nginx.com> | 2023-08-31 09:41:46 -0700 |
commit | c45c8919c7232eb20023484f6d1fc9f1f50395d8 (patch) | |
tree | cc12eb307c1611494948645e4b487fa06495c3d2 /test/test_asgi_websockets.py | |
parent | 88c90e1c351ab8c5bd487a5cd4b735014b08e271 (diff) | |
parent | 9b22b6957bc87b3df002d0bc691fdae6a20abdac (diff) | |
download | unit-1.31.0-1.tar.gz unit-1.31.0-1.tar.bz2 |
Merged with the default branch.1.31.0-1
Diffstat (limited to 'test/test_asgi_websockets.py')
-rw-r--r-- | test/test_asgi_websockets.py | 2095 |
1 files changed, 1050 insertions, 1045 deletions
diff --git a/test/test_asgi_websockets.py b/test/test_asgi_websockets.py index b15bee43..eb7a20e7 100644 --- a/test/test_asgi_websockets.py +++ b/test/test_asgi_websockets.py @@ -3,1497 +3,1502 @@ import time import pytest from packaging import version -from unit.applications.lang.python import TestApplicationPython -from unit.applications.websockets import TestApplicationWebsocket -from unit.option import option +from unit.applications.lang.python import ApplicationPython +from unit.applications.websockets import ApplicationWebsocket +prerequisites = { + 'modules': {'python': lambda v: version.parse(v) >= version.parse('3.5')} +} -class TestASGIWebsockets(TestApplicationPython): - prerequisites = { - 'modules': { - 'python': lambda v: version.parse(v) >= version.parse('3.5') - } - } - load_module = 'asgi' +client = ApplicationPython(load_module='asgi') +ws = ApplicationWebsocket() - ws = TestApplicationWebsocket() - @pytest.fixture(autouse=True) - def setup_method_fixture(self, request, skip_alert): - assert 'success' in self.conf( - {'http': {'websocket': {'keepalive_interval': 0}}}, 'settings' - ), 'clear keepalive_interval' +@pytest.fixture(autouse=True) +def setup_method_fixture(skip_alert): + assert 'success' in client.conf( + {'http': {'websocket': {'keepalive_interval': 0}}}, 'settings' + ), 'clear keepalive_interval' - skip_alert(r'socket close\(\d+\) failed') + skip_alert(r'socket close\(\d+\) failed') - def close_connection(self, sock): - assert self.recvall(sock, read_timeout=0.1) == b'', 'empty soc' - self.ws.frame_write(sock, self.ws.OP_CLOSE, self.ws.serialize_close()) +def close_connection(sock): + assert client.recvall(sock, read_timeout=0.1) == b'', 'empty soc' - self.check_close(sock) + ws.frame_write(sock, ws.OP_CLOSE, ws.serialize_close()) - def check_close(self, sock, code=1000, no_close=False, frame=None): - if frame == None: - frame = self.ws.frame_read(sock) + check_close(sock) - assert frame['fin'] == True, 'close fin' - assert frame['opcode'] == self.ws.OP_CLOSE, 'close opcode' - assert frame['code'] == code, 'close code' - if not no_close: - sock.close() +def check_close(sock, code=1000, no_close=False, frame=None): + if frame is None: + frame = ws.frame_read(sock) - def check_frame(self, frame, fin, opcode, payload, decode=True): - if opcode == self.ws.OP_BINARY or not decode: - data = frame['data'] - else: - data = frame['data'].decode('utf-8') + assert frame['fin'], 'close fin' + assert frame['opcode'] == ws.OP_CLOSE, 'close opcode' + assert frame['code'] == code, 'close code' - assert frame['fin'] == fin, 'fin' - assert frame['opcode'] == opcode, 'opcode' - assert data == payload, 'payload' + if not no_close: + sock.close() - def test_asgi_websockets_handshake(self): - self.load('websockets/mirror') - resp, sock, key = self.ws.upgrade() - sock.close() +def check_frame(frame, fin, opcode, payload, decode=True): + if opcode == ws.OP_BINARY or not decode: + data = frame['data'] + else: + data = frame['data'].decode('utf-8') - assert resp['status'] == 101, 'status' - assert resp['headers']['Upgrade'] == 'websocket', 'upgrade' - assert resp['headers']['Connection'] == 'Upgrade', 'connection' - assert resp['headers']['Sec-WebSocket-Accept'] == self.ws.accept( - key - ), 'key' + assert frame['fin'] == fin, 'fin' + assert frame['opcode'] == opcode, 'opcode' + assert data == payload, 'payload' - # remove "mirror" application - self.load('websockets/subprotocol') - def test_asgi_websockets_subprotocol(self): - self.load('websockets/subprotocol') +def test_asgi_websockets_handshake(): + client.load('websockets/mirror') - resp, sock, key = self.ws.upgrade() - sock.close() + resp, sock, key = ws.upgrade() + sock.close() - assert resp['status'] == 101, 'status' - assert ( - resp['headers']['x-subprotocols'] == "('chat', 'phone', 'video')" - ), 'subprotocols' - assert resp['headers']['sec-websocket-protocol'] == 'chat', 'key' + assert resp['status'] == 101, 'status' + assert resp['headers']['Upgrade'] == 'websocket', 'upgrade' + assert resp['headers']['Connection'] == 'Upgrade', 'connection' + assert resp['headers']['Sec-WebSocket-Accept'] == ws.accept(key), 'key' - def test_asgi_websockets_mirror(self): - self.load('websockets/mirror') + # remove "mirror" application + client.load('websockets/subprotocol') - message = 'blah' - _, sock, _ = self.ws.upgrade() +def test_asgi_websockets_subprotocol(): + client.load('websockets/subprotocol') - self.ws.frame_write(sock, self.ws.OP_TEXT, message) - frame = self.ws.frame_read(sock) + resp, sock, _ = ws.upgrade() + sock.close() - assert message == frame['data'].decode('utf-8'), 'mirror' + assert resp['status'] == 101, 'status' + assert ( + resp['headers']['x-subprotocols'] == "('chat', 'phone', 'video')" + ), 'subprotocols' + assert resp['headers']['sec-websocket-protocol'] == 'chat', 'key' - self.ws.frame_write(sock, self.ws.OP_TEXT, message) - frame = self.ws.frame_read(sock) - assert message == frame['data'].decode('utf-8'), 'mirror 2' +def test_asgi_websockets_mirror(): + client.load('websockets/mirror') - sock.close() + message = 'blah' - def test_asgi_websockets_mirror_app_change(self): - self.load('websockets/mirror') + _, sock, _ = ws.upgrade() - message = 'blah' + ws.frame_write(sock, ws.OP_TEXT, message) + frame = ws.frame_read(sock) - _, sock, _ = self.ws.upgrade() + assert message == frame['data'].decode('utf-8'), 'mirror' - self.ws.frame_write(sock, self.ws.OP_TEXT, message) - frame = self.ws.frame_read(sock) + ws.frame_write(sock, ws.OP_TEXT, message) + frame = ws.frame_read(sock) - assert message == frame['data'].decode('utf-8'), 'mirror' + assert message == frame['data'].decode('utf-8'), 'mirror 2' - self.load('websockets/subprotocol') + sock.close() - self.ws.frame_write(sock, self.ws.OP_TEXT, message) - frame = self.ws.frame_read(sock) - assert message == frame['data'].decode('utf-8'), 'mirror 2' +def test_asgi_websockets_mirror_app_change(): + client.load('websockets/mirror') - sock.close() + message = 'blah' - def test_asgi_websockets_no_mask(self): - self.load('websockets/mirror') + _, sock, _ = ws.upgrade() - message = 'blah' + ws.frame_write(sock, ws.OP_TEXT, message) + frame = ws.frame_read(sock) - _, sock, _ = self.ws.upgrade() + assert message == frame['data'].decode('utf-8'), 'mirror' - self.ws.frame_write(sock, self.ws.OP_TEXT, message, mask=False) + client.load('websockets/subprotocol') - frame = self.ws.frame_read(sock) + ws.frame_write(sock, ws.OP_TEXT, message) + frame = ws.frame_read(sock) - assert frame['opcode'] == self.ws.OP_CLOSE, 'no mask opcode' - assert frame['code'] == 1002, 'no mask close code' + assert message == frame['data'].decode('utf-8'), 'mirror 2' - sock.close() + sock.close() - def test_asgi_websockets_fragmentation(self): - self.load('websockets/mirror') - message = 'blah' +def test_asgi_websockets_no_mask(): + client.load('websockets/mirror') - _, sock, _ = self.ws.upgrade() + message = 'blah' - self.ws.frame_write(sock, self.ws.OP_TEXT, message, fin=False) - self.ws.frame_write(sock, self.ws.OP_CONT, ' ', fin=False) - self.ws.frame_write(sock, self.ws.OP_CONT, message) + _, sock, _ = ws.upgrade() - frame = self.ws.frame_read(sock) + ws.frame_write(sock, ws.OP_TEXT, message, mask=False) - assert f'{message} {message}' == frame['data'].decode( - 'utf-8' - ), 'mirror framing' + frame = ws.frame_read(sock) - sock.close() + assert frame['opcode'] == ws.OP_CLOSE, 'no mask opcode' + assert frame['code'] == 1002, 'no mask close code' - def test_asgi_websockets_length_long(self): - self.load('websockets/mirror') + sock.close() - _, sock, _ = self.ws.upgrade() - self.ws.frame_write(sock, self.ws.OP_TEXT, 'fragment1', fin=False) - self.ws.frame_write( - sock, self.ws.OP_CONT, 'fragment2', length=2**64 - 1 - ) +def test_asgi_websockets_fragmentation(): + client.load('websockets/mirror') - self.check_close(sock, 1009) # 1009 - CLOSE_TOO_LARGE + message = 'blah' - def test_asgi_websockets_frame_fragmentation_invalid(self): - self.load('websockets/mirror') + _, sock, _ = ws.upgrade() - message = 'blah' + ws.frame_write(sock, ws.OP_TEXT, message, fin=False) + ws.frame_write(sock, ws.OP_CONT, ' ', fin=False) + ws.frame_write(sock, ws.OP_CONT, message) - _, sock, _ = self.ws.upgrade() + frame = ws.frame_read(sock) - self.ws.frame_write(sock, self.ws.OP_PING, message, fin=False) + assert f'{message} {message}' == frame['data'].decode( + 'utf-8' + ), 'mirror framing' - frame = self.ws.frame_read(sock) + sock.close() - frame.pop('data') - assert frame == { - 'fin': True, - 'rsv1': False, - 'rsv2': False, - 'rsv3': False, - 'opcode': self.ws.OP_CLOSE, - 'mask': 0, - 'code': 1002, - 'reason': 'Fragmented control frame', - }, 'close frame' - sock.close() +def test_asgi_websockets_length_long(): + client.load('websockets/mirror') - def test_asgi_websockets_large(self): - self.load('websockets/mirror') + _, sock, _ = ws.upgrade() - message = '0123456789' * 300 + ws.frame_write(sock, ws.OP_TEXT, 'fragment1', fin=False) + ws.frame_write(sock, ws.OP_CONT, 'fragment2', length=2**64 - 1) - _, sock, _ = self.ws.upgrade() + check_close(sock, 1009) # 1009 - CLOSE_TOO_LARGE - self.ws.frame_write(sock, self.ws.OP_TEXT, message) - frame = self.ws.frame_read(sock) - data = frame['data'].decode('utf-8') +def test_asgi_websockets_frame_fragmentation_invalid(): + client.load('websockets/mirror') - frame = self.ws.frame_read(sock) - data += frame['data'].decode('utf-8') + message = 'blah' - assert message == data, 'large' + _, sock, _ = ws.upgrade() - sock.close() + ws.frame_write(sock, ws.OP_PING, message, fin=False) - def test_asgi_websockets_two_clients(self): - self.load('websockets/mirror') + frame = ws.frame_read(sock) - message1 = 'blah1' - message2 = 'blah2' + frame.pop('data') + assert frame == { + 'fin': True, + 'rsv1': False, + 'rsv2': False, + 'rsv3': False, + 'opcode': ws.OP_CLOSE, + 'mask': 0, + 'code': 1002, + 'reason': 'Fragmented control frame', + }, 'close frame' - _, sock1, _ = self.ws.upgrade() - _, sock2, _ = self.ws.upgrade() + sock.close() - self.ws.frame_write(sock1, self.ws.OP_TEXT, message1) - self.ws.frame_write(sock2, self.ws.OP_TEXT, message2) - frame1 = self.ws.frame_read(sock1) - frame2 = self.ws.frame_read(sock2) +def test_asgi_websockets_large(): + client.load('websockets/mirror') - assert message1 == frame1['data'].decode('utf-8'), 'client 1' - assert message2 == frame2['data'].decode('utf-8'), 'client 2' + message = '0123456789' * 300 - sock1.close() - sock2.close() + _, sock, _ = ws.upgrade() - @pytest.mark.skip('not yet') - def test_asgi_websockets_handshake_upgrade_absent( - self, - ): # FAIL https://tools.ietf.org/html/rfc6455#section-4.2.1 - self.load('websockets/mirror') + ws.frame_write(sock, ws.OP_TEXT, message) - resp = self.get( - headers={ - 'Host': 'localhost', - 'Connection': 'Upgrade', - 'Sec-WebSocket-Key': self.ws.key(), - 'Sec-WebSocket-Protocol': 'chat', - 'Sec-WebSocket-Version': 13, - }, - ) + frame = ws.frame_read(sock) + data = frame['data'].decode('utf-8') - assert resp['status'] == 400, 'upgrade absent' + frame = ws.frame_read(sock) + data += frame['data'].decode('utf-8') - def test_asgi_websockets_handshake_case_insensitive(self): - self.load('websockets/mirror') + assert message == data, 'large' - resp, sock, _ = self.ws.upgrade( - headers={ - 'Host': 'localhost', - 'Upgrade': 'WEBSOCKET', - 'Connection': 'UPGRADE', - 'Sec-WebSocket-Key': self.ws.key(), - 'Sec-WebSocket-Protocol': 'chat', - 'Sec-WebSocket-Version': 13, - } - ) - sock.close() + sock.close() - assert resp['status'] == 101, 'status' - - @pytest.mark.skip('not yet') - def test_asgi_websockets_handshake_connection_absent(self): # FAIL - self.load('websockets/mirror') - - resp = self.get( - headers={ - 'Host': 'localhost', - 'Upgrade': 'websocket', - 'Sec-WebSocket-Key': self.ws.key(), - 'Sec-WebSocket-Protocol': 'chat', - 'Sec-WebSocket-Version': 13, - }, - ) - - assert resp['status'] == 400, 'status' - - def test_asgi_websockets_handshake_version_absent(self): - self.load('websockets/mirror') - - resp = self.get( - headers={ - 'Host': 'localhost', - 'Upgrade': 'websocket', - 'Connection': 'Upgrade', - 'Sec-WebSocket-Key': self.ws.key(), - 'Sec-WebSocket-Protocol': 'chat', - }, - ) - - assert resp['status'] == 426, 'status' - - @pytest.mark.skip('not yet') - def test_asgi_websockets_handshake_key_invalid(self): - self.load('websockets/mirror') - - resp = self.get( - headers={ - 'Host': 'localhost', - 'Upgrade': 'websocket', - 'Connection': 'Upgrade', - 'Sec-WebSocket-Key': '!', - 'Sec-WebSocket-Protocol': 'chat', - 'Sec-WebSocket-Version': 13, - }, - ) - - assert resp['status'] == 400, 'key length' - - key = self.ws.key() - resp = self.get( - headers={ - 'Host': 'localhost', - 'Upgrade': 'websocket', - 'Connection': 'Upgrade', - 'Sec-WebSocket-Key': [key, key], - 'Sec-WebSocket-Protocol': 'chat', - 'Sec-WebSocket-Version': 13, - }, - ) - - assert ( - resp['status'] == 400 - ), 'key double' # FAIL https://tools.ietf.org/html/rfc6455#section-11.3.1 - - def test_asgi_websockets_handshake_method_invalid(self): - self.load('websockets/mirror') - - resp = self.post( - headers={ - 'Host': 'localhost', - 'Upgrade': 'websocket', - 'Connection': 'Upgrade', - 'Sec-WebSocket-Key': self.ws.key(), - 'Sec-WebSocket-Protocol': 'chat', - 'Sec-WebSocket-Version': 13, - }, - ) - - assert resp['status'] == 400, 'status' - - def test_asgi_websockets_handshake_http_10(self): - self.load('websockets/mirror') - - resp = self.get( - headers={ - 'Host': 'localhost', - 'Upgrade': 'websocket', - 'Connection': 'Upgrade', - 'Sec-WebSocket-Key': self.ws.key(), - 'Sec-WebSocket-Protocol': 'chat', - 'Sec-WebSocket-Version': 13, - }, - http_10=True, - ) - - assert resp['status'] == 400, 'status' - - def test_asgi_websockets_handshake_uri_invalid(self): - self.load('websockets/mirror') - - resp = self.get( - headers={ - 'Host': 'localhost', - 'Upgrade': 'websocket', - 'Connection': 'Upgrade', - 'Sec-WebSocket-Key': self.ws.key(), - 'Sec-WebSocket-Protocol': 'chat', - 'Sec-WebSocket-Version': 13, - }, - url='!', - ) - - assert resp['status'] == 400, 'status' - - def test_asgi_websockets_protocol_absent(self): - self.load('websockets/mirror') - - key = self.ws.key() - resp, sock, _ = self.ws.upgrade( - headers={ - 'Host': 'localhost', - 'Upgrade': 'websocket', - 'Connection': 'Upgrade', - 'Sec-WebSocket-Key': key, - 'Sec-WebSocket-Version': 13, - } - ) - sock.close() - assert resp['status'] == 101, 'status' - assert resp['headers']['Upgrade'] == 'websocket', 'upgrade' - assert resp['headers']['Connection'] == 'Upgrade', 'connection' - assert resp['headers']['Sec-WebSocket-Accept'] == self.ws.accept( - key - ), 'key' +def test_asgi_websockets_two_clients(): + client.load('websockets/mirror') - # autobahn-testsuite - # - # Some following tests fail because of Unit does not support UTF-8 - # validation for websocket frames. It should be implemented - # by application, if necessary. + message1 = 'blah1' + message2 = 'blah2' - def test_asgi_websockets_1_1_1__1_1_8(self): - self.load('websockets/mirror') + _, sock1, _ = ws.upgrade() + _, sock2, _ = ws.upgrade() - opcode = self.ws.OP_TEXT + ws.frame_write(sock1, ws.OP_TEXT, message1) + ws.frame_write(sock2, ws.OP_TEXT, message2) - _, sock, _ = self.ws.upgrade() + frame1 = ws.frame_read(sock1) + frame2 = ws.frame_read(sock2) - def check_length(length, chopsize=None): - payload = '*' * length + assert message1 == frame1['data'].decode('utf-8'), 'client 1' + assert message2 == frame2['data'].decode('utf-8'), 'client 2' - self.ws.frame_write(sock, opcode, payload, chopsize=chopsize) + sock1.close() + sock2.close() - frame = self.ws.frame_read(sock) - self.check_frame(frame, True, opcode, payload) - check_length(0) # 1_1_1 - check_length(125) # 1_1_2 - check_length(126) # 1_1_3 - check_length(127) # 1_1_4 - check_length(128) # 1_1_5 - check_length(65535) # 1_1_6 - check_length(65536) # 1_1_7 - check_length(65536, chopsize=997) # 1_1_8 +# FAIL https://tools.ietf.org/html/rfc6455#section-4.2.1 +@pytest.mark.skip('not yet') +def test_asgi_websockets_handshake_upgrade_absent(): + client.load('websockets/mirror') - self.close_connection(sock) + resp = client.get( + headers={ + 'Host': 'localhost', + 'Connection': 'Upgrade', + 'Sec-WebSocket-Key': ws.key(), + 'Sec-WebSocket-Protocol': 'chat', + 'Sec-WebSocket-Version': 13, + }, + ) - def test_asgi_websockets_1_2_1__1_2_8(self): - self.load('websockets/mirror') + assert resp['status'] == 400, 'upgrade absent' - opcode = self.ws.OP_BINARY - _, sock, _ = self.ws.upgrade() +def test_asgi_websockets_handshake_case_insensitive(): + client.load('websockets/mirror') - def check_length(length, chopsize=None): - payload = b'\xfe' * length + resp, sock, _ = ws.upgrade( + headers={ + 'Host': 'localhost', + 'Upgrade': 'WEBSOCKET', + 'Connection': 'UPGRADE', + 'Sec-WebSocket-Key': ws.key(), + 'Sec-WebSocket-Protocol': 'chat', + 'Sec-WebSocket-Version': 13, + } + ) + sock.close() + + assert resp['status'] == 101, 'status' + + +@pytest.mark.skip('not yet') +def test_asgi_websockets_handshake_connection_absent(): # FAIL + client.load('websockets/mirror') + + resp = client.get( + headers={ + 'Host': 'localhost', + 'Upgrade': 'websocket', + 'Sec-WebSocket-Key': ws.key(), + 'Sec-WebSocket-Protocol': 'chat', + 'Sec-WebSocket-Version': 13, + }, + ) + + assert resp['status'] == 400, 'status' + + +def test_asgi_websockets_handshake_version_absent(): + client.load('websockets/mirror') + + resp = client.get( + headers={ + 'Host': 'localhost', + 'Upgrade': 'websocket', + 'Connection': 'Upgrade', + 'Sec-WebSocket-Key': ws.key(), + 'Sec-WebSocket-Protocol': 'chat', + }, + ) + + assert resp['status'] == 426, 'status' + + +@pytest.mark.skip('not yet') +def test_asgi_websockets_handshake_key_invalid(): + client.load('websockets/mirror') + + resp = client.get( + headers={ + 'Host': 'localhost', + 'Upgrade': 'websocket', + 'Connection': 'Upgrade', + 'Sec-WebSocket-Key': '!', + 'Sec-WebSocket-Protocol': 'chat', + 'Sec-WebSocket-Version': 13, + }, + ) + + assert resp['status'] == 400, 'key length' + + key = ws.key() + resp = client.get( + headers={ + 'Host': 'localhost', + 'Upgrade': 'websocket', + 'Connection': 'Upgrade', + 'Sec-WebSocket-Key': [key, key], + 'Sec-WebSocket-Protocol': 'chat', + 'Sec-WebSocket-Version': 13, + }, + ) + + assert ( + resp['status'] == 400 + ), 'key double' # FAIL https://tools.ietf.org/html/rfc6455#section-11.3.1 + + +def test_asgi_websockets_handshake_method_invalid(): + client.load('websockets/mirror') + + resp = client.post( + headers={ + 'Host': 'localhost', + 'Upgrade': 'websocket', + 'Connection': 'Upgrade', + 'Sec-WebSocket-Key': ws.key(), + 'Sec-WebSocket-Protocol': 'chat', + 'Sec-WebSocket-Version': 13, + }, + ) + + assert resp['status'] == 400, 'status' + + +def test_asgi_websockets_handshake_http_10(): + client.load('websockets/mirror') + + resp = client.get( + headers={ + 'Host': 'localhost', + 'Upgrade': 'websocket', + 'Connection': 'Upgrade', + 'Sec-WebSocket-Key': ws.key(), + 'Sec-WebSocket-Protocol': 'chat', + 'Sec-WebSocket-Version': 13, + }, + http_10=True, + ) + + assert resp['status'] == 400, 'status' + + +def test_asgi_websockets_handshake_uri_invalid(): + client.load('websockets/mirror') + + resp = client.get( + headers={ + 'Host': 'localhost', + 'Upgrade': 'websocket', + 'Connection': 'Upgrade', + 'Sec-WebSocket-Key': ws.key(), + 'Sec-WebSocket-Protocol': 'chat', + 'Sec-WebSocket-Version': 13, + }, + url='!', + ) + + assert resp['status'] == 400, 'status' + + +def test_asgi_websockets_protocol_absent(): + client.load('websockets/mirror') + + key = ws.key() + resp, sock, _ = ws.upgrade( + headers={ + 'Host': 'localhost', + 'Upgrade': 'websocket', + 'Connection': 'Upgrade', + 'Sec-WebSocket-Key': key, + 'Sec-WebSocket-Version': 13, + } + ) + sock.close() - self.ws.frame_write(sock, opcode, payload, chopsize=chopsize) - frame = self.ws.frame_read(sock) + assert resp['status'] == 101, 'status' + assert resp['headers']['Upgrade'] == 'websocket', 'upgrade' + assert resp['headers']['Connection'] == 'Upgrade', 'connection' + assert resp['headers']['Sec-WebSocket-Accept'] == ws.accept(key), 'key' - self.check_frame(frame, True, opcode, payload) - check_length(0) # 1_2_1 - check_length(125) # 1_2_2 - check_length(126) # 1_2_3 - check_length(127) # 1_2_4 - check_length(128) # 1_2_5 - check_length(65535) # 1_2_6 - check_length(65536) # 1_2_7 - check_length(65536, chopsize=997) # 1_2_8 +# autobahn-testsuite +# +# Some following tests fail because of Unit does not support UTF-8 +# validation for websocket frames. It should be implemented +# by application, if necessary. - self.close_connection(sock) - def test_asgi_websockets_2_1__2_6(self): - self.load('websockets/mirror') +def test_asgi_websockets_1_1_1__1_1_8(): + client.load('websockets/mirror') - op_ping = self.ws.OP_PING - op_pong = self.ws.OP_PONG + opcode = ws.OP_TEXT - _, sock, _ = self.ws.upgrade() + _, sock, _ = ws.upgrade() - def check_ping(payload, chopsize=None, decode=True): - self.ws.frame_write(sock, op_ping, payload, chopsize=chopsize) - frame = self.ws.frame_read(sock) + def check_length(length, chopsize=None): + payload = '*' * length - self.check_frame(frame, True, op_pong, payload, decode=decode) + ws.frame_write(sock, opcode, payload, chopsize=chopsize) - check_ping('') # 2_1 - check_ping('Hello, world!') # 2_2 - check_ping(b'\x00\xff\xfe\xfd\xfc\xfb\x00\xff', decode=False) # 2_3 - check_ping(b'\xfe' * 125, decode=False) # 2_4 - check_ping(b'\xfe' * 125, chopsize=1, decode=False) # 2_6 + frame = ws.frame_read(sock) + check_frame(frame, True, opcode, payload) - self.close_connection(sock) + check_length(0) # 1_1_1 + check_length(125) # 1_1_2 + check_length(126) # 1_1_3 + check_length(127) # 1_1_4 + check_length(128) # 1_1_5 + check_length(65535) # 1_1_6 + check_length(65536) # 1_1_7 + check_length(65536, chopsize=997) # 1_1_8 - # 2_5 + close_connection(sock) - _, sock, _ = self.ws.upgrade() - self.ws.frame_write(sock, self.ws.OP_PING, b'\xfe' * 126) - self.check_close(sock, 1002) +def test_asgi_websockets_1_2_1__1_2_8(): + client.load('websockets/mirror') - def test_asgi_websockets_2_7__2_9(self): - self.load('websockets/mirror') + opcode = ws.OP_BINARY - # 2_7 + _, sock, _ = ws.upgrade() - _, sock, _ = self.ws.upgrade() + def check_length(length, chopsize=None): + payload = b'\xfe' * length - self.ws.frame_write(sock, self.ws.OP_PONG, '') - assert self.recvall(sock, read_timeout=0.1) == b'', '2_7' + ws.frame_write(sock, opcode, payload, chopsize=chopsize) + frame = ws.frame_read(sock) - # 2_8 + check_frame(frame, True, opcode, payload) - self.ws.frame_write(sock, self.ws.OP_PONG, 'unsolicited pong payload') - assert self.recvall(sock, read_timeout=0.1) == b'', '2_8' + check_length(0) # 1_2_1 + check_length(125) # 1_2_2 + check_length(126) # 1_2_3 + check_length(127) # 1_2_4 + check_length(128) # 1_2_5 + check_length(65535) # 1_2_6 + check_length(65536) # 1_2_7 + check_length(65536, chopsize=997) # 1_2_8 - # 2_9 + close_connection(sock) - payload = 'ping payload' - self.ws.frame_write(sock, self.ws.OP_PONG, 'unsolicited pong payload') - self.ws.frame_write(sock, self.ws.OP_PING, payload) +def test_asgi_websockets_2_1__2_6(): + client.load('websockets/mirror') - frame = self.ws.frame_read(sock) - self.check_frame(frame, True, self.ws.OP_PONG, payload) + op_ping = ws.OP_PING + op_pong = ws.OP_PONG - self.close_connection(sock) + _, sock, _ = ws.upgrade() - def test_asgi_websockets_2_10__2_11(self): - self.load('websockets/mirror') + def check_ping(payload, chopsize=None, decode=True): + ws.frame_write(sock, op_ping, payload, chopsize=chopsize) + frame = ws.frame_read(sock) - # 2_10 + check_frame(frame, True, op_pong, payload, decode=decode) - _, sock, _ = self.ws.upgrade() + check_ping('') # 2_1 + check_ping('Hello, world!') # 2_2 + check_ping(b'\x00\xff\xfe\xfd\xfc\xfb\x00\xff', decode=False) # 2_3 + check_ping(b'\xfe' * 125, decode=False) # 2_4 + check_ping(b'\xfe' * 125, chopsize=1, decode=False) # 2_6 - for i in range(0, 10): - self.ws.frame_write(sock, self.ws.OP_PING, f'payload-{i}') + close_connection(sock) - for i in range(0, 10): - frame = self.ws.frame_read(sock) - self.check_frame(frame, True, self.ws.OP_PONG, f'payload-{i}') + # 2_5 - # 2_11 + _, sock, _ = ws.upgrade() - for i in range(0, 10): - opcode = self.ws.OP_PING - self.ws.frame_write(sock, opcode, f'payload-{i}', chopsize=1) + ws.frame_write(sock, ws.OP_PING, b'\xfe' * 126) + check_close(sock, 1002) - for i in range(0, 10): - frame = self.ws.frame_read(sock) - self.check_frame(frame, True, self.ws.OP_PONG, f'payload-{i}') - self.close_connection(sock) +def test_asgi_websockets_2_7__2_9(): + client.load('websockets/mirror') - @pytest.mark.skip('not yet') - def test_asgi_websockets_3_1__3_7(self): - self.load('websockets/mirror') + # 2_7 - payload = 'Hello, world!' + _, sock, _ = ws.upgrade() - # 3_1 + ws.frame_write(sock, ws.OP_PONG, '') + assert client.recvall(sock, read_timeout=0.1) == b'', '2_7' - _, sock, _ = self.ws.upgrade() + # 2_8 - self.ws.frame_write(sock, self.ws.OP_TEXT, payload, rsv1=True) - self.check_close(sock, 1002) + ws.frame_write(sock, ws.OP_PONG, 'unsolicited pong payload') + assert client.recvall(sock, read_timeout=0.1) == b'', '2_8' - # 3_2 + # 2_9 - _, sock, _ = self.ws.upgrade() + payload = 'ping payload' - self.ws.frame_write(sock, self.ws.OP_TEXT, payload) - self.ws.frame_write(sock, self.ws.OP_TEXT, payload, rsv2=True) - self.ws.frame_write(sock, self.ws.OP_PING, '') + ws.frame_write(sock, ws.OP_PONG, 'unsolicited pong payload') + ws.frame_write(sock, ws.OP_PING, payload) - frame = self.ws.frame_read(sock) - self.check_frame(frame, True, self.ws.OP_TEXT, payload) + frame = ws.frame_read(sock) + check_frame(frame, True, ws.OP_PONG, payload) - self.check_close(sock, 1002, no_close=True) + close_connection(sock) - assert self.recvall(sock, read_timeout=0.1) == b'', 'empty 3_2' - sock.close() - # 3_3 +def test_asgi_websockets_2_10__2_11(): + client.load('websockets/mirror') - _, sock, _ = self.ws.upgrade() + # 2_10 - self.ws.frame_write(sock, self.ws.OP_TEXT, payload) + _, sock, _ = ws.upgrade() - frame = self.ws.frame_read(sock) - self.check_frame(frame, True, self.ws.OP_TEXT, payload) + for i in range(0, 10): + ws.frame_write(sock, ws.OP_PING, f'payload-{i}') - self.ws.frame_write( - sock, self.ws.OP_TEXT, payload, rsv1=True, rsv2=True - ) + for i in range(0, 10): + frame = ws.frame_read(sock) + check_frame(frame, True, ws.OP_PONG, f'payload-{i}') - self.check_close(sock, 1002, no_close=True) + # 2_11 - assert self.recvall(sock, read_timeout=0.1) == b'', 'empty 3_3' - sock.close() + for i in range(0, 10): + opcode = ws.OP_PING + ws.frame_write(sock, opcode, f'payload-{i}', chopsize=1) - # 3_4 + for i in range(0, 10): + frame = ws.frame_read(sock) + check_frame(frame, True, ws.OP_PONG, f'payload-{i}') - _, sock, _ = self.ws.upgrade() + close_connection(sock) - self.ws.frame_write(sock, self.ws.OP_TEXT, payload, chopsize=1) - self.ws.frame_write( - sock, self.ws.OP_TEXT, payload, rsv3=True, chopsize=1 - ) - self.ws.frame_write(sock, self.ws.OP_PING, '') - frame = self.ws.frame_read(sock) - self.check_frame(frame, True, self.ws.OP_TEXT, payload) +@pytest.mark.skip('not yet') +def test_asgi_websockets_3_1__3_7(): + client.load('websockets/mirror') - self.check_close(sock, 1002, no_close=True) + payload = 'Hello, world!' - assert self.recvall(sock, read_timeout=0.1) == b'', 'empty 3_4' - sock.close() + # 3_1 - # 3_5 + _, sock, _ = ws.upgrade() - _, sock, _ = self.ws.upgrade() + ws.frame_write(sock, ws.OP_TEXT, payload, rsv1=True) + check_close(sock, 1002) - self.ws.frame_write( - sock, - self.ws.OP_BINARY, - b'\x00\xff\xfe\xfd\xfc\xfb\x00\xff', - rsv1=True, - rsv3=True, - ) + # 3_2 - self.check_close(sock, 1002) + _, sock, _ = ws.upgrade() - # 3_6 + ws.frame_write(sock, ws.OP_TEXT, payload) + ws.frame_write(sock, ws.OP_TEXT, payload, rsv2=True) + ws.frame_write(sock, ws.OP_PING, '') - _, sock, _ = self.ws.upgrade() + frame = ws.frame_read(sock) + check_frame(frame, True, ws.OP_TEXT, payload) - self.ws.frame_write( - sock, self.ws.OP_PING, payload, rsv2=True, rsv3=True - ) + check_close(sock, 1002, no_close=True) - self.check_close(sock, 1002) + assert client.recvall(sock, read_timeout=0.1) == b'', 'empty 3_2' + sock.close() - # 3_7 + # 3_3 - _, sock, _ = self.ws.upgrade() + _, sock, _ = ws.upgrade() - self.ws.frame_write( - sock, self.ws.OP_CLOSE, payload, rsv1=True, rsv2=True, rsv3=True - ) + ws.frame_write(sock, ws.OP_TEXT, payload) - self.check_close(sock, 1002) + frame = ws.frame_read(sock) + check_frame(frame, True, ws.OP_TEXT, payload) - def test_asgi_websockets_4_1_1__4_2_5(self): - self.load('websockets/mirror') + ws.frame_write(sock, ws.OP_TEXT, payload, rsv1=True, rsv2=True) - payload = 'Hello, world!' + check_close(sock, 1002, no_close=True) - # 4_1_1 + assert client.recvall(sock, read_timeout=0.1) == b'', 'empty 3_3' + sock.close() - _, sock, _ = self.ws.upgrade() + # 3_4 - self.ws.frame_write(sock, 0x03, '') - self.check_close(sock, 1002) + _, sock, _ = ws.upgrade() - # 4_1_2 + ws.frame_write(sock, ws.OP_TEXT, payload, chopsize=1) + ws.frame_write(sock, ws.OP_TEXT, payload, rsv3=True, chopsize=1) + ws.frame_write(sock, ws.OP_PING, '') - _, sock, _ = self.ws.upgrade() + frame = ws.frame_read(sock) + check_frame(frame, True, ws.OP_TEXT, payload) - self.ws.frame_write(sock, 0x04, 'reserved opcode payload') - self.check_close(sock, 1002) + check_close(sock, 1002, no_close=True) - # 4_1_3 + assert client.recvall(sock, read_timeout=0.1) == b'', 'empty 3_4' + sock.close() - _, sock, _ = self.ws.upgrade() + # 3_5 - self.ws.frame_write(sock, self.ws.OP_TEXT, payload) + _, sock, _ = ws.upgrade() - frame = self.ws.frame_read(sock) - self.check_frame(frame, True, self.ws.OP_TEXT, payload) + ws.frame_write( + sock, + ws.OP_BINARY, + b'\x00\xff\xfe\xfd\xfc\xfb\x00\xff', + rsv1=True, + rsv3=True, + ) - self.ws.frame_write(sock, 0x05, '') - self.ws.frame_write(sock, self.ws.OP_PING, '') + check_close(sock, 1002) - self.check_close(sock, 1002) + # 3_6 - # 4_1_4 + _, sock, _ = ws.upgrade() - _, sock, _ = self.ws.upgrade() + ws.frame_write(sock, ws.OP_PING, payload, rsv2=True, rsv3=True) - self.ws.frame_write(sock, self.ws.OP_TEXT, payload) + check_close(sock, 1002) - frame = self.ws.frame_read(sock) - self.check_frame(frame, True, self.ws.OP_TEXT, payload) + # 3_7 - self.ws.frame_write(sock, 0x06, payload) - self.ws.frame_write(sock, self.ws.OP_PING, '') + _, sock, _ = ws.upgrade() - self.check_close(sock, 1002) + ws.frame_write(sock, ws.OP_CLOSE, payload, rsv1=True, rsv2=True, rsv3=True) - # 4_1_5 + check_close(sock, 1002) - _, sock, _ = self.ws.upgrade() - self.ws.frame_write(sock, self.ws.OP_TEXT, payload, chopsize=1) +def test_asgi_websockets_4_1_1__4_2_5(): + client.load('websockets/mirror') - frame = self.ws.frame_read(sock) - self.check_frame(frame, True, self.ws.OP_TEXT, payload) + payload = 'Hello, world!' - self.ws.frame_write(sock, 0x07, payload, chopsize=1) - self.ws.frame_write(sock, self.ws.OP_PING, '') + # 4_1_1 - self.check_close(sock, 1002) + _, sock, _ = ws.upgrade() - # 4_2_1 + ws.frame_write(sock, 0x03, '') + check_close(sock, 1002) - _, sock, _ = self.ws.upgrade() + # 4_1_2 - self.ws.frame_write(sock, 0x0B, '') - self.check_close(sock, 1002) + _, sock, _ = ws.upgrade() - # 4_2_2 + ws.frame_write(sock, 0x04, 'reserved opcode payload') + check_close(sock, 1002) - _, sock, _ = self.ws.upgrade() + # 4_1_3 - self.ws.frame_write(sock, 0x0C, 'reserved opcode payload') - self.check_close(sock, 1002) + _, sock, _ = ws.upgrade() - # 4_2_3 + ws.frame_write(sock, ws.OP_TEXT, payload) - _, sock, _ = self.ws.upgrade() + frame = ws.frame_read(sock) + check_frame(frame, True, ws.OP_TEXT, payload) - self.ws.frame_write(sock, self.ws.OP_TEXT, payload) + ws.frame_write(sock, 0x05, '') + ws.frame_write(sock, ws.OP_PING, '') - frame = self.ws.frame_read(sock) - self.check_frame(frame, True, self.ws.OP_TEXT, payload) + check_close(sock, 1002) - self.ws.frame_write(sock, 0x0D, '') - self.ws.frame_write(sock, self.ws.OP_PING, '') + # 4_1_4 - self.check_close(sock, 1002) + _, sock, _ = ws.upgrade() - # 4_2_4 + ws.frame_write(sock, ws.OP_TEXT, payload) - _, sock, _ = self.ws.upgrade() + frame = ws.frame_read(sock) + check_frame(frame, True, ws.OP_TEXT, payload) - self.ws.frame_write(sock, self.ws.OP_TEXT, payload) + ws.frame_write(sock, 0x06, payload) + ws.frame_write(sock, ws.OP_PING, '') - frame = self.ws.frame_read(sock) - self.check_frame(frame, True, self.ws.OP_TEXT, payload) + check_close(sock, 1002) - self.ws.frame_write(sock, 0x0E, payload) - self.ws.frame_write(sock, self.ws.OP_PING, '') + # 4_1_5 - self.check_close(sock, 1002) + _, sock, _ = ws.upgrade() - # 4_2_5 + ws.frame_write(sock, ws.OP_TEXT, payload, chopsize=1) - _, sock, _ = self.ws.upgrade() + frame = ws.frame_read(sock) + check_frame(frame, True, ws.OP_TEXT, payload) - self.ws.frame_write(sock, self.ws.OP_TEXT, payload, chopsize=1) + ws.frame_write(sock, 0x07, payload, chopsize=1) + ws.frame_write(sock, ws.OP_PING, '') - frame = self.ws.frame_read(sock) - self.check_frame(frame, True, self.ws.OP_TEXT, payload) + check_close(sock, 1002) - self.ws.frame_write(sock, 0x0F, payload, chopsize=1) - self.ws.frame_write(sock, self.ws.OP_PING, '') + # 4_2_1 - self.check_close(sock, 1002) + _, sock, _ = ws.upgrade() - def test_asgi_websockets_5_1__5_20(self): - self.load('websockets/mirror') + ws.frame_write(sock, 0x0B, '') + check_close(sock, 1002) - # 5_1 + # 4_2_2 - _, sock, _ = self.ws.upgrade() + _, sock, _ = ws.upgrade() - self.ws.frame_write(sock, self.ws.OP_PING, 'fragment1', fin=False) - self.ws.frame_write(sock, self.ws.OP_CONT, 'fragment2', fin=True) - self.check_close(sock, 1002) + ws.frame_write(sock, 0x0C, 'reserved opcode payload') + check_close(sock, 1002) - # 5_2 + # 4_2_3 - _, sock, _ = self.ws.upgrade() + _, sock, _ = ws.upgrade() - self.ws.frame_write(sock, self.ws.OP_PONG, 'fragment1', fin=False) - self.ws.frame_write(sock, self.ws.OP_CONT, 'fragment2', fin=True) - self.check_close(sock, 1002) + ws.frame_write(sock, ws.OP_TEXT, payload) - # 5_3 + frame = ws.frame_read(sock) + check_frame(frame, True, ws.OP_TEXT, payload) - _, sock, _ = self.ws.upgrade() + ws.frame_write(sock, 0x0D, '') + ws.frame_write(sock, ws.OP_PING, '') - self.ws.frame_write(sock, self.ws.OP_TEXT, 'fragment1', fin=False) - self.ws.frame_write(sock, self.ws.OP_CONT, 'fragment2', fin=True) + check_close(sock, 1002) - frame = self.ws.frame_read(sock) - self.check_frame(frame, True, self.ws.OP_TEXT, 'fragment1fragment2') + # 4_2_4 - # 5_4 + _, sock, _ = ws.upgrade() - self.ws.frame_write(sock, self.ws.OP_TEXT, 'fragment1', fin=False) - assert self.recvall(sock, read_timeout=0.1) == b'', '5_4' - self.ws.frame_write(sock, self.ws.OP_CONT, 'fragment2', fin=True) + ws.frame_write(sock, ws.OP_TEXT, payload) - frame = self.ws.frame_read(sock) - self.check_frame(frame, True, self.ws.OP_TEXT, 'fragment1fragment2') + frame = ws.frame_read(sock) + check_frame(frame, True, ws.OP_TEXT, payload) - # 5_5 + ws.frame_write(sock, 0x0E, payload) + ws.frame_write(sock, ws.OP_PING, '') - self.ws.frame_write( - sock, self.ws.OP_TEXT, 'fragment1', fin=False, chopsize=1 - ) - self.ws.frame_write( - sock, self.ws.OP_CONT, 'fragment2', fin=True, chopsize=1 - ) + check_close(sock, 1002) - frame = self.ws.frame_read(sock) - self.check_frame(frame, True, self.ws.OP_TEXT, 'fragment1fragment2') + # 4_2_5 - # 5_6 + _, sock, _ = ws.upgrade() - ping_payload = 'ping payload' + ws.frame_write(sock, ws.OP_TEXT, payload, chopsize=1) - self.ws.frame_write(sock, self.ws.OP_TEXT, 'fragment1', fin=False) - self.ws.frame_write(sock, self.ws.OP_PING, ping_payload) - self.ws.frame_write(sock, self.ws.OP_CONT, 'fragment2', fin=True) + frame = ws.frame_read(sock) + check_frame(frame, True, ws.OP_TEXT, payload) - frame = self.ws.frame_read(sock) - self.check_frame(frame, True, self.ws.OP_PONG, ping_payload) + ws.frame_write(sock, 0x0F, payload, chopsize=1) + ws.frame_write(sock, ws.OP_PING, '') - frame = self.ws.frame_read(sock) - self.check_frame(frame, True, self.ws.OP_TEXT, 'fragment1fragment2') + check_close(sock, 1002) - # 5_7 - ping_payload = 'ping payload' +def test_asgi_websockets_5_1__5_20(): + client.load('websockets/mirror') - self.ws.frame_write(sock, self.ws.OP_TEXT, 'fragment1', fin=False) - assert self.recvall(sock, read_timeout=0.1) == b'', '5_7' + # 5_1 - self.ws.frame_write(sock, self.ws.OP_PING, ping_payload) + _, sock, _ = ws.upgrade() - frame = self.ws.frame_read(sock) - self.check_frame(frame, True, self.ws.OP_PONG, ping_payload) + ws.frame_write(sock, ws.OP_PING, 'fragment1', fin=False) + ws.frame_write(sock, ws.OP_CONT, 'fragment2', fin=True) + check_close(sock, 1002) - self.ws.frame_write(sock, self.ws.OP_CONT, 'fragment2', fin=True) + # 5_2 - frame = self.ws.frame_read(sock) - self.check_frame(frame, True, self.ws.OP_TEXT, 'fragment1fragment2') + _, sock, _ = ws.upgrade() - # 5_8 + ws.frame_write(sock, ws.OP_PONG, 'fragment1', fin=False) + ws.frame_write(sock, ws.OP_CONT, 'fragment2', fin=True) + check_close(sock, 1002) - ping_payload = 'ping payload' + # 5_3 - self.ws.frame_write( - sock, self.ws.OP_TEXT, 'fragment1', fin=False, chopsize=1 - ) - self.ws.frame_write(sock, self.ws.OP_PING, ping_payload, chopsize=1) - self.ws.frame_write( - sock, self.ws.OP_CONT, 'fragment2', fin=True, chopsize=1 - ) + _, sock, _ = ws.upgrade() - frame = self.ws.frame_read(sock) - self.check_frame(frame, True, self.ws.OP_PONG, ping_payload) + ws.frame_write(sock, ws.OP_TEXT, 'fragment1', fin=False) + ws.frame_write(sock, ws.OP_CONT, 'fragment2', fin=True) - frame = self.ws.frame_read(sock) - self.check_frame(frame, True, self.ws.OP_TEXT, 'fragment1fragment2') + frame = ws.frame_read(sock) + check_frame(frame, True, ws.OP_TEXT, 'fragment1fragment2') - # 5_9 + # 5_4 - self.ws.frame_write( - sock, self.ws.OP_CONT, 'non-continuation payload', fin=True - ) - self.ws.frame_write(sock, self.ws.OP_TEXT, 'Hello, world!', fin=True) - self.check_close(sock, 1002) + ws.frame_write(sock, ws.OP_TEXT, 'fragment1', fin=False) + assert client.recvall(sock, read_timeout=0.1) == b'', '5_4' + ws.frame_write(sock, ws.OP_CONT, 'fragment2', fin=True) - # 5_10 + frame = ws.frame_read(sock) + check_frame(frame, True, ws.OP_TEXT, 'fragment1fragment2') - _, sock, _ = self.ws.upgrade() + # 5_5 - self.ws.frame_write( - sock, self.ws.OP_CONT, 'non-continuation payload', fin=True - ) - self.ws.frame_write(sock, self.ws.OP_TEXT, 'Hello, world!', fin=True) - self.check_close(sock, 1002) + ws.frame_write(sock, ws.OP_TEXT, 'fragment1', fin=False, chopsize=1) + ws.frame_write(sock, ws.OP_CONT, 'fragment2', fin=True, chopsize=1) - # 5_11 + frame = ws.frame_read(sock) + check_frame(frame, True, ws.OP_TEXT, 'fragment1fragment2') - _, sock, _ = self.ws.upgrade() + # 5_6 - self.ws.frame_write( - sock, - self.ws.OP_CONT, - 'non-continuation payload', - fin=True, - chopsize=1, - ) - self.ws.frame_write( - sock, self.ws.OP_TEXT, 'Hello, world!', fin=True, chopsize=1 - ) - self.check_close(sock, 1002) + ping_payload = 'ping payload' - # 5_12 + ws.frame_write(sock, ws.OP_TEXT, 'fragment1', fin=False) + ws.frame_write(sock, ws.OP_PING, ping_payload) + ws.frame_write(sock, ws.OP_CONT, 'fragment2', fin=True) - _, sock, _ = self.ws.upgrade() + frame = ws.frame_read(sock) + check_frame(frame, True, ws.OP_PONG, ping_payload) - self.ws.frame_write( - sock, self.ws.OP_CONT, 'non-continuation payload', fin=False - ) - self.ws.frame_write(sock, self.ws.OP_TEXT, 'Hello, world!', fin=True) - self.check_close(sock, 1002) + frame = ws.frame_read(sock) + check_frame(frame, True, ws.OP_TEXT, 'fragment1fragment2') - # 5_13 + # 5_7 - _, sock, _ = self.ws.upgrade() + ping_payload = 'ping payload' - self.ws.frame_write( - sock, self.ws.OP_CONT, 'non-continuation payload', fin=False - ) - self.ws.frame_write(sock, self.ws.OP_TEXT, 'Hello, world!', fin=True) - self.check_close(sock, 1002) + ws.frame_write(sock, ws.OP_TEXT, 'fragment1', fin=False) + assert client.recvall(sock, read_timeout=0.1) == b'', '5_7' - # 5_14 + ws.frame_write(sock, ws.OP_PING, ping_payload) - _, sock, _ = self.ws.upgrade() + frame = ws.frame_read(sock) + check_frame(frame, True, ws.OP_PONG, ping_payload) - self.ws.frame_write( - sock, - self.ws.OP_CONT, - 'non-continuation payload', - fin=False, - chopsize=1, - ) - self.ws.frame_write( - sock, self.ws.OP_TEXT, 'Hello, world!', fin=True, chopsize=1 - ) - self.check_close(sock, 1002) + ws.frame_write(sock, ws.OP_CONT, 'fragment2', fin=True) - # 5_15 + frame = ws.frame_read(sock) + check_frame(frame, True, ws.OP_TEXT, 'fragment1fragment2') - _, sock, _ = self.ws.upgrade() + # 5_8 - self.ws.frame_write(sock, self.ws.OP_TEXT, 'fragment1', fin=False) - self.ws.frame_write(sock, self.ws.OP_CONT, 'fragment2', fin=True) - self.ws.frame_write(sock, self.ws.OP_CONT, 'fragment3', fin=False) - self.ws.frame_write(sock, self.ws.OP_TEXT, 'fragment4', fin=True) + ping_payload = 'ping payload' - frame = self.ws.frame_read(sock) + ws.frame_write(sock, ws.OP_TEXT, 'fragment1', fin=False, chopsize=1) + ws.frame_write(sock, ws.OP_PING, ping_payload, chopsize=1) + ws.frame_write(sock, ws.OP_CONT, 'fragment2', fin=True, chopsize=1) - if frame['opcode'] == self.ws.OP_TEXT: - self.check_frame(frame, True, self.ws.OP_TEXT, 'fragment1fragment2') - frame = None + frame = ws.frame_read(sock) + check_frame(frame, True, ws.OP_PONG, ping_payload) - self.check_close(sock, 1002, frame=frame) + frame = ws.frame_read(sock) + check_frame(frame, True, ws.OP_TEXT, 'fragment1fragment2') - # 5_16 + # 5_9 - _, sock, _ = self.ws.upgrade() + ws.frame_write(sock, ws.OP_CONT, 'non-continuation payload', fin=True) + ws.frame_write(sock, ws.OP_TEXT, 'Hello, world!', fin=True) + check_close(sock, 1002) - for i in range(0, 2): - self.ws.frame_write(sock, self.ws.OP_CONT, 'fragment1', fin=False) - self.ws.frame_write(sock, self.ws.OP_TEXT, 'fragment2', fin=False) - self.ws.frame_write(sock, self.ws.OP_CONT, 'fragment3', fin=True) - self.check_close(sock, 1002) + # 5_10 - # 5_17 + _, sock, _ = ws.upgrade() - _, sock, _ = self.ws.upgrade() + ws.frame_write(sock, ws.OP_CONT, 'non-continuation payload', fin=True) + ws.frame_write(sock, ws.OP_TEXT, 'Hello, world!', fin=True) + check_close(sock, 1002) - for i in range(0, 2): - self.ws.frame_write(sock, self.ws.OP_CONT, 'fragment1', fin=True) - self.ws.frame_write(sock, self.ws.OP_TEXT, 'fragment2', fin=False) - self.ws.frame_write(sock, self.ws.OP_CONT, 'fragment3', fin=True) - self.check_close(sock, 1002) + # 5_11 - # 5_18 + _, sock, _ = ws.upgrade() - _, sock, _ = self.ws.upgrade() + ws.frame_write( + sock, + ws.OP_CONT, + 'non-continuation payload', + fin=True, + chopsize=1, + ) + ws.frame_write(sock, ws.OP_TEXT, 'Hello, world!', fin=True, chopsize=1) + check_close(sock, 1002) - self.ws.frame_write(sock, self.ws.OP_TEXT, 'fragment1', fin=False) - self.ws.frame_write(sock, self.ws.OP_TEXT, 'fragment2') - self.check_close(sock, 1002) + # 5_12 - # 5_19 + _, sock, _ = ws.upgrade() - _, sock, _ = self.ws.upgrade() + ws.frame_write(sock, ws.OP_CONT, 'non-continuation payload', fin=False) + ws.frame_write(sock, ws.OP_TEXT, 'Hello, world!', fin=True) + check_close(sock, 1002) - self.ws.frame_write(sock, self.ws.OP_TEXT, 'fragment1', fin=False) - self.ws.frame_write(sock, self.ws.OP_CONT, 'fragment2', fin=False) - self.ws.frame_write(sock, self.ws.OP_PING, 'pongme 1!') + # 5_13 - time.sleep(1) + _, sock, _ = ws.upgrade() - self.ws.frame_write(sock, self.ws.OP_CONT, 'fragment3', fin=False) - self.ws.frame_write(sock, self.ws.OP_CONT, 'fragment4', fin=False) - self.ws.frame_write(sock, self.ws.OP_PING, 'pongme 2!') - self.ws.frame_write(sock, self.ws.OP_CONT, 'fragment5') + ws.frame_write(sock, ws.OP_CONT, 'non-continuation payload', fin=False) + ws.frame_write(sock, ws.OP_TEXT, 'Hello, world!', fin=True) + check_close(sock, 1002) - frame = self.ws.frame_read(sock) - self.check_frame(frame, True, self.ws.OP_PONG, 'pongme 1!') + # 5_14 - frame = self.ws.frame_read(sock) - self.check_frame(frame, True, self.ws.OP_PONG, 'pongme 2!') + _, sock, _ = ws.upgrade() - self.check_frame( - self.ws.frame_read(sock), - True, - self.ws.OP_TEXT, - 'fragment1fragment2fragment3fragment4fragment5', - ) + ws.frame_write( + sock, + ws.OP_CONT, + 'non-continuation payload', + fin=False, + chopsize=1, + ) + ws.frame_write(sock, ws.OP_TEXT, 'Hello, world!', fin=True, chopsize=1) + check_close(sock, 1002) - # 5_20 + # 5_15 - self.ws.frame_write(sock, self.ws.OP_TEXT, 'fragment1', fin=False) - self.ws.frame_write(sock, self.ws.OP_CONT, 'fragment2', fin=False) - self.ws.frame_write(sock, self.ws.OP_PING, 'pongme 1!') + _, sock, _ = ws.upgrade() - frame = self.ws.frame_read(sock) - self.check_frame(frame, True, self.ws.OP_PONG, 'pongme 1!') + ws.frame_write(sock, ws.OP_TEXT, 'fragment1', fin=False) + ws.frame_write(sock, ws.OP_CONT, 'fragment2', fin=True) + ws.frame_write(sock, ws.OP_CONT, 'fragment3', fin=False) + ws.frame_write(sock, ws.OP_TEXT, 'fragment4', fin=True) - time.sleep(1) + frame = ws.frame_read(sock) - self.ws.frame_write(sock, self.ws.OP_CONT, 'fragment3', fin=False) - self.ws.frame_write(sock, self.ws.OP_CONT, 'fragment4', fin=False) - self.ws.frame_write(sock, self.ws.OP_PING, 'pongme 2!') + if frame['opcode'] == ws.OP_TEXT: + check_frame(frame, True, ws.OP_TEXT, 'fragment1fragment2') + frame = None - frame = self.ws.frame_read(sock) - self.check_frame(frame, True, self.ws.OP_PONG, 'pongme 2!') + check_close(sock, 1002, frame=frame) - assert self.recvall(sock, read_timeout=0.1) == b'', '5_20' - self.ws.frame_write(sock, self.ws.OP_CONT, 'fragment5') + # 5_16 - self.check_frame( - self.ws.frame_read(sock), - True, - self.ws.OP_TEXT, - 'fragment1fragment2fragment3fragment4fragment5', - ) + _, sock, _ = ws.upgrade() - self.close_connection(sock) + for _ in range(0, 2): + ws.frame_write(sock, ws.OP_CONT, 'fragment1', fin=False) + ws.frame_write(sock, ws.OP_TEXT, 'fragment2', fin=False) + ws.frame_write(sock, ws.OP_CONT, 'fragment3', fin=True) + check_close(sock, 1002) - def test_asgi_websockets_6_1_1__6_4_4(self): - self.load('websockets/mirror') + # 5_17 - # 6_1_1 + _, sock, _ = ws.upgrade() - _, sock, _ = self.ws.upgrade() + for _ in range(0, 2): + ws.frame_write(sock, ws.OP_CONT, 'fragment1', fin=True) + ws.frame_write(sock, ws.OP_TEXT, 'fragment2', fin=False) + ws.frame_write(sock, ws.OP_CONT, 'fragment3', fin=True) + check_close(sock, 1002) - self.ws.frame_write(sock, self.ws.OP_TEXT, '') - frame = self.ws.frame_read(sock) - self.check_frame(frame, True, self.ws.OP_TEXT, '') + # 5_18 - # 6_1_2 + _, sock, _ = ws.upgrade() - self.ws.frame_write(sock, self.ws.OP_TEXT, '', fin=False) - self.ws.frame_write(sock, self.ws.OP_CONT, '', fin=False) - self.ws.frame_write(sock, self.ws.OP_CONT, '') + ws.frame_write(sock, ws.OP_TEXT, 'fragment1', fin=False) + ws.frame_write(sock, ws.OP_TEXT, 'fragment2') + check_close(sock, 1002) - frame = self.ws.frame_read(sock) - self.check_frame(frame, True, self.ws.OP_TEXT, '') + # 5_19 - # 6_1_3 + _, sock, _ = ws.upgrade() - payload = 'middle frame payload' + ws.frame_write(sock, ws.OP_TEXT, 'fragment1', fin=False) + ws.frame_write(sock, ws.OP_CONT, 'fragment2', fin=False) + ws.frame_write(sock, ws.OP_PING, 'pongme 1!') - self.ws.frame_write(sock, self.ws.OP_TEXT, '', fin=False) - self.ws.frame_write(sock, self.ws.OP_CONT, payload, fin=False) - self.ws.frame_write(sock, self.ws.OP_CONT, '') + time.sleep(1) - frame = self.ws.frame_read(sock) - self.check_frame(frame, True, self.ws.OP_TEXT, payload) + ws.frame_write(sock, ws.OP_CONT, 'fragment3', fin=False) + ws.frame_write(sock, ws.OP_CONT, 'fragment4', fin=False) + ws.frame_write(sock, ws.OP_PING, 'pongme 2!') + ws.frame_write(sock, ws.OP_CONT, 'fragment5') - # 6_2_1 + frame = ws.frame_read(sock) + check_frame(frame, True, ws.OP_PONG, 'pongme 1!') - payload = 'Hello-µ@ßöäüàá-UTF-8!!' + frame = ws.frame_read(sock) + check_frame(frame, True, ws.OP_PONG, 'pongme 2!') - self.ws.frame_write(sock, self.ws.OP_TEXT, payload) + check_frame( + ws.frame_read(sock), + True, + ws.OP_TEXT, + 'fragment1fragment2fragment3fragment4fragment5', + ) - frame = self.ws.frame_read(sock) - self.check_frame(frame, True, self.ws.OP_TEXT, payload) + # 5_20 - # 6_2_2 + ws.frame_write(sock, ws.OP_TEXT, 'fragment1', fin=False) + ws.frame_write(sock, ws.OP_CONT, 'fragment2', fin=False) + ws.frame_write(sock, ws.OP_PING, 'pongme 1!') - self.ws.frame_write(sock, self.ws.OP_TEXT, payload[:12], fin=False) - self.ws.frame_write(sock, self.ws.OP_CONT, payload[12:]) + frame = ws.frame_read(sock) + check_frame(frame, True, ws.OP_PONG, 'pongme 1!') - frame = self.ws.frame_read(sock) - self.check_frame(frame, True, self.ws.OP_TEXT, payload) + time.sleep(1) - # 6_2_3 + ws.frame_write(sock, ws.OP_CONT, 'fragment3', fin=False) + ws.frame_write(sock, ws.OP_CONT, 'fragment4', fin=False) + ws.frame_write(sock, ws.OP_PING, 'pongme 2!') - self.ws.message(sock, self.ws.OP_TEXT, payload, fragmention_size=1) + frame = ws.frame_read(sock) + check_frame(frame, True, ws.OP_PONG, 'pongme 2!') - frame = self.ws.frame_read(sock) - self.check_frame(frame, True, self.ws.OP_TEXT, payload) + assert client.recvall(sock, read_timeout=0.1) == b'', '5_20' + ws.frame_write(sock, ws.OP_CONT, 'fragment5') - # 6_2_4 + check_frame( + ws.frame_read(sock), + True, + ws.OP_TEXT, + 'fragment1fragment2fragment3fragment4fragment5', + ) - payload = '\xce\xba\xe1\xbd\xb9\xcf\x83\xce\xbc\xce\xb5' + close_connection(sock) - self.ws.message(sock, self.ws.OP_TEXT, payload, fragmention_size=1) - frame = self.ws.frame_read(sock) - self.check_frame(frame, True, self.ws.OP_TEXT, payload) +def test_asgi_websockets_6_1_1__6_4_4(): + client.load('websockets/mirror') - self.close_connection(sock) + # 6_1_1 - # Unit does not support UTF-8 validation - # - # # 6_3_1 FAIL - # - # payload_1 = '\xce\xba\xe1\xbd\xb9\xcf\x83\xce\xbc\xce\xb5' - # payload_2 = '\xed\xa0\x80' - # payload_3 = '\x65\x64\x69\x74\x65\x64' - # - # payload = payload_1 + payload_2 + payload_3 - # - # self.ws.message(sock, self.ws.OP_TEXT, payload) - # self.check_close(sock, 1007) - # - # # 6_3_2 FAIL - # - # _, sock, _ = self.ws.upgrade() - # - # self.ws.message(sock, self.ws.OP_TEXT, payload, fragmention_size=1) - # self.check_close(sock, 1007) - # - # # 6_4_1 ... 6_4_4 FAIL + _, sock, _ = ws.upgrade() - def test_asgi_websockets_7_1_1__7_5_1(self): - self.load('websockets/mirror') + ws.frame_write(sock, ws.OP_TEXT, '') + frame = ws.frame_read(sock) + check_frame(frame, True, ws.OP_TEXT, '') - # 7_1_1 + # 6_1_2 - _, sock, _ = self.ws.upgrade() + ws.frame_write(sock, ws.OP_TEXT, '', fin=False) + ws.frame_write(sock, ws.OP_CONT, '', fin=False) + ws.frame_write(sock, ws.OP_CONT, '') - payload = "Hello World!" + frame = ws.frame_read(sock) + check_frame(frame, True, ws.OP_TEXT, '') - self.ws.frame_write(sock, self.ws.OP_TEXT, payload) + # 6_1_3 - frame = self.ws.frame_read(sock) - self.check_frame(frame, True, self.ws.OP_TEXT, payload) + payload = 'middle frame payload' - self.close_connection(sock) + ws.frame_write(sock, ws.OP_TEXT, '', fin=False) + ws.frame_write(sock, ws.OP_CONT, payload, fin=False) + ws.frame_write(sock, ws.OP_CONT, '') - # 7_1_2 + frame = ws.frame_read(sock) + check_frame(frame, True, ws.OP_TEXT, payload) - _, sock, _ = self.ws.upgrade() + # 6_2_1 - self.ws.frame_write(sock, self.ws.OP_CLOSE, self.ws.serialize_close()) - self.ws.frame_write(sock, self.ws.OP_CLOSE, self.ws.serialize_close()) + payload = 'Hello-µ@ßöäüàá-UTF-8!!' - self.check_close(sock) + ws.frame_write(sock, ws.OP_TEXT, payload) - # 7_1_3 + frame = ws.frame_read(sock) + check_frame(frame, True, ws.OP_TEXT, payload) - _, sock, _ = self.ws.upgrade() + # 6_2_2 - self.ws.frame_write(sock, self.ws.OP_CLOSE, self.ws.serialize_close()) - self.check_close(sock, no_close=True) + ws.frame_write(sock, ws.OP_TEXT, payload[:12], fin=False) + ws.frame_write(sock, ws.OP_CONT, payload[12:]) - self.ws.frame_write(sock, self.ws.OP_PING, '') - assert self.recvall(sock, read_timeout=0.1) == b'', 'empty soc' + frame = ws.frame_read(sock) + check_frame(frame, True, ws.OP_TEXT, payload) - sock.close() + # 6_2_3 - # 7_1_4 + ws.message(sock, ws.OP_TEXT, payload, fragmention_size=1) - _, sock, _ = self.ws.upgrade() + frame = ws.frame_read(sock) + check_frame(frame, True, ws.OP_TEXT, payload) - self.ws.frame_write(sock, self.ws.OP_CLOSE, self.ws.serialize_close()) - self.check_close(sock, no_close=True) + # 6_2_4 - self.ws.frame_write(sock, self.ws.OP_TEXT, payload) - assert self.recvall(sock, read_timeout=0.1) == b'', 'empty soc' + payload = '\xce\xba\xe1\xbd\xb9\xcf\x83\xce\xbc\xce\xb5' - sock.close() + ws.message(sock, ws.OP_TEXT, payload, fragmention_size=1) - # 7_1_5 + frame = ws.frame_read(sock) + check_frame(frame, True, ws.OP_TEXT, payload) - _, sock, _ = self.ws.upgrade() + close_connection(sock) - self.ws.frame_write(sock, self.ws.OP_TEXT, 'fragment1', fin=False) - self.ws.frame_write(sock, self.ws.OP_CLOSE, self.ws.serialize_close()) - self.check_close(sock, no_close=True) - self.ws.frame_write(sock, self.ws.OP_CONT, 'fragment2') - assert self.recvall(sock, read_timeout=0.1) == b'', 'empty soc' +# Unit does not support UTF-8 validation +# +# # 6_3_1 FAIL +# +# payload_1 = '\xce\xba\xe1\xbd\xb9\xcf\x83\xce\xbc\xce\xb5' +# payload_2 = '\xed\xa0\x80' +# payload_3 = '\x65\x64\x69\x74\x65\x64' +# +# payload = payload_1 + payload_2 + payload_3 +# +# ws.message(sock, ws.OP_TEXT, payload) +# check_close(sock, 1007) +# +# # 6_3_2 FAIL +# +# _, sock, _ = ws.upgrade() +# +# ws.message(sock, ws.OP_TEXT, payload, fragmention_size=1) +# check_close(sock, 1007) +# +# # 6_4_1 ... 6_4_4 FAIL - sock.close() - # 7_1_6 +def test_asgi_websockets_7_1_1__7_5_1(): + client.load('websockets/mirror') - _, sock, _ = self.ws.upgrade() + # 7_1_1 - self.ws.frame_write(sock, self.ws.OP_TEXT, 'BAsd7&jh23' * 26 * 2**10) - self.ws.frame_write(sock, self.ws.OP_TEXT, payload) - self.ws.frame_write(sock, self.ws.OP_CLOSE, self.ws.serialize_close()) + _, sock, _ = ws.upgrade() - self.recvall(sock, read_timeout=1) + payload = "Hello World!" - self.ws.frame_write(sock, self.ws.OP_PING, '') - assert self.recvall(sock, read_timeout=0.1) == b'', 'empty soc' + ws.frame_write(sock, ws.OP_TEXT, payload) - sock.close() + frame = ws.frame_read(sock) + check_frame(frame, True, ws.OP_TEXT, payload) + + close_connection(sock) + + # 7_1_2 + + _, sock, _ = ws.upgrade() + + ws.frame_write(sock, ws.OP_CLOSE, ws.serialize_close()) + ws.frame_write(sock, ws.OP_CLOSE, ws.serialize_close()) + + check_close(sock) + + # 7_1_3 + + _, sock, _ = ws.upgrade() + + ws.frame_write(sock, ws.OP_CLOSE, ws.serialize_close()) + check_close(sock, no_close=True) + + ws.frame_write(sock, ws.OP_PING, '') + assert client.recvall(sock, read_timeout=0.1) == b'', 'empty soc' - # 7_3_1 + sock.close() - _, sock, _ = self.ws.upgrade() + # 7_1_4 - self.ws.frame_write(sock, self.ws.OP_CLOSE, '') - self.check_close(sock) + _, sock, _ = ws.upgrade() - # 7_3_2 + ws.frame_write(sock, ws.OP_CLOSE, ws.serialize_close()) + check_close(sock, no_close=True) - _, sock, _ = self.ws.upgrade() + ws.frame_write(sock, ws.OP_TEXT, payload) + assert client.recvall(sock, read_timeout=0.1) == b'', 'empty soc' - self.ws.frame_write(sock, self.ws.OP_CLOSE, 'a') - self.check_close(sock, 1002) + sock.close() - # 7_3_3 + # 7_1_5 - _, sock, _ = self.ws.upgrade() + _, sock, _ = ws.upgrade() - self.ws.frame_write(sock, self.ws.OP_CLOSE, self.ws.serialize_close()) - self.check_close(sock) + ws.frame_write(sock, ws.OP_TEXT, 'fragment1', fin=False) + ws.frame_write(sock, ws.OP_CLOSE, ws.serialize_close()) + check_close(sock, no_close=True) - # 7_3_4 + ws.frame_write(sock, ws.OP_CONT, 'fragment2') + assert client.recvall(sock, read_timeout=0.1) == b'', 'empty soc' - _, sock, _ = self.ws.upgrade() + sock.close() - payload = self.ws.serialize_close(reason='Hello World!') + # 7_1_6 - self.ws.frame_write(sock, self.ws.OP_CLOSE, payload) - self.check_close(sock) + _, sock, _ = ws.upgrade() - # 7_3_5 + ws.frame_write(sock, ws.OP_TEXT, 'BAsd7&jh23' * 26 * 2**10) + ws.frame_write(sock, ws.OP_TEXT, payload) + ws.frame_write(sock, ws.OP_CLOSE, ws.serialize_close()) - _, sock, _ = self.ws.upgrade() + client.recvall(sock, read_timeout=1) - payload = self.ws.serialize_close(reason='*' * 123) + ws.frame_write(sock, ws.OP_PING, '') + assert client.recvall(sock, read_timeout=0.1) == b'', 'empty soc' - self.ws.frame_write(sock, self.ws.OP_CLOSE, payload) - self.check_close(sock) + sock.close() - # 7_3_6 + # 7_3_1 - _, sock, _ = self.ws.upgrade() + _, sock, _ = ws.upgrade() - payload = self.ws.serialize_close(reason='*' * 124) + ws.frame_write(sock, ws.OP_CLOSE, '') + check_close(sock) - self.ws.frame_write(sock, self.ws.OP_CLOSE, payload) - self.check_close(sock, 1002) + # 7_3_2 - # # 7_5_1 FAIL Unit does not support UTF-8 validation - # - # _, sock, _ = self.ws.upgrade() - # - # payload = self.ws.serialize_close(reason = '\xce\xba\xe1\xbd\xb9\xcf' \ - # '\x83\xce\xbc\xce\xb5\xed\xa0\x80\x65\x64\x69\x74\x65\x64') - # - # self.ws.frame_write(sock, self.ws.OP_CLOSE, payload) - # self.check_close(sock, 1007) + _, sock, _ = ws.upgrade() - def test_asgi_websockets_7_7_X__7_9_X(self): - self.load('websockets/mirror') + ws.frame_write(sock, ws.OP_CLOSE, 'a') + check_close(sock, 1002) - valid_codes = [ - 1000, - 1001, - 1002, - 1003, - 1007, - 1008, - 1009, - 1010, - 1011, - 3000, - 3999, - 4000, - 4999, - ] + # 7_3_3 - invalid_codes = [0, 999, 1004, 1005, 1006, 1016, 1100, 2000, 2999] + _, sock, _ = ws.upgrade() - for code in valid_codes: - _, sock, _ = self.ws.upgrade() + ws.frame_write(sock, ws.OP_CLOSE, ws.serialize_close()) + check_close(sock) - payload = self.ws.serialize_close(code=code) + # 7_3_4 - self.ws.frame_write(sock, self.ws.OP_CLOSE, payload) - self.check_close(sock) + _, sock, _ = ws.upgrade() - for code in invalid_codes: - _, sock, _ = self.ws.upgrade() + payload = ws.serialize_close(reason='Hello World!') - payload = self.ws.serialize_close(code=code) + ws.frame_write(sock, ws.OP_CLOSE, payload) + check_close(sock) - self.ws.frame_write(sock, self.ws.OP_CLOSE, payload) - self.check_close(sock, 1002) + # 7_3_5 - def test_asgi_websockets_7_13_1__7_13_2(self): - self.load('websockets/mirror') + _, sock, _ = ws.upgrade() - # 7_13_1 + payload = ws.serialize_close(reason='*' * 123) - _, sock, _ = self.ws.upgrade() + ws.frame_write(sock, ws.OP_CLOSE, payload) + check_close(sock) - payload = self.ws.serialize_close(code=5000) + # 7_3_6 - self.ws.frame_write(sock, self.ws.OP_CLOSE, payload) - self.check_close(sock, 1002) + _, sock, _ = ws.upgrade() - # 7_13_2 + payload = ws.serialize_close(reason='*' * 124) - _, sock, _ = self.ws.upgrade() + ws.frame_write(sock, ws.OP_CLOSE, payload) + check_close(sock, 1002) - payload = struct.pack('!I', 65536) + ''.encode('utf-8') - self.ws.frame_write(sock, self.ws.OP_CLOSE, payload) - self.check_close(sock, 1002) +# # 7_5_1 FAIL Unit does not support UTF-8 validation +# +# _, sock, _ = ws.upgrade() +# +# payload = ws.serialize_close(reason = '\xce\xba\xe1\xbd\xb9\xcf' \ +# '\x83\xce\xbc\xce\xb5\xed\xa0\x80\x65\x64\x69\x74\x65\x64') +# +# ws.frame_write(sock, ws.OP_CLOSE, payload) +# check_close(sock, 1007) - def test_asgi_websockets_9_1_1__9_6_6(self, is_unsafe): - if not is_unsafe: - pytest.skip('unsafe, long run') - self.load('websockets/mirror') +def test_asgi_websockets_7_7_X__7_9_X(): + client.load('websockets/mirror') - assert 'success' in self.conf( - { - 'http': { - 'websocket': { - 'max_frame_size': 33554432, - 'keepalive_interval': 0, - } + valid_codes = [ + 1000, + 1001, + 1002, + 1003, + 1007, + 1008, + 1009, + 1010, + 1011, + 3000, + 3999, + 4000, + 4999, + ] + + invalid_codes = [0, 999, 1004, 1005, 1006, 1016, 1100, 2000, 2999] + + for code in valid_codes: + _, sock, _ = ws.upgrade() + + payload = ws.serialize_close(code=code) + + ws.frame_write(sock, ws.OP_CLOSE, payload) + check_close(sock) + + for code in invalid_codes: + _, sock, _ = ws.upgrade() + + payload = ws.serialize_close(code=code) + + ws.frame_write(sock, ws.OP_CLOSE, payload) + check_close(sock, 1002) + + +def test_asgi_websockets_7_13_1__7_13_2(): + client.load('websockets/mirror') + + # 7_13_1 + + _, sock, _ = ws.upgrade() + + payload = ws.serialize_close(code=5000) + + ws.frame_write(sock, ws.OP_CLOSE, payload) + check_close(sock, 1002) + + # 7_13_2 + + _, sock, _ = ws.upgrade() + + payload = struct.pack('!I', 65536) + ''.encode('utf-8') + + ws.frame_write(sock, ws.OP_CLOSE, payload) + check_close(sock, 1002) + + +def test_asgi_websockets_9_1_1__9_6_6(is_unsafe, system): + if not is_unsafe: + pytest.skip('unsafe, long run') + + client.load('websockets/mirror') + + assert 'success' in client.conf( + { + 'http': { + 'websocket': { + 'max_frame_size': 33554432, + 'keepalive_interval': 0, } - }, - 'settings', - ), 'increase max_frame_size and keepalive_interval' - - _, sock, _ = self.ws.upgrade() - - op_text = self.ws.OP_TEXT - op_binary = self.ws.OP_BINARY - - def check_payload(opcode, length, chopsize=None): - if opcode == self.ws.OP_TEXT: - payload = '*' * length - else: - payload = b'*' * length + } + }, + 'settings', + ), 'increase max_frame_size and keepalive_interval' - self.ws.frame_write(sock, opcode, payload, chopsize=chopsize) - frame = self.ws.frame_read(sock, read_timeout=5) - self.check_frame(frame, True, opcode, payload) + _, sock, _ = ws.upgrade() - def check_message(opcode, f_size): - if opcode == self.ws.OP_TEXT: - payload = '*' * 4 * 2**20 - else: - payload = b'*' * 4 * 2**20 + op_text = ws.OP_TEXT + op_binary = ws.OP_BINARY - self.ws.message(sock, opcode, payload, fragmention_size=f_size) - frame = self.ws.frame_read(sock, read_timeout=5) - self.check_frame(frame, True, opcode, payload) + def check_payload(opcode, length, chopsize=None): + if opcode == ws.OP_TEXT: + payload = '*' * length + else: + payload = b'*' * length - check_payload(op_text, 64 * 2**10) # 9_1_1 - check_payload(op_text, 256 * 2**10) # 9_1_2 - check_payload(op_text, 2**20) # 9_1_3 - check_payload(op_text, 4 * 2**20) # 9_1_4 - check_payload(op_text, 8 * 2**20) # 9_1_5 - check_payload(op_text, 16 * 2**20) # 9_1_6 + ws.frame_write(sock, opcode, payload, chopsize=chopsize) + frame = ws.frame_read(sock, read_timeout=5) + check_frame(frame, True, opcode, payload) - check_payload(op_binary, 64 * 2**10) # 9_2_1 - check_payload(op_binary, 256 * 2**10) # 9_2_2 - check_payload(op_binary, 2**20) # 9_2_3 - check_payload(op_binary, 4 * 2**20) # 9_2_4 - check_payload(op_binary, 8 * 2**20) # 9_2_5 - check_payload(op_binary, 16 * 2**20) # 9_2_6 + def check_message(opcode, f_size): + if opcode == ws.OP_TEXT: + payload = '*' * 4 * 2**20 + else: + payload = b'*' * 4 * 2**20 - if option.system != 'Darwin' and option.system != 'FreeBSD': - check_message(op_text, 64) # 9_3_1 - check_message(op_text, 256) # 9_3_2 - check_message(op_text, 2**10) # 9_3_3 - check_message(op_text, 4 * 2**10) # 9_3_4 - check_message(op_text, 16 * 2**10) # 9_3_5 - check_message(op_text, 64 * 2**10) # 9_3_6 - check_message(op_text, 256 * 2**10) # 9_3_7 - check_message(op_text, 2**20) # 9_3_8 - check_message(op_text, 4 * 2**20) # 9_3_9 + ws.message(sock, opcode, payload, fragmention_size=f_size) + frame = ws.frame_read(sock, read_timeout=5) + check_frame(frame, True, opcode, payload) - check_message(op_binary, 64) # 9_4_1 - check_message(op_binary, 256) # 9_4_2 - check_message(op_binary, 2**10) # 9_4_3 - check_message(op_binary, 4 * 2**10) # 9_4_4 - check_message(op_binary, 16 * 2**10) # 9_4_5 - check_message(op_binary, 64 * 2**10) # 9_4_6 - check_message(op_binary, 256 * 2**10) # 9_4_7 - check_message(op_binary, 2**20) # 9_4_8 - check_message(op_binary, 4 * 2**20) # 9_4_9 + check_payload(op_text, 64 * 2**10) # 9_1_1 + check_payload(op_text, 256 * 2**10) # 9_1_2 + check_payload(op_text, 2**20) # 9_1_3 + check_payload(op_text, 4 * 2**20) # 9_1_4 + check_payload(op_text, 8 * 2**20) # 9_1_5 + check_payload(op_text, 16 * 2**20) # 9_1_6 - check_payload(op_text, 2**20, chopsize=64) # 9_5_1 - check_payload(op_text, 2**20, chopsize=128) # 9_5_2 - check_payload(op_text, 2**20, chopsize=256) # 9_5_3 - check_payload(op_text, 2**20, chopsize=512) # 9_5_4 - check_payload(op_text, 2**20, chopsize=1024) # 9_5_5 - check_payload(op_text, 2**20, chopsize=2048) # 9_5_6 + check_payload(op_binary, 64 * 2**10) # 9_2_1 + check_payload(op_binary, 256 * 2**10) # 9_2_2 + check_payload(op_binary, 2**20) # 9_2_3 + check_payload(op_binary, 4 * 2**20) # 9_2_4 + check_payload(op_binary, 8 * 2**20) # 9_2_5 + check_payload(op_binary, 16 * 2**20) # 9_2_6 - check_payload(op_binary, 2**20, chopsize=64) # 9_6_1 - check_payload(op_binary, 2**20, chopsize=128) # 9_6_2 - check_payload(op_binary, 2**20, chopsize=256) # 9_6_3 - check_payload(op_binary, 2**20, chopsize=512) # 9_6_4 - check_payload(op_binary, 2**20, chopsize=1024) # 9_6_5 - check_payload(op_binary, 2**20, chopsize=2048) # 9_6_6 + if system not in ['Darwin', 'FreeBSD']: + check_message(op_text, 64) # 9_3_1 + check_message(op_text, 256) # 9_3_2 + check_message(op_text, 2**10) # 9_3_3 + check_message(op_text, 4 * 2**10) # 9_3_4 + check_message(op_text, 16 * 2**10) # 9_3_5 + check_message(op_text, 64 * 2**10) # 9_3_6 + check_message(op_text, 256 * 2**10) # 9_3_7 + check_message(op_text, 2**20) # 9_3_8 + check_message(op_text, 4 * 2**20) # 9_3_9 - self.close_connection(sock) + check_message(op_binary, 64) # 9_4_1 + check_message(op_binary, 256) # 9_4_2 + check_message(op_binary, 2**10) # 9_4_3 + check_message(op_binary, 4 * 2**10) # 9_4_4 + check_message(op_binary, 16 * 2**10) # 9_4_5 + check_message(op_binary, 64 * 2**10) # 9_4_6 + check_message(op_binary, 256 * 2**10) # 9_4_7 + check_message(op_binary, 2**20) # 9_4_8 + check_message(op_binary, 4 * 2**20) # 9_4_9 - def test_asgi_websockets_10_1_1(self): - self.load('websockets/mirror') + check_payload(op_text, 2**20, chopsize=64) # 9_5_1 + check_payload(op_text, 2**20, chopsize=128) # 9_5_2 + check_payload(op_text, 2**20, chopsize=256) # 9_5_3 + check_payload(op_text, 2**20, chopsize=512) # 9_5_4 + check_payload(op_text, 2**20, chopsize=1024) # 9_5_5 + check_payload(op_text, 2**20, chopsize=2048) # 9_5_6 - _, sock, _ = self.ws.upgrade() + check_payload(op_binary, 2**20, chopsize=64) # 9_6_1 + check_payload(op_binary, 2**20, chopsize=128) # 9_6_2 + check_payload(op_binary, 2**20, chopsize=256) # 9_6_3 + check_payload(op_binary, 2**20, chopsize=512) # 9_6_4 + check_payload(op_binary, 2**20, chopsize=1024) # 9_6_5 + check_payload(op_binary, 2**20, chopsize=2048) # 9_6_6 - payload = '*' * 65536 + close_connection(sock) - self.ws.message(sock, self.ws.OP_TEXT, payload, fragmention_size=1300) - frame = self.ws.frame_read(sock) - self.check_frame(frame, True, self.ws.OP_TEXT, payload) +def test_asgi_websockets_10_1_1(): + client.load('websockets/mirror') - self.close_connection(sock) + _, sock, _ = ws.upgrade() - # settings + payload = '*' * 65536 - def test_asgi_websockets_max_frame_size(self): - self.load('websockets/mirror') + ws.message(sock, ws.OP_TEXT, payload, fragmention_size=1300) - assert 'success' in self.conf( - {'http': {'websocket': {'max_frame_size': 100}}}, 'settings' - ), 'configure max_frame_size' + frame = ws.frame_read(sock) + check_frame(frame, True, ws.OP_TEXT, payload) - _, sock, _ = self.ws.upgrade() + close_connection(sock) - payload = '*' * 94 - opcode = self.ws.OP_TEXT - self.ws.frame_write(sock, opcode, payload) # frame length is 100 +# settings - frame = self.ws.frame_read(sock) - self.check_frame(frame, True, opcode, payload) - payload = '*' * 95 +def test_asgi_websockets_max_frame_size(): + client.load('websockets/mirror') - self.ws.frame_write(sock, opcode, payload) # frame length is 101 - self.check_close(sock, 1009) # 1009 - CLOSE_TOO_LARGE + assert 'success' in client.conf( + {'http': {'websocket': {'max_frame_size': 100}}}, 'settings' + ), 'configure max_frame_size' - def test_asgi_websockets_read_timeout(self): - self.load('websockets/mirror') + _, sock, _ = ws.upgrade() - assert 'success' in self.conf( - {'http': {'websocket': {'read_timeout': 5}}}, 'settings' - ), 'configure read_timeout' + payload = '*' * 94 + opcode = ws.OP_TEXT - _, sock, _ = self.ws.upgrade() + ws.frame_write(sock, opcode, payload) # frame length is 100 - frame = self.ws.frame_to_send(self.ws.OP_TEXT, 'blah') - sock.sendall(frame[:2]) + frame = ws.frame_read(sock) + check_frame(frame, True, opcode, payload) - time.sleep(2) + payload = '*' * 95 - self.check_close(sock, 1001) # 1001 - CLOSE_GOING_AWAY + ws.frame_write(sock, opcode, payload) # frame length is 101 + check_close(sock, 1009) # 1009 - CLOSE_TOO_LARGE - def test_asgi_websockets_keepalive_interval(self): - self.load('websockets/mirror') - assert 'success' in self.conf( - {'http': {'websocket': {'keepalive_interval': 5}}}, 'settings' - ), 'configure keepalive_interval' +def test_asgi_websockets_read_timeout(): + client.load('websockets/mirror') - _, sock, _ = self.ws.upgrade() + assert 'success' in client.conf( + {'http': {'websocket': {'read_timeout': 5}}}, 'settings' + ), 'configure read_timeout' - frame = self.ws.frame_to_send(self.ws.OP_TEXT, 'blah') - sock.sendall(frame[:2]) + _, sock, _ = ws.upgrade() - time.sleep(2) + frame = ws.frame_to_send(ws.OP_TEXT, 'blah') + sock.sendall(frame[:2]) - frame = self.ws.frame_read(sock) - self.check_frame(frame, True, self.ws.OP_PING, '') # PING frame + time.sleep(2) - sock.close() + check_close(sock, 1001) # 1001 - CLOSE_GOING_AWAY - def test_asgi_websockets_client_locks_app(self): - self.load('websockets/mirror') - message = 'blah' +def test_asgi_websockets_keepalive_interval(): + client.load('websockets/mirror') - _, sock, _ = self.ws.upgrade() + assert 'success' in client.conf( + {'http': {'websocket': {'keepalive_interval': 5}}}, 'settings' + ), 'configure keepalive_interval' - assert 'success' in self.conf({}), 'remove app' + _, sock, _ = ws.upgrade() - self.ws.frame_write(sock, self.ws.OP_TEXT, message) + frame = ws.frame_to_send(ws.OP_TEXT, 'blah') + sock.sendall(frame[:2]) - frame = self.ws.frame_read(sock) + time.sleep(2) - assert message == frame['data'].decode('utf-8'), 'client' + frame = ws.frame_read(sock) + check_frame(frame, True, ws.OP_PING, '') # PING frame - sock.close() + sock.close() + + +def test_asgi_websockets_client_locks_app(): + client.load('websockets/mirror') + + message = 'blah' + + _, sock, _ = ws.upgrade() + + assert 'success' in client.conf({}), 'remove app' + + ws.frame_write(sock, ws.OP_TEXT, message) + + frame = ws.frame_read(sock) + + assert message == frame['data'].decode('utf-8'), 'client' + + sock.close() |