import argparse
import atexit
import fcntl
import os
import platform
import re
import shutil
import signal
import stat
import subprocess
import sys
import tempfile
import time
import unittest
from multiprocessing import Process
class TestUnit(unittest.TestCase):
current_dir = os.path.abspath(
os.path.join(os.path.dirname(__file__), os.pardir)
)
pardir = os.path.abspath(
os.path.join(os.path.dirname(__file__), os.pardir, os.pardir)
)
is_su = os.geteuid() == 0
uid = os.geteuid()
gid = os.getegid()
architecture = platform.architecture()[0]
system = platform.system()
maxDiff = None
detailed = False
save_log = False
print_log = False
unsafe = False
def __init__(self, methodName='runTest'):
super().__init__(methodName)
if re.match(r'.*\/run\.py$', sys.argv[0]):
args, rest = TestUnit._parse_args()
TestUnit._set_args(args)
def run(self, result=None):
if not hasattr(self, 'application_type'):
return super().run(result)
# rerun test for each available module version
type = self.application_type
for module in self.prerequisites['modules']:
if module in self.available['modules']:
prereq_version = self.prerequisites['modules'][module]
available_versions = self.available['modules'][module]
if prereq_version == 'all':
for version in available_versions:
self.application_type = type + ' ' + version
self.application_version = version
super().run(result)
elif prereq_version == 'any':
self.application_type = type + ' ' + available_versions[0]
super().run(result)
else:
for version in available_versions:
if version.startswith(prereq_version):
self.application_type = type + ' ' + version
super().run(result)
@classmethod
def main(cls):
args, rest = TestUnit._parse_args()
for i, arg in enumerate(rest):
if arg[:5] == 'test_':
rest[i] = cls.__name__ + '.' + arg
sys.argv = sys.argv[:1] + rest
TestUnit._set_args(args)
unittest.main()
@classmethod
def setUpClass(cls, complete_check=True):
cls.available = {'modules': {}, 'features': {}}
unit = TestUnit()
unit._run()
# read unit.log
for i in range(50):
with open(unit.testdir + '/unit.log', 'r') as f:
log = f.read()
m = re.search('controller started', log)
if m is None:
time.sleep(0.1)
else:
break
if m is None:
unit._print_log()
exit("Unit is writing log too long")
# discover available modules from unit.log
for module in re.findall(r'module: ([a-zA-Z]+) (.*) ".*"$', log, re.M):
if module[0] not in cls.available['modules']:
cls.available['modules'][module[0]] = [module[1]]
else:
cls.available['modules'][module[0]].append(module[1])
def check(available, prerequisites):
missed = []
# check modules
if 'modules' in prerequisites:
available_modules = list(available['modules'].keys())
for module in prerequisites['modules']:
if module in available_modules:
continue
missed.append(module)
if missed:
print('Unit has no ' + ', '.join(missed) + ' module(s)')
raise unittest.SkipTest()
# check features
if 'features' in prerequisites:
available_features = list(available['features'].keys())
for feature in prerequisites['features']:
if feature in available_features:
continue
missed.append(feature)
if missed:
print(', '.join(missed) + ' feature(s) not supported')
raise unittest.SkipTest()
def destroy():
unit.stop()
unit._check_alerts(log)
shutil.rmtree(unit.testdir)
def complete():
destroy()
check(cls.available, cls.prerequisites)
if complete_check:
complete()
else:
unit.complete = complete
return unit
def setUp(self):
self._run()
def _run(self):
build_dir = os.path.join(self.pardir, 'build')
self.unitd = build_dir + '/unitd'
if not os.path.isfile(self.unitd):
exit("Could not find unit")
self.testdir = tempfile.mkdtemp(prefix='unit-test-')
self.public_dir(self.testdir)
if oct(stat.S_IMODE(os.stat(build_dir).st_mode)) != '0o777':
self.public_dir(build_dir)
os.mkdir(self.testdir + '/state')
with open(self.testdir + '/unit.log', 'w') as log:
self._p = subprocess.Popen(
[
self.unitd,
'--no-daemon',
'--modules', self.pardir + '/build',
'--state', self.testdir + '/state',
'--pid', self.testdir + '/unit.pid',
'--log', self.testdir + '/unit.log',
'--control', 'unix:' + self.testdir + '/control.unit.sock',
'--tmp', self.testdir,
],
stderr=log,
)
atexit.register(self.stop)
if not self.waitforfiles(self.testdir + '/control.unit.sock'):
self._print_log()
exit("Could not start unit")
self.skip_alerts = [
r'read signalfd\(4\) failed',
r'sendmsg.+failed',
r'recvmsg.+failed',
]
self.skip_sanitizer = False
def tearDown(self):
stop_errs = self.stop()
# detect errors and failures for current test
def list2reason(exc_list):
if exc_list and exc_list[-1][0] is self:
return exc_list[-1][1]
if hasattr(self, '_outcome'):
result = self.defaultTestResult()
self._feedErrorsToResult(result, self._outcome.errors)
else:
result = getattr(
self, '_outcomeForDoCleanups', self._resultForDoCleanups
)
success = not list2reason(result.errors) and not list2reason(
result.failures
)
# check unit.log for alerts
unit_log = self.testdir + '/unit.log'
with open(unit_log, 'r', encoding='utf-8', errors='ignore') as f:
self._check_alerts(f.read())
# remove unit.log
if not TestUnit.save_log and success:
shutil.rmtree(self.testdir)
else:
self._print_log()
self.assertListEqual(stop_errs, [None, None], 'stop errors')
def stop(self):
errors = []
errors.append(self._stop())
errors.append(self.stop_processes())
atexit.unregister(self.stop)
return errors
def _stop(self):
if self._p.poll() is not None:
return
with self._p as p:
p.send_signal(signal.SIGQUIT)
try:
retcode = p.wait(15)
if retcode:
return 'Child process terminated with code ' + str(retcode)
except:
p.kill()
return 'Could not terminate unit'
def run_process(self, target, *args):
if not hasattr(self, '_processes'):
self._processes = []
process = Process(target=target, args=args)
process.start()
self._processes.append(process)
def stop_processes(self):
if not hasattr(self, '_processes'):
return
fail = False
for process in self._processes:
if process.is_alive():
process.terminate()
process.join(timeout=15)
if process.is_alive():
fail = True
if fail:
return 'Fail to stop process'
def waitforfiles(self, *files):
for i in range(50):
wait = False
ret = False
for f in files:
if not os.path.exists(f):
wait = True
break
if wait:
time.sleep(0.1)
else:
ret = True
break
return ret
def public_dir(self, path):
os.chmod(path, 0o777)
for root, dirs, files in os.walk(path):
for d in dirs:
os.chmod(os.path.join(root, d), 0o777)
for f in files:
os.chmod(os.path.join(root, f), 0o777)
def _check_alerts(self, log):
found = False
alerts = re.findall('.+\[alert\].+', log)
if alerts:
print('All alerts/sanitizer errors found in log:')
[print(alert) for alert in alerts]
found = True
if self.skip_alerts:
for skip in self.skip_alerts:
alerts = [al for al in alerts if re.search(skip, al) is None]
if alerts:
self._print_log(log)
self.assertFalse(alerts, 'alert(s)')
if not self.skip_sanitizer:
sanitizer_errors = re.findall('.+Sanitizer.+', log)
if sanitizer_errors:
self._print_log(log)
self.assertFalse(sanitizer_errors, 'sanitizer error(s)')
if found:
print('skipped.')
@staticmethod
def _parse_args():
parser = argparse.ArgumentParser(add_help=False)
parser.add_argument(
'-d',
'--detailed',
dest='detailed',
action='store_true',
help='Detailed output for tests',
)
parser.add_argument(
'-l',
'--log',
dest='save_log',
action='store_true',
help='Save unit.log after the test execution',
)
parser.add_argument(
'-r',
'--reprint_log',
dest='print_log',
action='store_true',
help='Print unit.log to stdout in case of errors',
)
parser.add_argument(
'-u',
'--unsafe',
dest='unsafe',
action='store_true',
help='Run unsafe tests',
)
return parser.parse_known_args()
@staticmethod
def _set_args(args):
TestUnit.detailed = args.detailed
TestUnit.save_log = args.save_log
TestUnit.print_log = args.print_log
TestUnit.unsafe = args.unsafe
# set stdout to non-blocking
if TestUnit.detailed or TestUnit.print_log:
fcntl.fcntl(sys.stdout.fileno(), fcntl.F_SETFL, 0)
def _print_log(self, data=None):
path = self.testdir + '/unit.log'
print('Path to unit.log:\n' + path + '\n')
if TestUnit.print_log:
os.set_blocking(sys.stdout.fileno(), True)
sys.stdout.flush()
if data is None:
with open(path, 'r', encoding='utf-8', errors='ignore') as f:
shutil.copyfileobj(f, sys.stdout)
else:
sys.stdout.write(data)