diff options
Diffstat (limited to '')
-rw-r--r-- | test/test_ruby_application.py | 625 |
1 files changed, 327 insertions, 298 deletions
diff --git a/test/test_ruby_application.py b/test/test_ruby_application.py index 424d6764..6f533b70 100644 --- a/test/test_ruby_application.py +++ b/test/test_ruby_application.py @@ -2,415 +2,444 @@ import re import subprocess import pytest -from unit.applications.lang.ruby import TestApplicationRuby +from unit.applications.lang.ruby import ApplicationRuby prerequisites = {'modules': {'ruby': 'all'}} +client = ApplicationRuby() -class TestRubyApplication(TestApplicationRuby): - def test_ruby_application(self, date_to_sec_epoch, sec_epoch): - self.load('variables') - body = 'Test body string.' +def test_ruby_application(date_to_sec_epoch, sec_epoch): + client.load('variables') - resp = self.post( - headers={ - 'Host': 'localhost', - 'Content-Type': 'text/html', - 'Custom-Header': 'blah', - 'Connection': 'close', - }, - body=body, - ) + body = 'Test body string.' - assert resp['status'] == 200, 'status' - headers = resp['headers'] - header_server = headers.pop('Server') - assert re.search(r'Unit/[\d\.]+', header_server), 'server header' - assert ( - headers.pop('Server-Software') == header_server - ), 'server software header' - - date = headers.pop('Date') - assert date[-4:] == ' GMT', 'date header timezone' - assert abs(date_to_sec_epoch(date) - sec_epoch) < 5, 'date header' - - assert headers == { - 'Connection': 'close', - 'Content-Length': str(len(body)), + resp = client.post( + headers={ + 'Host': 'localhost', 'Content-Type': 'text/html', - 'Request-Method': 'POST', - 'Request-Uri': '/', - 'Http-Host': 'localhost', - 'Script-Name': '', - 'Server-Protocol': 'HTTP/1.1', 'Custom-Header': 'blah', - 'Rack-Version': '13', - 'Rack-Url-Scheme': 'http', - 'Rack-Multithread': 'false', - 'Rack-Multiprocess': 'true', - 'Rack-Run-Once': 'false', - 'Rack-Hijack-Q': 'false', - 'Rack-Hijack': '', - 'Rack-Hijack-IO': '', - }, 'headers' - assert resp['body'] == body, 'body' + 'Connection': 'close', + }, + body=body, + ) - def test_ruby_application_query_string(self): - self.load('query_string') + assert resp['status'] == 200, 'status' + headers = resp['headers'] + header_server = headers.pop('Server') + assert re.search(r'Unit/[\d\.]+', header_server), 'server header' + assert ( + headers.pop('Server-Software') == header_server + ), 'server software header' - resp = self.get(url='/?var1=val1&var2=val2') + date = headers.pop('Date') + assert date[-4:] == ' GMT', 'date header timezone' + assert abs(date_to_sec_epoch(date) - sec_epoch) < 5, 'date header' - assert ( - resp['headers']['Query-String'] == 'var1=val1&var2=val2' - ), 'Query-String header' + assert headers == { + 'Connection': 'close', + 'Content-Length': str(len(body)), + 'Content-Type': 'text/html', + 'Request-Method': 'POST', + 'Request-Uri': '/', + 'Http-Host': 'localhost', + 'Script-Name': '', + 'Server-Protocol': 'HTTP/1.1', + 'Custom-Header': 'blah', + 'Rack-Version': '13', + 'Rack-Url-Scheme': 'http', + 'Rack-Multithread': 'false', + 'Rack-Multiprocess': 'true', + 'Rack-Run-Once': 'false', + 'Rack-Hijack-Q': 'false', + 'Rack-Hijack': '', + 'Rack-Hijack-IO': '', + }, 'headers' + assert resp['body'] == body, 'body' - def test_ruby_application_query_string_empty(self): - self.load('query_string') - resp = self.get(url='/?') +def test_ruby_application_query_string(): + client.load('query_string') - assert resp['status'] == 200, 'query string empty status' - assert resp['headers']['Query-String'] == '', 'query string empty' + resp = client.get(url='/?var1=val1&var2=val2') - def test_ruby_application_query_string_absent(self): - self.load('query_string') + assert ( + resp['headers']['Query-String'] == 'var1=val1&var2=val2' + ), 'Query-String header' - resp = self.get() - assert resp['status'] == 200, 'query string absent status' - assert resp['headers']['Query-String'] == '', 'query string absent' +def test_ruby_application_query_string_empty(): + client.load('query_string') - @pytest.mark.skip('not yet') - def test_ruby_application_server_port(self): - self.load('server_port') + resp = client.get(url='/?') - assert ( - self.get()['headers']['Server-Port'] == '7080' - ), 'Server-Port header' + assert resp['status'] == 200, 'query string empty status' + assert resp['headers']['Query-String'] == '', 'query string empty' - def test_ruby_application_status_int(self): - self.load('status_int') - assert self.get()['status'] == 200, 'status int' +def test_ruby_application_query_string_absent(): + client.load('query_string') - def test_ruby_application_input_read_empty(self): - self.load('input_read_empty') + resp = client.get() - assert self.get()['body'] == '', 'read empty' + assert resp['status'] == 200, 'query string absent status' + assert resp['headers']['Query-String'] == '', 'query string absent' - def test_ruby_application_input_read_parts(self): - self.load('input_read_parts') - assert ( - self.post(body='0123456789')['body'] == '012345678' - ), 'input read parts' +@pytest.mark.skip('not yet') +def test_ruby_application_server_port(): + client.load('server_port') - def test_ruby_application_input_read_buffer(self): - self.load('input_read_buffer') + assert ( + client.get()['headers']['Server-Port'] == '7080' + ), 'Server-Port header' - assert ( - self.post(body='0123456789')['body'] == '0123456789' - ), 'input read buffer' - def test_ruby_application_input_read_buffer_not_empty(self): - self.load('input_read_buffer_not_empty') +def test_ruby_application_status_int(): + client.load('status_int') - assert ( - self.post(body='0123456789')['body'] == '0123456789' - ), 'input read buffer not empty' + assert client.get()['status'] == 200, 'status int' - def test_ruby_application_input_gets(self): - self.load('input_gets') - body = '0123456789' +def test_ruby_application_input_read_empty(): + client.load('input_read_empty') - assert self.post(body=body)['body'] == body, 'input gets' + assert client.get()['body'] == '', 'read empty' - def test_ruby_application_input_gets_2(self): - self.load('input_gets') - assert ( - self.post(body='01234\n56789\n')['body'] == '01234\n' - ), 'input gets 2' +def test_ruby_application_input_read_parts(): + client.load('input_read_parts') - def test_ruby_application_input_gets_all(self): - self.load('input_gets_all') + assert ( + client.post(body='0123456789')['body'] == '012345678' + ), 'input read parts' - body = '\n01234\n56789\n\n' - assert self.post(body=body)['body'] == body, 'input gets all' +def test_ruby_application_input_read_buffer(): + client.load('input_read_buffer') - def test_ruby_application_input_each(self): - self.load('input_each') + assert ( + client.post(body='0123456789')['body'] == '0123456789' + ), 'input read buffer' - body = '\n01234\n56789\n\n' - assert self.post(body=body)['body'] == body, 'input each' +def test_ruby_application_input_read_buffer_not_empty(): + client.load('input_read_buffer_not_empty') - @pytest.mark.skip('not yet') - def test_ruby_application_input_rewind(self): - self.load('input_rewind') + assert ( + client.post(body='0123456789')['body'] == '0123456789' + ), 'input read buffer not empty' - body = '0123456789' - assert self.post(body=body)['body'] == body, 'input rewind' +def test_ruby_application_input_gets(): + client.load('input_gets') - @pytest.mark.skip('not yet') - def test_ruby_application_syntax_error(self, skip_alert): - skip_alert( - r'Failed to parse rack script', - r'syntax error', - r'new_from_string', - r'parse_file', - ) - self.load('syntax_error') + body = '0123456789' - assert self.get()['status'] == 500, 'syntax error' + assert client.post(body=body)['body'] == body, 'input gets' - def test_ruby_application_errors_puts(self, wait_for_record): - self.load('errors_puts') - assert self.get()['status'] == 200 +def test_ruby_application_input_gets_2(): + client.load('input_gets') - assert ( - wait_for_record(r'\[error\].+Error in application') is not None - ), 'errors puts' + assert ( + client.post(body='01234\n56789\n')['body'] == '01234\n' + ), 'input gets 2' - def test_ruby_application_errors_puts_int(self, wait_for_record): - self.load('errors_puts_int') - assert self.get()['status'] == 200 +def test_ruby_application_input_gets_all(): + client.load('input_gets_all') - assert ( - wait_for_record(r'\[error\].+1234567890') is not None - ), 'errors puts int' + body = '\n01234\n56789\n\n' - def test_ruby_application_errors_write(self, wait_for_record): - self.load('errors_write') + assert client.post(body=body)['body'] == body, 'input gets all' - assert self.get()['status'] == 200 - assert ( - wait_for_record(r'\[error\].+Error in application') is not None - ), 'errors write' - def test_ruby_application_errors_write_to_s_custom(self): - self.load('errors_write_to_s_custom') +def test_ruby_application_input_each(): + client.load('input_each') - assert self.get()['status'] == 200, 'errors write to_s custom' + body = '\n01234\n56789\n\n' - def test_ruby_application_errors_write_int(self, wait_for_record): - self.load('errors_write_int') + assert client.post(body=body)['body'] == body, 'input each' - assert self.get()['status'] == 200 - assert ( - wait_for_record(r'\[error\].+1234567890') is not None - ), 'errors write int' - def test_ruby_application_at_exit(self, wait_for_record): - self.load('at_exit') +@pytest.mark.skip('not yet') +def test_ruby_application_input_rewind(): + client.load('input_rewind') - assert self.get()['status'] == 200 + body = '0123456789' - assert 'success' in self.conf({"listeners": {}, "applications": {}}) + assert client.post(body=body)['body'] == body, 'input rewind' - assert ( - wait_for_record(r'\[error\].+At exit called\.') is not None - ), 'at exit' - def test_ruby_application_encoding(self): - self.load('encoding') +@pytest.mark.skip('not yet') +def test_ruby_application_syntax_error(skip_alert): + skip_alert( + r'Failed to parse rack script', + r'syntax error', + r'new_from_string', + r'parse_file', + ) + client.load('syntax_error') - try: - locales = ( - subprocess.check_output( - ['locale', '-a'], - stderr=subprocess.STDOUT, - ) - .decode() - .split('\n') - ) + assert client.get()['status'] == 500, 'syntax error' - except (FileNotFoundError, subprocess.CalledProcessError): - pytest.skip('require locale') - - def get_locale(pattern): - return next( - ( - l - for l in locales - if re.match(pattern, l.upper()) is not None - ), - None, - ) - utf8 = get_locale(r'.*UTF[-_]?8') - iso88591 = get_locale(r'.*ISO[-_]?8859[-_]?1') +def test_ruby_application_errors_puts(wait_for_record): + client.load('errors_puts') + + assert client.get()['status'] == 200 + + assert ( + wait_for_record(r'\[error\].+Error in application') is not None + ), 'errors puts' + + +def test_ruby_application_errors_puts_int(wait_for_record): + client.load('errors_puts_int') + + assert client.get()['status'] == 200 + + assert ( + wait_for_record(r'\[error\].+1234567890') is not None + ), 'errors puts int' - def check_locale(enc): - assert 'success' in self.conf( - {"LC_CTYPE": enc, "LC_ALL": ""}, - '/config/applications/encoding/environment', - ) - resp = self.get() - assert resp['status'] == 200, 'status' +def test_ruby_application_errors_write(wait_for_record): + client.load('errors_write') - enc_default = re.sub(r'[-_]', '', resp['headers']['X-Enc']).upper() - assert ( - enc_default == re.sub(r'[-_]', '', enc.split('.')[-1]).upper() + assert client.get()['status'] == 200 + assert ( + wait_for_record(r'\[error\].+Error in application') is not None + ), 'errors write' + + +def test_ruby_application_errors_write_to_s_custom(): + client.load('errors_write_to_s_custom') + + assert client.get()['status'] == 200, 'errors write to_s custom' + + +def test_ruby_application_errors_write_int(wait_for_record): + client.load('errors_write_int') + + assert client.get()['status'] == 200 + assert ( + wait_for_record(r'\[error\].+1234567890') is not None + ), 'errors write int' + + +def test_ruby_application_at_exit(wait_for_record): + client.load('at_exit') + + assert client.get()['status'] == 200 + + assert 'success' in client.conf({"listeners": {}, "applications": {}}) + + assert ( + wait_for_record(r'\[error\].+At exit called\.') is not None + ), 'at exit' + + +def test_ruby_application_encoding(): + client.load('encoding') + + try: + locales = ( + subprocess.check_output( + ['locale', '-a'], + stderr=subprocess.STDOUT, ) + .decode() + .split('\n') + ) + + except (FileNotFoundError, subprocess.CalledProcessError): + pytest.skip('require locale') + + def get_locale(pattern): + return next( + (l for l in locales if re.match(pattern, l.upper()) is not None), + None, + ) - if utf8: - check_locale(utf8) + utf8 = get_locale(r'.*UTF[-_]?8') + iso88591 = get_locale(r'.*ISO[-_]?8859[-_]?1') - if iso88591: - check_locale(iso88591) + def check_locale(enc): + assert 'success' in client.conf( + {"LC_CTYPE": enc, "LC_ALL": ""}, + '/config/applications/encoding/environment', + ) - if not utf8 and not iso88591: - pytest.skip('no available locales') + resp = client.get() + assert resp['status'] == 200, 'status' - def test_ruby_application_header_custom(self): - self.load('header_custom') + enc_default = re.sub(r'[-_]', '', resp['headers']['X-Enc']).upper() + assert enc_default == re.sub(r'[-_]', '', enc.split('.')[-1]).upper() - resp = self.post(body="\ntc=one,two\ntc=three,four,\n\n") + if utf8: + check_locale(utf8) - assert resp['headers']['Custom-Header'] == [ - '', - 'tc=one,two', - 'tc=three,four,', - '', - '', - ], 'header custom' + if iso88591: + check_locale(iso88591) - @pytest.mark.skip('not yet') - def test_ruby_application_header_custom_non_printable(self): - self.load('header_custom') + if not utf8 and not iso88591: + pytest.skip('no available locales') - assert ( - self.post(body='\b')['status'] == 500 - ), 'header custom non printable' - def test_ruby_application_header_status(self): - self.load('header_status') +def test_ruby_application_header_custom(): + client.load('header_custom') - assert self.get()['status'] == 200, 'header status' + resp = client.post(body="\ntc=one,two\ntc=three,four,\n\n") - @pytest.mark.skip('not yet') - def test_ruby_application_header_rack(self): - self.load('header_rack') + assert resp['headers']['Custom-Header'] == [ + '', + 'tc=one,two', + 'tc=three,four,', + '', + '', + ], 'header custom' - assert self.get()['status'] == 500, 'header rack' - def test_ruby_application_body_empty(self): - self.load('body_empty') +@pytest.mark.skip('not yet') +def test_ruby_application_header_custom_non_printable(): + client.load('header_custom') - assert self.get()['body'] == '', 'body empty' + assert ( + client.post(body='\b')['status'] == 500 + ), 'header custom non printable' - def test_ruby_application_body_array(self): - self.load('body_array') - assert self.get()['body'] == '0123456789', 'body array' +def test_ruby_application_header_status(): + client.load('header_status') - def test_ruby_application_body_large(self): - self.load('mirror') + assert client.get()['status'] == 200, 'header status' - body = '0123456789' * 1000 - assert self.post(body=body)['body'] == body, 'body large' +@pytest.mark.skip('not yet') +def test_ruby_application_header_rack(): + client.load('header_rack') - @pytest.mark.skip('not yet') - def test_ruby_application_body_each_error(self, wait_for_record): - self.load('body_each_error') + assert client.get()['status'] == 500, 'header rack' - assert self.get()['status'] == 500, 'body each error status' - assert ( - wait_for_record(r'\[error\].+Failed to run ruby script') is not None - ), 'body each error' +def test_ruby_application_body_empty(): + client.load('body_empty') - def test_ruby_application_body_file(self): - self.load('body_file') + assert client.get()['body'] == '', 'body empty' - assert self.get()['body'] == 'body\n', 'body file' - def test_ruby_keepalive_body(self): - self.load('mirror') +def test_ruby_application_body_array(): + client.load('body_array') - assert self.get()['status'] == 200, 'init' + assert client.get()['body'] == '0123456789', 'body array' - body = '0123456789' * 500 - (resp, sock) = self.post( - headers={ - 'Host': 'localhost', - 'Connection': 'keep-alive', - }, - start=True, - body=body, - read_timeout=1, - ) - assert resp['body'] == body, 'keep-alive 1' +def test_ruby_application_body_large(): + client.load('mirror') - body = '0123456789' - resp = self.post(sock=sock, body=body) + body = '0123456789' * 1000 - assert resp['body'] == body, 'keep-alive 2' + assert client.post(body=body)['body'] == body, 'body large' - def test_ruby_application_constants(self): - self.load('constants') - resp = self.get() +@pytest.mark.skip('not yet') +def test_ruby_application_body_each_error(wait_for_record): + client.load('body_each_error') - assert resp['status'] == 200, 'status' + assert client.get()['status'] == 500, 'body each error status' - headers = resp['headers'] - assert len(headers['X-Copyright']) > 0, 'RUBY_COPYRIGHT' - assert len(headers['X-Description']) > 0, 'RUBY_DESCRIPTION' - assert len(headers['X-Engine']) > 0, 'RUBY_ENGINE' - assert len(headers['X-Engine-Version']) > 0, 'RUBY_ENGINE_VERSION' - assert len(headers['X-Patchlevel']) > 0, 'RUBY_PATCHLEVEL' - assert len(headers['X-Platform']) > 0, 'RUBY_PLATFORM' - assert len(headers['X-Release-Date']) > 0, 'RUBY_RELEASE_DATE' - assert len(headers['X-Revision']) > 0, 'RUBY_REVISION' - assert len(headers['X-Version']) > 0, 'RUBY_VERSION' - - def test_ruby_application_threads(self): - self.load('threads') - - assert 'success' in self.conf( - '4', 'applications/threads/threads' - ), 'configure 4 threads' - - socks = [] - - for _ in range(4): - sock = self.get( - headers={ - 'Host': 'localhost', - 'X-Delay': '2', - 'Connection': 'close', - }, - no_recv=True, - ) + assert ( + wait_for_record(r'\[error\].+Failed to run ruby script') is not None + ), 'body each error' + + +def test_ruby_application_body_file(): + client.load('body_file') + + assert client.get()['body'] == 'body\n', 'body file' + + +def test_ruby_keepalive_body(): + client.load('mirror') + + assert client.get()['status'] == 200, 'init' + + body = '0123456789' * 500 + (resp, sock) = client.post( + headers={ + 'Host': 'localhost', + 'Connection': 'keep-alive', + }, + start=True, + body=body, + read_timeout=1, + ) + + assert resp['body'] == body, 'keep-alive 1' + + body = '0123456789' + resp = client.post(sock=sock, body=body) - socks.append(sock) + assert resp['body'] == body, 'keep-alive 2' - threads = set() - for sock in socks: - resp = self.recvall(sock).decode('utf-8') +def test_ruby_application_constants(): + client.load('constants') - self.log_in(resp) + resp = client.get() - resp = self._resp_to_dict(resp) + assert resp['status'] == 200, 'status' - assert resp['status'] == 200, 'status' + headers = resp['headers'] + assert len(headers['X-Copyright']) > 0, 'RUBY_COPYRIGHT' + assert len(headers['X-Description']) > 0, 'RUBY_DESCRIPTION' + assert len(headers['X-Engine']) > 0, 'RUBY_ENGINE' + assert len(headers['X-Engine-Version']) > 0, 'RUBY_ENGINE_VERSION' + assert len(headers['X-Patchlevel']) > 0, 'RUBY_PATCHLEVEL' + assert len(headers['X-Platform']) > 0, 'RUBY_PLATFORM' + assert len(headers['X-Release-Date']) > 0, 'RUBY_RELEASE_DATE' + assert len(headers['X-Revision']) > 0, 'RUBY_REVISION' + assert len(headers['X-Version']) > 0, 'RUBY_VERSION' + + +def test_ruby_application_threads(): + client.load('threads') + + assert 'success' in client.conf( + '4', 'applications/threads/threads' + ), 'configure 4 threads' + + socks = [] + + for _ in range(4): + sock = client.get( + headers={ + 'Host': 'localhost', + 'X-Delay': '2', + 'Connection': 'close', + }, + no_recv=True, + ) + + socks.append(sock) + + threads = set() + + for sock in socks: + resp = client.recvall(sock).decode('utf-8') + + client.log_in(resp) + + resp = client._resp_to_dict(resp) + + assert resp['status'] == 200, 'status' - threads.add(resp['headers']['X-Thread']) + threads.add(resp['headers']['X-Thread']) - assert resp['headers']['Rack-Multithread'] == 'true', 'multithread' + assert resp['headers']['Rack-Multithread'] == 'true', 'multithread' - sock.close() + sock.close() - assert len(socks) == len(threads), 'threads differs' + assert len(socks) == len(threads), 'threads differs' |