summaryrefslogtreecommitdiffhomepage
path: root/test/unit
diff options
context:
space:
mode:
Diffstat (limited to '')
-rw-r--r--test/unit/http.py73
-rw-r--r--test/unit/main.py13
2 files changed, 77 insertions, 9 deletions
diff --git a/test/unit/http.py b/test/unit/http.py
index 839e91a2..c71e8f7e 100644
--- a/test/unit/http.py
+++ b/test/unit/http.py
@@ -1,3 +1,6 @@
+import binascii
+import io
+import os
import re
import time
import json
@@ -68,6 +71,10 @@ class TestHTTP(TestUnit):
if body != b'':
if isinstance(body, str):
body = body.encode()
+ elif isinstance(body, dict):
+ body, content_type = self.form_encode(body)
+
+ headers['Content-Type'] = content_type
if 'Content-Length' not in headers:
headers['Content-Length'] = len(body)
@@ -266,3 +273,69 @@ class TestHTTP(TestUnit):
sock.close()
self.assertTrue(ret, 'socket connected')
+
+ def form_encode(self, fields):
+ is_multipart = False
+
+ for _, value in fields.items():
+ if isinstance(value, dict):
+ is_multipart = True
+ break
+
+ if is_multipart:
+ body, content_type = self.multipart_encode(fields)
+
+ else:
+ body, content_type = self.form_url_encode(fields)
+
+ return body, content_type
+
+ def form_url_encode(self, fields):
+ data = "&".join("%s=%s" % (name, value)
+ for name, value in fields.items()).encode()
+ return data, 'application/x-www-form-urlencoded'
+
+ def multipart_encode(self, fields):
+ boundary = binascii.hexlify(os.urandom(16)).decode('ascii')
+
+ body = ''
+
+ for field, value in fields.items():
+ filename = ''
+ datatype = ''
+
+ if isinstance(value, dict):
+ datatype = 'text/plain'
+ filename = value['filename']
+
+ if value.get('type'):
+ datatype = value['type']
+
+ if not isinstance(value['data'], io.IOBase):
+ self.fail('multipart encoding of file requires a stream.')
+
+ data = value['data'].read()
+
+ elif isinstance(value, str):
+ data = value
+
+ else:
+ self.fail('multipart requires a string or stream data')
+
+ body += (
+ "--%s\r\nContent-Disposition: form-data; name=\"%s\""
+ ) % (boundary, field)
+
+ if filename != '':
+ body += "; filename=\"%s\"" % filename
+
+ body += "\r\n"
+
+ if datatype != '':
+ body += "Content-Type: %s\r\n" % datatype
+
+ body += "\r\n%s\r\n" % data
+
+ body += "--%s--\r\n" % boundary
+
+ return body.encode(), "multipart/form-data; boundary=%s" % boundary
diff --git a/test/unit/main.py b/test/unit/main.py
index ea6afd7f..37d01d3b 100644
--- a/test/unit/main.py
+++ b/test/unit/main.py
@@ -209,9 +209,7 @@ class TestUnit(unittest.TestCase):
print()
- def _run_unit():
- subprocess.call(
- [
+ self._p = Process(target=subprocess.call, args=[ [
self.unitd,
'--no-daemon',
'--modules', self.pardir + '/build',
@@ -219,10 +217,7 @@ class TestUnit(unittest.TestCase):
'--pid', self.testdir + '/unit.pid',
'--log', self.testdir + '/unit.log',
'--control', 'unix:' + self.testdir + '/control.unit.sock',
- ]
- )
-
- self._p = Process(target=_run_unit)
+ ] ])
self._p.start()
if not self.waitforfiles(
@@ -299,11 +294,11 @@ class TestUnit(unittest.TestCase):
if found:
print('skipped.')
- def run_process(self, target):
+ def run_process(self, target, *args):
if not hasattr(self, '_processes'):
self._processes = []
- process = Process(target=target)
+ process = Process(target=target, args=args)
process.start()
self._processes.append(process)