# -*- encoding: utf-8 -*- from contextlib import contextmanager import logging import mock import pytest from phi.async_ldap.client import ( parse_host, checked_port, compose_dn_username, AsyncClient, ) BASE_DN = "dc=unit,dc=macaomilano,dc=org" @contextmanager def does_not_raise(): yield @pytest.mark.parametrize( "test_url, exp_proto, exp_addr, exp_port", [ ("1.3.1.2", "ldap", "1.3.1.2", 389), ("ldap://localhost:1312", "ldap", "localhost", 1312), ("localhost:1312", "ldap", "localhost", 1312), ("localhost", "ldap", "localhost", 389), ("ldap://localhost", "ldap", "localhost", 389), ("ldaps://localhost", "ldaps", "localhost", 636), ("ldaps://localhost:1312", "ldaps", "localhost", 1312), ], ) def test_parse_host(test_url, exp_proto, exp_addr, exp_port): proto, addr, port = parse_host(test_url) assert proto == exp_proto assert addr == exp_addr assert port == exp_port @pytest.mark.parametrize( "manual, auto, exp_port", [(None, 389, 389), (1312, 389, 1312), (1312, 1312, 1312)] ) def test_checked_port(manual, auto, exp_port, caplog): port = checked_port(manual, auto) if manual and manual != auto: with caplog.at_level(logging.WARNING): "The former prevails" in caplog.text assert port == exp_port @pytest.mark.parametrize( "username, base_dn, ou, attribute_id, exp_dn", [ ( f"uid=conte_mascetti,{BASE_DN}", BASE_DN, None, "uid", f"uid=conte_mascetti,{BASE_DN}", ), ("root", BASE_DN, None, "cn", f"cn=root,{BASE_DN}"), ("necchi", BASE_DN, "Hackers", "uid", f"uid=necchi,ou=Hackers,{BASE_DN}"), ("perozzi", BASE_DN, "Phrackers", "cn", f"cn=perozzi,ou=Phrackers,{BASE_DN}"), ], ) def test_compose_dn_username(username, base_dn, ou, attribute_id, exp_dn): dn = compose_dn_username(username, base_dn, ou, attribute_id) assert dn == exp_dn @pytest.mark.parametrize( "url, encryption, validate, ca_cert, expectation", [ ("localhost", None, False, None, does_not_raise()), ("localhost", True, False, None, does_not_raise()), ("localhost", False, True, None, does_not_raise()), ("localhost", True, True, "path/to/cert.pem", does_not_raise()), ("ldaps://localhost", False, False, None, pytest.raises(ValueError)), ], ) def test_AsyncClient_init(url, encryption, validate, ca_cert, expectation): with expectation as exp: cl = AsyncClient( host=url, port=389, encryption=encryption, ciphers=None, validate=validate, ca_cert=ca_cert, username="conte_mascetti", password="pass", base_dn=BASE_DN, ou="Hackers", ) if exp is not None: assert "Incompatible provided protocol" in str(exp.value) return assert cl.base_dn == BASE_DN assert url in cl.full_uri assert "389" in cl.full_uri assert cl._tls if encryption else not cl._tls if validate: assert cl.cert_policy == -1 else: assert cl.cert_policy == 0 if ca_cert: assert cl.ca_cert == ca_cert else: assert cl.ca_cert == ""