1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
|
from unit.applications.lang.python import TestApplicationPython
from unit.option import option
class TestUnixAbstract(TestApplicationPython):
prerequisites = {
'modules': {'python': 'any'},
'features': ['unix_abstract'],
}
def test_unix_abstract_source(self):
addr = '\0sock'
def source(source):
assert 'success' in self.conf(
f'"{source}"', 'routes/0/match/source'
)
assert 'success' in self.conf(
{
"listeners": {
"127.0.0.1:7080": {"pass": "routes"},
f"unix:@{addr[1:]}": {"pass": "routes"},
},
"routes": [
{
"match": {"source": "!0.0.0.0/0"},
"action": {"return": 200},
}
],
"applications": {},
}
)
assert (
self.get(sock_type='unix', addr=addr)['status'] == 200
), 'neg ipv4'
source("!::/0")
assert (
self.get(sock_type='unix', addr=addr)['status'] == 200
), 'neg ipv6'
source("unix")
assert self.get()['status'] == 404, 'ipv4'
assert self.get(sock_type='unix', addr=addr)['status'] == 200, 'unix'
def test_unix_abstract_client_ip(self):
def get_xff(xff, sock_type='ipv4'):
address = {
'ipv4': ('127.0.0.1', 7080),
'ipv6': ('::1', 7081),
'unix': ('\0sock', None),
}
(addr, port) = address[sock_type]
return self.get(
sock_type=sock_type,
addr=addr,
port=port,
headers={'Connection': 'close', 'X-Forwarded-For': xff},
)['body']
client_ip_dir = f"{option.test_dir}/python/client_ip"
assert 'success' in self.conf(
{
"listeners": {
"127.0.0.1:7080": {
"client_ip": {
"header": "X-Forwarded-For",
"source": "unix",
},
"pass": "applications/client_ip",
},
"[::1]:7081": {
"client_ip": {
"header": "X-Forwarded-For",
"source": "unix",
},
"pass": "applications/client_ip",
},
"unix:@sock": {
"client_ip": {
"header": "X-Forwarded-For",
"source": "unix",
},
"pass": "applications/client_ip",
},
},
"applications": {
"client_ip": {
"type": self.get_application_type(),
"processes": {"spare": 0},
"path": client_ip_dir,
"working_directory": client_ip_dir,
"module": "wsgi",
}
},
}
)
assert get_xff('1.1.1.1') == '127.0.0.1', 'bad source ipv4'
assert get_xff('1.1.1.1', 'ipv6') == '::1', 'bad source ipv6'
for ip in [
'1.1.1.1',
'::11.22.33.44',
]:
assert get_xff(ip, 'unix') == ip, 'replace'
|