summaryrefslogtreecommitdiffhomepage
path: root/test
diff options
context:
space:
mode:
Diffstat (limited to '')
-rw-r--r--test/php/phpinfo/index.wrong3
-rw-r--r--test/python/upload/wsgi.py27
-rwxr-xr-xtest/run.py19
-rw-r--r--test/test_java_application.py39
-rw-r--r--test/test_php_application.py12
-rw-r--r--test/test_proxy.py7
-rw-r--r--test/test_tls.py22
-rw-r--r--test/unit/http.py73
-rw-r--r--test/unit/main.py13
9 files changed, 171 insertions, 44 deletions
diff --git a/test/php/phpinfo/index.wrong b/test/php/phpinfo/index.wrong
new file mode 100644
index 00000000..6c9e0678
--- /dev/null
+++ b/test/php/phpinfo/index.wrong
@@ -0,0 +1,3 @@
+<?php
+header('Content-Length: 0');
+?>
diff --git a/test/python/upload/wsgi.py b/test/python/upload/wsgi.py
new file mode 100644
index 00000000..37ee89eb
--- /dev/null
+++ b/test/python/upload/wsgi.py
@@ -0,0 +1,27 @@
+from tempfile import TemporaryFile
+import os, cgi
+
+def read(environ):
+ length = int(environ.get('CONTENT_LENGTH', 0))
+
+ body = TemporaryFile(mode='w+b')
+ body.write(bytes(environ['wsgi.input'].read(length)))
+ body.seek(0)
+
+ environ['wsgi.input'] = body
+ return body
+
+def application(environ, start_response):
+ file = read(environ)
+
+ form = cgi.FieldStorage(fp=file, environ=environ, keep_blank_values=True)
+
+ filename = form['file'].filename
+ data = filename.encode() + form['file'].file.read()
+
+ start_response('200 OK', [
+ ('Content-Type', 'text/plain'),
+ ('Content-Length', str(len(data))),
+ ])
+
+ return data
diff --git a/test/run.py b/test/run.py
index 0504a6f7..b79d0484 100755
--- a/test/run.py
+++ b/test/run.py
@@ -4,16 +4,17 @@ import unittest
import sys
import os
-loader = unittest.TestLoader()
-suite = unittest.TestSuite()
+if __name__ == '__main__':
+ loader = unittest.TestLoader()
+ suite = unittest.TestSuite()
-this_dir = os.path.dirname(__file__)
-tests = loader.discover(start_dir=this_dir)
-suite.addTests(tests)
+ this_dir = os.path.dirname(__file__)
+ tests = loader.discover(start_dir=this_dir)
+ suite.addTests(tests)
-runner = unittest.TextTestRunner(verbosity=3)
-result = runner.run(suite)
+ runner = unittest.TextTestRunner(verbosity=3)
+ result = runner.run(suite)
-ret = not (len(result.failures) == len(result.errors) == 0)
+ ret = not (len(result.failures) == len(result.errors) == 0)
-sys.exit(ret)
+ sys.exit(ret)
diff --git a/test/test_java_application.py b/test/test_java_application.py
index d2b97f88..9d873d6b 100644
--- a/test/test_java_application.py
+++ b/test/test_java_application.py
@@ -1,3 +1,4 @@
+import io
import os
import time
import unittest
@@ -1223,31 +1224,25 @@ class TestJavaApplication(TestApplicationJava):
os.mkdir(fulldst)
self.public_dir(fulldst)
- body = (
- """Preamble. Should be ignored.\r
-\r
---12345\r
-Content-Disposition: form-data; name="file"; filename="sample.txt"\r
-Content-Type: text/plain\r
-\r
-Data from sample file\r
---12345\r
-Content-Disposition: form-data; name="destination"\r
-\r
-%s\r
---12345\r
-Content-Disposition: form-data; name="upload"\r
-\r
-Upload\r
---12345--\r
-\r
-Epilogue. Should be ignored."""
- % fulldst
- )
+ fields = {
+ 'file': {
+ 'filename': 'sample.txt',
+ 'type': 'text/plain',
+ 'data': io.StringIO('Data from sample file'),
+ },
+ 'destination': fulldst,
+ 'upload': 'Upload',
+ }
+
+ encoded, content_type = self.multipart_encode(fields)
+
+ preamble = 'Preamble. Should be ignored.'
+ epilogue = 'Epilogue. Should be ignored.'
+ body = "%s\r\n%s\r\n%s" % (preamble, encoded.decode(), epilogue)
resp = self.post(
headers={
- 'Content-Type': 'multipart/form-data; boundary=12345',
+ 'Content-Type': content_type,
'Host': 'localhost',
'Connection': 'close',
},
diff --git a/test/test_php_application.py b/test/test_php_application.py
index d614885c..837181e6 100644
--- a/test/test_php_application.py
+++ b/test/test_php_application.py
@@ -500,7 +500,17 @@ class TestPHPApplication(TestApplicationPHP):
), 'configure index default'
)
- self.assertEqual(self.get()['status'], 200, 'status')
+ resp = self.get()
+
+ self.assertEqual(resp['status'], 200, 'status')
+ self.assertNotEqual(resp['body'], '', 'body not empty')
+
+ def test_php_application_extension_check(self):
+ self.load('phpinfo')
+
+ self.assertNotEqual(
+ self.get(url='/index.wrong')['status'], 200, 'status'
+ )
if __name__ == '__main__':
TestPHPApplication.main()
diff --git a/test/test_proxy.py b/test/test_proxy.py
index 4697b88f..5d158285 100644
--- a/test/test_proxy.py
+++ b/test/test_proxy.py
@@ -10,11 +10,12 @@ class TestProxy(TestApplicationPython):
SERVER_PORT = 7999
- def run_server(self):
+ @staticmethod
+ def run_server(server_port):
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
- server_address = ('', self.SERVER_PORT)
+ server_address = ('', server_port)
sock.bind(server_address)
sock.listen(5)
@@ -57,7 +58,7 @@ Content-Length: 10
def setUp(self):
super().setUp()
- self.run_process(self.run_server)
+ self.run_process(self.run_server, self.SERVER_PORT)
self.waitforsocket(self.SERVER_PORT)
self.assertIn(
diff --git a/test/test_tls.py b/test/test_tls.py
index 3514bbcb..1ead111c 100644
--- a/test/test_tls.py
+++ b/test/test_tls.py
@@ -1,3 +1,5 @@
+import io
+import os
import re
import ssl
import subprocess
@@ -591,5 +593,25 @@ basicConstraints = critical,CA:TRUE"""
'url scheme https',
)
+ def test_tls_big_upload(self):
+ self.load('upload')
+
+ self.certificate()
+
+ self.add_tls(application='upload')
+
+ filename = 'test.txt'
+ data = '0123456789' * 9000
+
+ res = self.post_ssl(body={
+ 'file': {
+ 'filename': filename,
+ 'type': 'text/plain',
+ 'data': io.StringIO(data),
+ }
+ })
+ self.assertEqual(res['status'], 200, 'status ok')
+ self.assertEqual(res['body'], filename + data)
+
if __name__ == '__main__':
TestTLS.main()
diff --git a/test/unit/http.py b/test/unit/http.py
index 839e91a2..c71e8f7e 100644
--- a/test/unit/http.py
+++ b/test/unit/http.py
@@ -1,3 +1,6 @@
+import binascii
+import io
+import os
import re
import time
import json
@@ -68,6 +71,10 @@ class TestHTTP(TestUnit):
if body != b'':
if isinstance(body, str):
body = body.encode()
+ elif isinstance(body, dict):
+ body, content_type = self.form_encode(body)
+
+ headers['Content-Type'] = content_type
if 'Content-Length' not in headers:
headers['Content-Length'] = len(body)
@@ -266,3 +273,69 @@ class TestHTTP(TestUnit):
sock.close()
self.assertTrue(ret, 'socket connected')
+
+ def form_encode(self, fields):
+ is_multipart = False
+
+ for _, value in fields.items():
+ if isinstance(value, dict):
+ is_multipart = True
+ break
+
+ if is_multipart:
+ body, content_type = self.multipart_encode(fields)
+
+ else:
+ body, content_type = self.form_url_encode(fields)
+
+ return body, content_type
+
+ def form_url_encode(self, fields):
+ data = "&".join("%s=%s" % (name, value)
+ for name, value in fields.items()).encode()
+ return data, 'application/x-www-form-urlencoded'
+
+ def multipart_encode(self, fields):
+ boundary = binascii.hexlify(os.urandom(16)).decode('ascii')
+
+ body = ''
+
+ for field, value in fields.items():
+ filename = ''
+ datatype = ''
+
+ if isinstance(value, dict):
+ datatype = 'text/plain'
+ filename = value['filename']
+
+ if value.get('type'):
+ datatype = value['type']
+
+ if not isinstance(value['data'], io.IOBase):
+ self.fail('multipart encoding of file requires a stream.')
+
+ data = value['data'].read()
+
+ elif isinstance(value, str):
+ data = value
+
+ else:
+ self.fail('multipart requires a string or stream data')
+
+ body += (
+ "--%s\r\nContent-Disposition: form-data; name=\"%s\""
+ ) % (boundary, field)
+
+ if filename != '':
+ body += "; filename=\"%s\"" % filename
+
+ body += "\r\n"
+
+ if datatype != '':
+ body += "Content-Type: %s\r\n" % datatype
+
+ body += "\r\n%s\r\n" % data
+
+ body += "--%s--\r\n" % boundary
+
+ return body.encode(), "multipart/form-data; boundary=%s" % boundary
diff --git a/test/unit/main.py b/test/unit/main.py
index ea6afd7f..37d01d3b 100644
--- a/test/unit/main.py
+++ b/test/unit/main.py
@@ -209,9 +209,7 @@ class TestUnit(unittest.TestCase):
print()
- def _run_unit():
- subprocess.call(
- [
+ self._p = Process(target=subprocess.call, args=[ [
self.unitd,
'--no-daemon',
'--modules', self.pardir + '/build',
@@ -219,10 +217,7 @@ class TestUnit(unittest.TestCase):
'--pid', self.testdir + '/unit.pid',
'--log', self.testdir + '/unit.log',
'--control', 'unix:' + self.testdir + '/control.unit.sock',
- ]
- )
-
- self._p = Process(target=_run_unit)
+ ] ])
self._p.start()
if not self.waitforfiles(
@@ -299,11 +294,11 @@ class TestUnit(unittest.TestCase):
if found:
print('skipped.')
- def run_process(self, target):
+ def run_process(self, target, *args):
if not hasattr(self, '_processes'):
self._processes = []
- process = Process(target=target)
+ process = Process(target=target, args=args)
process.start()
self._processes.append(process)