import os
import re
import sys
import json
import time
import shutil
import socket
import tempfile
import unittest
from subprocess import call
from multiprocessing import Process
class TestUnit(unittest.TestCase):
def setUp(self):
self._run()
def tearDown(self):
self._stop()
if '--log' in sys.argv:
with open(self.testdir + '/unit.log', 'r') as f:
print(f.read())
if '--leave' not in sys.argv:
shutil.rmtree(self.testdir)
def check_modules(self, *modules):
self._run()
for i in range(50):
with open(self.testdir + '/unit.log', 'r') as f:
log = f.read()
m = re.search('controller started', log, re.M | re.S)
if m is None:
time.sleep(0.1)
else:
break
if m is None:
exit("Unit is writing log too long")
ret = ''
for module in modules:
m = re.search('module: ' + module, log, re.M | re.S)
if m is None:
ret = module
self._stop()
shutil.rmtree(self.testdir)
return ret
def _run(self):
self.testdir = tempfile.mkdtemp(prefix='unit-test-')
os.mkdir(self.testdir + '/state')
print()
def _run_unit():
pardir = os.path.abspath(os.path.join(os.path.dirname(__file__),
os.pardir))
call([pardir + '/build/unitd',
'--no-daemon',
'--modules', pardir + '/build',
'--state', self.testdir + '/state',
'--pid', self.testdir + '/unit.pid',
'--log', self.testdir + '/unit.log',
'--control', 'unix:' + self.testdir + '/control.unit.sock'])
self._p = Process(target=_run_unit)
self._p.start()
if not self._waitforfiles(self.testdir + '/unit.pid',
self.testdir + '/unit.log', self.testdir + '/control.unit.sock'):
exit("Could not start unit")
def _stop(self):
with open(self.testdir + '/unit.pid', 'r') as f:
pid = f.read().rstrip()
call(['kill', pid])
for i in range(50):
if not os.path.exists(self.testdir + '/unit.pid'):
break
time.sleep(0.1)
if os.path.exists(self.testdir + '/unit.pid'):
exit("Could not terminate unit")
self._p.join(timeout=1)
self._terminate_process(self._p)
def _terminate_process(self, process):
if process.is_alive():
process.terminate()
process.join(timeout=5)
if process.is_alive():
exit("Could not terminate process " + process.pid)
if process.exitcode:
exit("Child process terminated with code " + str(process.exitcode))
def _waitforfiles(self, *files):
for i in range(50):
wait = False
ret = 0
for f in files:
if not os.path.exists(f):
wait = True
break
if wait:
time.sleep(0.1)
else:
ret = 1
break
return ret
class TestUnitControl(TestUnit):
# TODO socket reuse
# TODO http client
def http(self, req):
with self._control_sock() as sock:
sock.sendall(req)
if '--verbose' in sys.argv:
print('>>>', req, sep='\n')
resp = self._recvall(sock)
if '--verbose' in sys.argv:
print('<<<', resp, sep='\n')
return resp
def get(self, path='/'):
resp = self.http(('GET ' + path
+ ' HTTP/1.1\r\nHost: localhost\r\n\r\n').encode())
return self._body_json(resp)
def delete(self, path='/'):
resp = self.http(('DELETE ' + path
+ ' HTTP/1.1\r\nHost: localhost\r\n\r\n').encode())
return self._body_json(resp)
def put(self, path='/', data=''):
if isinstance(data, str):
data = data.encode()
resp = self.http(('PUT ' + path + ' HTTP/1.1\nHost: localhost\n'
+ 'Content-Length: ' + str(len(data))
+ '\r\n\r\n').encode() + data)
return self._body_json(resp)
def _control_sock(self):
sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
sock.connect(self.testdir + '/control.unit.sock')
return sock
def _recvall(self, sock, buff_size=4096):
data = ''
while True:
part = sock.recv(buff_size).decode()
data += part
if len(part) < buff_size:
break
return data
def _body_json(self, resp):
m = re.search('.*?\x0d\x0a?\x0d\x0a?(.*)', resp, re.M | re.S)
return json.loads(m.group(1))