summaryrefslogtreecommitdiffhomepage
path: root/test/test_ruby_application.py
diff options
context:
space:
mode:
authorAndrei Zeliankou <zelenkov@nginx.com>2023-06-14 18:20:09 +0100
committerAndrei Zeliankou <zelenkov@nginx.com>2023-06-14 18:20:09 +0100
commitc183bd8749a19477390f8cb77efe5f6d223f0905 (patch)
tree4e821e9cb07be9a86bf2d442acb3ea6740ba5a99 /test/test_ruby_application.py
parentc6d05191a069ac150cc8eb2bece75cf79c0a465a (diff)
downloadunit-c183bd8749a19477390f8cb77efe5f6d223f0905.tar.gz
unit-c183bd8749a19477390f8cb77efe5f6d223f0905.tar.bz2
Tests: get rid of classes in test files.
Class usage came from the unittest framework and it was always redundant after migration to the pytest. This commit removes classes from files containing tests to make them more readable and understandable.
Diffstat (limited to 'test/test_ruby_application.py')
-rw-r--r--test/test_ruby_application.py625
1 files changed, 327 insertions, 298 deletions
diff --git a/test/test_ruby_application.py b/test/test_ruby_application.py
index 424d6764..6f533b70 100644
--- a/test/test_ruby_application.py
+++ b/test/test_ruby_application.py
@@ -2,415 +2,444 @@ import re
import subprocess
import pytest
-from unit.applications.lang.ruby import TestApplicationRuby
+from unit.applications.lang.ruby import ApplicationRuby
prerequisites = {'modules': {'ruby': 'all'}}
+client = ApplicationRuby()
-class TestRubyApplication(TestApplicationRuby):
- def test_ruby_application(self, date_to_sec_epoch, sec_epoch):
- self.load('variables')
- body = 'Test body string.'
+def test_ruby_application(date_to_sec_epoch, sec_epoch):
+ client.load('variables')
- resp = self.post(
- headers={
- 'Host': 'localhost',
- 'Content-Type': 'text/html',
- 'Custom-Header': 'blah',
- 'Connection': 'close',
- },
- body=body,
- )
+ body = 'Test body string.'
- assert resp['status'] == 200, 'status'
- headers = resp['headers']
- header_server = headers.pop('Server')
- assert re.search(r'Unit/[\d\.]+', header_server), 'server header'
- assert (
- headers.pop('Server-Software') == header_server
- ), 'server software header'
-
- date = headers.pop('Date')
- assert date[-4:] == ' GMT', 'date header timezone'
- assert abs(date_to_sec_epoch(date) - sec_epoch) < 5, 'date header'
-
- assert headers == {
- 'Connection': 'close',
- 'Content-Length': str(len(body)),
+ resp = client.post(
+ headers={
+ 'Host': 'localhost',
'Content-Type': 'text/html',
- 'Request-Method': 'POST',
- 'Request-Uri': '/',
- 'Http-Host': 'localhost',
- 'Script-Name': '',
- 'Server-Protocol': 'HTTP/1.1',
'Custom-Header': 'blah',
- 'Rack-Version': '13',
- 'Rack-Url-Scheme': 'http',
- 'Rack-Multithread': 'false',
- 'Rack-Multiprocess': 'true',
- 'Rack-Run-Once': 'false',
- 'Rack-Hijack-Q': 'false',
- 'Rack-Hijack': '',
- 'Rack-Hijack-IO': '',
- }, 'headers'
- assert resp['body'] == body, 'body'
+ 'Connection': 'close',
+ },
+ body=body,
+ )
- def test_ruby_application_query_string(self):
- self.load('query_string')
+ assert resp['status'] == 200, 'status'
+ headers = resp['headers']
+ header_server = headers.pop('Server')
+ assert re.search(r'Unit/[\d\.]+', header_server), 'server header'
+ assert (
+ headers.pop('Server-Software') == header_server
+ ), 'server software header'
- resp = self.get(url='/?var1=val1&var2=val2')
+ date = headers.pop('Date')
+ assert date[-4:] == ' GMT', 'date header timezone'
+ assert abs(date_to_sec_epoch(date) - sec_epoch) < 5, 'date header'
- assert (
- resp['headers']['Query-String'] == 'var1=val1&var2=val2'
- ), 'Query-String header'
+ assert headers == {
+ 'Connection': 'close',
+ 'Content-Length': str(len(body)),
+ 'Content-Type': 'text/html',
+ 'Request-Method': 'POST',
+ 'Request-Uri': '/',
+ 'Http-Host': 'localhost',
+ 'Script-Name': '',
+ 'Server-Protocol': 'HTTP/1.1',
+ 'Custom-Header': 'blah',
+ 'Rack-Version': '13',
+ 'Rack-Url-Scheme': 'http',
+ 'Rack-Multithread': 'false',
+ 'Rack-Multiprocess': 'true',
+ 'Rack-Run-Once': 'false',
+ 'Rack-Hijack-Q': 'false',
+ 'Rack-Hijack': '',
+ 'Rack-Hijack-IO': '',
+ }, 'headers'
+ assert resp['body'] == body, 'body'
- def test_ruby_application_query_string_empty(self):
- self.load('query_string')
- resp = self.get(url='/?')
+def test_ruby_application_query_string():
+ client.load('query_string')
- assert resp['status'] == 200, 'query string empty status'
- assert resp['headers']['Query-String'] == '', 'query string empty'
+ resp = client.get(url='/?var1=val1&var2=val2')
- def test_ruby_application_query_string_absent(self):
- self.load('query_string')
+ assert (
+ resp['headers']['Query-String'] == 'var1=val1&var2=val2'
+ ), 'Query-String header'
- resp = self.get()
- assert resp['status'] == 200, 'query string absent status'
- assert resp['headers']['Query-String'] == '', 'query string absent'
+def test_ruby_application_query_string_empty():
+ client.load('query_string')
- @pytest.mark.skip('not yet')
- def test_ruby_application_server_port(self):
- self.load('server_port')
+ resp = client.get(url='/?')
- assert (
- self.get()['headers']['Server-Port'] == '7080'
- ), 'Server-Port header'
+ assert resp['status'] == 200, 'query string empty status'
+ assert resp['headers']['Query-String'] == '', 'query string empty'
- def test_ruby_application_status_int(self):
- self.load('status_int')
- assert self.get()['status'] == 200, 'status int'
+def test_ruby_application_query_string_absent():
+ client.load('query_string')
- def test_ruby_application_input_read_empty(self):
- self.load('input_read_empty')
+ resp = client.get()
- assert self.get()['body'] == '', 'read empty'
+ assert resp['status'] == 200, 'query string absent status'
+ assert resp['headers']['Query-String'] == '', 'query string absent'
- def test_ruby_application_input_read_parts(self):
- self.load('input_read_parts')
- assert (
- self.post(body='0123456789')['body'] == '012345678'
- ), 'input read parts'
+@pytest.mark.skip('not yet')
+def test_ruby_application_server_port():
+ client.load('server_port')
- def test_ruby_application_input_read_buffer(self):
- self.load('input_read_buffer')
+ assert (
+ client.get()['headers']['Server-Port'] == '7080'
+ ), 'Server-Port header'
- assert (
- self.post(body='0123456789')['body'] == '0123456789'
- ), 'input read buffer'
- def test_ruby_application_input_read_buffer_not_empty(self):
- self.load('input_read_buffer_not_empty')
+def test_ruby_application_status_int():
+ client.load('status_int')
- assert (
- self.post(body='0123456789')['body'] == '0123456789'
- ), 'input read buffer not empty'
+ assert client.get()['status'] == 200, 'status int'
- def test_ruby_application_input_gets(self):
- self.load('input_gets')
- body = '0123456789'
+def test_ruby_application_input_read_empty():
+ client.load('input_read_empty')
- assert self.post(body=body)['body'] == body, 'input gets'
+ assert client.get()['body'] == '', 'read empty'
- def test_ruby_application_input_gets_2(self):
- self.load('input_gets')
- assert (
- self.post(body='01234\n56789\n')['body'] == '01234\n'
- ), 'input gets 2'
+def test_ruby_application_input_read_parts():
+ client.load('input_read_parts')
- def test_ruby_application_input_gets_all(self):
- self.load('input_gets_all')
+ assert (
+ client.post(body='0123456789')['body'] == '012345678'
+ ), 'input read parts'
- body = '\n01234\n56789\n\n'
- assert self.post(body=body)['body'] == body, 'input gets all'
+def test_ruby_application_input_read_buffer():
+ client.load('input_read_buffer')
- def test_ruby_application_input_each(self):
- self.load('input_each')
+ assert (
+ client.post(body='0123456789')['body'] == '0123456789'
+ ), 'input read buffer'
- body = '\n01234\n56789\n\n'
- assert self.post(body=body)['body'] == body, 'input each'
+def test_ruby_application_input_read_buffer_not_empty():
+ client.load('input_read_buffer_not_empty')
- @pytest.mark.skip('not yet')
- def test_ruby_application_input_rewind(self):
- self.load('input_rewind')
+ assert (
+ client.post(body='0123456789')['body'] == '0123456789'
+ ), 'input read buffer not empty'
- body = '0123456789'
- assert self.post(body=body)['body'] == body, 'input rewind'
+def test_ruby_application_input_gets():
+ client.load('input_gets')
- @pytest.mark.skip('not yet')
- def test_ruby_application_syntax_error(self, skip_alert):
- skip_alert(
- r'Failed to parse rack script',
- r'syntax error',
- r'new_from_string',
- r'parse_file',
- )
- self.load('syntax_error')
+ body = '0123456789'
- assert self.get()['status'] == 500, 'syntax error'
+ assert client.post(body=body)['body'] == body, 'input gets'
- def test_ruby_application_errors_puts(self, wait_for_record):
- self.load('errors_puts')
- assert self.get()['status'] == 200
+def test_ruby_application_input_gets_2():
+ client.load('input_gets')
- assert (
- wait_for_record(r'\[error\].+Error in application') is not None
- ), 'errors puts'
+ assert (
+ client.post(body='01234\n56789\n')['body'] == '01234\n'
+ ), 'input gets 2'
- def test_ruby_application_errors_puts_int(self, wait_for_record):
- self.load('errors_puts_int')
- assert self.get()['status'] == 200
+def test_ruby_application_input_gets_all():
+ client.load('input_gets_all')
- assert (
- wait_for_record(r'\[error\].+1234567890') is not None
- ), 'errors puts int'
+ body = '\n01234\n56789\n\n'
- def test_ruby_application_errors_write(self, wait_for_record):
- self.load('errors_write')
+ assert client.post(body=body)['body'] == body, 'input gets all'
- assert self.get()['status'] == 200
- assert (
- wait_for_record(r'\[error\].+Error in application') is not None
- ), 'errors write'
- def test_ruby_application_errors_write_to_s_custom(self):
- self.load('errors_write_to_s_custom')
+def test_ruby_application_input_each():
+ client.load('input_each')
- assert self.get()['status'] == 200, 'errors write to_s custom'
+ body = '\n01234\n56789\n\n'
- def test_ruby_application_errors_write_int(self, wait_for_record):
- self.load('errors_write_int')
+ assert client.post(body=body)['body'] == body, 'input each'
- assert self.get()['status'] == 200
- assert (
- wait_for_record(r'\[error\].+1234567890') is not None
- ), 'errors write int'
- def test_ruby_application_at_exit(self, wait_for_record):
- self.load('at_exit')
+@pytest.mark.skip('not yet')
+def test_ruby_application_input_rewind():
+ client.load('input_rewind')
- assert self.get()['status'] == 200
+ body = '0123456789'
- assert 'success' in self.conf({"listeners": {}, "applications": {}})
+ assert client.post(body=body)['body'] == body, 'input rewind'
- assert (
- wait_for_record(r'\[error\].+At exit called\.') is not None
- ), 'at exit'
- def test_ruby_application_encoding(self):
- self.load('encoding')
+@pytest.mark.skip('not yet')
+def test_ruby_application_syntax_error(skip_alert):
+ skip_alert(
+ r'Failed to parse rack script',
+ r'syntax error',
+ r'new_from_string',
+ r'parse_file',
+ )
+ client.load('syntax_error')
- try:
- locales = (
- subprocess.check_output(
- ['locale', '-a'],
- stderr=subprocess.STDOUT,
- )
- .decode()
- .split('\n')
- )
+ assert client.get()['status'] == 500, 'syntax error'
- except (FileNotFoundError, subprocess.CalledProcessError):
- pytest.skip('require locale')
-
- def get_locale(pattern):
- return next(
- (
- l
- for l in locales
- if re.match(pattern, l.upper()) is not None
- ),
- None,
- )
- utf8 = get_locale(r'.*UTF[-_]?8')
- iso88591 = get_locale(r'.*ISO[-_]?8859[-_]?1')
+def test_ruby_application_errors_puts(wait_for_record):
+ client.load('errors_puts')
+
+ assert client.get()['status'] == 200
+
+ assert (
+ wait_for_record(r'\[error\].+Error in application') is not None
+ ), 'errors puts'
+
+
+def test_ruby_application_errors_puts_int(wait_for_record):
+ client.load('errors_puts_int')
+
+ assert client.get()['status'] == 200
+
+ assert (
+ wait_for_record(r'\[error\].+1234567890') is not None
+ ), 'errors puts int'
- def check_locale(enc):
- assert 'success' in self.conf(
- {"LC_CTYPE": enc, "LC_ALL": ""},
- '/config/applications/encoding/environment',
- )
- resp = self.get()
- assert resp['status'] == 200, 'status'
+def test_ruby_application_errors_write(wait_for_record):
+ client.load('errors_write')
- enc_default = re.sub(r'[-_]', '', resp['headers']['X-Enc']).upper()
- assert (
- enc_default == re.sub(r'[-_]', '', enc.split('.')[-1]).upper()
+ assert client.get()['status'] == 200
+ assert (
+ wait_for_record(r'\[error\].+Error in application') is not None
+ ), 'errors write'
+
+
+def test_ruby_application_errors_write_to_s_custom():
+ client.load('errors_write_to_s_custom')
+
+ assert client.get()['status'] == 200, 'errors write to_s custom'
+
+
+def test_ruby_application_errors_write_int(wait_for_record):
+ client.load('errors_write_int')
+
+ assert client.get()['status'] == 200
+ assert (
+ wait_for_record(r'\[error\].+1234567890') is not None
+ ), 'errors write int'
+
+
+def test_ruby_application_at_exit(wait_for_record):
+ client.load('at_exit')
+
+ assert client.get()['status'] == 200
+
+ assert 'success' in client.conf({"listeners": {}, "applications": {}})
+
+ assert (
+ wait_for_record(r'\[error\].+At exit called\.') is not None
+ ), 'at exit'
+
+
+def test_ruby_application_encoding():
+ client.load('encoding')
+
+ try:
+ locales = (
+ subprocess.check_output(
+ ['locale', '-a'],
+ stderr=subprocess.STDOUT,
)
+ .decode()
+ .split('\n')
+ )
+
+ except (FileNotFoundError, subprocess.CalledProcessError):
+ pytest.skip('require locale')
+
+ def get_locale(pattern):
+ return next(
+ (l for l in locales if re.match(pattern, l.upper()) is not None),
+ None,
+ )
- if utf8:
- check_locale(utf8)
+ utf8 = get_locale(r'.*UTF[-_]?8')
+ iso88591 = get_locale(r'.*ISO[-_]?8859[-_]?1')
- if iso88591:
- check_locale(iso88591)
+ def check_locale(enc):
+ assert 'success' in client.conf(
+ {"LC_CTYPE": enc, "LC_ALL": ""},
+ '/config/applications/encoding/environment',
+ )
- if not utf8 and not iso88591:
- pytest.skip('no available locales')
+ resp = client.get()
+ assert resp['status'] == 200, 'status'
- def test_ruby_application_header_custom(self):
- self.load('header_custom')
+ enc_default = re.sub(r'[-_]', '', resp['headers']['X-Enc']).upper()
+ assert enc_default == re.sub(r'[-_]', '', enc.split('.')[-1]).upper()
- resp = self.post(body="\ntc=one,two\ntc=three,four,\n\n")
+ if utf8:
+ check_locale(utf8)
- assert resp['headers']['Custom-Header'] == [
- '',
- 'tc=one,two',
- 'tc=three,four,',
- '',
- '',
- ], 'header custom'
+ if iso88591:
+ check_locale(iso88591)
- @pytest.mark.skip('not yet')
- def test_ruby_application_header_custom_non_printable(self):
- self.load('header_custom')
+ if not utf8 and not iso88591:
+ pytest.skip('no available locales')
- assert (
- self.post(body='\b')['status'] == 500
- ), 'header custom non printable'
- def test_ruby_application_header_status(self):
- self.load('header_status')
+def test_ruby_application_header_custom():
+ client.load('header_custom')
- assert self.get()['status'] == 200, 'header status'
+ resp = client.post(body="\ntc=one,two\ntc=three,four,\n\n")
- @pytest.mark.skip('not yet')
- def test_ruby_application_header_rack(self):
- self.load('header_rack')
+ assert resp['headers']['Custom-Header'] == [
+ '',
+ 'tc=one,two',
+ 'tc=three,four,',
+ '',
+ '',
+ ], 'header custom'
- assert self.get()['status'] == 500, 'header rack'
- def test_ruby_application_body_empty(self):
- self.load('body_empty')
+@pytest.mark.skip('not yet')
+def test_ruby_application_header_custom_non_printable():
+ client.load('header_custom')
- assert self.get()['body'] == '', 'body empty'
+ assert (
+ client.post(body='\b')['status'] == 500
+ ), 'header custom non printable'
- def test_ruby_application_body_array(self):
- self.load('body_array')
- assert self.get()['body'] == '0123456789', 'body array'
+def test_ruby_application_header_status():
+ client.load('header_status')
- def test_ruby_application_body_large(self):
- self.load('mirror')
+ assert client.get()['status'] == 200, 'header status'
- body = '0123456789' * 1000
- assert self.post(body=body)['body'] == body, 'body large'
+@pytest.mark.skip('not yet')
+def test_ruby_application_header_rack():
+ client.load('header_rack')
- @pytest.mark.skip('not yet')
- def test_ruby_application_body_each_error(self, wait_for_record):
- self.load('body_each_error')
+ assert client.get()['status'] == 500, 'header rack'
- assert self.get()['status'] == 500, 'body each error status'
- assert (
- wait_for_record(r'\[error\].+Failed to run ruby script') is not None
- ), 'body each error'
+def test_ruby_application_body_empty():
+ client.load('body_empty')
- def test_ruby_application_body_file(self):
- self.load('body_file')
+ assert client.get()['body'] == '', 'body empty'
- assert self.get()['body'] == 'body\n', 'body file'
- def test_ruby_keepalive_body(self):
- self.load('mirror')
+def test_ruby_application_body_array():
+ client.load('body_array')
- assert self.get()['status'] == 200, 'init'
+ assert client.get()['body'] == '0123456789', 'body array'
- body = '0123456789' * 500
- (resp, sock) = self.post(
- headers={
- 'Host': 'localhost',
- 'Connection': 'keep-alive',
- },
- start=True,
- body=body,
- read_timeout=1,
- )
- assert resp['body'] == body, 'keep-alive 1'
+def test_ruby_application_body_large():
+ client.load('mirror')
- body = '0123456789'
- resp = self.post(sock=sock, body=body)
+ body = '0123456789' * 1000
- assert resp['body'] == body, 'keep-alive 2'
+ assert client.post(body=body)['body'] == body, 'body large'
- def test_ruby_application_constants(self):
- self.load('constants')
- resp = self.get()
+@pytest.mark.skip('not yet')
+def test_ruby_application_body_each_error(wait_for_record):
+ client.load('body_each_error')
- assert resp['status'] == 200, 'status'
+ assert client.get()['status'] == 500, 'body each error status'
- headers = resp['headers']
- assert len(headers['X-Copyright']) > 0, 'RUBY_COPYRIGHT'
- assert len(headers['X-Description']) > 0, 'RUBY_DESCRIPTION'
- assert len(headers['X-Engine']) > 0, 'RUBY_ENGINE'
- assert len(headers['X-Engine-Version']) > 0, 'RUBY_ENGINE_VERSION'
- assert len(headers['X-Patchlevel']) > 0, 'RUBY_PATCHLEVEL'
- assert len(headers['X-Platform']) > 0, 'RUBY_PLATFORM'
- assert len(headers['X-Release-Date']) > 0, 'RUBY_RELEASE_DATE'
- assert len(headers['X-Revision']) > 0, 'RUBY_REVISION'
- assert len(headers['X-Version']) > 0, 'RUBY_VERSION'
-
- def test_ruby_application_threads(self):
- self.load('threads')
-
- assert 'success' in self.conf(
- '4', 'applications/threads/threads'
- ), 'configure 4 threads'
-
- socks = []
-
- for _ in range(4):
- sock = self.get(
- headers={
- 'Host': 'localhost',
- 'X-Delay': '2',
- 'Connection': 'close',
- },
- no_recv=True,
- )
+ assert (
+ wait_for_record(r'\[error\].+Failed to run ruby script') is not None
+ ), 'body each error'
+
+
+def test_ruby_application_body_file():
+ client.load('body_file')
+
+ assert client.get()['body'] == 'body\n', 'body file'
+
+
+def test_ruby_keepalive_body():
+ client.load('mirror')
+
+ assert client.get()['status'] == 200, 'init'
+
+ body = '0123456789' * 500
+ (resp, sock) = client.post(
+ headers={
+ 'Host': 'localhost',
+ 'Connection': 'keep-alive',
+ },
+ start=True,
+ body=body,
+ read_timeout=1,
+ )
+
+ assert resp['body'] == body, 'keep-alive 1'
+
+ body = '0123456789'
+ resp = client.post(sock=sock, body=body)
- socks.append(sock)
+ assert resp['body'] == body, 'keep-alive 2'
- threads = set()
- for sock in socks:
- resp = self.recvall(sock).decode('utf-8')
+def test_ruby_application_constants():
+ client.load('constants')
- self.log_in(resp)
+ resp = client.get()
- resp = self._resp_to_dict(resp)
+ assert resp['status'] == 200, 'status'
- assert resp['status'] == 200, 'status'
+ headers = resp['headers']
+ assert len(headers['X-Copyright']) > 0, 'RUBY_COPYRIGHT'
+ assert len(headers['X-Description']) > 0, 'RUBY_DESCRIPTION'
+ assert len(headers['X-Engine']) > 0, 'RUBY_ENGINE'
+ assert len(headers['X-Engine-Version']) > 0, 'RUBY_ENGINE_VERSION'
+ assert len(headers['X-Patchlevel']) > 0, 'RUBY_PATCHLEVEL'
+ assert len(headers['X-Platform']) > 0, 'RUBY_PLATFORM'
+ assert len(headers['X-Release-Date']) > 0, 'RUBY_RELEASE_DATE'
+ assert len(headers['X-Revision']) > 0, 'RUBY_REVISION'
+ assert len(headers['X-Version']) > 0, 'RUBY_VERSION'
+
+
+def test_ruby_application_threads():
+ client.load('threads')
+
+ assert 'success' in client.conf(
+ '4', 'applications/threads/threads'
+ ), 'configure 4 threads'
+
+ socks = []
+
+ for _ in range(4):
+ sock = client.get(
+ headers={
+ 'Host': 'localhost',
+ 'X-Delay': '2',
+ 'Connection': 'close',
+ },
+ no_recv=True,
+ )
+
+ socks.append(sock)
+
+ threads = set()
+
+ for sock in socks:
+ resp = client.recvall(sock).decode('utf-8')
+
+ client.log_in(resp)
+
+ resp = client._resp_to_dict(resp)
+
+ assert resp['status'] == 200, 'status'
- threads.add(resp['headers']['X-Thread'])
+ threads.add(resp['headers']['X-Thread'])
- assert resp['headers']['Rack-Multithread'] == 'true', 'multithread'
+ assert resp['headers']['Rack-Multithread'] == 'true', 'multithread'
- sock.close()
+ sock.close()
- assert len(socks) == len(threads), 'threads differs'
+ assert len(socks) == len(threads), 'threads differs'