diff options
Diffstat (limited to '')
-rw-r--r-- | test/unit/http.py | 73 | ||||
-rw-r--r-- | test/unit/main.py | 13 |
2 files changed, 77 insertions, 9 deletions
diff --git a/test/unit/http.py b/test/unit/http.py index 839e91a2..c71e8f7e 100644 --- a/test/unit/http.py +++ b/test/unit/http.py @@ -1,3 +1,6 @@ +import binascii +import io +import os import re import time import json @@ -68,6 +71,10 @@ class TestHTTP(TestUnit): if body != b'': if isinstance(body, str): body = body.encode() + elif isinstance(body, dict): + body, content_type = self.form_encode(body) + + headers['Content-Type'] = content_type if 'Content-Length' not in headers: headers['Content-Length'] = len(body) @@ -266,3 +273,69 @@ class TestHTTP(TestUnit): sock.close() self.assertTrue(ret, 'socket connected') + + def form_encode(self, fields): + is_multipart = False + + for _, value in fields.items(): + if isinstance(value, dict): + is_multipart = True + break + + if is_multipart: + body, content_type = self.multipart_encode(fields) + + else: + body, content_type = self.form_url_encode(fields) + + return body, content_type + + def form_url_encode(self, fields): + data = "&".join("%s=%s" % (name, value) + for name, value in fields.items()).encode() + return data, 'application/x-www-form-urlencoded' + + def multipart_encode(self, fields): + boundary = binascii.hexlify(os.urandom(16)).decode('ascii') + + body = '' + + for field, value in fields.items(): + filename = '' + datatype = '' + + if isinstance(value, dict): + datatype = 'text/plain' + filename = value['filename'] + + if value.get('type'): + datatype = value['type'] + + if not isinstance(value['data'], io.IOBase): + self.fail('multipart encoding of file requires a stream.') + + data = value['data'].read() + + elif isinstance(value, str): + data = value + + else: + self.fail('multipart requires a string or stream data') + + body += ( + "--%s\r\nContent-Disposition: form-data; name=\"%s\"" + ) % (boundary, field) + + if filename != '': + body += "; filename=\"%s\"" % filename + + body += "\r\n" + + if datatype != '': + body += "Content-Type: %s\r\n" % datatype + + body += "\r\n%s\r\n" % data + + body += "--%s--\r\n" % boundary + + return body.encode(), "multipart/form-data; boundary=%s" % boundary diff --git a/test/unit/main.py b/test/unit/main.py index ea6afd7f..37d01d3b 100644 --- a/test/unit/main.py +++ b/test/unit/main.py @@ -209,9 +209,7 @@ class TestUnit(unittest.TestCase): print() - def _run_unit(): - subprocess.call( - [ + self._p = Process(target=subprocess.call, args=[ [ self.unitd, '--no-daemon', '--modules', self.pardir + '/build', @@ -219,10 +217,7 @@ class TestUnit(unittest.TestCase): '--pid', self.testdir + '/unit.pid', '--log', self.testdir + '/unit.log', '--control', 'unix:' + self.testdir + '/control.unit.sock', - ] - ) - - self._p = Process(target=_run_unit) + ] ]) self._p.start() if not self.waitforfiles( @@ -299,11 +294,11 @@ class TestUnit(unittest.TestCase): if found: print('skipped.') - def run_process(self, target): + def run_process(self, target, *args): if not hasattr(self, '_processes'): self._processes = [] - process = Process(target=target) + process = Process(target=target, args=args) process.start() self._processes.append(process) |