import unittest from unit.applications.lang.ruby import TestApplicationRuby class TestRubyApplication(TestApplicationRuby): prerequisites = ['ruby'] def test_ruby_application(self): self.load('variables') body = 'Test body string.' resp = self.post( headers={ 'Host': 'localhost', 'Content-Type': 'text/html', 'Custom-Header': 'blah', 'Connection': 'close', }, body=body, ) self.assertEqual(resp['status'], 200, 'status') headers = resp['headers'] header_server = headers.pop('Server') self.assertRegex(header_server, r'Unit/[\d\.]+', 'server header') self.assertEqual( headers.pop('Server-Software'), header_server, 'server software header', ) date = headers.pop('Date') self.assertEqual(date[-4:], ' GMT', 'date header timezone') self.assertLess( abs(self.date_to_sec_epoch(date) - self.sec_epoch()), 5, 'date header', ) self.assertDictEqual( headers, { 'Connection': 'close', 'Content-Length': str(len(body)), 'Content-Type': 'text/html', 'Request-Method': 'POST', 'Request-Uri': '/', 'Http-Host': 'localhost', 'Server-Protocol': 'HTTP/1.1', 'Custom-Header': 'blah', 'Rack-Version': '13', 'Rack-Url-Scheme': 'http', 'Rack-Multithread': 'false', 'Rack-Multiprocess': 'true', 'Rack-Run-Once': 'false', 'Rack-Hijack-Q': 'false', 'Rack-Hijack': '', 'Rack-Hijack-IO': '', }, 'headers', ) self.assertEqual(resp['body'], body, 'body') def test_ruby_application_query_string(self): self.load('query_string') resp = self.get(url='/?var1=val1&var2=val2') self.assertEqual( resp['headers']['Query-String'], 'var1=val1&var2=val2', 'Query-String header', ) def test_ruby_application_query_string_empty(self): self.load('query_string') resp = self.get(url='/?') self.assertEqual(resp['status'], 200, 'query string empty status') self.assertEqual( resp['headers']['Query-String'], '', 'query string empty' ) def test_ruby_application_query_string_absent(self): self.load('query_string') resp = self.get() self.assertEqual(resp['status'], 200, 'query string absent status') self.assertEqual( resp['headers']['Query-String'], '', 'query string absent' ) @unittest.skip('not yet') def test_ruby_application_server_port(self): self.load('server_port') self.assertEqual( self.get()['headers']['Server-Port'], '7080', 'Server-Port header' ) def test_ruby_application_status_int(self): self.load('status_int') self.assertEqual(self.get()['status'], 200, 'status int') def test_ruby_application_input_read_empty(self): self.load('input_read_empty') self.assertEqual(self.get()['body'], '', 'read empty') def test_ruby_application_input_read_parts(self): self.load('input_read_parts') self.assertEqual( self.post(body='0123456789')['body'], '012345678', 'input read parts', ) def test_ruby_application_input_read_buffer(self): self.load('input_read_buffer') self.assertEqual( self.post(body='0123456789')['body'], '0123456789', 'input read buffer', ) def test_ruby_application_input_read_buffer_not_empty(self): self.load('input_read_buffer_not_empty') self.assertEqual( self.post(body='0123456789')['body'], '0123456789', 'input read buffer not empty', ) def test_ruby_application_input_gets(self): self.load('input_gets') body = '0123456789' self.assertEqual(self.post(body=body)['body'], body, 'input gets') def test_ruby_application_input_gets_2(self): self.load('input_gets') self.assertEqual( self.post(body='01234\n56789\n')['body'], '01234\n', 'input gets 2' ) def test_ruby_application_input_gets_all(self): self.load('input_gets_all') body = '\n01234\n56789\n\n' self.assertEqual(self.post(body=body)['body'], body, 'input gets all') def test_ruby_application_input_each(self): self.load('input_each') body = '\n01234\n56789\n\n' self.assertEqual(self.post(body=body)['body'], body, 'input each') @unittest.skip('not yet') def test_ruby_application_input_rewind(self): self.load('input_rewind') body = '0123456789' self.assertEqual(self.post(body=body)['body'], body, 'input rewind') @unittest.skip('not yet') def test_ruby_application_syntax_error(self): self.skip_alerts.extend( [ r'Failed to parse rack script', r'syntax error', r'new_from_string', r'parse_file', ] ) self.load('syntax_error') self.assertEqual(self.get()['status'], 500, 'syntax error') def test_ruby_application_errors_puts(self): self.load('errors_puts') self.get() self.stop() self.assertIsNotNone( self.wait_for_record(r'\[error\].+Error in application'), 'errors puts', ) def test_ruby_application_errors_puts_int(self): self.load('errors_puts_int') self.get() self.stop() self.assertIsNotNone( self.wait_for_record(r'\[error\].+1234567890'), 'errors puts int' ) def test_ruby_application_errors_write(self): self.load('errors_write') self.get() self.stop() self.assertIsNotNone( self.wait_for_record(r'\[error\].+Error in application'), 'errors write', ) def test_ruby_application_errors_write_to_s_custom(self): self.load('errors_write_to_s_custom') self.assertEqual(self.get()['status'], 200, 'errors write to_s custom') def test_ruby_application_errors_write_int(self): self.load('errors_write_int') self.get() self.stop() self.assertIsNotNone( self.wait_for_record(r'\[error\].+1234567890'), 'errors write int' ) def test_ruby_application_at_exit(self): self.load('at_exit') self.get() self.conf({"listeners": {}, "applications": {}}) self.stop() self.assertIsNotNone( self.wait_for_record(r'\[error\].+At exit called\.'), 'at exit' ) def test_ruby_application_header_custom(self): self.load('header_custom') resp = self.post(body="\ntc=one,two\ntc=three,four,\n\n") self.assertEqual( resp['headers']['Custom-Header'], ['', 'tc=one,two', 'tc=three,four,', '', ''], 'header custom', ) @unittest.skip('not yet') def test_ruby_application_header_custom_non_printable(self): self.load('header_custom') self.assertEqual( self.post(body='\b')['status'], 500, 'header custom non printable' ) def test_ruby_application_header_status(self): self.load('header_status') self.assertEqual(self.get()['status'], 200, 'header status') @unittest.skip('not yet') def test_ruby_application_header_rack(self): self.load('header_rack') self.assertEqual(self.get()['status'], 500, 'header rack') def test_ruby_application_body_empty(self): self.load('body_empty') self.assertEqual(self.get()['body'], '0\r\n\r\n', 'body empty') def test_ruby_application_body_array(self): self.load('body_array') self.assertEqual(self.get()['body'], '0123456789', 'body array') def test_ruby_application_body_large(self): self.load('mirror') body = '0123456789' * 1000 self.assertEqual(self.post(body=body)['body'], body, 'body large') @unittest.skip('not yet') def test_ruby_application_body_each_error(self): self.load('body_each_error') self.assertEqual(self.get()['status'], 500, 'body each error status') self.stop() self.assertIsNotNone( self.wait_for_record(r'\[error\].+Failed to run ruby script'), 'body each error', ) def test_ruby_application_body_file(self): self.load('body_file') self.assertEqual(self.get()['body'], 'body\n', 'body file') def test_ruby_keepalive_body(self): self.load('mirror') self.assertEqual(self.get()['status'], 200, 'init') (resp, sock) = self.post( headers={ 'Host': 'localhost', 'Connection': 'keep-alive', 'Content-Type': 'text/html', }, start=True, body='0123456789' * 500, read_timeout=1, ) self.assertEqual(resp['body'], '0123456789' * 500, 'keep-alive 1') resp = self.post( headers={ 'Host': 'localhost', 'Connection': 'close', 'Content-Type': 'text/html', }, sock=sock, body='0123456789', ) self.assertEqual(resp['body'], '0123456789', 'keep-alive 2') if __name__ == '__main__': TestRubyApplication.main()