1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
|
import socket
import time
import pytest
pytest.importorskip('OpenSSL.SSL')
from OpenSSL.SSL import (
TLSv1_2_METHOD,
SESS_CACHE_CLIENT,
OP_NO_TICKET,
Context,
Connection,
_lib,
)
from unit.applications.tls import TestApplicationTLS
class TestTLSSession(TestApplicationTLS):
prerequisites = {'modules': {'openssl': 'any'}}
@pytest.fixture(autouse=True)
def setup_method_fixture(self):
self.certificate()
assert 'success' in self.conf(
{
"listeners": {
"*:7080": {
"pass": "routes",
"tls": {"certificate": "default", "session": {}},
}
},
"routes": [{"action": {"return": 200}}],
"applications": {},
}
), 'load application configuration'
def add_session(self, cache_size=None, timeout=None):
session = {}
if cache_size is not None:
session['cache_size'] = cache_size
if timeout is not None:
session['timeout'] = timeout
return self.conf(session, 'listeners/*:7080/tls/session')
def connect(self, ctx=None, session=None):
sock = socket.create_connection(('127.0.0.1', 7080))
if ctx is None:
ctx = Context(TLSv1_2_METHOD)
ctx.set_session_cache_mode(SESS_CACHE_CLIENT)
ctx.set_options(OP_NO_TICKET)
client = Connection(ctx, sock)
client.set_connect_state()
if session is not None:
client.set_session(session)
client.do_handshake()
client.shutdown()
return (
client,
client.get_session(),
ctx,
_lib.SSL_session_reused(client._ssl),
)
def test_tls_session(self):
_, sess, ctx, reused = self.connect()
assert not reused, 'new connection'
_, _, _, reused = self.connect(ctx, sess)
assert not reused, 'no cache'
assert 'success' in self.add_session(cache_size=2)
_, sess, ctx, reused = self.connect()
assert not reused, 'new connection cache'
_, _, _, reused = self.connect(ctx, sess)
assert reused, 'cache'
_, _, _, reused = self.connect(ctx, sess)
assert reused, 'cache 2'
# check that at least one session of four is not reused
clients = [self.connect() for _ in range(4)]
assert True not in [c[-1] for c in clients], 'cache small all new'
clients_again = [self.connect(c[2], c[1]) for c in clients]
assert False in [c[-1] for c in clients_again], 'cache small no reuse'
# all four sessions are reused
assert 'success' in self.add_session(cache_size=8)
clients = [self.connect() for _ in range(4)]
assert True not in [c[-1] for c in clients], 'cache big all new'
clients_again = [self.connect(c[2], c[1]) for c in clients]
assert False not in [c[-1] for c in clients_again], 'cache big reuse'
def test_tls_session_timeout(self):
assert 'success' in self.add_session(cache_size=5, timeout=1)
_, sess, ctx, reused = self.connect()
assert not reused, 'new connection'
_, _, _, reused = self.connect(ctx, sess)
assert reused, 'no timeout'
time.sleep(3)
_, _, _, reused = self.connect(ctx, sess)
assert not reused, 'timeout'
def test_tls_session_invalid(self):
assert 'error' in self.add_session(cache_size=-1)
assert 'error' in self.add_session(cache_size={})
assert 'error' in self.add_session(timeout=-1)
assert 'error' in self.add_session(timeout={})
|