# -*- coding: utf-8 -*- import asyncio from async_generator import asynccontextmanager import pytest from phi.async_ldap.model import ( Hackers, User, Robots, Service, Group, Roles, ) from phi.async_ldap.mixins import build_heritage from phi.async_ldap.client import AsyncClient import phi.exceptions as e BASE_DN = "dc=unit,dc=macaomilano,dc=org" cl = AsyncClient( "ldap://localhost", port=389, encryption=True, # validate=True, ca_cert="../openldap/cert.pem", username="root", password="root", base_dn=BASE_DN, attribute_id="cn", ) @asynccontextmanager async def clean_db(): h = Hackers(cl) r = Robots(cl) c = Roles(cl) h.delete_cascade = True r.delete_cascade = True c.delete_cascade = True await h.delete() await r.delete() await c.delete() yield await h.delete() await r.delete() await c.delete() async def init_achilles(): u = User(cl, "achilles") u["cn"] = "Achilles" u["sn"] = "achilles" u["mail"] = "achilles@phthia.gr" u["userPassword"] = "Patroclus123" await u.save() return u async def init_patroclus(): u = User(cl, "patroclus") u["cn"] = "Patroclus" u["sn"] = "patroclus" u["mail"] = "patroclus@phthia.gr" u["userPassword"] = "WannabeAnHero" await u.save() return u async def init_athena(): s = Service(cl, "athena") s["userPassword"] = "ἁ θεονόα" await s.save() return s async def init_group(group_name, members): g = Group(cl, group_name, member=members) await g.save() return g @pytest.mark.asyncio @pytest.mark.integration_test async def test_User_init(): async with clean_db(): u = await init_achilles() h = Hackers(cl) res = await h.search("achilles") assert u == res @pytest.mark.asyncio @pytest.mark.integration_test async def test_User_exists(): async with clean_db(): u1 = await init_achilles() u2 = User(cl, "enea") assert await u1.exists() assert not await u2.exists() @pytest.mark.asyncio @pytest.mark.integration_test async def test_User_double_save_raises(): async with clean_db(): u = await init_achilles() # Read all the data from the db await u.sync() with pytest.raises(e.PhiEntryExists) as ex: await u.save() assert u.dn in str(ex.value) @pytest.mark.asyncio @pytest.mark.integration_test async def test_User_describe(): async with clean_db(): u = await init_achilles() res = await u.describe() assert res == { "uid": "achilles", "cn": "Achilles", "sn": "achilles", "dn": f"uid=achilles,ou=Hackers,{BASE_DN}", "mail": "achilles@phthia.gr", } @pytest.mark.asyncio @pytest.mark.integration_test async def test_User_modify(): async with clean_db(): u = await init_achilles() NEW_EMAIL = "a@myrmidons.mil" u["mail"] = NEW_EMAIL await u.modify() h = Hackers(cl) res = await h.search("achilles") await u.sync() assert u["mail"] == res["mail"] == NEW_EMAIL for attr in u.ldap_attributes: assert u[attr] == res[attr] @pytest.mark.asyncio @pytest.mark.integration_test async def test_User_modify_raises(): """Modifying a not-yet-existing user raises.""" async with clean_db(): u = User(cl, "enea") with pytest.raises(e.PhiEntryDoesNotExist) as ex: await u.modify() assert u.dn in str(ex.value) @pytest.mark.asyncio @pytest.mark.integration_test async def test_User_delete(): async with clean_db(): u = await init_achilles() await u.delete() h = Hackers(cl) with pytest.raises(e.PhiEntryDoesNotExist) as ex: await h.search("achilles") assert u.dn in str(ex.value) @pytest.mark.asyncio @pytest.mark.integration_test async def test_Service_init(): async with clean_db(): s = await init_athena() r = Robots(cl) res = await r.search("athena") assert s == res @pytest.mark.asyncio @pytest.mark.integration_test async def test_Service_describe(): async with clean_db(): s = await init_athena() res = await s.describe() assert res == { "ou": "Robots", "uid": "athena", "dn": f"uid=athena,ou=Robots,{BASE_DN}", } @pytest.mark.asyncio @pytest.mark.integration_test async def test_Group_init(): async with clean_db(): u = await init_achilles() g = await init_group("achaeans", [u]) c = Roles(cl) res = await c.search("achaeans") assert g == res assert [u] == [a async for a in g.get_members()] @pytest.mark.asyncio @pytest.mark.integration_test async def test_Group_describe(): async with clean_db(): u1 = await init_achilles() u2 = await init_patroclus() g = await init_group("achaeans", [u1, u2]) res = await g.describe() assert res == { "ou": "Roles", "cn": "achaeans", "dn": f"cn=achaeans,ou=Roles,{BASE_DN}", "member": [u1.dn, u2.dn], } @pytest.mark.asyncio @pytest.mark.integration_test async def test_Group_add_member(): async with clean_db(): u = await init_achilles() a = await init_athena() g1 = await init_group("achaeans", [u]) g2 = await init_group("gods", [u]) await g2.add_member(a) m1 = [m async for m in g1.get_members()] m2 = [m async for m in g2.get_members()] assert u in m1 assert u in m2 assert a not in m1 assert a in m2 @pytest.mark.asyncio @pytest.mark.integration_test async def test_Group_remove_member(): async with clean_db(): u = await init_achilles() a = await init_athena() g = await init_group("achaeans", [u, a]) m = [a async for a in g.get_members()] assert u in m assert a in m await g.remove_member(a) assert [u] == [el async for el in g.get_members()] @pytest.mark.asyncio @pytest.mark.integration_test async def test_User_groups(): async with clean_db(): u = await init_achilles() a = await init_athena() g1 = await init_group("achaeans", [u]) g2 = await init_group("gods", [u, a]) res1 = await u.groups() res2 = await a.groups() assert g1 in res1 assert g2 in res1 assert g1 not in res2 assert g2 in res2 @pytest.mark.asyncio @pytest.mark.integration_test async def test_OU_delete_raises(): async with clean_db(): u1 = await init_achilles() u2 = await init_patroclus() a = await init_athena() g1 = await init_group("achaeans", [u1, u2]) g2 = await init_group("gods", [a, u1]) h = Hackers(cl) _saved_val = h.delete_cascade h.delete_cascade = False assert not h.delete_cascade with pytest.raises(e.PhiCannotExecute) as ex: await h.delete() assert "delete_cascade is not set" in str(ex.value) h.delete_cascade = _saved_val @pytest.mark.asyncio @pytest.mark.integration_test async def test_OU_delete_cascade(): async with clean_db(): u1 = await init_achilles() u2 = await init_patroclus() a = await init_athena() g1 = await init_group("achaeans", [u1, u2]) g2 = await init_group("gods", [a, u1]) h = Hackers(cl) _saved_val = h.delete_cascade h.delete_cascade = True assert h.delete_cascade await h.delete() g2_members = [e async for e in g2.get_members()] h_members = [e async for e in h] assert not await u1.exists() assert not await u2.exists() assert h_members == [] assert not await g1.exists() assert u1 not in g2_members assert a in g2_members h.delete_cascade = _saved_val