summaryrefslogtreecommitdiffhomepage
path: root/test/unit/main.py
diff options
context:
space:
mode:
Diffstat (limited to 'test/unit/main.py')
-rw-r--r--test/unit/main.py292
1 files changed, 37 insertions, 255 deletions
diff --git a/test/unit/main.py b/test/unit/main.py
index 83aa9139..d5940995 100644
--- a/test/unit/main.py
+++ b/test/unit/main.py
@@ -1,90 +1,26 @@
-import argparse
import atexit
-import fcntl
import os
-import platform
import re
import shutil
import signal
import stat
import subprocess
-import sys
import tempfile
import time
-import unittest
from multiprocessing import Process
+import pytest
+from conftest import _check_alerts
+from conftest import _print_log
+from conftest import option
+from conftest import public_dir
+from conftest import waitforfiles
-class TestUnit(unittest.TestCase):
-
- current_dir = os.path.abspath(
- os.path.join(os.path.dirname(__file__), os.pardir)
- )
- pardir = os.path.abspath(
- os.path.join(os.path.dirname(__file__), os.pardir, os.pardir)
- )
- is_su = os.geteuid() == 0
- uid = os.geteuid()
- gid = os.getegid()
- architecture = platform.architecture()[0]
- system = platform.system()
- maxDiff = None
-
- detailed = False
- save_log = False
- print_log = False
- unsafe = False
-
- def __init__(self, methodName='runTest'):
- super().__init__(methodName)
-
- if re.match(r'.*\/run\.py$', sys.argv[0]):
- args, rest = TestUnit._parse_args()
-
- TestUnit._set_args(args)
-
- def run(self, result=None):
- if not hasattr(self, 'application_type'):
- return super().run(result)
-
- # rerun test for each available module version
-
- type = self.application_type
- for module in self.prerequisites['modules']:
- if module in self.available['modules']:
- prereq_version = self.prerequisites['modules'][module]
- available_versions = self.available['modules'][module]
-
- if prereq_version == 'all':
- for version in available_versions:
- self.application_type = type + ' ' + version
- super().run(result)
- elif prereq_version == 'any':
- self.application_type = type + ' ' + available_versions[0]
- super().run(result)
- else:
- for version in available_versions:
- if version.startswith(prereq_version):
- self.application_type = type + ' ' + version
- super().run(result)
+class TestUnit():
@classmethod
- def main(cls):
- args, rest = TestUnit._parse_args()
-
- for i, arg in enumerate(rest):
- if arg[:5] == 'test_':
- rest[i] = cls.__name__ + '.' + arg
-
- sys.argv = sys.argv[:1] + rest
-
- TestUnit._set_args(args)
-
- unittest.main()
-
- @classmethod
- def setUpClass(cls, complete_check=True):
- cls.available = {'modules': {}, 'features': {}}
+ def setup_class(cls, complete_check=True):
+ cls.available = option.available
unit = TestUnit()
unit._run()
@@ -92,7 +28,7 @@ class TestUnit(unittest.TestCase):
# read unit.log
for i in range(50):
- with open(unit.testdir + '/unit.log', 'r') as f:
+ with open(unit.temp_dir + '/unit.log', 'r') as f:
log = f.read()
m = re.search('controller started', log)
@@ -102,17 +38,9 @@ class TestUnit(unittest.TestCase):
break
if m is None:
- unit._print_log()
+ _print_log(path=unit.temp_dir + '/unit.log')
exit("Unit is writing log too long")
- # discover available modules from unit.log
-
- for module in re.findall(r'module: ([a-zA-Z]+) (.*) ".*"$', log, re.M):
- if module[0] not in cls.available['modules']:
- cls.available['modules'][module[0]] = [module[1]]
- else:
- cls.available['modules'][module[0]].append(module[1])
-
def check(available, prerequisites):
missed = []
@@ -128,8 +56,7 @@ class TestUnit(unittest.TestCase):
missed.append(module)
if missed:
- print('Unit has no ' + ', '.join(missed) + ' module(s)')
- raise unittest.SkipTest()
+ pytest.skip('Unit has no ' + ', '.join(missed) + ' module(s)')
# check features
@@ -143,13 +70,12 @@ class TestUnit(unittest.TestCase):
missed.append(feature)
if missed:
- print(', '.join(missed) + ' feature(s) not supported')
- raise unittest.SkipTest()
+ pytest.skip(', '.join(missed) + ' feature(s) not supported')
def destroy():
unit.stop()
- unit._check_alerts(log)
- shutil.rmtree(unit.testdir)
+ _check_alerts(log)
+ shutil.rmtree(unit.temp_dir)
def complete():
destroy()
@@ -161,92 +87,66 @@ class TestUnit(unittest.TestCase):
unit.complete = complete
return unit
- def setUp(self):
+ def setup_method(self):
self._run()
def _run(self):
- build_dir = self.pardir + '/build'
+ build_dir = option.current_dir + '/build'
self.unitd = build_dir + '/unitd'
if not os.path.isfile(self.unitd):
exit("Could not find unit")
- self.testdir = tempfile.mkdtemp(prefix='unit-test-')
+ self.temp_dir = tempfile.mkdtemp(prefix='unit-test-')
- self.public_dir(self.testdir)
+ public_dir(self.temp_dir)
if oct(stat.S_IMODE(os.stat(build_dir).st_mode)) != '0o777':
- self.public_dir(build_dir)
+ public_dir(build_dir)
- os.mkdir(self.testdir + '/state')
+ os.mkdir(self.temp_dir + '/state')
- with open(self.testdir + '/unit.log', 'w') as log:
+ with open(self.temp_dir + '/unit.log', 'w') as log:
self._p = subprocess.Popen(
[
self.unitd,
'--no-daemon',
- '--modules', self.pardir + '/build',
- '--state', self.testdir + '/state',
- '--pid', self.testdir + '/unit.pid',
- '--log', self.testdir + '/unit.log',
- '--control', 'unix:' + self.testdir + '/control.unit.sock',
- '--tmp', self.testdir,
+ '--modules', build_dir,
+ '--state', self.temp_dir + '/state',
+ '--pid', self.temp_dir + '/unit.pid',
+ '--log', self.temp_dir + '/unit.log',
+ '--control', 'unix:' + self.temp_dir + '/control.unit.sock',
+ '--tmp', self.temp_dir,
],
stderr=log,
)
atexit.register(self.stop)
- if not self.waitforfiles(self.testdir + '/control.unit.sock'):
- self._print_log()
+ if not waitforfiles(self.temp_dir + '/control.unit.sock'):
+ _print_log(path=self.temp_dir + '/unit.log')
exit("Could not start unit")
self._started = True
- self.skip_alerts = [
- r'read signalfd\(4\) failed',
- r'sendmsg.+failed',
- r'recvmsg.+failed',
- ]
- self.skip_sanitizer = False
-
- def tearDown(self):
+ def teardown_method(self):
self.stop()
- # detect errors and failures for current test
-
- def list2reason(exc_list):
- if exc_list and exc_list[-1][0] is self:
- return exc_list[-1][1]
-
- if hasattr(self, '_outcome'):
- result = self.defaultTestResult()
- self._feedErrorsToResult(result, self._outcome.errors)
- else:
- result = getattr(
- self, '_outcomeForDoCleanups', self._resultForDoCleanups
- )
-
- success = not list2reason(result.errors) and not list2reason(
- result.failures
- )
-
# check unit.log for alerts
- unit_log = self.testdir + '/unit.log'
+ unit_log = self.temp_dir + '/unit.log'
with open(unit_log, 'r', encoding='utf-8', errors='ignore') as f:
- self._check_alerts(f.read())
+ _check_alerts(f.read())
# remove unit.log
- if not TestUnit.save_log and success:
- shutil.rmtree(self.testdir)
-
+ if not option.save_log:
+ shutil.rmtree(self.temp_dir)
else:
- self._print_log()
+ _print_log(path=self.temp_dir)
- self.assertListEqual(self.stop_errors, [None, None], 'stop errors')
+ assert self.stop_errors == [None, None], 'stop errors'
def stop(self):
if not self._started:
@@ -301,121 +201,3 @@ class TestUnit(unittest.TestCase):
if fail:
return 'Fail to stop process'
-
- def waitforfiles(self, *files):
- for i in range(50):
- wait = False
- ret = False
-
- for f in files:
- if not os.path.exists(f):
- wait = True
- break
-
- if wait:
- time.sleep(0.1)
-
- else:
- ret = True
- break
-
- return ret
-
- def public_dir(self, path):
- os.chmod(path, 0o777)
-
- for root, dirs, files in os.walk(path):
- for d in dirs:
- os.chmod(os.path.join(root, d), 0o777)
- for f in files:
- os.chmod(os.path.join(root, f), 0o777)
-
- def _check_alerts(self, log):
- found = False
-
- alerts = re.findall('.+\[alert\].+', log)
-
- if alerts:
- print('All alerts/sanitizer errors found in log:')
- [print(alert) for alert in alerts]
- found = True
-
- if self.skip_alerts:
- for skip in self.skip_alerts:
- alerts = [al for al in alerts if re.search(skip, al) is None]
-
- if alerts:
- self._print_log(log)
- self.assertFalse(alerts, 'alert(s)')
-
- if not self.skip_sanitizer:
- sanitizer_errors = re.findall('.+Sanitizer.+', log)
-
- if sanitizer_errors:
- self._print_log(log)
- self.assertFalse(sanitizer_errors, 'sanitizer error(s)')
-
- if found:
- print('skipped.')
-
- @staticmethod
- def _parse_args():
- parser = argparse.ArgumentParser(add_help=False)
-
- parser.add_argument(
- '-d',
- '--detailed',
- dest='detailed',
- action='store_true',
- help='Detailed output for tests',
- )
- parser.add_argument(
- '-l',
- '--log',
- dest='save_log',
- action='store_true',
- help='Save unit.log after the test execution',
- )
- parser.add_argument(
- '-r',
- '--reprint_log',
- dest='print_log',
- action='store_true',
- help='Print unit.log to stdout in case of errors',
- )
- parser.add_argument(
- '-u',
- '--unsafe',
- dest='unsafe',
- action='store_true',
- help='Run unsafe tests',
- )
-
- return parser.parse_known_args()
-
- @staticmethod
- def _set_args(args):
- TestUnit.detailed = args.detailed
- TestUnit.save_log = args.save_log
- TestUnit.print_log = args.print_log
- TestUnit.unsafe = args.unsafe
-
- # set stdout to non-blocking
-
- if TestUnit.detailed or TestUnit.print_log:
- fcntl.fcntl(sys.stdout.fileno(), fcntl.F_SETFL, 0)
-
- def _print_log(self, data=None):
- path = self.testdir + '/unit.log'
-
- print('Path to unit.log:\n' + path + '\n')
-
- if TestUnit.print_log:
- os.set_blocking(sys.stdout.fileno(), True)
- sys.stdout.flush()
-
- if data is None:
- with open(path, 'r', encoding='utf-8', errors='ignore') as f:
- shutil.copyfileobj(f, sys.stdout)
- else:
- sys.stdout.write(data)