summaryrefslogblamecommitdiffhomepage
path: root/test/unit/main.py
blob: d59409955e7572d6662b9c82af0eea956e04497e (plain) (tree)
1
2
3
4
5
6
7
8
9
             

         
             
             

                 
               
           

                                   






                                  
 
                 
                

                                              






                           
                                                             








                                                        
                                                        

                                                














                                                                     
                                                                              



                                           
                                                                       




                                                         
                                          

                      
                                                                            


                       

                                        









                                                   
 
                           

                   
                   
                                                 
                                         



                                          
                                                             
 
                                 

                                                                    
                                 
 
                                          
 
                                                           



                                       
                                            




                                                                                 


                           
 

                                  
                                                                  
                                                        

                                        

                            
                              
                   
 

                                   
                                              

                                                                         
                                   


                         

                                        
             
                                          
 
                                                              
 
                   



                             
 
                                             
 
                                                      
 

                                    
                             
 
                    


                                      





                                         
                                                                               
                   
                        
                                                 
 
                                         


                                           
                                                   







                                           
                    
                                       
                                  



                                        



                                         
import atexit
import os
import re
import shutil
import signal
import stat
import subprocess
import tempfile
import time
from multiprocessing import Process

import pytest
from conftest import _check_alerts
from conftest import _print_log
from conftest import option
from conftest import public_dir
from conftest import waitforfiles


class TestUnit():
    @classmethod
    def setup_class(cls, complete_check=True):
        cls.available = option.available
        unit = TestUnit()

        unit._run()

        # read unit.log

        for i in range(50):
            with open(unit.temp_dir + '/unit.log', 'r') as f:
                log = f.read()
                m = re.search('controller started', log)

                if m is None:
                    time.sleep(0.1)
                else:
                    break

        if m is None:
            _print_log(path=unit.temp_dir + '/unit.log')
            exit("Unit is writing log too long")

        def check(available, prerequisites):
            missed = []

            # check modules

            if 'modules' in prerequisites:
                available_modules = list(available['modules'].keys())

                for module in prerequisites['modules']:
                    if module in available_modules:
                        continue

                    missed.append(module)

            if missed:
                pytest.skip('Unit has no ' + ', '.join(missed) + ' module(s)')

            # check features

            if 'features' in prerequisites:
                available_features = list(available['features'].keys())

                for feature in prerequisites['features']:
                    if feature in available_features:
                        continue

                    missed.append(feature)

            if missed:
                pytest.skip(', '.join(missed) + ' feature(s) not supported')

        def destroy():
            unit.stop()
            _check_alerts(log)
            shutil.rmtree(unit.temp_dir)

        def complete():
            destroy()
            check(cls.available, cls.prerequisites)

        if complete_check:
            complete()
        else:
            unit.complete = complete
            return unit

    def setup_method(self):
        self._run()

    def _run(self):
        build_dir = option.current_dir + '/build'
        self.unitd = build_dir + '/unitd'

        if not os.path.isfile(self.unitd):
            exit("Could not find unit")

        self.temp_dir = tempfile.mkdtemp(prefix='unit-test-')

        public_dir(self.temp_dir)

        if oct(stat.S_IMODE(os.stat(build_dir).st_mode)) != '0o777':
            public_dir(build_dir)

        os.mkdir(self.temp_dir + '/state')

        with open(self.temp_dir + '/unit.log', 'w') as log:
            self._p = subprocess.Popen(
                [
                    self.unitd,
                    '--no-daemon',
                    '--modules',  build_dir,
                    '--state',    self.temp_dir + '/state',
                    '--pid',      self.temp_dir + '/unit.pid',
                    '--log',      self.temp_dir + '/unit.log',
                    '--control',  'unix:' + self.temp_dir + '/control.unit.sock',
                    '--tmp',      self.temp_dir,
                ],
                stderr=log,
            )

        atexit.register(self.stop)

        if not waitforfiles(self.temp_dir + '/control.unit.sock'):
            _print_log(path=self.temp_dir + '/unit.log')
            exit("Could not start unit")

        self._started = True

    def teardown_method(self):
        self.stop()

        # check unit.log for alerts

        unit_log = self.temp_dir + '/unit.log'

        with open(unit_log, 'r', encoding='utf-8', errors='ignore') as f:
            _check_alerts(f.read())

        # remove unit.log

        if not option.save_log:
            shutil.rmtree(self.temp_dir)
        else:
            _print_log(path=self.temp_dir)

        assert self.stop_errors == [None, None], 'stop errors'

    def stop(self):
        if not self._started:
            return

        self.stop_errors = []

        self.stop_errors.append(self._stop())

        self.stop_errors.append(self.stop_processes())

        atexit.unregister(self.stop)

        self._started = False

    def _stop(self):
        if self._p.poll() is not None:
            return

        with self._p as p:
            p.send_signal(signal.SIGQUIT)

            try:
                retcode = p.wait(15)
                if retcode:
                    return 'Child process terminated with code ' + str(retcode)
            except:
                p.kill()
                return 'Could not terminate unit'

    def run_process(self, target, *args):
        if not hasattr(self, '_processes'):
            self._processes = []

        process = Process(target=target, args=args)
        process.start()

        self._processes.append(process)

    def stop_processes(self):
        if not hasattr(self, '_processes'):
            return

        fail = False
        for process in self._processes:
            if process.is_alive():
                process.terminate()
                process.join(timeout=15)

                if process.is_alive():
                    fail = True

        if fail:
            return 'Fail to stop process'