diff options
Diffstat (limited to '')
-rw-r--r-- | test/test_python_application.py | 1294 |
1 files changed, 666 insertions, 628 deletions
diff --git a/test/test_python_application.py b/test/test_python_application.py index d846c1e3..18473d59 100644 --- a/test/test_python_application.py +++ b/test/test_python_application.py @@ -8,19 +8,20 @@ import venv import pytest from packaging import version -from unit.applications.lang.python import TestApplicationPython +from unit.applications.lang.python import ApplicationPython prerequisites = {'modules': {'python': 'all'}} +client = ApplicationPython() -class TestPythonApplication(TestApplicationPython): - def test_python_application_variables(self, date_to_sec_epoch, sec_epoch): - self.load('variables') - body = 'Test body string.' +def test_python_application_variables(date_to_sec_epoch, sec_epoch): + client.load('variables') - resp = self.http( - f"""POST / HTTP/1.1 + body = 'Test body string.' + + resp = client.http( + f"""POST / HTTP/1.1 Host: localhost Content-Length: {len(body)} Custom-Header: blah @@ -30,866 +31,903 @@ Connection: close custom-header: BLAH {body}""".encode(), - raw=True, - ) + raw=True, + ) + + assert resp['status'] == 200, 'status' + headers = resp['headers'] + header_server = headers.pop('Server') + assert re.search(r'Unit/[\d\.]+', header_server), 'server header' + assert ( + headers.pop('Server-Software') == header_server + ), 'server software header' + + date = headers.pop('Date') + assert date[-4:] == ' GMT', 'date header timezone' + assert abs(date_to_sec_epoch(date) - sec_epoch) < 5, 'date header' + + assert headers == { + 'Connection': 'close', + 'Content-Length': str(len(body)), + 'Content-Type': 'text/html', + 'Request-Method': 'POST', + 'Request-Uri': '/', + 'Http-Host': 'localhost', + 'Server-Protocol': 'HTTP/1.1', + 'Custom-Header': 'blah, Blah, BLAH', + 'Wsgi-Version': '(1, 0)', + 'Wsgi-Url-Scheme': 'http', + 'Wsgi-Multithread': 'False', + 'Wsgi-Multiprocess': 'True', + 'Wsgi-Run-Once': 'False', + }, 'headers' + assert resp['body'] == body, 'body' - assert resp['status'] == 200, 'status' - headers = resp['headers'] - header_server = headers.pop('Server') - assert re.search(r'Unit/[\d\.]+', header_server), 'server header' - assert ( - headers.pop('Server-Software') == header_server - ), 'server software header' - - date = headers.pop('Date') - assert date[-4:] == ' GMT', 'date header timezone' - assert abs(date_to_sec_epoch(date) - sec_epoch) < 5, 'date header' - - assert headers == { - 'Connection': 'close', - 'Content-Length': str(len(body)), - 'Content-Type': 'text/html', - 'Request-Method': 'POST', - 'Request-Uri': '/', - 'Http-Host': 'localhost', - 'Server-Protocol': 'HTTP/1.1', - 'Custom-Header': 'blah, Blah, BLAH', - 'Wsgi-Version': '(1, 0)', - 'Wsgi-Url-Scheme': 'http', - 'Wsgi-Multithread': 'False', - 'Wsgi-Multiprocess': 'True', - 'Wsgi-Run-Once': 'False', - }, 'headers' - assert resp['body'] == body, 'body' - - def test_python_application_query_string(self): - self.load('query_string') - - resp = self.get(url='/?var1=val1&var2=val2') - - assert ( - resp['headers']['Query-String'] == 'var1=val1&var2=val2' - ), 'Query-String header' - - def test_python_application_query_string_space(self): - self.load('query_string') - - resp = self.get(url='/ ?var1=val1&var2=val2') - assert ( - resp['headers']['Query-String'] == 'var1=val1&var2=val2' - ), 'Query-String space' - - resp = self.get(url='/ %20?var1=val1&var2=val2') - assert ( - resp['headers']['Query-String'] == 'var1=val1&var2=val2' - ), 'Query-String space 2' - - resp = self.get(url='/ %20 ?var1=val1&var2=val2') - assert ( - resp['headers']['Query-String'] == 'var1=val1&var2=val2' - ), 'Query-String space 3' - resp = self.get(url='/blah %20 blah? var1= val1 & var2=val2') - assert ( - resp['headers']['Query-String'] == ' var1= val1 & var2=val2' - ), 'Query-String space 4' +def test_python_application_query_string(): + client.load('query_string') - def test_python_application_prefix(self): - self.load('prefix', prefix='/api/rest') + resp = client.get(url='/?var1=val1&var2=val2') - def set_prefix(prefix): - self.conf(f'"{prefix}"', 'applications/prefix/prefix') + assert ( + resp['headers']['Query-String'] == 'var1=val1&var2=val2' + ), 'Query-String header' - def check_prefix(url, script_name, path_info): - resp = self.get(url=url) - assert resp['status'] == 200 - assert resp['headers']['Script-Name'] == script_name - assert resp['headers']['Path-Info'] == path_info - check_prefix('/ap', 'NULL', '/ap') - check_prefix('/api', 'NULL', '/api') - check_prefix('/api/', 'NULL', '/api/') - check_prefix('/api/res', 'NULL', '/api/res') - check_prefix('/api/restful', 'NULL', '/api/restful') - check_prefix('/api/rest', '/api/rest', '') - check_prefix('/api/rest/', '/api/rest', '/') - check_prefix('/api/rest/get', '/api/rest', '/get') - check_prefix('/api/rest/get/blah', '/api/rest', '/get/blah') +def test_python_application_query_string_space(): + client.load('query_string') - set_prefix('/api/rest/') - check_prefix('/api/rest', '/api/rest', '') - check_prefix('/api/restful', 'NULL', '/api/restful') - check_prefix('/api/rest/', '/api/rest', '/') - check_prefix('/api/rest/blah', '/api/rest', '/blah') + resp = client.get(url='/ ?var1=val1&var2=val2') + assert ( + resp['headers']['Query-String'] == 'var1=val1&var2=val2' + ), 'Query-String space' - set_prefix('/app') - check_prefix('/ap', 'NULL', '/ap') - check_prefix('/app', '/app', '') - check_prefix('/app/', '/app', '/') - check_prefix('/application/', 'NULL', '/application/') + resp = client.get(url='/ %20?var1=val1&var2=val2') + assert ( + resp['headers']['Query-String'] == 'var1=val1&var2=val2' + ), 'Query-String space 2' - set_prefix('/') - check_prefix('/', 'NULL', '/') - check_prefix('/app', 'NULL', '/app') + resp = client.get(url='/ %20 ?var1=val1&var2=val2') + assert ( + resp['headers']['Query-String'] == 'var1=val1&var2=val2' + ), 'Query-String space 3' - def test_python_application_query_string_empty(self): - self.load('query_string') + resp = client.get(url='/blah %20 blah? var1= val1 & var2=val2') + assert ( + resp['headers']['Query-String'] == ' var1= val1 & var2=val2' + ), 'Query-String space 4' - resp = self.get(url='/?') - assert resp['status'] == 200, 'query string empty status' - assert resp['headers']['Query-String'] == '', 'query string empty' +def test_python_application_prefix(): + client.load('prefix', prefix='/api/rest') - def test_python_application_query_string_absent(self): - self.load('query_string') + def set_prefix(prefix): + client.conf(f'"{prefix}"', 'applications/prefix/prefix') - resp = self.get() + def check_prefix(url, script_name, path_info): + resp = client.get(url=url) + assert resp['status'] == 200 + assert resp['headers']['Script-Name'] == script_name + assert resp['headers']['Path-Info'] == path_info - assert resp['status'] == 200, 'query string absent status' - assert resp['headers']['Query-String'] == '', 'query string absent' + check_prefix('/ap', 'NULL', '/ap') + check_prefix('/api', 'NULL', '/api') + check_prefix('/api/', 'NULL', '/api/') + check_prefix('/api/res', 'NULL', '/api/res') + check_prefix('/api/restful', 'NULL', '/api/restful') + check_prefix('/api/rest', '/api/rest', '') + check_prefix('/api/rest/', '/api/rest', '/') + check_prefix('/api/rest/get', '/api/rest', '/get') + check_prefix('/api/rest/get/blah', '/api/rest', '/get/blah') - @pytest.mark.skip('not yet') - def test_python_application_server_port(self): - self.load('server_port') + set_prefix('/api/rest/') + check_prefix('/api/rest', '/api/rest', '') + check_prefix('/api/restful', 'NULL', '/api/restful') + check_prefix('/api/rest/', '/api/rest', '/') + check_prefix('/api/rest/blah', '/api/rest', '/blah') - assert ( - self.get()['headers']['Server-Port'] == '7080' - ), 'Server-Port header' + set_prefix('/app') + check_prefix('/ap', 'NULL', '/ap') + check_prefix('/app', '/app', '') + check_prefix('/app/', '/app', '/') + check_prefix('/application/', 'NULL', '/application/') - @pytest.mark.skip('not yet') - def test_python_application_working_directory_invalid(self): - self.load('empty') + set_prefix('/') + check_prefix('/', 'NULL', '/') + check_prefix('/app', 'NULL', '/app') - assert 'success' in self.conf( - '"/blah"', 'applications/empty/working_directory' - ), 'configure invalid working_directory' - assert self.get()['status'] == 500, 'status' +def test_python_application_query_string_empty(): + client.load('query_string') - def test_python_application_204_transfer_encoding(self): - self.load('204_no_content') + resp = client.get(url='/?') - assert ( - 'Transfer-Encoding' not in self.get()['headers'] - ), '204 header transfer encoding' + assert resp['status'] == 200, 'query string empty status' + assert resp['headers']['Query-String'] == '', 'query string empty' - def test_python_application_ctx_iter_atexit(self, wait_for_record): - self.load('ctx_iter_atexit') - resp = self.post(body='0123456789') +def test_python_application_query_string_absent(): + client.load('query_string') - assert resp['status'] == 200, 'ctx iter status' - assert resp['body'] == '0123456789', 'ctx iter body' + resp = client.get() - assert 'success' in self.conf({"listeners": {}, "applications": {}}) + assert resp['status'] == 200, 'query string absent status' + assert resp['headers']['Query-String'] == '', 'query string absent' - assert wait_for_record(r'RuntimeError') is not None, 'ctx iter atexit' - def test_python_keepalive_body(self): - self.load('mirror') +@pytest.mark.skip('not yet') +def test_python_application_server_port(): + client.load('server_port') - assert self.get()['status'] == 200, 'init' + assert ( + client.get()['headers']['Server-Port'] == '7080' + ), 'Server-Port header' - body = '0123456789' * 500 - (resp, sock) = self.post( - headers={ - 'Host': 'localhost', - 'Connection': 'keep-alive', - }, - start=True, - body=body, - read_timeout=1, - ) - assert resp['body'] == body, 'keep-alive 1' +@pytest.mark.skip('not yet') +def test_python_application_working_directory_invalid(): + client.load('empty') - body = '0123456789' - resp = self.post(sock=sock, body=body) + assert 'success' in client.conf( + '"/blah"', 'applications/empty/working_directory' + ), 'configure invalid working_directory' - assert resp['body'] == body, 'keep-alive 2' + assert client.get()['status'] == 500, 'status' - def test_python_keepalive_reconfigure(self): - self.load('mirror') - assert self.get()['status'] == 200, 'init' +def test_python_application_204_transfer_encoding(): + client.load('204_no_content') - body = '0123456789' - conns = 3 - socks = [] + assert ( + 'Transfer-Encoding' not in client.get()['headers'] + ), '204 header transfer encoding' - for i in range(conns): - (resp, sock) = self.post( - headers={ - 'Host': 'localhost', - 'Connection': 'keep-alive', - }, - start=True, - body=body, - read_timeout=1, - ) - assert resp['body'] == body, 'keep-alive open' +def test_python_application_ctx_iter_atexit(wait_for_record): + client.load('ctx_iter_atexit') - self.load('mirror', processes=i + 1) + resp = client.post(body='0123456789') - socks.append(sock) + assert resp['status'] == 200, 'ctx iter status' + assert resp['body'] == '0123456789', 'ctx iter body' - for i in range(conns): - (resp, sock) = self.post( - headers={ - 'Host': 'localhost', - 'Connection': 'keep-alive', - }, - start=True, - sock=socks[i], - body=body, - read_timeout=1, - ) + assert 'success' in client.conf({"listeners": {}, "applications": {}}) + + assert wait_for_record(r'RuntimeError') is not None, 'ctx iter atexit' + + +def test_python_keepalive_body(): + client.load('mirror') + + assert client.get()['status'] == 200, 'init' + + body = '0123456789' * 500 + (resp, sock) = client.post( + headers={ + 'Host': 'localhost', + 'Connection': 'keep-alive', + }, + start=True, + body=body, + read_timeout=1, + ) - assert resp['body'] == body, 'keep-alive request' + assert resp['body'] == body, 'keep-alive 1' - self.load('mirror', processes=i + 1) + body = '0123456789' + resp = client.post(sock=sock, body=body) - for i in range(conns): - resp = self.post(sock=socks[i], body=body) + assert resp['body'] == body, 'keep-alive 2' - assert resp['body'] == body, 'keep-alive close' - self.load('mirror', processes=i + 1) +def test_python_keepalive_reconfigure(): + client.load('mirror') - def test_python_keepalive_reconfigure_2(self): - self.load('mirror') + assert client.get()['status'] == 200, 'init' - assert self.get()['status'] == 200, 'init' + body = '0123456789' + conns = 3 + socks = [] - body = '0123456789' + for i in range(conns): + (resp, sock) = client.post( + headers={ + 'Host': 'localhost', + 'Connection': 'keep-alive', + }, + start=True, + body=body, + read_timeout=1, + ) + + assert resp['body'] == body, 'keep-alive open' + + client.load('mirror', processes=i + 1) - (resp, sock) = self.post( + socks.append(sock) + + for i in range(conns): + (resp, sock) = client.post( headers={ 'Host': 'localhost', 'Connection': 'keep-alive', }, start=True, + sock=socks[i], body=body, read_timeout=1, ) - assert resp['body'] == body, 'reconfigure 2 keep-alive 1' + assert resp['body'] == body, 'keep-alive request' - self.load('empty') + client.load('mirror', processes=i + 1) - assert self.get()['status'] == 200, 'init' + for i in range(conns): + resp = client.post(sock=socks[i], body=body) - (resp, sock) = self.post(start=True, sock=sock, body=body) + assert resp['body'] == body, 'keep-alive close' - assert resp['status'] == 200, 'reconfigure 2 keep-alive 2' - assert resp['body'] == '', 'reconfigure 2 keep-alive 2 body' + client.load('mirror', processes=i + 1) - assert 'success' in self.conf( - {"listeners": {}, "applications": {}} - ), 'reconfigure 2 clear configuration' - resp = self.get(sock=sock) +def test_python_keepalive_reconfigure_2(): + client.load('mirror') - assert resp == {}, 'reconfigure 2 keep-alive 3' + assert client.get()['status'] == 200, 'init' - def test_python_atexit(self, wait_for_record): - self.load('atexit') + body = '0123456789' - self.get() + (resp, sock) = client.post( + headers={ + 'Host': 'localhost', + 'Connection': 'keep-alive', + }, + start=True, + body=body, + read_timeout=1, + ) - assert 'success' in self.conf({"listeners": {}, "applications": {}}) + assert resp['body'] == body, 'reconfigure 2 keep-alive 1' - assert wait_for_record(r'At exit called\.') is not None, 'atexit' + client.load('empty') - def test_python_process_switch(self): - self.load('delayed', processes=2) + assert client.get()['status'] == 200, 'init' - self.get( - headers={ - 'Host': 'localhost', - 'Content-Length': '0', - 'X-Delay': '5', - 'Connection': 'close', - }, - no_recv=True, - ) + (resp, sock) = client.post(start=True, sock=sock, body=body) - headers_delay_1 = { - 'Connection': 'close', + assert resp['status'] == 200, 'reconfigure 2 keep-alive 2' + assert resp['body'] == '', 'reconfigure 2 keep-alive 2 body' + + assert 'success' in client.conf( + {"listeners": {}, "applications": {}} + ), 'reconfigure 2 clear configuration' + + resp = client.get(sock=sock) + + assert resp == {}, 'reconfigure 2 keep-alive 3' + + +def test_python_atexit(wait_for_record): + client.load('atexit') + + client.get() + + assert 'success' in client.conf({"listeners": {}, "applications": {}}) + + assert wait_for_record(r'At exit called\.') is not None, 'atexit' + + +def test_python_process_switch(): + client.load('delayed', processes=2) + + client.get( + headers={ 'Host': 'localhost', 'Content-Length': '0', - 'X-Delay': '1', - } + 'X-Delay': '5', + 'Connection': 'close', + }, + no_recv=True, + ) + + headers_delay_1 = { + 'Connection': 'close', + 'Host': 'localhost', + 'Content-Length': '0', + 'X-Delay': '1', + } + + client.get(headers=headers_delay_1, no_recv=True) + + time.sleep(0.5) - self.get(headers=headers_delay_1, no_recv=True) + for _ in range(10): + client.get(headers=headers_delay_1, no_recv=True) - time.sleep(0.5) + client.get(headers=headers_delay_1) - for _ in range(10): - self.get(headers=headers_delay_1, no_recv=True) - self.get(headers=headers_delay_1) +@pytest.mark.skip('not yet') +def test_python_application_start_response_exit(): + client.load('start_response_exit') - @pytest.mark.skip('not yet') - def test_python_application_start_response_exit(self): - self.load('start_response_exit') + assert client.get()['status'] == 500, 'start response exit' - assert self.get()['status'] == 500, 'start response exit' - def test_python_application_input_iter(self): - self.load('input_iter') +def test_python_application_input_iter(): + client.load('input_iter') - body = '''0123456789 + body = '''0123456789 next line last line''' - resp = self.post(body=body) - assert resp['body'] == body, 'input iter' - assert resp['headers']['X-Lines-Count'] == '4', 'input iter lines' + resp = client.post(body=body) + assert resp['body'] == body, 'input iter' + assert resp['headers']['X-Lines-Count'] == '4', 'input iter lines' - def test_python_application_input_readline(self): - self.load('input_readline') - body = '''0123456789 +def test_python_application_input_readline(): + client.load('input_readline') + + body = '''0123456789 next line last line''' - resp = self.post(body=body) - assert resp['body'] == body, 'input readline' - assert resp['headers']['X-Lines-Count'] == '4', 'input readline lines' + resp = client.post(body=body) + assert resp['body'] == body, 'input readline' + assert resp['headers']['X-Lines-Count'] == '4', 'input readline lines' + - def test_python_application_input_readline_size(self): - self.load('input_readline_size') +def test_python_application_input_readline_size(): + client.load('input_readline_size') - body = '''0123456789 + body = '''0123456789 next line last line''' - assert self.post(body=body)['body'] == body, 'input readline size' - assert ( - self.post(body='0123')['body'] == '0123' - ), 'input readline size less' + assert client.post(body=body)['body'] == body, 'input readline size' + assert ( + client.post(body='0123')['body'] == '0123' + ), 'input readline size less' + - def test_python_application_input_readlines(self): - self.load('input_readlines') +def test_python_application_input_readlines(): + client.load('input_readlines') - body = '''0123456789 + body = '''0123456789 next line last line''' - resp = self.post(body=body) - assert resp['body'] == body, 'input readlines' - assert resp['headers']['X-Lines-Count'] == '4', 'input readlines lines' + resp = client.post(body=body) + assert resp['body'] == body, 'input readlines' + assert resp['headers']['X-Lines-Count'] == '4', 'input readlines lines' - def test_python_application_input_readlines_huge(self): - self.load('input_readlines') - body = ( - '''0123456789 abcdefghi +def test_python_application_input_readlines_huge(): + client.load('input_readlines') + + body = ( + '''0123456789 abcdefghi next line: 0123456789 abcdefghi last line: 987654321 ''' - * 512 - ) + * 512 + ) - assert ( - self.post(body=body, read_buffer_size=16384)['body'] == body - ), 'input readlines huge' + assert ( + client.post(body=body, read_buffer_size=16384)['body'] == body + ), 'input readlines huge' - def test_python_application_input_read_length(self): - self.load('input_read_length') - body = '0123456789' +def test_python_application_input_read_length(): + client.load('input_read_length') - resp = self.post( - headers={ - 'Host': 'localhost', - 'Input-Length': '5', - 'Connection': 'close', - }, - body=body, - ) + body = '0123456789' - assert resp['body'] == body[:5], 'input read length lt body' + resp = client.post( + headers={ + 'Host': 'localhost', + 'Input-Length': '5', + 'Connection': 'close', + }, + body=body, + ) - resp = self.post( - headers={ - 'Host': 'localhost', - 'Input-Length': '15', - 'Connection': 'close', - }, - body=body, - ) + assert resp['body'] == body[:5], 'input read length lt body' - assert resp['body'] == body, 'input read length gt body' + resp = client.post( + headers={ + 'Host': 'localhost', + 'Input-Length': '15', + 'Connection': 'close', + }, + body=body, + ) - resp = self.post( - headers={ - 'Host': 'localhost', - 'Input-Length': '0', - 'Connection': 'close', - }, - body=body, - ) + assert resp['body'] == body, 'input read length gt body' - assert resp['body'] == '', 'input read length zero' + resp = client.post( + headers={ + 'Host': 'localhost', + 'Input-Length': '0', + 'Connection': 'close', + }, + body=body, + ) - resp = self.post( - headers={ - 'Host': 'localhost', - 'Input-Length': '-1', - 'Connection': 'close', - }, - body=body, - ) + assert resp['body'] == '', 'input read length zero' - assert resp['body'] == body, 'input read length negative' + resp = client.post( + headers={ + 'Host': 'localhost', + 'Input-Length': '-1', + 'Connection': 'close', + }, + body=body, + ) - @pytest.mark.skip('not yet') - def test_python_application_errors_write(self, wait_for_record): - self.load('errors_write') + assert resp['body'] == body, 'input read length negative' - self.get() - assert ( - wait_for_record(r'\[error\].+Error in application\.') is not None - ), 'errors write' +@pytest.mark.skip('not yet') +def test_python_application_errors_write(wait_for_record): + client.load('errors_write') - def test_python_application_body_array(self): - self.load('body_array') + client.get() - assert self.get()['body'] == '0123456789', 'body array' + assert ( + wait_for_record(r'\[error\].+Error in application\.') is not None + ), 'errors write' - def test_python_application_body_io(self): - self.load('body_io') - assert self.get()['body'] == '0123456789', 'body io' +def test_python_application_body_array(): + client.load('body_array') - def test_python_application_body_io_file(self): - self.load('body_io_file') + assert client.get()['body'] == '0123456789', 'body array' - assert self.get()['body'] == 'body\n', 'body io file' - @pytest.mark.skip('not yet') - def test_python_application_syntax_error(self, skip_alert): - skip_alert(r'Python failed to import module "wsgi"') - self.load('syntax_error') +def test_python_application_body_io(): + client.load('body_io') - assert self.get()['status'] == 500, 'syntax error' + assert client.get()['body'] == '0123456789', 'body io' - def test_python_application_loading_error(self, skip_alert): - skip_alert(r'Python failed to import module "blah"') - self.load('empty', module="blah") +def test_python_application_body_io_file(): + client.load('body_io_file') - assert self.get()['status'] == 503, 'loading error' + assert client.get()['body'] == 'body\n', 'body io file' - def test_python_application_close(self, wait_for_record): - self.load('close') - self.get() +@pytest.mark.skip('not yet') +def test_python_application_syntax_error(skip_alert): + skip_alert(r'Python failed to import module "wsgi"') + client.load('syntax_error') - assert wait_for_record(r'Close called\.') is not None, 'close' + assert client.get()['status'] == 500, 'syntax error' - def test_python_application_close_error(self, wait_for_record): - self.load('close_error') - self.get() +def test_python_application_loading_error(skip_alert): + skip_alert(r'Python failed to import module "blah"') - assert wait_for_record(r'Close called\.') is not None, 'close error' + client.load('empty', module="blah") - def test_python_application_not_iterable(self, wait_for_record): - self.load('not_iterable') + assert client.get()['status'] == 503, 'loading error' - self.get() - assert ( - wait_for_record( - r'\[error\].+the application returned not an iterable object' - ) - is not None - ), 'not iterable' +def test_python_application_close(wait_for_record): + client.load('close') - def test_python_application_write(self): - self.load('write') + client.get() - assert self.get()['body'] == '0123456789', 'write' + assert wait_for_record(r'Close called\.') is not None, 'close' - def test_python_application_encoding(self): - self.load('encoding') - try: - locales = ( - subprocess.check_output( - ['locale', '-a'], - stderr=subprocess.STDOUT, - ) - .decode() - .splitlines() - ) - except ( - FileNotFoundError, - UnicodeDecodeError, - subprocess.CalledProcessError, - ): - pytest.skip('require locale') - - to_check = [ - re.compile(r'.*UTF[-_]?8'), - re.compile(r'.*ISO[-_]?8859[-_]?1'), - ] - matches = [ - loc - for loc in locales - if any(pattern.match(loc.upper()) for pattern in to_check) - ] - - if not matches: - pytest.skip('no available locales') - - def unify(str): - str.upper().replace('-', '').replace('_', '') - - for loc in matches: - assert 'success' in self.conf( - {"LC_CTYPE": loc, "LC_ALL": ""}, - '/config/applications/encoding/environment', - ) - resp = self.get() - assert resp['status'] == 200, 'status' - assert unify(resp['headers']['X-Encoding']) == unify( - loc.split('.')[-1] - ) +def test_python_application_close_error(wait_for_record): + client.load('close_error') - def test_python_application_unicode(self, temp_dir): - try: - app_type = self.get_application_type() - v = version.Version(app_type.split()[-1]) - if v.major != 3: - raise version.InvalidVersion + client.get() - except version.InvalidVersion: - pytest.skip('require python module version 3') + assert wait_for_record(r'Close called\.') is not None, 'close error' - venv_path = f'{temp_dir}/venv' - venv.create(venv_path) - self.load('unicode') - assert 'success' in self.conf( - f'"{venv_path}"', - '/config/applications/unicode/home', - ) - assert ( - self.get( - headers={ - 'Host': 'localhost', - 'Temp-dir': temp_dir, - 'Connection': 'close', - } - )['status'] - == 200 +def test_python_application_not_iterable(wait_for_record): + client.load('not_iterable') + + client.get() + + assert ( + wait_for_record( + r'\[error\].+the application returned not an iterable object' ) + is not None + ), 'not iterable' + - def test_python_application_threading(self, wait_for_record): - """wait_for_record() timeouts after 5s while every thread works at - least 3s. So without releasing GIL test should fail. - """ +def test_python_application_write(): + client.load('write') - self.load('threading') + assert client.get()['body'] == '0123456789', 'write' - for _ in range(10): - self.get(no_recv=True) - assert ( - wait_for_record(r'\(5\) Thread: 100', wait=50) is not None - ), 'last thread finished' +def test_python_application_encoding(): + client.load('encoding') + + try: + locales = ( + subprocess.check_output( + ['locale', '-a'], + stderr=subprocess.STDOUT, + ) + .decode() + .splitlines() + ) + except ( + FileNotFoundError, + UnicodeDecodeError, + subprocess.CalledProcessError, + ): + pytest.skip('require locale') + + to_check = [ + re.compile(r'.*UTF[-_]?8'), + re.compile(r'.*ISO[-_]?8859[-_]?1'), + ] + matches = [ + loc + for loc in locales + if any(pattern.match(loc.upper()) for pattern in to_check) + ] + + if not matches: + pytest.skip('no available locales') + + def unify(str): + str.upper().replace('-', '').replace('_', '') + + for loc in matches: + assert 'success' in client.conf( + {"LC_CTYPE": loc, "LC_ALL": ""}, + '/config/applications/encoding/environment', + ) + resp = client.get() + assert resp['status'] == 200, 'status' + assert unify(resp['headers']['X-Encoding']) == unify(loc.split('.')[-1]) - def test_python_application_iter_exception(self, findall, wait_for_record): - self.load('iter_exception') - # Default request doesn't lead to the exception. +def test_python_application_unicode(temp_dir): + try: + app_type = client.get_application_type() + v = version.Version(app_type.split()[-1]) + if v.major != 3: + raise version.InvalidVersion - resp = self.get( + except version.InvalidVersion: + pytest.skip('require python module version 3') + + venv_path = f'{temp_dir}/venv' + venv.create(venv_path) + + client.load('unicode') + assert 'success' in client.conf( + f'"{venv_path}"', + '/config/applications/unicode/home', + ) + assert ( + client.get( headers={ 'Host': 'localhost', - 'X-Skip': '9', - 'X-Chunked': '1', + 'Temp-dir': temp_dir, 'Connection': 'close', } - ) - assert resp['status'] == 200, 'status' - assert resp['body'] == 'XXXXXXX', 'body' + )['status'] + == 200 + ) - # Exception before start_response(). - assert self.get()['status'] == 503, 'error' +def test_python_application_threading(wait_for_record): + """wait_for_record() timeouts after 5s while every thread works at + least 3s. So without releasing GIL test should fail. + """ - assert wait_for_record(r'Traceback') is not None, 'traceback' - assert ( - wait_for_record(r"raise Exception\('first exception'\)") is not None - ), 'first exception raise' - assert len(findall(r'Traceback')) == 1, 'traceback count 1' + client.load('threading') - # Exception after start_response(), before first write(). + for _ in range(10): + client.get(no_recv=True) - assert ( - self.get( - headers={ - 'Host': 'localhost', - 'X-Skip': '1', - 'Connection': 'close', - } - )['status'] - == 503 - ), 'error 2' + assert ( + wait_for_record(r'\(5\) Thread: 100', wait=50) is not None + ), 'last thread finished' - assert ( - wait_for_record(r"raise Exception\('second exception'\)") - is not None - ), 'exception raise second' - assert len(findall(r'Traceback')) == 2, 'traceback count 2' - # Exception after first write(), before first __next__(). +def test_python_application_iter_exception(findall, wait_for_record): + client.load('iter_exception') - _, sock = self.get( - headers={ - 'Host': 'localhost', - 'X-Skip': '2', - 'Connection': 'keep-alive', - }, - start=True, - ) + # Default request doesn't lead to the exception. + + resp = client.get( + headers={ + 'Host': 'localhost', + 'X-Skip': '9', + 'X-Chunked': '1', + 'Connection': 'close', + } + ) + assert resp['status'] == 200, 'status' + assert resp['body'] == 'XXXXXXX', 'body' + + # Exception before start_response(). - assert ( - wait_for_record(r"raise Exception\('third exception'\)") is not None - ), 'exception raise third' - assert len(findall(r'Traceback')) == 3, 'traceback count 3' + assert client.get()['status'] == 503, 'error' - assert self.get(sock=sock) == {}, 'closed connection' + assert wait_for_record(r'Traceback') is not None, 'traceback' + assert ( + wait_for_record(r"raise Exception\('first exception'\)") is not None + ), 'first exception raise' + assert len(findall(r'Traceback')) == 1, 'traceback count 1' - # Exception after first write(), before first __next__(), - # chunked (incomplete body). + # Exception after start_response(), before first write(). - resp = self.get( + assert ( + client.get( headers={ 'Host': 'localhost', - 'X-Skip': '2', - 'X-Chunked': '1', + 'X-Skip': '1', 'Connection': 'close', - }, - raw_resp=True, - ) - if resp: - assert resp[-5:] != '0\r\n\r\n', 'incomplete body' - assert len(findall(r'Traceback')) == 4, 'traceback count 4' + } + )['status'] + == 503 + ), 'error 2' - # Exception in __next__(). + assert ( + wait_for_record(r"raise Exception\('second exception'\)") is not None + ), 'exception raise second' + assert len(findall(r'Traceback')) == 2, 'traceback count 2' - _, sock = self.get( - headers={ - 'Host': 'localhost', - 'X-Skip': '3', - 'Connection': 'keep-alive', - }, - start=True, - ) + # Exception after first write(), before first __next__(). + + _, sock = client.get( + headers={ + 'Host': 'localhost', + 'X-Skip': '2', + 'Connection': 'keep-alive', + }, + start=True, + ) + + assert ( + wait_for_record(r"raise Exception\('third exception'\)") is not None + ), 'exception raise third' + assert len(findall(r'Traceback')) == 3, 'traceback count 3' + + assert client.get(sock=sock) == {}, 'closed connection' + + # Exception after first write(), before first __next__(), + # chunked (incomplete body). + + resp = client.get( + headers={ + 'Host': 'localhost', + 'X-Skip': '2', + 'X-Chunked': '1', + 'Connection': 'close', + }, + raw_resp=True, + ) + if resp: + assert resp[-5:] != '0\r\n\r\n', 'incomplete body' + assert len(findall(r'Traceback')) == 4, 'traceback count 4' + + # Exception in __next__(). + + _, sock = client.get( + headers={ + 'Host': 'localhost', + 'X-Skip': '3', + 'Connection': 'keep-alive', + }, + start=True, + ) - assert ( - wait_for_record(r"raise Exception\('next exception'\)") is not None - ), 'exception raise next' - assert len(findall(r'Traceback')) == 5, 'traceback count 5' + assert ( + wait_for_record(r"raise Exception\('next exception'\)") is not None + ), 'exception raise next' + assert len(findall(r'Traceback')) == 5, 'traceback count 5' - assert self.get(sock=sock) == {}, 'closed connection 2' + assert client.get(sock=sock) == {}, 'closed connection 2' - # Exception in __next__(), chunked (incomplete body). + # Exception in __next__(), chunked (incomplete body). - resp = self.get( + resp = client.get( + headers={ + 'Host': 'localhost', + 'X-Skip': '3', + 'X-Chunked': '1', + 'Connection': 'close', + }, + raw_resp=True, + ) + if resp: + assert resp[-5:] != '0\r\n\r\n', 'incomplete body 2' + assert len(findall(r'Traceback')) == 6, 'traceback count 6' + + # Exception before start_response() and in close(). + + assert ( + client.get( headers={ 'Host': 'localhost', - 'X-Skip': '3', - 'X-Chunked': '1', + 'X-Not-Skip-Close': '1', 'Connection': 'close', - }, - raw_resp=True, - ) - if resp: - assert resp[-5:] != '0\r\n\r\n', 'incomplete body 2' - assert len(findall(r'Traceback')) == 6, 'traceback count 6' + } + )['status'] + == 503 + ), 'error' - # Exception before start_response() and in close(). + assert ( + wait_for_record(r"raise Exception\('close exception'\)") is not None + ), 'exception raise close' + assert len(findall(r'Traceback')) == 8, 'traceback count 8' - assert ( - self.get( - headers={ - 'Host': 'localhost', - 'X-Not-Skip-Close': '1', - 'Connection': 'close', - } - )['status'] - == 503 - ), 'error' - assert ( - wait_for_record(r"raise Exception\('close exception'\)") is not None - ), 'exception raise close' - assert len(findall(r'Traceback')) == 8, 'traceback count 8' +def test_python_user_group(require): + require({'privileged_user': True}) - def test_python_user_group(self, require): - require({'privileged_user': True}) + nobody_uid = pwd.getpwnam('nobody').pw_uid - nobody_uid = pwd.getpwnam('nobody').pw_uid + group = 'nobody' - group = 'nobody' + try: + group_id = grp.getgrnam(group).gr_gid + except KeyError: + group = 'nogroup' + group_id = grp.getgrnam(group).gr_gid - try: - group_id = grp.getgrnam(group).gr_gid - except KeyError: - group = 'nogroup' - group_id = grp.getgrnam(group).gr_gid + client.load('user_group') - self.load('user_group') + obj = client.getjson()['body'] + assert obj['UID'] == nobody_uid, 'nobody uid' + assert obj['GID'] == group_id, 'nobody gid' - obj = self.getjson()['body'] - assert obj['UID'] == nobody_uid, 'nobody uid' - assert obj['GID'] == group_id, 'nobody gid' + client.load('user_group', user='nobody') - self.load('user_group', user='nobody') + obj = client.getjson()['body'] + assert obj['UID'] == nobody_uid, 'nobody uid user=nobody' + assert obj['GID'] == group_id, 'nobody gid user=nobody' - obj = self.getjson()['body'] - assert obj['UID'] == nobody_uid, 'nobody uid user=nobody' - assert obj['GID'] == group_id, 'nobody gid user=nobody' + client.load('user_group', user='nobody', group=group) - self.load('user_group', user='nobody', group=group) + obj = client.getjson()['body'] + assert obj['UID'] == nobody_uid, f'nobody uid user=nobody group={group}' + assert obj['GID'] == group_id, f'nobody gid user=nobody group={group}' - obj = self.getjson()['body'] - assert obj['UID'] == nobody_uid, f'nobody uid user=nobody group={group}' - assert obj['GID'] == group_id, f'nobody gid user=nobody group={group}' + client.load('user_group', group=group) - self.load('user_group', group=group) + obj = client.getjson()['body'] + assert obj['UID'] == nobody_uid, f'nobody uid group={group}' + assert obj['GID'] == group_id, f'nobody gid group={group}' - obj = self.getjson()['body'] - assert obj['UID'] == nobody_uid, f'nobody uid group={group}' - assert obj['GID'] == group_id, f'nobody gid group={group}' + client.load('user_group', user='root') - self.load('user_group', user='root') + obj = client.getjson()['body'] + assert obj['UID'] == 0, 'root uid user=root' + assert obj['GID'] == 0, 'root gid user=root' - obj = self.getjson()['body'] - assert obj['UID'] == 0, 'root uid user=root' - assert obj['GID'] == 0, 'root gid user=root' + group = 'root' - group = 'root' + try: + grp.getgrnam(group) + group = True + except KeyError: + group = False - try: - grp.getgrnam(group) - group = True - except KeyError: - group = False + if group: + client.load('user_group', user='root', group='root') - if group: - self.load('user_group', user='root', group='root') + obj = client.getjson()['body'] + assert obj['UID'] == 0, 'root uid user=root group=root' + assert obj['GID'] == 0, 'root gid user=root group=root' - obj = self.getjson()['body'] - assert obj['UID'] == 0, 'root uid user=root group=root' - assert obj['GID'] == 0, 'root gid user=root group=root' + client.load('user_group', group='root') - self.load('user_group', group='root') + obj = client.getjson()['body'] + assert obj['UID'] == nobody_uid, 'root uid group=root' + assert obj['GID'] == 0, 'root gid group=root' - obj = self.getjson()['body'] - assert obj['UID'] == nobody_uid, 'root uid group=root' - assert obj['GID'] == 0, 'root gid group=root' - def test_python_application_callable(self, skip_alert): - skip_alert(r'Python failed to get "blah" from module') - self.load('callable') +def test_python_application_callable(skip_alert): + skip_alert(r'Python failed to get "blah" from module') + client.load('callable') - assert self.get()['status'] == 204, 'default application response' + assert client.get()['status'] == 204, 'default application response' - self.load('callable', callable="app") + client.load('callable', callable="app") - assert self.get()['status'] == 200, 'callable response' + assert client.get()['status'] == 200, 'callable response' - self.load('callable', callable="blah") + client.load('callable', callable="blah") - assert self.get()['status'] not in [200, 204], 'callable response inv' + assert client.get()['status'] not in [200, 204], 'callable response inv' - def test_python_application_path(self): - self.load('path') - def set_path(path): - assert 'success' in self.conf(path, 'applications/path/path') +def test_python_application_path(): + client.load('path') - def get_path(): - return self.get()['body'].split(os.pathsep) + def set_path(path): + assert 'success' in client.conf(path, 'applications/path/path') - default_path = self.conf_get('/config/applications/path/path') - assert 'success' in self.conf( - {"PYTHONPATH": default_path}, - '/config/applications/path/environment', - ) + def get_path(): + return client.get()['body'].split(os.pathsep) - self.conf_delete('/config/applications/path/path') - sys_path = get_path() + default_path = client.conf_get('/config/applications/path/path') + assert 'success' in client.conf( + {"PYTHONPATH": default_path}, + '/config/applications/path/environment', + ) - set_path('"/blah"') - assert ['/blah', *sys_path] == get_path(), 'check path' + client.conf_delete('/config/applications/path/path') + sys_path = get_path() - set_path('"/new"') - assert ['/new', *sys_path] == get_path(), 'check path update' + set_path('"/blah"') + assert ['/blah', *sys_path] == get_path(), 'check path' - set_path('["/blah1", "/blah2"]') - assert [ - '/blah1', - '/blah2', - *sys_path, - ] == get_path(), 'check path array' + set_path('"/new"') + assert ['/new', *sys_path] == get_path(), 'check path update' - def test_python_application_path_invalid(self): - self.load('path') + set_path('["/blah1", "/blah2"]') + assert [ + '/blah1', + '/blah2', + *sys_path, + ] == get_path(), 'check path array' - def check_path(path): - assert 'error' in self.conf(path, 'applications/path/path') - check_path('{}') - check_path('["/blah", []]') +def test_python_application_path_invalid(): + client.load('path') - def test_python_application_threads(self): - self.load('threads', threads=4) + def check_path(path): + assert 'error' in client.conf(path, 'applications/path/path') - socks = [] + check_path('{}') + check_path('["/blah", []]') - for _ in range(4): - sock = self.get( - headers={ - 'Host': 'localhost', - 'X-Delay': '2', - 'Connection': 'close', - }, - no_recv=True, - ) - socks.append(sock) +def test_python_application_threads(): + client.load('threads', threads=4) - threads = set() + socks = [] - for sock in socks: - resp = self.recvall(sock).decode('utf-8') + for _ in range(4): + sock = client.get( + headers={ + 'Host': 'localhost', + 'X-Delay': '2', + 'Connection': 'close', + }, + no_recv=True, + ) - self.log_in(resp) + socks.append(sock) - resp = self._resp_to_dict(resp) + threads = set() - assert resp['status'] == 200, 'status' + for sock in socks: + resp = client.recvall(sock).decode('utf-8') + + client.log_in(resp) + + resp = client._resp_to_dict(resp) + + assert resp['status'] == 200, 'status' - threads.add(resp['headers']['X-Thread']) + threads.add(resp['headers']['X-Thread']) - assert resp['headers']['Wsgi-Multithread'] == 'True', 'multithread' + assert resp['headers']['Wsgi-Multithread'] == 'True', 'multithread' - sock.close() + sock.close() - assert len(socks) == len(threads), 'threads differs' + assert len(socks) == len(threads), 'threads differs' |