import re import subprocess import pytest from unit.applications.lang.ruby import TestApplicationRuby prerequisites = {'modules': {'ruby': 'all'}} class TestRubyApplication(TestApplicationRuby): def test_ruby_application(self, date_to_sec_epoch, sec_epoch): self.load('variables') body = 'Test body string.' resp = self.post( headers={ 'Host': 'localhost', 'Content-Type': 'text/html', 'Custom-Header': 'blah', 'Connection': 'close', }, body=body, ) assert resp['status'] == 200, 'status' headers = resp['headers'] header_server = headers.pop('Server') assert re.search(r'Unit/[\d\.]+', header_server), 'server header' assert ( headers.pop('Server-Software') == header_server ), 'server software header' date = headers.pop('Date') assert date[-4:] == ' GMT', 'date header timezone' assert abs(date_to_sec_epoch(date) - sec_epoch) < 5, 'date header' assert headers == { 'Connection': 'close', 'Content-Length': str(len(body)), 'Content-Type': 'text/html', 'Request-Method': 'POST', 'Request-Uri': '/', 'Http-Host': 'localhost', 'Script-Name': '', 'Server-Protocol': 'HTTP/1.1', 'Custom-Header': 'blah', 'Rack-Version': '13', 'Rack-Url-Scheme': 'http', 'Rack-Multithread': 'false', 'Rack-Multiprocess': 'true', 'Rack-Run-Once': 'false', 'Rack-Hijack-Q': 'false', 'Rack-Hijack': '', 'Rack-Hijack-IO': '', }, 'headers' assert resp['body'] == body, 'body' def test_ruby_application_query_string(self): self.load('query_string') resp = self.get(url='/?var1=val1&var2=val2') assert ( resp['headers']['Query-String'] == 'var1=val1&var2=val2' ), 'Query-String header' def test_ruby_application_query_string_empty(self): self.load('query_string') resp = self.get(url='/?') assert resp['status'] == 200, 'query string empty status' assert resp['headers']['Query-String'] == '', 'query string empty' def test_ruby_application_query_string_absent(self): self.load('query_string') resp = self.get() assert resp['status'] == 200, 'query string absent status' assert resp['headers']['Query-String'] == '', 'query string absent' @pytest.mark.skip('not yet') def test_ruby_application_server_port(self): self.load('server_port') assert ( self.get()['headers']['Server-Port'] == '7080' ), 'Server-Port header' def test_ruby_application_status_int(self): self.load('status_int') assert self.get()['status'] == 200, 'status int' def test_ruby_application_input_read_empty(self): self.load('input_read_empty') assert self.get()['body'] == '', 'read empty' def test_ruby_application_input_read_parts(self): self.load('input_read_parts') assert ( self.post(body='0123456789')['body'] == '012345678' ), 'input read parts' def test_ruby_application_input_read_buffer(self): self.load('input_read_buffer') assert ( self.post(body='0123456789')['body'] == '0123456789' ), 'input read buffer' def test_ruby_application_input_read_buffer_not_empty(self): self.load('input_read_buffer_not_empty') assert ( self.post(body='0123456789')['body'] == '0123456789' ), 'input read buffer not empty' def test_ruby_application_input_gets(self): self.load('input_gets') body = '0123456789' assert self.post(body=body)['body'] == body, 'input gets' def test_ruby_application_input_gets_2(self): self.load('input_gets') assert ( self.post(body='01234\n56789\n')['body'] == '01234\n' ), 'input gets 2' def test_ruby_application_input_gets_all(self): self.load('input_gets_all') body = '\n01234\n56789\n\n' assert self.post(body=body)['body'] == body, 'input gets all' def test_ruby_application_input_each(self): self.load('input_each') body = '\n01234\n56789\n\n' assert self.post(body=body)['body'] == body, 'input each' @pytest.mark.skip('not yet') def test_ruby_application_input_rewind(self): self.load('input_rewind') body = '0123456789' assert self.post(body=body)['body'] == body, 'input rewind' @pytest.mark.skip('not yet') def test_ruby_application_syntax_error(self, skip_alert): skip_alert( r'Failed to parse rack script', r'syntax error', r'new_from_string', r'parse_file', ) self.load('syntax_error') assert self.get()['status'] == 500, 'syntax error' def test_ruby_application_errors_puts(self, wait_for_record): self.load('errors_puts') assert self.get()['status'] == 200 assert ( wait_for_record(r'\[error\].+Error in application') is not None ), 'errors puts' def test_ruby_application_errors_puts_int(self, wait_for_record): self.load('errors_puts_int') assert self.get()['status'] == 200 assert ( wait_for_record(r'\[error\].+1234567890') is not None ), 'errors puts int' def test_ruby_application_errors_write(self, wait_for_record): self.load('errors_write') assert self.get()['status'] == 200 assert ( wait_for_record(r'\[error\].+Error in application') is not None ), 'errors write' def test_ruby_application_errors_write_to_s_custom(self): self.load('errors_write_to_s_custom') assert self.get()['status'] == 200, 'errors write to_s custom' def test_ruby_application_errors_write_int(self, wait_for_record): self.load('errors_write_int') assert self.get()['status'] == 200 assert ( wait_for_record(r'\[error\].+1234567890') is not None ), 'errors write int' def test_ruby_application_at_exit(self, wait_for_record): self.load('at_exit') assert self.get()['status'] == 200 assert 'success' in self.conf({"listeners": {}, "applications": {}}) assert ( wait_for_record(r'\[error\].+At exit called\.') is not None ), 'at exit' def test_ruby_application_encoding(self): self.load('encoding') try: locales = ( subprocess.check_output( ['locale', '-a'], stderr=subprocess.STDOUT, ) .decode() .split('\n') ) except (FileNotFoundError, subprocess.CalledProcessError): pytest.skip('require locale') def get_locale(pattern): return next( ( l for l in locales if re.match(pattern, l.upper()) is not None ), None, ) utf8 = get_locale(r'.*UTF[-_]?8') iso88591 = get_locale(r'.*ISO[-_]?8859[-_]?1') def check_locale(enc): assert 'success' in self.conf( {"LC_CTYPE": enc, "LC_ALL": ""}, '/config/applications/encoding/environment', ) resp = self.get() assert resp['status'] == 200, 'status' enc_default = re.sub(r'[-_]', '', resp['headers']['X-Enc']).upper() assert ( enc_default == re.sub(r'[-_]', '', enc.split('.')[-1]).upper() ) if utf8: check_locale(utf8) if iso88591: check_locale(iso88591) if not utf8 and not iso88591: pytest.skip('no available locales') def test_ruby_application_header_custom(self): self.load('header_custom') resp = self.post(body="\ntc=one,two\ntc=three,four,\n\n") assert resp['headers']['Custom-Header'] == [ '', 'tc=one,two', 'tc=three,four,', '', '', ], 'header custom' @pytest.mark.skip('not yet') def test_ruby_application_header_custom_non_printable(self): self.load('header_custom') assert ( self.post(body='\b')['status'] == 500 ), 'header custom non printable' def test_ruby_application_header_status(self): self.load('header_status') assert self.get()['status'] == 200, 'header status' @pytest.mark.skip('not yet') def test_ruby_application_header_rack(self): self.load('header_rack') assert self.get()['status'] == 500, 'header rack' def test_ruby_application_body_empty(self): self.load('body_empty') assert self.get()['body'] == '', 'body empty' def test_ruby_application_body_array(self): self.load('body_array') assert self.get()['body'] == '0123456789', 'body array' def test_ruby_application_body_large(self): self.load('mirror') body = '0123456789' * 1000 assert self.post(body=body)['body'] == body, 'body large' @pytest.mark.skip('not yet') def test_ruby_application_body_each_error(self, wait_for_record): self.load('body_each_error') assert self.get()['status'] == 500, 'body each error status' assert ( wait_for_record(r'\[error\].+Failed to run ruby script') is not None ), 'body each error' def test_ruby_application_body_file(self): self.load('body_file') assert self.get()['body'] == 'body\n', 'body file' def test_ruby_keepalive_body(self): self.load('mirror') assert self.get()['status'] == 200, 'init' body = '0123456789' * 500 (resp, sock) = self.post( headers={ 'Host': 'localhost', 'Connection': 'keep-alive', }, start=True, body=body, read_timeout=1, ) assert resp['body'] == body, 'keep-alive 1' body = '0123456789' resp = self.post(sock=sock, body=body) assert resp['body'] == body, 'keep-alive 2' def test_ruby_application_constants(self): self.load('constants') resp = self.get() assert resp['status'] == 200, 'status' headers = resp['headers'] assert len(headers['X-Copyright']) > 0, 'RUBY_COPYRIGHT' assert len(headers['X-Description']) > 0, 'RUBY_DESCRIPTION' assert len(headers['X-Engine']) > 0, 'RUBY_ENGINE' assert len(headers['X-Engine-Version']) > 0, 'RUBY_ENGINE_VERSION' assert len(headers['X-Patchlevel']) > 0, 'RUBY_PATCHLEVEL' assert len(headers['X-Platform']) > 0, 'RUBY_PLATFORM' assert len(headers['X-Release-Date']) > 0, 'RUBY_RELEASE_DATE' assert len(headers['X-Revision']) > 0, 'RUBY_REVISION' assert len(headers['X-Version']) > 0, 'RUBY_VERSION' def test_ruby_application_threads(self): self.load('threads') assert 'success' in self.conf( '4', 'applications/threads/threads' ), 'configure 4 threads' socks = [] for _ in range(4): sock = self.get( headers={ 'Host': 'localhost', 'X-Delay': '2', 'Connection': 'close', }, no_recv=True, ) socks.append(sock) threads = set() for sock in socks: resp = self.recvall(sock).decode('utf-8') self.log_in(resp) resp = self._resp_to_dict(resp) assert resp['status'] == 200, 'status' threads.add(resp['headers']['X-Thread']) assert resp['headers']['Rack-Multithread'] == 'true', 'multithread' sock.close() assert len(socks) == len(threads), 'threads differs'