summaryrefslogblamecommitdiffhomepage
path: root/test/test_upstreams_rr.py
blob: 2f74fbde66df1942ba1b4aa127fc7ff0784c4ba3 (plain) (tree)
1
2
3
4
5
6
7
8

         
 



                                                               
                                                  









                                                            
                                                            


                                                           














                                                     



                                                               
                      
                                       








                                              
 
                            


                                                  
 


                                                          
 
                           

















                                                                     


                                                              
 
                                    
                                 
                              




                                          
                                                          



















                                                                     
                                                            










                                                                   
                                                            






















































                                                                             




























































                                                                            








































                                                                          

                                                            


                                                                   
                                                            




                                                                   








































                                                                         




                            
                                          
                               





                                          









                                                  

                                                  

                                       

                                                                               

























                                                                            
                                                






















                                                                              




                                                                
 




                                                                               





















                                                                             

                                                                   
 

                                                                              









                                                        

                                                             








                                       
                                                             















                                                        

                                                         








                                       
                                                                             














                                                   

                                                                    




















                                                                          
















                                                                             














                                                                            



                          
import os
import re

from unit.applications.lang.python import TestApplicationPython


class TestUpstreamsRR(TestApplicationPython):
    prerequisites = {'modules': {'python': 'any'}}

    def setUp(self):
        super().setUp()

        self.assertIn(
            'success',
            self.conf(
                {
                    "listeners": {
                        "*:7080": {"pass": "upstreams/one"},
                        "*:7090": {"pass": "upstreams/two"},
                        "*:7081": {"pass": "routes/one"},
                        "*:7082": {"pass": "routes/two"},
                        "*:7083": {"pass": "routes/three"},
                    },
                    "upstreams": {
                        "one": {
                            "servers": {
                                "127.0.0.1:7081": {},
                                "127.0.0.1:7082": {},
                            },
                        },
                        "two": {
                            "servers": {
                                "127.0.0.1:7081": {},
                                "127.0.0.1:7082": {},
                            },
                        },
                    },
                    "routes": {
                        "one": [{"action": {"return": 200}}],
                        "two": [{"action": {"return": 201}}],
                        "three": [{"action": {"return": 202}}],
                    },
                    "applications": {},
                },
            ),
            'upstreams initial configuration',
        )

        self.cpu_count = os.cpu_count()

    def get_resps(self, req=100, port=7080):
        resps = [0]

        for _ in range(req):
            status = self.get(port=port)['status']
            if 200 > status or status > 209:
                continue

            ups = status % 10
            if ups > len(resps) - 1:
                resps.extend([0] * (ups - len(resps) + 1))

            resps[ups] += 1

        return resps

    def get_resps_sc(self, req=100, port=7080):
        to_send = b"""GET / HTTP/1.1
Host: localhost

""" * (
            req - 1
        )

        to_send += b"""GET / HTTP/1.1
Host: localhost
Connection: close

"""

        resp = self.http(to_send, raw_resp=True, raw=True, port=port)
        status = re.findall(r'HTTP\/\d\.\d\s(\d\d\d)', resp)
        status = list(filter(lambda x: x[:2] == '20', status))
        ups = list(map(lambda x: int(x[-1]), status))

        resps = [0] * (max(ups) + 1)
        for i in range(len(ups)):
            resps[ups[i]] += 1

        return resps

    def test_upstreams_rr_no_weight(self):
        resps = self.get_resps()
        self.assertEqual(sum(resps), 100, 'no weight sum')
        self.assertLessEqual(
            abs(resps[0] - resps[1]), self.cpu_count, 'no weight'
        )

        self.assertIn(
            'success',
            self.conf_delete('upstreams/one/servers/127.0.0.1:7081'),
            'no weight server remove',
        )

        resps = self.get_resps(req=50)
        self.assertEqual(resps[1], 50, 'no weight 2')

        self.assertIn(
            'success',
            self.conf({}, 'upstreams/one/servers/127.0.0.1:7081'),
            'no weight server revert',
        )

        resps = self.get_resps()
        self.assertEqual(sum(resps), 100, 'no weight 3 sum')
        self.assertLessEqual(
            abs(resps[0] - resps[1]), self.cpu_count, 'no weight 3'
        )

        self.assertIn(
            'success',
            self.conf({}, 'upstreams/one/servers/127.0.0.1:7083'),
            'no weight server new',
        )

        resps = self.get_resps()
        self.assertEqual(sum(resps), 100, 'no weight 4 sum')
        self.assertLessEqual(
            max(resps) - min(resps), self.cpu_count, 'no weight 4'
        )

        resps = self.get_resps_sc(req=30)
        self.assertEqual(resps[0], 10, 'no weight 4 0')
        self.assertEqual(resps[1], 10, 'no weight 4 1')
        self.assertEqual(resps[2], 10, 'no weight 4 2')

    def test_upstreams_rr_weight(self):
        self.assertIn(
            'success',
            self.conf({"weight": 3}, 'upstreams/one/servers/127.0.0.1:7081'),
            'configure weight',
        )

        resps = self.get_resps_sc()
        self.assertEqual(resps[0], 75, 'weight 3 0')
        self.assertEqual(resps[1], 25, 'weight 3 1')

        self.assertIn(
            'success',
            self.conf_delete('upstreams/one/servers/127.0.0.1:7081/weight'),
            'configure weight remove',
        )
        resps = self.get_resps_sc(req=10)
        self.assertEqual(resps[0], 5, 'weight 0 0')
        self.assertEqual(resps[1], 5, 'weight 0 1')

        self.assertIn(
            'success',
            self.conf('1', 'upstreams/one/servers/127.0.0.1:7081/weight'),
            'configure weight 1',
        )

        resps = self.get_resps_sc()
        self.assertEqual(resps[0], 50, 'weight 1 0')
        self.assertEqual(resps[1], 50, 'weight 1 1')

        self.assertIn(
            'success',
            self.conf(
                {
                    "127.0.0.1:7081": {"weight": 3},
                    "127.0.0.1:7083": {"weight": 2},
                },
                'upstreams/one/servers',
            ),
            'configure weight 2',
        )

        resps = self.get_resps_sc()
        self.assertEqual(resps[0], 60, 'weight 2 0')
        self.assertEqual(resps[2], 40, 'weight 2 1')

    def test_upstreams_rr_weight_rational(self):
        def set_weights(w1, w2):
            self.assertIn(
                'success',
                self.conf(
                    {
                        "127.0.0.1:7081": {"weight": w1},
                        "127.0.0.1:7082": {"weight": w2},
                    },
                    'upstreams/one/servers',
                ),
                'configure weights',
            )

        def check_reqs(w1, w2, reqs=10):
            resps = self.get_resps_sc(req=reqs)
            self.assertEqual(resps[0], reqs * w1 / (w1 + w2), 'weight 1')
            self.assertEqual(resps[1], reqs * w2 / (w1 + w2), 'weight 2')

        def check_weights(w1, w2):
            set_weights(w1, w2)
            check_reqs(w1, w2)

        check_weights(0, 1)
        check_weights(0, 999999.0123456)
        check_weights(1, 9)
        check_weights(100000, 900000)
        check_weights(1, .25)
        check_weights(1, 0.25)
        check_weights(0.2, .8)
        check_weights(1, 1.5)
        check_weights(1e-3, 1E-3)
        check_weights(1e-20, 1e-20)
        check_weights(1e4, 1e4)
        check_weights(1000000, 1000000)

        set_weights(0.25, 0.25)
        self.assertIn(
            'success',
            self.conf_delete('upstreams/one/servers/127.0.0.1:7081/weight'),
            'delete weight',
        )
        check_reqs(1, 0.25)

        self.assertIn(
            'success',
            self.conf(
                {
                    "127.0.0.1:7081": {"weight": 0.1},
                    "127.0.0.1:7082": {"weight": 1},
                    "127.0.0.1:7083": {"weight": 0.9},
                },
                'upstreams/one/servers',
            ),
            'configure weights',
        )
        resps = self.get_resps_sc(req=20)
        self.assertEqual(resps[0], 1, 'weight 3 1')
        self.assertEqual(resps[1], 10, 'weight 3 2')
        self.assertEqual(resps[2], 9, 'weight 3 3')

    def test_upstreams_rr_independent(self):
        def sum_resps(*args):
            sum = [0] * len(args[0])
            for arg in args:
                sum = [x + y for x, y in zip(sum, arg)]

            return sum

        resps = self.get_resps_sc(req=30, port=7090)
        self.assertEqual(resps[0], 15, 'dep two before 0')
        self.assertEqual(resps[1], 15, 'dep two before 1')

        resps = self.get_resps_sc(req=30)
        self.assertEqual(resps[0], 15, 'dep one before 0')
        self.assertEqual(resps[1], 15, 'dep one before 1')

        self.assertIn(
            'success',
            self.conf('2', 'upstreams/two/servers/127.0.0.1:7081/weight'),
            'configure dep weight',
        )

        resps = self.get_resps_sc(req=30, port=7090)
        self.assertEqual(resps[0], 20, 'dep two 0')
        self.assertEqual(resps[1], 10, 'dep two 1')

        resps = self.get_resps_sc(req=30)
        self.assertEqual(resps[0], 15, 'dep one 0')
        self.assertEqual(resps[1], 15, 'dep one 1')

        self.assertIn(
            'success',
            self.conf('1', 'upstreams/two/servers/127.0.0.1:7081/weight'),
            'configure dep weight 1',
        )

        r_one, r_two = [0, 0], [0, 0]
        for _ in range(10):
            r_one = sum_resps(r_one, self.get_resps(req=10))
            r_two = sum_resps(r_two, self.get_resps(req=10, port=7090))


        self.assertEqual(sum(r_one), 100, 'dep one mix sum')
        self.assertLessEqual(
            abs(r_one[0] - r_one[1]), self.cpu_count, 'dep one mix'
        )
        self.assertEqual(sum(r_two), 100, 'dep two mix sum')
        self.assertLessEqual(
            abs(r_two[0] - r_two[1]), self.cpu_count, 'dep two mix'
        )

    def test_upstreams_rr_delay(self):
        self.assertIn(
            'success',
            self.conf(
                {
                    "listeners": {
                        "*:7080": {"pass": "upstreams/one"},
                        "*:7081": {"pass": "routes"},
                        "*:7082": {"pass": "routes"},
                    },
                    "upstreams": {
                        "one": {
                            "servers": {
                                "127.0.0.1:7081": {},
                                "127.0.0.1:7082": {},
                            },
                        },
                    },
                    "routes": [
                        {
                            "match": {"destination": "*:7081"},
                            "action": {"pass": "applications/delayed"},
                        },
                        {
                            "match": {"destination": "*:7082"},
                            "action": {"return": 201},
                        },
                    ],
                    "applications": {
                        "delayed": {
                            "type": "python",
                            "processes": {"spare": 0},
                            "path": self.current_dir + "/python/delayed",
                            "working_directory": self.current_dir
                            + "/python/delayed",
                            "module": "wsgi",
                        }
                    },
                },
            ),
            'upstreams initial configuration',
        )

        req = 50

        socks = []
        for i in range(req):
            delay = 1 if i % 5 == 0 else 0
            _, sock = self.get(
                headers={
                    'Host': 'localhost',
                    'Content-Length': '0',
                    'X-Delay': str(delay),
                    'Connection': 'close',
                },
                start=True,
                no_recv=True,
            )
            socks.append(sock)

        resps = [0, 0]
        for i in range(req):
            resp = self.recvall(socks[i]).decode()
            socks[i].close()

            m = re.search('HTTP/1.1 20(\d)', resp)
            self.assertIsNotNone(m, 'status')
            resps[int(m.group(1))] += 1

        self.assertEqual(sum(resps), req, 'delay sum')
        self.assertLessEqual(abs(resps[0] - resps[1]), self.cpu_count, 'delay')

    def test_upstreams_rr_active_req(self):
        conns = 5
        socks = []
        socks2 = []

        for _ in range(conns):
            _, sock = self.get(start=True, no_recv=True)
            socks.append(sock)

            _, sock2 = self.http(
                b"""POST / HTTP/1.1
Host: localhost
Content-Length: 10
Connection: close

""",
                start=True,
                no_recv=True,
                raw=True,
            )
            socks2.append(sock2)

        # Send one more request and read response to make sure that previous
        # requests had enough time to reach server.

        self.assertEqual(self.get()['body'], '')

        self.assertIn(
            'success',
            self.conf(
                {"127.0.0.1:7083": {"weight": 2}}, 'upstreams/one/servers',
            ),
            'active req new server',
        )
        self.assertIn(
            'success',
            self.conf_delete('upstreams/one/servers/127.0.0.1:7083'),
            'active req server remove',
        )
        self.assertIn(
            'success', self.conf_delete('listeners/*:7080'), 'delete listener'
        )
        self.assertIn(
            'success',
            self.conf_delete('upstreams/one'),
            'active req upstream remove',
        )

        for i in range(conns):
            self.assertEqual(
                self.http(b'', sock=socks[i], raw=True)['body'],
                '',
                'active req GET',
            )

            self.assertEqual(
                self.http(b"""0123456789""", sock=socks2[i], raw=True)['body'],
                '',
                'active req POST',
            )

    def test_upstreams_rr_bad_server(self):
        self.assertIn(
            'success',
            self.conf({"weight": 1}, 'upstreams/one/servers/127.0.0.1:7084'),
            'configure bad server',
        )

        resps = self.get_resps_sc(req=30)
        self.assertEqual(resps[0], 10, 'bad server 0')
        self.assertEqual(resps[1], 10, 'bad server 1')
        self.assertEqual(sum(resps), 20, 'bad server sum')

    def test_upstreams_rr_pipeline(self):
        resps = self.get_resps_sc()

        self.assertEqual(resps[0], 50, 'pipeline 0')
        self.assertEqual(resps[1], 50, 'pipeline 1')

    def test_upstreams_rr_post(self):
        resps = [0, 0]
        for _ in range(50):
            resps[self.get()['status'] % 10] += 1
            resps[self.post(body='0123456789')['status'] % 10] += 1

        self.assertEqual(sum(resps), 100, 'post sum')
        self.assertLessEqual(abs(resps[0] - resps[1]), self.cpu_count, 'post')

    def test_upstreams_rr_unix(self):
        addr_0 = self.testdir + '/sock_0'
        addr_1 = self.testdir + '/sock_1'

        self.assertIn(
            'success',
            self.conf(
                {
                    "*:7080": {"pass": "upstreams/one"},
                    "unix:" + addr_0: {"pass": "routes/one"},
                    "unix:" + addr_1: {"pass": "routes/two"},
                },
                'listeners',
            ),
            'configure listeners unix',
        )

        self.assertIn(
            'success',
            self.conf(
                {"unix:" + addr_0: {}, "unix:" + addr_1: {}},
                'upstreams/one/servers',
            ),
            'configure servers unix',
        )

        resps = self.get_resps_sc()

        self.assertEqual(resps[0], 50, 'unix 0')
        self.assertEqual(resps[1], 50, 'unix 1')

    def test_upstreams_rr_ipv6(self):
        self.assertIn(
            'success',
            self.conf(
                {
                    "*:7080": {"pass": "upstreams/one"},
                    "[::1]:7081": {"pass": "routes/one"},
                    "[::1]:7082": {"pass": "routes/two"},
                },
                'listeners',
            ),
            'configure listeners ipv6',
        )

        self.assertIn(
            'success',
            self.conf(
                {"[::1]:7081": {}, "[::1]:7082": {}}, 'upstreams/one/servers'
            ),
            'configure servers ipv6',
        )

        resps = self.get_resps_sc()

        self.assertEqual(resps[0], 50, 'ipv6 0')
        self.assertEqual(resps[1], 50, 'ipv6 1')

    def test_upstreams_rr_servers_empty(self):
        self.assertIn(
            'success',
            self.conf({}, 'upstreams/one/servers'),
            'configure servers empty',
        )
        self.assertEqual(self.get()['status'], 502, 'servers empty')

        self.assertIn(
            'success',
            self.conf(
                {"127.0.0.1:7081": {"weight": 0}}, 'upstreams/one/servers'
            ),
            'configure servers empty one',
        )
        self.assertEqual(self.get()['status'], 502, 'servers empty one')
        self.assertIn(
            'success',
            self.conf(
                {
                    "127.0.0.1:7081": {"weight": 0},
                    "127.0.0.1:7082": {"weight": 0},
                },
                'upstreams/one/servers',
            ),
            'configure servers empty two',
        )
        self.assertEqual(self.get()['status'], 502, 'servers empty two')

    def test_upstreams_rr_invalid(self):
        self.assertIn(
            'error', self.conf({}, 'upstreams'), 'upstreams empty',
        )
        self.assertIn(
            'error', self.conf({}, 'upstreams/one'), 'named upstreams empty',
        )
        self.assertIn(
            'error',
            self.conf({}, 'upstreams/one/servers/127.0.0.1'),
            'invalid address',
        )
        self.assertIn(
            'error',
            self.conf({}, 'upstreams/one/servers/127.0.0.1:7081/blah'),
            'invalid server option',
        )

        def check_weight(w):
            self.assertIn(
                'error',
                self.conf(w, 'upstreams/one/servers/127.0.0.1:7081/weight'),
                'invalid weight option',
            )
        check_weight({})
        check_weight('-1')
        check_weight('1.')
        check_weight('1.1.')
        check_weight('.')
        check_weight('.01234567890123')
        check_weight('1000001')
        check_weight('2e6')


if __name__ == '__main__':
    TestUpstreamsRR.main()