blob: 18b9d53531d35188f1e7b74cb2b6f778d89eeb19 (
plain) (
tree)
|
|
import re
import subprocess
import time
from conftest import skip_alert
from unit.applications.lang.python import TestApplicationPython
class TestRespawn(TestApplicationPython):
prerequisites = {'modules': {'python': 'any'}}
PATTERN_ROUTER = 'unit: router'
PATTERN_CONTROLLER = 'unit: controller'
def setup_method(self):
super().setup_method()
self.app_name = "app-" + self.temp_dir.split('/')[-1]
self.load('empty', self.app_name)
assert 'success' in self.conf(
'1', 'applications/' + self.app_name + '/processes'
)
def pid_by_name(self, name):
output = subprocess.check_output(['ps', 'ax']).decode()
m = re.search(r'\s*(\d+).*' + name, output)
return m if m is None else m.group(1)
def kill_pids(self, *pids):
subprocess.call(['kill', '-9'] + list(pids))
def wait_for_process(self, process):
for i in range(50):
found = self.pid_by_name(process)
if found is not None:
break
time.sleep(0.1)
return found
def smoke_test(self):
for _ in range(5):
assert 'success' in self.conf(
'1', 'applications/' + self.app_name + '/processes'
)
assert self.get()['status'] == 200
# Check if the only one router, controller,
# and application processes running.
output = subprocess.check_output(['ps', 'ax']).decode()
assert len(re.findall(self.PATTERN_ROUTER, output)) == 1
assert len(re.findall(self.PATTERN_CONTROLLER, output)) == 1
assert len(re.findall(self.app_name, output)) == 1
def test_respawn_router(self):
pid = self.pid_by_name(self.PATTERN_ROUTER)
self.kill_pids(pid)
skip_alert(r'process %s exited on signal 9' % pid)
assert self.wait_for_process(self.PATTERN_ROUTER) is not None
self.smoke_test()
def test_respawn_controller(self):
pid = self.pid_by_name(self.PATTERN_CONTROLLER)
self.kill_pids(pid)
skip_alert(r'process %s exited on signal 9' % pid)
assert self.wait_for_process(self.PATTERN_CONTROLLER) is not None
assert self.get()['status'] == 200
self.smoke_test()
def test_respawn_application(self):
pid = self.pid_by_name(self.app_name)
self.kill_pids(pid)
skip_alert(r'process %s exited on signal 9' % pid)
assert self.wait_for_process(self.app_name) is not None
self.smoke_test()
|