summaryrefslogtreecommitdiffhomepage
path: root/test/test_ruby_application.py
diff options
context:
space:
mode:
Diffstat (limited to 'test/test_ruby_application.py')
-rw-r--r--test/test_ruby_application.py630
1 files changed, 328 insertions, 302 deletions
diff --git a/test/test_ruby_application.py b/test/test_ruby_application.py
index 068b587b..6f533b70 100644
--- a/test/test_ruby_application.py
+++ b/test/test_ruby_application.py
@@ -2,418 +2,444 @@ import re
import subprocess
import pytest
-from unit.applications.lang.ruby import TestApplicationRuby
+from unit.applications.lang.ruby import ApplicationRuby
+prerequisites = {'modules': {'ruby': 'all'}}
-class TestRubyApplication(TestApplicationRuby):
- prerequisites = {'modules': {'ruby': 'all'}}
+client = ApplicationRuby()
- def test_ruby_application(self):
- self.load('variables')
- body = 'Test body string.'
+def test_ruby_application(date_to_sec_epoch, sec_epoch):
+ client.load('variables')
- resp = self.post(
- headers={
- 'Host': 'localhost',
- 'Content-Type': 'text/html',
- 'Custom-Header': 'blah',
- 'Connection': 'close',
- },
- body=body,
- )
+ body = 'Test body string.'
- assert resp['status'] == 200, 'status'
- headers = resp['headers']
- header_server = headers.pop('Server')
- assert re.search(r'Unit/[\d\.]+', header_server), 'server header'
- assert (
- headers.pop('Server-Software') == header_server
- ), 'server software header'
-
- date = headers.pop('Date')
- assert date[-4:] == ' GMT', 'date header timezone'
- assert (
- abs(self.date_to_sec_epoch(date) - self.sec_epoch()) < 5
- ), 'date header'
-
- assert headers == {
- 'Connection': 'close',
- 'Content-Length': str(len(body)),
+ resp = client.post(
+ headers={
+ 'Host': 'localhost',
'Content-Type': 'text/html',
- 'Request-Method': 'POST',
- 'Request-Uri': '/',
- 'Http-Host': 'localhost',
- 'Script-Name': '',
- 'Server-Protocol': 'HTTP/1.1',
'Custom-Header': 'blah',
- 'Rack-Version': '13',
- 'Rack-Url-Scheme': 'http',
- 'Rack-Multithread': 'false',
- 'Rack-Multiprocess': 'true',
- 'Rack-Run-Once': 'false',
- 'Rack-Hijack-Q': 'false',
- 'Rack-Hijack': '',
- 'Rack-Hijack-IO': '',
- }, 'headers'
- assert resp['body'] == body, 'body'
+ 'Connection': 'close',
+ },
+ body=body,
+ )
- def test_ruby_application_query_string(self):
- self.load('query_string')
+ assert resp['status'] == 200, 'status'
+ headers = resp['headers']
+ header_server = headers.pop('Server')
+ assert re.search(r'Unit/[\d\.]+', header_server), 'server header'
+ assert (
+ headers.pop('Server-Software') == header_server
+ ), 'server software header'
- resp = self.get(url='/?var1=val1&var2=val2')
+ date = headers.pop('Date')
+ assert date[-4:] == ' GMT', 'date header timezone'
+ assert abs(date_to_sec_epoch(date) - sec_epoch) < 5, 'date header'
- assert (
- resp['headers']['Query-String'] == 'var1=val1&var2=val2'
- ), 'Query-String header'
+ assert headers == {
+ 'Connection': 'close',
+ 'Content-Length': str(len(body)),
+ 'Content-Type': 'text/html',
+ 'Request-Method': 'POST',
+ 'Request-Uri': '/',
+ 'Http-Host': 'localhost',
+ 'Script-Name': '',
+ 'Server-Protocol': 'HTTP/1.1',
+ 'Custom-Header': 'blah',
+ 'Rack-Version': '13',
+ 'Rack-Url-Scheme': 'http',
+ 'Rack-Multithread': 'false',
+ 'Rack-Multiprocess': 'true',
+ 'Rack-Run-Once': 'false',
+ 'Rack-Hijack-Q': 'false',
+ 'Rack-Hijack': '',
+ 'Rack-Hijack-IO': '',
+ }, 'headers'
+ assert resp['body'] == body, 'body'
- def test_ruby_application_query_string_empty(self):
- self.load('query_string')
- resp = self.get(url='/?')
+def test_ruby_application_query_string():
+ client.load('query_string')
- assert resp['status'] == 200, 'query string empty status'
- assert resp['headers']['Query-String'] == '', 'query string empty'
+ resp = client.get(url='/?var1=val1&var2=val2')
- def test_ruby_application_query_string_absent(self):
- self.load('query_string')
+ assert (
+ resp['headers']['Query-String'] == 'var1=val1&var2=val2'
+ ), 'Query-String header'
- resp = self.get()
- assert resp['status'] == 200, 'query string absent status'
- assert resp['headers']['Query-String'] == '', 'query string absent'
+def test_ruby_application_query_string_empty():
+ client.load('query_string')
- @pytest.mark.skip('not yet')
- def test_ruby_application_server_port(self):
- self.load('server_port')
+ resp = client.get(url='/?')
- assert (
- self.get()['headers']['Server-Port'] == '7080'
- ), 'Server-Port header'
+ assert resp['status'] == 200, 'query string empty status'
+ assert resp['headers']['Query-String'] == '', 'query string empty'
- def test_ruby_application_status_int(self):
- self.load('status_int')
- assert self.get()['status'] == 200, 'status int'
+def test_ruby_application_query_string_absent():
+ client.load('query_string')
- def test_ruby_application_input_read_empty(self):
- self.load('input_read_empty')
+ resp = client.get()
- assert self.get()['body'] == '', 'read empty'
+ assert resp['status'] == 200, 'query string absent status'
+ assert resp['headers']['Query-String'] == '', 'query string absent'
- def test_ruby_application_input_read_parts(self):
- self.load('input_read_parts')
- assert (
- self.post(body='0123456789')['body'] == '012345678'
- ), 'input read parts'
+@pytest.mark.skip('not yet')
+def test_ruby_application_server_port():
+ client.load('server_port')
- def test_ruby_application_input_read_buffer(self):
- self.load('input_read_buffer')
+ assert (
+ client.get()['headers']['Server-Port'] == '7080'
+ ), 'Server-Port header'
- assert (
- self.post(body='0123456789')['body'] == '0123456789'
- ), 'input read buffer'
- def test_ruby_application_input_read_buffer_not_empty(self):
- self.load('input_read_buffer_not_empty')
+def test_ruby_application_status_int():
+ client.load('status_int')
- assert (
- self.post(body='0123456789')['body'] == '0123456789'
- ), 'input read buffer not empty'
+ assert client.get()['status'] == 200, 'status int'
- def test_ruby_application_input_gets(self):
- self.load('input_gets')
- body = '0123456789'
+def test_ruby_application_input_read_empty():
+ client.load('input_read_empty')
- assert self.post(body=body)['body'] == body, 'input gets'
+ assert client.get()['body'] == '', 'read empty'
- def test_ruby_application_input_gets_2(self):
- self.load('input_gets')
- assert (
- self.post(body='01234\n56789\n')['body'] == '01234\n'
- ), 'input gets 2'
+def test_ruby_application_input_read_parts():
+ client.load('input_read_parts')
- def test_ruby_application_input_gets_all(self):
- self.load('input_gets_all')
+ assert (
+ client.post(body='0123456789')['body'] == '012345678'
+ ), 'input read parts'
- body = '\n01234\n56789\n\n'
- assert self.post(body=body)['body'] == body, 'input gets all'
+def test_ruby_application_input_read_buffer():
+ client.load('input_read_buffer')
- def test_ruby_application_input_each(self):
- self.load('input_each')
+ assert (
+ client.post(body='0123456789')['body'] == '0123456789'
+ ), 'input read buffer'
- body = '\n01234\n56789\n\n'
- assert self.post(body=body)['body'] == body, 'input each'
+def test_ruby_application_input_read_buffer_not_empty():
+ client.load('input_read_buffer_not_empty')
- @pytest.mark.skip('not yet')
- def test_ruby_application_input_rewind(self):
- self.load('input_rewind')
+ assert (
+ client.post(body='0123456789')['body'] == '0123456789'
+ ), 'input read buffer not empty'
- body = '0123456789'
- assert self.post(body=body)['body'] == body, 'input rewind'
+def test_ruby_application_input_gets():
+ client.load('input_gets')
- @pytest.mark.skip('not yet')
- def test_ruby_application_syntax_error(self, skip_alert):
- skip_alert(
- r'Failed to parse rack script',
- r'syntax error',
- r'new_from_string',
- r'parse_file',
- )
- self.load('syntax_error')
+ body = '0123456789'
- assert self.get()['status'] == 500, 'syntax error'
+ assert client.post(body=body)['body'] == body, 'input gets'
- def test_ruby_application_errors_puts(self):
- self.load('errors_puts')
- assert self.get()['status'] == 200
+def test_ruby_application_input_gets_2():
+ client.load('input_gets')
- assert (
- self.wait_for_record(r'\[error\].+Error in application') is not None
- ), 'errors puts'
+ assert (
+ client.post(body='01234\n56789\n')['body'] == '01234\n'
+ ), 'input gets 2'
- def test_ruby_application_errors_puts_int(self):
- self.load('errors_puts_int')
- assert self.get()['status'] == 200
+def test_ruby_application_input_gets_all():
+ client.load('input_gets_all')
- assert (
- self.wait_for_record(r'\[error\].+1234567890') is not None
- ), 'errors puts int'
+ body = '\n01234\n56789\n\n'
- def test_ruby_application_errors_write(self):
- self.load('errors_write')
+ assert client.post(body=body)['body'] == body, 'input gets all'
- assert self.get()['status'] == 200
- assert (
- self.wait_for_record(r'\[error\].+Error in application') is not None
- ), 'errors write'
- def test_ruby_application_errors_write_to_s_custom(self):
- self.load('errors_write_to_s_custom')
+def test_ruby_application_input_each():
+ client.load('input_each')
- assert self.get()['status'] == 200, 'errors write to_s custom'
+ body = '\n01234\n56789\n\n'
- def test_ruby_application_errors_write_int(self):
- self.load('errors_write_int')
+ assert client.post(body=body)['body'] == body, 'input each'
- assert self.get()['status'] == 200
- assert (
- self.wait_for_record(r'\[error\].+1234567890') is not None
- ), 'errors write int'
- def test_ruby_application_at_exit(self):
- self.load('at_exit')
+@pytest.mark.skip('not yet')
+def test_ruby_application_input_rewind():
+ client.load('input_rewind')
- assert self.get()['status'] == 200
+ body = '0123456789'
- assert 'success' in self.conf({"listeners": {}, "applications": {}})
+ assert client.post(body=body)['body'] == body, 'input rewind'
- assert (
- self.wait_for_record(r'\[error\].+At exit called\.') is not None
- ), 'at exit'
- def test_ruby_application_encoding(self):
- self.load('encoding')
+@pytest.mark.skip('not yet')
+def test_ruby_application_syntax_error(skip_alert):
+ skip_alert(
+ r'Failed to parse rack script',
+ r'syntax error',
+ r'new_from_string',
+ r'parse_file',
+ )
+ client.load('syntax_error')
- try:
- locales = (
- subprocess.check_output(
- ['locale', '-a'],
- stderr=subprocess.STDOUT,
- )
- .decode()
- .split('\n')
- )
+ assert client.get()['status'] == 500, 'syntax error'
- except (FileNotFoundError, subprocess.CalledProcessError):
- pytest.skip('require locale')
-
- def get_locale(pattern):
- return next(
- (
- l
- for l in locales
- if re.match(pattern, l.upper()) is not None
- ),
- None,
- )
- utf8 = get_locale(r'.*UTF[-_]?8')
- iso88591 = get_locale(r'.*ISO[-_]?8859[-_]?1')
+def test_ruby_application_errors_puts(wait_for_record):
+ client.load('errors_puts')
+
+ assert client.get()['status'] == 200
+
+ assert (
+ wait_for_record(r'\[error\].+Error in application') is not None
+ ), 'errors puts'
+
+
+def test_ruby_application_errors_puts_int(wait_for_record):
+ client.load('errors_puts_int')
+
+ assert client.get()['status'] == 200
+
+ assert (
+ wait_for_record(r'\[error\].+1234567890') is not None
+ ), 'errors puts int'
- def check_locale(enc):
- assert 'success' in self.conf(
- {"LC_CTYPE": enc, "LC_ALL": ""},
- '/config/applications/encoding/environment',
- )
- resp = self.get()
- assert resp['status'] == 200, 'status'
+def test_ruby_application_errors_write(wait_for_record):
+ client.load('errors_write')
- enc_default = re.sub(r'[-_]', '', resp['headers']['X-Enc']).upper()
- assert (
- enc_default == re.sub(r'[-_]', '', enc.split('.')[-1]).upper()
+ assert client.get()['status'] == 200
+ assert (
+ wait_for_record(r'\[error\].+Error in application') is not None
+ ), 'errors write'
+
+
+def test_ruby_application_errors_write_to_s_custom():
+ client.load('errors_write_to_s_custom')
+
+ assert client.get()['status'] == 200, 'errors write to_s custom'
+
+
+def test_ruby_application_errors_write_int(wait_for_record):
+ client.load('errors_write_int')
+
+ assert client.get()['status'] == 200
+ assert (
+ wait_for_record(r'\[error\].+1234567890') is not None
+ ), 'errors write int'
+
+
+def test_ruby_application_at_exit(wait_for_record):
+ client.load('at_exit')
+
+ assert client.get()['status'] == 200
+
+ assert 'success' in client.conf({"listeners": {}, "applications": {}})
+
+ assert (
+ wait_for_record(r'\[error\].+At exit called\.') is not None
+ ), 'at exit'
+
+
+def test_ruby_application_encoding():
+ client.load('encoding')
+
+ try:
+ locales = (
+ subprocess.check_output(
+ ['locale', '-a'],
+ stderr=subprocess.STDOUT,
)
+ .decode()
+ .split('\n')
+ )
+
+ except (FileNotFoundError, subprocess.CalledProcessError):
+ pytest.skip('require locale')
+
+ def get_locale(pattern):
+ return next(
+ (l for l in locales if re.match(pattern, l.upper()) is not None),
+ None,
+ )
- if utf8:
- check_locale(utf8)
+ utf8 = get_locale(r'.*UTF[-_]?8')
+ iso88591 = get_locale(r'.*ISO[-_]?8859[-_]?1')
- if iso88591:
- check_locale(iso88591)
+ def check_locale(enc):
+ assert 'success' in client.conf(
+ {"LC_CTYPE": enc, "LC_ALL": ""},
+ '/config/applications/encoding/environment',
+ )
- if not utf8 and not iso88591:
- pytest.skip('no available locales')
+ resp = client.get()
+ assert resp['status'] == 200, 'status'
- def test_ruby_application_header_custom(self):
- self.load('header_custom')
+ enc_default = re.sub(r'[-_]', '', resp['headers']['X-Enc']).upper()
+ assert enc_default == re.sub(r'[-_]', '', enc.split('.')[-1]).upper()
- resp = self.post(body="\ntc=one,two\ntc=three,four,\n\n")
+ if utf8:
+ check_locale(utf8)
- assert resp['headers']['Custom-Header'] == [
- '',
- 'tc=one,two',
- 'tc=three,four,',
- '',
- '',
- ], 'header custom'
+ if iso88591:
+ check_locale(iso88591)
- @pytest.mark.skip('not yet')
- def test_ruby_application_header_custom_non_printable(self):
- self.load('header_custom')
+ if not utf8 and not iso88591:
+ pytest.skip('no available locales')
- assert (
- self.post(body='\b')['status'] == 500
- ), 'header custom non printable'
- def test_ruby_application_header_status(self):
- self.load('header_status')
+def test_ruby_application_header_custom():
+ client.load('header_custom')
- assert self.get()['status'] == 200, 'header status'
+ resp = client.post(body="\ntc=one,two\ntc=three,four,\n\n")
- @pytest.mark.skip('not yet')
- def test_ruby_application_header_rack(self):
- self.load('header_rack')
+ assert resp['headers']['Custom-Header'] == [
+ '',
+ 'tc=one,two',
+ 'tc=three,four,',
+ '',
+ '',
+ ], 'header custom'
- assert self.get()['status'] == 500, 'header rack'
- def test_ruby_application_body_empty(self):
- self.load('body_empty')
+@pytest.mark.skip('not yet')
+def test_ruby_application_header_custom_non_printable():
+ client.load('header_custom')
- assert self.get()['body'] == '', 'body empty'
+ assert (
+ client.post(body='\b')['status'] == 500
+ ), 'header custom non printable'
- def test_ruby_application_body_array(self):
- self.load('body_array')
- assert self.get()['body'] == '0123456789', 'body array'
+def test_ruby_application_header_status():
+ client.load('header_status')
- def test_ruby_application_body_large(self):
- self.load('mirror')
+ assert client.get()['status'] == 200, 'header status'
- body = '0123456789' * 1000
- assert self.post(body=body)['body'] == body, 'body large'
+@pytest.mark.skip('not yet')
+def test_ruby_application_header_rack():
+ client.load('header_rack')
- @pytest.mark.skip('not yet')
- def test_ruby_application_body_each_error(self):
- self.load('body_each_error')
+ assert client.get()['status'] == 500, 'header rack'
- assert self.get()['status'] == 500, 'body each error status'
- assert (
- self.wait_for_record(r'\[error\].+Failed to run ruby script')
- is not None
- ), 'body each error'
+def test_ruby_application_body_empty():
+ client.load('body_empty')
- def test_ruby_application_body_file(self):
- self.load('body_file')
+ assert client.get()['body'] == '', 'body empty'
- assert self.get()['body'] == 'body\n', 'body file'
- def test_ruby_keepalive_body(self):
- self.load('mirror')
+def test_ruby_application_body_array():
+ client.load('body_array')
- assert self.get()['status'] == 200, 'init'
+ assert client.get()['body'] == '0123456789', 'body array'
- body = '0123456789' * 500
- (resp, sock) = self.post(
- headers={
- 'Host': 'localhost',
- 'Connection': 'keep-alive',
- },
- start=True,
- body=body,
- read_timeout=1,
- )
- assert resp['body'] == body, 'keep-alive 1'
+def test_ruby_application_body_large():
+ client.load('mirror')
- body = '0123456789'
- resp = self.post(sock=sock, body=body)
+ body = '0123456789' * 1000
- assert resp['body'] == body, 'keep-alive 2'
+ assert client.post(body=body)['body'] == body, 'body large'
- def test_ruby_application_constants(self):
- self.load('constants')
- resp = self.get()
+@pytest.mark.skip('not yet')
+def test_ruby_application_body_each_error(wait_for_record):
+ client.load('body_each_error')
- assert resp['status'] == 200, 'status'
+ assert client.get()['status'] == 500, 'body each error status'
- headers = resp['headers']
- assert len(headers['X-Copyright']) > 0, 'RUBY_COPYRIGHT'
- assert len(headers['X-Description']) > 0, 'RUBY_DESCRIPTION'
- assert len(headers['X-Engine']) > 0, 'RUBY_ENGINE'
- assert len(headers['X-Engine-Version']) > 0, 'RUBY_ENGINE_VERSION'
- assert len(headers['X-Patchlevel']) > 0, 'RUBY_PATCHLEVEL'
- assert len(headers['X-Platform']) > 0, 'RUBY_PLATFORM'
- assert len(headers['X-Release-Date']) > 0, 'RUBY_RELEASE_DATE'
- assert len(headers['X-Revision']) > 0, 'RUBY_REVISION'
- assert len(headers['X-Version']) > 0, 'RUBY_VERSION'
-
- def test_ruby_application_threads(self):
- self.load('threads')
-
- assert 'success' in self.conf(
- '4', 'applications/threads/threads'
- ), 'configure 4 threads'
-
- socks = []
-
- for i in range(4):
- sock = self.get(
- headers={
- 'Host': 'localhost',
- 'X-Delay': '2',
- 'Connection': 'close',
- },
- no_recv=True,
- )
+ assert (
+ wait_for_record(r'\[error\].+Failed to run ruby script') is not None
+ ), 'body each error'
+
+
+def test_ruby_application_body_file():
+ client.load('body_file')
+
+ assert client.get()['body'] == 'body\n', 'body file'
+
+
+def test_ruby_keepalive_body():
+ client.load('mirror')
+
+ assert client.get()['status'] == 200, 'init'
+
+ body = '0123456789' * 500
+ (resp, sock) = client.post(
+ headers={
+ 'Host': 'localhost',
+ 'Connection': 'keep-alive',
+ },
+ start=True,
+ body=body,
+ read_timeout=1,
+ )
+
+ assert resp['body'] == body, 'keep-alive 1'
+
+ body = '0123456789'
+ resp = client.post(sock=sock, body=body)
- socks.append(sock)
+ assert resp['body'] == body, 'keep-alive 2'
- threads = set()
- for sock in socks:
- resp = self.recvall(sock).decode('utf-8')
+def test_ruby_application_constants():
+ client.load('constants')
- self.log_in(resp)
+ resp = client.get()
- resp = self._resp_to_dict(resp)
+ assert resp['status'] == 200, 'status'
- assert resp['status'] == 200, 'status'
+ headers = resp['headers']
+ assert len(headers['X-Copyright']) > 0, 'RUBY_COPYRIGHT'
+ assert len(headers['X-Description']) > 0, 'RUBY_DESCRIPTION'
+ assert len(headers['X-Engine']) > 0, 'RUBY_ENGINE'
+ assert len(headers['X-Engine-Version']) > 0, 'RUBY_ENGINE_VERSION'
+ assert len(headers['X-Patchlevel']) > 0, 'RUBY_PATCHLEVEL'
+ assert len(headers['X-Platform']) > 0, 'RUBY_PLATFORM'
+ assert len(headers['X-Release-Date']) > 0, 'RUBY_RELEASE_DATE'
+ assert len(headers['X-Revision']) > 0, 'RUBY_REVISION'
+ assert len(headers['X-Version']) > 0, 'RUBY_VERSION'
+
+
+def test_ruby_application_threads():
+ client.load('threads')
+
+ assert 'success' in client.conf(
+ '4', 'applications/threads/threads'
+ ), 'configure 4 threads'
+
+ socks = []
+
+ for _ in range(4):
+ sock = client.get(
+ headers={
+ 'Host': 'localhost',
+ 'X-Delay': '2',
+ 'Connection': 'close',
+ },
+ no_recv=True,
+ )
+
+ socks.append(sock)
+
+ threads = set()
+
+ for sock in socks:
+ resp = client.recvall(sock).decode('utf-8')
+
+ client.log_in(resp)
+
+ resp = client._resp_to_dict(resp)
+
+ assert resp['status'] == 200, 'status'
- threads.add(resp['headers']['X-Thread'])
+ threads.add(resp['headers']['X-Thread'])
- assert resp['headers']['Rack-Multithread'] == 'true', 'multithread'
+ assert resp['headers']['Rack-Multithread'] == 'true', 'multithread'
- sock.close()
+ sock.close()
- assert len(socks) == len(threads), 'threads differs'
+ assert len(socks) == len(threads), 'threads differs'