import getpass
import os
import re
import shutil
import signal
import time
from pathlib import Path
import pytest
from unit.applications.lang.php import TestApplicationPHP
from unit.option import option
class TestPHPApplication(TestApplicationPHP):
prerequisites = {'modules': {'php': 'all'}}
def before_disable_functions(self):
body = self.get()['body']
assert re.search(r'time: \d+', body), 'disable_functions before time'
assert re.search(r'exec: \/\w+', body), 'disable_functions before exec'
def check_opcache(self):
resp = self.get()
assert resp['status'] == 200, 'status'
headers = resp['headers']
if 'X-OPcache' in headers and headers['X-OPcache'] == '-1':
pytest.skip('opcache is not supported')
return resp
def set_opcache(self, app, val):
assert 'success' in self.conf(
{"admin": {"opcache.enable": val, "opcache.enable_cli": val}},
f'applications/{app}/options',
)
r = self.check_opcache()
assert r['headers']['X-OPcache'] == val, 'opcache value'
def set_preload(self, preload):
with open(f'{option.temp_dir}/php.ini', 'w') as f:
f.write(
f"""opcache.preload = {option.test_dir}/php/opcache/preload\
/{preload}
opcache.preload_user = {option.user or getpass.getuser()}
"""
)
assert 'success' in self.conf(
{"file": f"{option.temp_dir}/php.ini"},
'applications/opcache/options',
)
def test_php_application_variables(self, date_to_sec_epoch, sec_epoch):
self.load('variables')
body = 'Test body string.'
resp = self.post(
headers={
'Host': 'localhost',
'Content-Type': 'text/html',
'Custom-Header': 'blah',
'Connection': 'close',
},
body=body,
url='/index.php/blah?var=val',
)
assert resp['status'] == 200, 'status'
headers = resp['headers']
header_server = headers.pop('Server')
assert re.search(r'Unit/[\d\.]+', header_server), 'server header'
assert (
headers.pop('Server-Software') == header_server
), 'server software header'
date = headers.pop('Date')
assert date[-4:] == ' GMT', 'date header timezone'
assert abs(date_to_sec_epoch(date) - sec_epoch) < 5, 'date header'
if 'X-Powered-By' in headers:
headers.pop('X-Powered-By')
headers.pop('Content-type')
assert headers == {
'Connection': 'close',
'Content-Length': str(len(body)),
'Request-Method': 'POST',
'Path-Info': '/blah',
'Request-Uri': '/index.php/blah?var=val',
'Http-Host': 'localhost',
'Server-Protocol': 'HTTP/1.1',
'Custom-Header': 'blah',
}, 'headers'
assert resp['body'] == body, 'body'
def test_php_application_query_string(self):
self.load('query_string')
resp = self.get(url='/?var1=val1&var2=val2')
assert (
resp['headers']['Query-String'] == 'var1=val1&var2=val2'
), 'query string'
def test_php_application_query_string_empty(self):
self.load('query_string')
resp = self.get(url='/?')
assert resp['status'] == 200, 'query string empty status'
assert resp['headers']['Query-String'] == '', 'query string empty'
def test_php_application_fastcgi_finish_request(self, findall, unit_pid):
self.load('fastcgi_finish_request')
assert 'success' in self.conf(
{"admin": {"auto_globals_jit": "1"}},
'applications/fastcgi_finish_request/options',
)
assert self.get()['body'] == '0123'
os.kill(unit_pid, signal.SIGUSR1)
errs = findall(r'Error in fastcgi_finish_request')
assert len(errs) == 0, 'no error'
def test_php_application_fastcgi_finish_request_2(self, findall, unit_pid):
self.load('fastcgi_finish_request')
assert 'success' in self.conf(
{"admin": {"auto_globals_jit": "1"}},
'applications/fastcgi_finish_request/options',
)
resp = self.get(url='/?skip')
assert resp['status'] == 200
assert resp['body'] == ''
os.kill(unit_pid, signal.SIGUSR1)
errs = findall(r'Error in fastcgi_finish_request')
assert len(errs) == 0, 'no error'
def test_php_application_query_string_absent(self):
self.load('query_string')
resp = self.get()
assert resp['status'] == 200, 'query string absent status'
assert resp['headers']['Query-String'] == '', 'query string absent'
def test_php_application_phpinfo(self):
self.load('phpinfo')
resp = self.get()
assert resp['status'] == 200, 'status'
assert resp['body'] != '', 'body not empty'
def test_php_application_header_status(self):
self.load('header')
assert (
self.get(
headers={
'Host': 'localhost',
'Connection': 'close',
'X-Header': 'HTTP/1.1 404 Not Found',
}
)['status']
== 404
), 'status'
assert (
self.get(
headers={
'Host': 'localhost',
'Connection': 'close',
'X-Header': 'http/1.1 404 Not Found',
}
)['status']
== 404
), 'status case insensitive'
assert (
self.get(
headers={
'Host': 'localhost',
'Connection': 'close',
'X-Header': 'HTTP/ 404 Not Found',
}
)['status']
== 404
), 'status version empty'
def test_php_application_404(self):
self.load('404')
resp = self.get()
assert resp['status'] == 404, '404 status'
assert re.search(
r'
404 Not Found', resp['body']
), '404 body'
def test_php_application_keepalive_body(self):
self.load('mirror')
assert self.get()['status'] == 200, 'init'
body = '0123456789' * 500
(resp, sock) = self.post(
headers={
'Host': 'localhost',
'Connection': 'keep-alive',
},
start=True,
body=body,
read_timeout=1,
)
assert resp['body'] == body, 'keep-alive 1'
body = '0123456789'
resp = self.post(sock=sock, body=body)
assert resp['body'] == body, 'keep-alive 2'
def test_php_application_conditional(self):
self.load('conditional')
assert re.search(r'True', self.get()['body']), 'conditional true'
assert re.search(r'False', self.post()['body']), 'conditional false'
def test_php_application_get_variables(self):
self.load('get_variables')
resp = self.get(url='/?var1=val1&var2=&var3')
assert resp['headers']['X-Var-1'] == 'val1', 'GET variables'
assert resp['headers']['X-Var-2'] == '', 'GET variables 2'
assert resp['headers']['X-Var-3'] == '', 'GET variables 3'
assert resp['headers']['X-Var-4'] == 'not set', 'GET variables 4'
def test_php_application_post_variables(self):
self.load('post_variables')
resp = self.post(
headers={
'Content-Type': 'application/x-www-form-urlencoded',
'Host': 'localhost',
'Connection': 'close',
},
body='var1=val1&var2=',
)
assert resp['headers']['X-Var-1'] == 'val1', 'POST variables'
assert resp['headers']['X-Var-2'] == '', 'POST variables 2'
assert resp['headers']['X-Var-3'] == 'not set', 'POST variables 3'
def test_php_application_cookies(self):
self.load('cookies')
resp = self.get(
headers={
'Cookie': 'var=val; var2=val2',
'Host': 'localhost',
'Connection': 'close',
}
)
assert resp['headers']['X-Cookie-1'] == 'val', 'cookie'
assert resp['headers']['X-Cookie-2'] == 'val2', 'cookie'
def test_php_application_ini_precision(self):
self.load('ini_precision')
assert self.get()['headers']['X-Precision'] != '4', 'ini value default'
assert 'success' in self.conf(
{"file": "ini/php.ini"}, 'applications/ini_precision/options'
)
assert (
self.get()['headers']['X-File']
== f'{option.test_dir}/php/ini_precision/ini/php.ini'
), 'ini file'
assert self.get()['headers']['X-Precision'] == '4', 'ini value'
@pytest.mark.skip('not yet')
def test_php_application_ini_admin_user(self):
self.load('ini_precision')
assert 'error' in self.conf(
{"user": {"precision": "4"}, "admin": {"precision": "5"}},
'applications/ini_precision/options',
), 'ini admin user'
def test_php_application_ini_admin(self):
self.load('ini_precision')
assert 'success' in self.conf(
{"file": "ini/php.ini", "admin": {"precision": "5"}},
'applications/ini_precision/options',
)
assert (
self.get()['headers']['X-File']
== f'{option.test_dir}/php/ini_precision/ini/php.ini'
), 'ini file'
assert self.get()['headers']['X-Precision'] == '5', 'ini value admin'
def test_php_application_ini_user(self):
self.load('ini_precision')
assert 'success' in self.conf(
{"file": "ini/php.ini", "user": {"precision": "5"}},
'applications/ini_precision/options',
)
assert (
self.get()['headers']['X-File']
== f'{option.test_dir}/php/ini_precision/ini/php.ini'
), 'ini file'
assert self.get()['headers']['X-Precision'] == '5', 'ini value user'
def test_php_application_ini_user_2(self):
self.load('ini_precision')
assert 'success' in self.conf(
{"file": "ini/php.ini"}, 'applications/ini_precision/options'
)
assert self.get()['headers']['X-Precision'] == '4', 'ini user file'
assert 'success' in self.conf(
{"precision": "5"}, 'applications/ini_precision/options/user'
)
assert self.get()['headers']['X-Precision'] == '5', 'ini value user'
def test_php_application_ini_set_admin(self):
self.load('ini_precision')
assert 'success' in self.conf(
{"admin": {"precision": "5"}}, 'applications/ini_precision/options'
)
assert (
self.get(url='/?precision=6')['headers']['X-Precision'] == '5'
), 'ini set admin'
def test_php_application_ini_set_user(self):
self.load('ini_precision')
assert 'success' in self.conf(
{"user": {"precision": "5"}}, 'applications/ini_precision/options'
)
assert (
self.get(url='/?precision=6')['headers']['X-Precision'] == '6'
), 'ini set user'
def test_php_application_ini_repeat(self):
self.load('ini_precision')
assert 'success' in self.conf(
{"user": {"precision": "5"}}, 'applications/ini_precision/options'
)
assert self.get()['headers']['X-Precision'] == '5', 'ini value'
assert self.get()['headers']['X-Precision'] == '5', 'ini value repeat'
def test_php_application_disable_functions_exec(self):
self.load('time_exec')
self.before_disable_functions()
assert 'success' in self.conf(
{"admin": {"disable_functions": "exec"}},
'applications/time_exec/options',
)
body = self.get()['body']
assert re.search(r'time: \d+', body), 'disable_functions time'
assert not re.search(r'exec: \/\w+', body), 'disable_functions exec'
def test_php_application_disable_functions_comma(self):
self.load('time_exec')
self.before_disable_functions()
assert 'success' in self.conf(
{"admin": {"disable_functions": "exec,time"}},
'applications/time_exec/options',
)
body = self.get()['body']
assert not re.search(r'time: \d+', body), 'disable_functions comma time'
assert not re.search(
r'exec: \/\w+', body
), 'disable_functions comma exec'
def test_php_application_auth(self):
self.load('auth')
resp = self.get()
assert resp['status'] == 200, 'status'
assert resp['headers']['X-Digest'] == 'not set', 'digest'
assert resp['headers']['X-User'] == 'not set', 'user'
assert resp['headers']['X-Password'] == 'not set', 'password'
resp = self.get(
headers={
'Host': 'localhost',
'Authorization': 'Basic dXNlcjpwYXNzd29yZA==',
'Connection': 'close',
}
)
assert resp['status'] == 200, 'basic status'
assert resp['headers']['X-Digest'] == 'not set', 'basic digest'
assert resp['headers']['X-User'] == 'user', 'basic user'
assert resp['headers']['X-Password'] == 'password', 'basic password'
resp = self.get(
headers={
'Host': 'localhost',
'Authorization': 'Digest username="blah", realm="", uri="/"',
'Connection': 'close',
}
)
assert resp['status'] == 200, 'digest status'
assert (
resp['headers']['X-Digest'] == 'username="blah", realm="", uri="/"'
), 'digest digest'
assert resp['headers']['X-User'] == 'not set', 'digest user'
assert resp['headers']['X-Password'] == 'not set', 'digest password'
def test_php_application_auth_invalid(self):
self.load('auth')
def check_auth(auth):
resp = self.get(
headers={
'Host': 'localhost',
'Authorization': auth,
'Connection': 'close',
}
)
assert resp['status'] == 200, 'status'
assert resp['headers']['X-Digest'] == 'not set', 'Digest'
assert resp['headers']['X-User'] == 'not set', 'User'
assert resp['headers']['X-Password'] == 'not set', 'Password'
check_auth('Basic dXN%cjpwYXNzd29yZA==')
check_auth('Basic XNlcjpwYXNzd29yZA==')
check_auth('Basic DdXNlcjpwYXNzd29yZA==')
check_auth('Basic blah')
check_auth('Basic')
check_auth('Digest')
check_auth('blah')
def test_php_application_disable_functions_space(self):
self.load('time_exec')
self.before_disable_functions()
assert 'success' in self.conf(
{"admin": {"disable_functions": "exec time"}},
'applications/time_exec/options',
)
body = self.get()['body']
assert not re.search(r'time: \d+', body), 'disable_functions space time'
assert not re.search(
r'exec: \/\w+', body
), 'disable_functions space exec'
def test_php_application_disable_functions_user(self):
self.load('time_exec')
self.before_disable_functions()
assert 'success' in self.conf(
{"user": {"disable_functions": "exec"}},
'applications/time_exec/options',
)
body = self.get()['body']
assert re.search(r'time: \d+', body), 'disable_functions user time'
assert not re.search(
r'exec: \/\w+', body
), 'disable_functions user exec'
def test_php_application_disable_functions_nonexistent(self):
self.load('time_exec')
self.before_disable_functions()
assert 'success' in self.conf(
{"admin": {"disable_functions": "blah"}},
'applications/time_exec/options',
)
body = self.get()['body']
assert re.search(
r'time: \d+', body
), 'disable_functions nonexistent time'
assert re.search(
r'exec: \/\w+', body
), 'disable_functions nonexistent exec'
def test_php_application_disable_classes(self):
self.load('date_time')
assert re.search(
r'012345', self.get()['body']
), 'disable_classes before'
assert 'success' in self.conf(
{"admin": {"disable_classes": "DateTime"}},
'applications/date_time/options',
)
assert not re.search(
r'012345', self.get()['body']
), 'disable_classes before'
def test_php_application_disable_classes_user(self):
self.load('date_time')
assert re.search(
r'012345', self.get()['body']
), 'disable_classes before'
assert 'success' in self.conf(
{"user": {"disable_classes": "DateTime"}},
'applications/date_time/options',
)
assert not re.search(
r'012345', self.get()['body']
), 'disable_classes before'
def test_php_application_error_log(self, findall, wait_for_record):
self.load('error_log')
assert self.get()['status'] == 200, 'status'
time.sleep(1)
assert self.get()['status'] == 200, 'status 2'
pattern = r'\d{4}\/\d\d\/\d\d\s\d\d:.+\[notice\].+Error in application'
assert wait_for_record(pattern) is not None, 'errors print'
errs = findall(pattern)
assert len(errs) == 2, 'error_log count'
date = errs[0].split('[')[0]
date2 = errs[1].split('[')[0]
assert date != date2, 'date diff'
def test_php_application_script(self):
assert 'success' in self.conf(
{
"listeners": {"*:7080": {"pass": "applications/script"}},
"applications": {
"script": {
"type": self.get_application_type(),
"processes": {"spare": 0},
"root": f"{option.test_dir}/php/script",
"script": "phpinfo.php",
}
},
}
), 'configure script'
resp = self.get()
assert resp['status'] == 200, 'status'
assert resp['body'] != '', 'body not empty'
def test_php_application_index_default(self):
assert 'success' in self.conf(
{
"listeners": {"*:7080": {"pass": "applications/phpinfo"}},
"applications": {
"phpinfo": {
"type": self.get_application_type(),
"processes": {"spare": 0},
"root": f"{option.test_dir}/php/phpinfo",
}
},
}
), 'configure index default'
resp = self.get()
assert resp['status'] == 200, 'status'
assert resp['body'] != '', 'body not empty'
def test_php_application_trailing_slash(self, temp_dir):
new_root = f'{temp_dir}/php-root'
os.makedirs(f'{new_root}/path')
Path(f'{new_root}/path/index.php').write_text('')
addr = f'{temp_dir}/sock'
assert 'success' in self.conf(
{
"listeners": {
"*:7080": {"pass": "applications/php-path"},
f'unix:{addr}': {"pass": "applications/php-path"},
},
"applications": {
"php-path": {
"type": self.get_application_type(),
"processes": {"spare": 0},
"root": new_root,
}
},
}
), 'configure trailing slash'
assert self.get(url='/path/')['status'] == 200, 'uri with trailing /'
resp = self.get(url='/path?q=a')
assert resp['status'] == 301, 'uri without trailing /'
assert (
resp['headers']['Location'] == 'http://localhost:7080/path/?q=a'
), 'Location with query string'
resp = self.get(
sock_type='unix',
addr=addr,
url='/path',
headers={'Host': 'foo', 'Connection': 'close'},
)
assert resp['status'] == 301, 'uri without trailing /'
assert (
resp['headers']['Location'] == 'http://foo/path/'
), 'Location with custom Host over UDS'
def test_php_application_forbidden(self, temp_dir):
new_root = f'{temp_dir}/php-root/path'
os.makedirs(new_root)
os.chmod(new_root, 0o000)
assert 'success' in self.conf(
{
"listeners": {"*:7080": {"pass": "applications/php-path"}},
"applications": {
"php-path": {
"type": self.get_application_type(),
"processes": {"spare": 0},
"root": f'{temp_dir}/php-root',
}
},
}
), 'forbidden directory'
assert self.get(url='/path/')['status'] == 403, 'access forbidden'
def test_php_application_extension_check(self, temp_dir):
self.load('phpinfo')
assert self.get(url='/index.wrong')['status'] != 200, 'status'
new_root = f'{temp_dir}/php'
os.mkdir(new_root)
shutil.copy(f'{option.test_dir}/php/phpinfo/index.wrong', new_root)
assert 'success' in self.conf(
{
"listeners": {"*:7080": {"pass": "applications/phpinfo"}},
"applications": {
"phpinfo": {
"type": self.get_application_type(),
"processes": {"spare": 0},
"root": new_root,
"working_directory": new_root,
}
},
}
), 'configure new root'
resp = self.get()
assert f'{resp["status"]}{resp["body"]}' != '200', 'status new root'
def run_php_application_cwd_root_tests(self):
assert 'success' in self.conf_delete(
'applications/cwd/working_directory'
)
script_cwd = f'{option.test_dir}/php/cwd'
resp = self.get()
assert resp['status'] == 200, 'status ok'
assert resp['body'] == script_cwd, 'default cwd'
assert 'success' in self.conf(
f'"{option.test_dir}"',
'applications/cwd/working_directory',
)
resp = self.get()
assert resp['status'] == 200, 'status ok'
assert resp['body'] == script_cwd, 'wdir cwd'
resp = self.get(url='/?chdir=/')
assert resp['status'] == 200, 'status ok'
assert resp['body'] == '/', 'cwd after chdir'
# cwd must be restored
resp = self.get()
assert resp['status'] == 200, 'status ok'
assert resp['body'] == script_cwd, 'cwd restored'
resp = self.get(url='/subdir/')
assert resp['body'] == f'{script_cwd}/subdir', 'cwd subdir'
def test_php_application_cwd_root(self):
self.load('cwd')
self.run_php_application_cwd_root_tests()
def test_php_application_cwd_opcache_disabled(self):
self.load('cwd')
self.set_opcache('cwd', '0')
self.run_php_application_cwd_root_tests()
def test_php_application_cwd_opcache_enabled(self):
self.load('cwd')
self.set_opcache('cwd', '1')
self.run_php_application_cwd_root_tests()
def run_php_application_cwd_script_tests(self):
self.load('cwd')
script_cwd = f'{option.test_dir}/php/cwd'
assert 'success' in self.conf_delete(
'applications/cwd/working_directory'
)
assert 'success' in self.conf('"index.php"', 'applications/cwd/script')
assert self.get()['body'] == script_cwd, 'default cwd'
assert self.get(url='/?chdir=/')['body'] == '/', 'cwd after chdir'
# cwd must be restored
assert self.get()['body'] == script_cwd, 'cwd restored'
def test_php_application_cwd_script(self):
self.load('cwd')
self.run_php_application_cwd_script_tests()
def test_php_application_cwd_script_opcache_disabled(self):
self.load('cwd')
self.set_opcache('cwd', '0')
self.run_php_application_cwd_script_tests()
def test_php_application_cwd_script_opcache_enabled(self):
self.load('cwd')
self.set_opcache('cwd', '1')
self.run_php_application_cwd_script_tests()
def test_php_application_path_relative(self):
self.load('open')
assert self.get()['body'] == 'test', 'relative path'
assert (
self.get(url='/?chdir=/')['body'] != 'test'
), 'relative path w/ chdir'
assert self.get()['body'] == 'test', 'relative path 2'
def test_php_application_shared_opcache(self):
self.load('opcache', limits={'requests': 1})
r = self.check_opcache()
pid = r['headers']['X-Pid']
assert r['headers']['X-Cached'] == '0', 'not cached'
r = self.get()
assert r['headers']['X-Pid'] != pid, 'new instance'
assert r['headers']['X-Cached'] == '1', 'cached'
def test_php_application_opcache_preload_chdir(self):
self.load('opcache')
self.check_opcache()
self.set_preload('chdir.php')
assert self.get()['headers']['X-Cached'] == '0', 'not cached'
assert self.get()['headers']['X-Cached'] == '1', 'cached'
def test_php_application_opcache_preload_ffr(self):
self.load('opcache')
self.check_opcache()
self.set_preload('fastcgi_finish_request.php')
assert self.get()['headers']['X-Cached'] == '0', 'not cached'
assert self.get()['headers']['X-Cached'] == '1', 'cached'