import socket
import pytest
pytest.importorskip('OpenSSL.SSL')
from OpenSSL.SSL import (
TLSv1_2_METHOD,
Context,
Connection,
_lib,
)
from unit.applications.tls import ApplicationTLS
prerequisites = {'modules': {'openssl': 'any'}}
client = ApplicationTLS()
TICKET = 'U1oDTh11mMxODuw12gS0EXX1E/PkZG13cJNQ6m5+6BGlfPTjNlIEw7PSVU3X1gTE'
TICKET2 = '5AV0DSYIYbZWZQB7fCnTHZmMxtotb/aXjam+n2XS79lTvX3Tq9xGqpC8XKNEF2lt'
TICKET80 = '6Pfil8lv/k8zf8MndPpfXaO5EAV6dhME6zs6CfUyq2yziynQwSywtKQMqHGnJ2HR\
49TZXi/Y4/8RSIO7QPsU51/HLR1gWIMhVM2m9yh93Bw='
@pytest.fixture(autouse=True)
def setup_method_fixture():
client.certificate()
listener_conf = {
"pass": "routes",
"tls": {
"certificate": "default",
"session": {"cache_size": 0, "tickets": True},
},
}
assert 'success' in client.conf(
{
"listeners": {
"*:7080": listener_conf,
"*:7081": listener_conf,
"*:7082": listener_conf,
},
"routes": [{"action": {"return": 200}}],
"applications": {},
}
), 'load application configuration'
def connect(ctx=None, session=None, port=7080):
sock = socket.create_connection(('127.0.0.1', port))
if ctx is None:
ctx = Context(TLSv1_2_METHOD)
conn = Connection(ctx, sock)
conn.set_connect_state()
if session is not None:
conn.set_session(session)
conn.do_handshake()
conn.shutdown()
return (
conn.get_session(),
ctx,
_lib.SSL_session_reused(conn._ssl),
)
def has_ticket(sess):
return _lib.SSL_SESSION_has_ticket(sess._session)
def set_tickets(tickets=True, port=7080):
assert 'success' in client.conf(
{"cache_size": 0, "tickets": tickets},
f'listeners/*:{port}/tls/session',
)
@pytest.mark.skipif(
not hasattr(_lib, 'SSL_SESSION_has_ticket'),
reason='ticket check is not supported',
)
def test_tls_ticket():
sess, ctx, reused = connect()
assert has_ticket(sess), 'tickets True'
assert not reused, 'tickets True not reused'
sess, ctx, reused = connect(ctx, sess)
assert has_ticket(sess), 'tickets True reconnect'
assert reused, 'tickets True reused'
set_tickets(tickets=False)
sess, _, _ = connect()
assert not has_ticket(sess), 'tickets False'
assert 'success' in client.conf_delete(
'listeners/*:7080/tls/session/tickets'
), 'tickets default configure'
sess, _, _ = connect()
assert not has_ticket(sess), 'tickets default (false)'
@pytest.mark.skipif(
not hasattr(_lib, 'SSL_SESSION_has_ticket'),
reason='ticket check is not supported',
)
def test_tls_ticket_string():
set_tickets(TICKET)
sess, ctx, _ = connect()
assert has_ticket(sess), 'tickets string'
sess2, _, reused = connect(ctx, sess)
assert has_ticket(sess2), 'tickets string reconnect'
assert reused, 'tickets string reused'
sess2, _, reused = connect(ctx, sess, port=7081)
assert has_ticket(sess2), 'connect True'
assert not reused, 'connect True not reused'
set_tickets(TICKET2, port=7081)
sess2, _, reused = connect(ctx, sess, port=7081)
assert has_ticket(sess2), 'wrong ticket'
assert not reused, 'wrong ticket not reused'
set_tickets(TICKET80)
sess, ctx, _ = connect()
assert has_ticket(sess), 'tickets string 80'
sess2, _, reused = connect(ctx, sess)
assert has_ticket(sess2), 'tickets string 80 reconnect'
assert reused, 'tickets string 80 reused'
sess2, _, reused = connect(ctx, sess, port=7081)
assert has_ticket(sess2), 'wrong ticket 80'
assert not reused, 'wrong ticket 80 not reused'
@pytest.mark.skipif(
not hasattr(_lib, 'SSL_SESSION_has_ticket'),
reason='ticket check is not supported',
)
def test_tls_ticket_array():
set_tickets([])
sess, ctx, _ = connect()
assert not has_ticket(sess), 'tickets array empty'
set_tickets([TICKET, TICKET2])
set_tickets(TICKET, port=7081)
set_tickets(TICKET2, port=7082)
sess, ctx, _ = connect()
_, _, reused = connect(ctx, sess, port=7081)
assert not reused, 'not last ticket'
_, _, reused = connect(ctx, sess, port=7082)
assert reused, 'last ticket'
sess, ctx, _ = connect(port=7081)
_, _, reused = connect(ctx, sess)
assert reused, 'first ticket'
sess, ctx, _ = connect(port=7082)
_, _, reused = connect(ctx, sess)
assert reused, 'second ticket'
assert 'success' in client.conf_delete(
'listeners/*:7080/tls/session/tickets/0'
), 'removed first ticket'
assert 'success' in client.conf_post(
f'"{TICKET}"', 'listeners/*:7080/tls/session/tickets'
), 'add new ticket to the end of array'
sess, ctx, _ = connect()
_, _, reused = connect(ctx, sess, port=7082)
assert not reused, 'not last ticket 2'
_, _, reused = connect(ctx, sess, port=7081)
assert reused, 'last ticket 2'
def test_tls_ticket_invalid():
def check_tickets(tickets):
assert 'error' in client.conf(
{"tickets": tickets},
'listeners/*:7080/tls/session',
)
check_tickets({})
check_tickets('!?&^' * 16)
check_tickets(f'{TICKET[:-2]}!{TICKET[3:]}')
check_tickets(TICKET[:-1])
check_tickets(f'{TICKET}b')
check_tickets(f'{TICKET}blah')
check_tickets([True, TICKET, TICKET2])
check_tickets([TICKET, 'blah', TICKET2])
check_tickets([TICKET, TICKET2, []])