summaryrefslogtreecommitdiffhomepage
path: root/test/unit/http.py
diff options
context:
space:
mode:
Diffstat (limited to 'test/unit/http.py')
-rw-r--r--test/unit/http.py162
1 files changed, 162 insertions, 0 deletions
diff --git a/test/unit/http.py b/test/unit/http.py
new file mode 100644
index 00000000..1ce86e5a
--- /dev/null
+++ b/test/unit/http.py
@@ -0,0 +1,162 @@
+import re
+import socket
+import select
+from unit.main import TestUnit
+
+
+class TestHTTP(TestUnit):
+ def http(self, start_str, **kwargs):
+ sock_type = (
+ 'ipv4' if 'sock_type' not in kwargs else kwargs['sock_type']
+ )
+ port = 7080 if 'port' not in kwargs else kwargs['port']
+ url = '/' if 'url' not in kwargs else kwargs['url']
+ http = 'HTTP/1.0' if 'http_10' in kwargs else 'HTTP/1.1'
+
+ headers = (
+ {'Host': 'localhost', 'Connection': 'close'}
+ if 'headers' not in kwargs
+ else kwargs['headers']
+ )
+
+ body = b'' if 'body' not in kwargs else kwargs['body']
+ crlf = '\r\n'
+
+ if 'addr' not in kwargs:
+ addr = '::1' if sock_type == 'ipv6' else '127.0.0.1'
+ else:
+ addr = kwargs['addr']
+
+ sock_types = {
+ 'ipv4': socket.AF_INET,
+ 'ipv6': socket.AF_INET6,
+ 'unix': socket.AF_UNIX,
+ }
+
+ if 'sock' not in kwargs:
+ sock = socket.socket(sock_types[sock_type], socket.SOCK_STREAM)
+
+ if (
+ sock_type == sock_types['ipv4']
+ or sock_type == sock_types['ipv6']
+ ):
+ sock.setsockopt(socket.IPPROTO_TCP, socket.TCP_NODELAY, 1)
+
+ if 'wrapper' in kwargs:
+ sock = kwargs['wrapper'](sock)
+
+ connect_args = addr if sock_type == 'unix' else (addr, port)
+ try:
+ sock.connect(connect_args)
+ except ConnectionRefusedError:
+ sock.close()
+ return None
+
+ else:
+ sock = kwargs['sock']
+
+ if 'raw' not in kwargs:
+ req = ' '.join([start_str, url, http]) + crlf
+
+ if body is not b'':
+ if isinstance(body, str):
+ body = body.encode()
+
+ if 'Content-Length' not in headers:
+ headers['Content-Length'] = len(body)
+
+ for header, value in headers.items():
+ if isinstance(value, list):
+ for v in value:
+ req += header + ': ' + str(v) + crlf
+
+ else:
+ req += header + ': ' + str(value) + crlf
+
+ req = (req + crlf).encode() + body
+
+ else:
+ req = start_str
+
+ sock.sendall(req)
+
+ if TestUnit.detailed:
+ print('>>>', req, sep='\n')
+
+ resp = ''
+
+ if 'no_recv' not in kwargs:
+ enc = 'utf-8' if 'encoding' not in kwargs else kwargs['encoding']
+ read_timeout = (
+ 30 if 'read_timeout' not in kwargs else kwargs['read_timeout']
+ )
+ resp = self.recvall(sock, read_timeout=read_timeout).decode(enc)
+
+ if TestUnit.detailed:
+ print('<<<', resp.encode('utf-8'), sep='\n')
+
+ if 'raw_resp' not in kwargs:
+ resp = self._resp_to_dict(resp)
+
+ if 'start' not in kwargs:
+ sock.close()
+ return resp
+
+ return (resp, sock)
+
+ def delete(self, **kwargs):
+ return self.http('DELETE', **kwargs)
+
+ def get(self, **kwargs):
+ return self.http('GET', **kwargs)
+
+ def post(self, **kwargs):
+ return self.http('POST', **kwargs)
+
+ def put(self, **kwargs):
+ return self.http('PUT', **kwargs)
+
+ def recvall(self, sock, read_timeout=30, buff_size=4096):
+ data = b''
+ while select.select([sock], [], [], read_timeout)[0]:
+ try:
+ part = sock.recv(buff_size)
+ except:
+ break
+
+ data += part
+
+ if not len(part):
+ break
+
+ return data
+
+ def _resp_to_dict(self, resp):
+ m = re.search('(.*?\x0d\x0a?)\x0d\x0a?(.*)', resp, re.M | re.S)
+
+ if not m:
+ return {}
+
+ headers_text, body = m.group(1), m.group(2)
+
+ p = re.compile('(.*?)\x0d\x0a?', re.M | re.S)
+ headers_lines = p.findall(headers_text)
+
+ status = re.search(
+ '^HTTP\/\d\.\d\s(\d+)|$', headers_lines.pop(0)
+ ).group(1)
+
+ headers = {}
+ for line in headers_lines:
+ m = re.search('(.*)\:\s(.*)', line)
+
+ if m.group(1) not in headers:
+ headers[m.group(1)] = m.group(2)
+
+ elif isinstance(headers[m.group(1)], list):
+ headers[m.group(1)].append(m.group(2))
+
+ else:
+ headers[m.group(1)] = [headers[m.group(1)], m.group(2)]
+
+ return {'status': int(status), 'headers': headers, 'body': body}