2019-04-28 15:33:53 +02:00
|
|
|
# -*- encoding: utf-8 -*-
|
|
|
|
|
|
|
|
from contextlib import contextmanager
|
|
|
|
import logging
|
|
|
|
|
|
|
|
import mock
|
|
|
|
import pytest
|
|
|
|
|
2019-05-04 17:59:21 +02:00
|
|
|
from phi.async_ldap.client import (
|
2019-04-28 15:33:53 +02:00
|
|
|
parse_host,
|
|
|
|
checked_port,
|
|
|
|
compose_dn_username,
|
|
|
|
AsyncClient,
|
|
|
|
)
|
|
|
|
|
|
|
|
|
|
|
|
BASE_DN = "dc=unit,dc=macaomilano,dc=org"
|
|
|
|
|
|
|
|
|
|
|
|
@contextmanager
|
|
|
|
def does_not_raise():
|
|
|
|
yield
|
|
|
|
|
|
|
|
|
|
|
|
@pytest.mark.parametrize(
|
|
|
|
"test_url, exp_proto, exp_addr, exp_port",
|
|
|
|
[
|
|
|
|
("1.3.1.2", "ldap", "1.3.1.2", 389),
|
|
|
|
("ldap://localhost:1312", "ldap", "localhost", 1312),
|
|
|
|
("localhost:1312", "ldap", "localhost", 1312),
|
|
|
|
("localhost", "ldap", "localhost", 389),
|
|
|
|
("ldap://localhost", "ldap", "localhost", 389),
|
|
|
|
("ldaps://localhost", "ldaps", "localhost", 636),
|
|
|
|
("ldaps://localhost:1312", "ldaps", "localhost", 1312),
|
|
|
|
],
|
|
|
|
)
|
|
|
|
def test_parse_host(test_url, exp_proto, exp_addr, exp_port):
|
|
|
|
proto, addr, port = parse_host(test_url)
|
|
|
|
|
|
|
|
assert proto == exp_proto
|
|
|
|
assert addr == exp_addr
|
|
|
|
assert port == exp_port
|
|
|
|
|
|
|
|
|
|
|
|
@pytest.mark.parametrize(
|
|
|
|
"manual, auto, exp_port", [(None, 389, 389), (1312, 389, 1312), (1312, 1312, 1312)]
|
|
|
|
)
|
|
|
|
def test_checked_port(manual, auto, exp_port, caplog):
|
|
|
|
port = checked_port(manual, auto)
|
|
|
|
if manual and manual != auto:
|
|
|
|
with caplog.at_level(logging.WARNING):
|
|
|
|
"The former prevails" in caplog.text
|
|
|
|
|
|
|
|
assert port == exp_port
|
|
|
|
|
|
|
|
|
|
|
|
@pytest.mark.parametrize(
|
|
|
|
"username, base_dn, ou, attribute_id, exp_dn",
|
|
|
|
[
|
|
|
|
(
|
|
|
|
f"uid=conte_mascetti,{BASE_DN}",
|
|
|
|
BASE_DN,
|
|
|
|
None,
|
|
|
|
"uid",
|
|
|
|
f"uid=conte_mascetti,{BASE_DN}",
|
|
|
|
),
|
|
|
|
("root", BASE_DN, None, "cn", f"cn=root,{BASE_DN}"),
|
|
|
|
("necchi", BASE_DN, "Hackers", "uid", f"uid=necchi,ou=Hackers,{BASE_DN}"),
|
|
|
|
("perozzi", BASE_DN, "Phrackers", "cn", f"cn=perozzi,ou=Phrackers,{BASE_DN}"),
|
|
|
|
],
|
|
|
|
)
|
|
|
|
def test_compose_dn_username(username, base_dn, ou, attribute_id, exp_dn):
|
|
|
|
dn = compose_dn_username(username, base_dn, ou, attribute_id)
|
|
|
|
|
|
|
|
assert dn == exp_dn
|
|
|
|
|
|
|
|
|
|
|
|
@pytest.mark.parametrize(
|
|
|
|
"url, encryption, validate, ca_cert, expectation",
|
|
|
|
[
|
|
|
|
("localhost", None, False, None, does_not_raise()),
|
|
|
|
("localhost", True, False, None, does_not_raise()),
|
|
|
|
("localhost", False, True, None, does_not_raise()),
|
|
|
|
("localhost", True, True, "path/to/cert.pem", does_not_raise()),
|
|
|
|
("ldaps://localhost", False, False, None, pytest.raises(ValueError)),
|
|
|
|
],
|
|
|
|
)
|
|
|
|
def test_AsyncClient_init(url, encryption, validate, ca_cert, expectation):
|
|
|
|
with expectation as exp:
|
|
|
|
cl = AsyncClient(
|
|
|
|
host=url,
|
|
|
|
port=389,
|
|
|
|
encryption=encryption,
|
|
|
|
ciphers=None,
|
|
|
|
validate=validate,
|
|
|
|
ca_cert=ca_cert,
|
|
|
|
username="conte_mascetti",
|
|
|
|
password="pass",
|
|
|
|
base_dn=BASE_DN,
|
|
|
|
ou="Hackers",
|
|
|
|
)
|
|
|
|
|
|
|
|
if exp is not None:
|
|
|
|
assert "Incompatible provided protocol" in str(exp.value)
|
|
|
|
return
|
|
|
|
|
|
|
|
assert cl.base_dn == BASE_DN
|
|
|
|
assert url in cl.full_uri
|
|
|
|
assert "389" in cl.full_uri
|
|
|
|
assert cl._tls if encryption else not cl._tls
|
|
|
|
if validate:
|
|
|
|
assert cl.cert_policy == -1
|
|
|
|
else:
|
|
|
|
assert cl.cert_policy == 0
|
|
|
|
if ca_cert:
|
|
|
|
assert cl.ca_cert == ca_cert
|
|
|
|
else:
|
|
|
|
assert cl.ca_cert == ""
|