362 lines
8.0 KiB
Python
362 lines
8.0 KiB
Python
# -*- coding: utf-8 -*-
|
|
|
|
import asyncio
|
|
|
|
from async_generator import asynccontextmanager
|
|
import pytest
|
|
|
|
from phi.async_ldap.model import (
|
|
Hackers,
|
|
User,
|
|
Robots,
|
|
Service,
|
|
Group,
|
|
Congregations,
|
|
)
|
|
from phi.async_ldap.mixins import build_heritage
|
|
from phi.async_ldap.client import AsyncClient
|
|
import phi.exceptions as e
|
|
|
|
BASE_DN = "dc=unit,dc=macaomilano,dc=org"
|
|
|
|
cl = AsyncClient(
|
|
"ldap://localhost",
|
|
port=389,
|
|
encryption=True,
|
|
# validate=True,
|
|
ca_cert="../openldap/cert.pem",
|
|
username="root",
|
|
password="root",
|
|
base_dn=BASE_DN,
|
|
attribute_id="cn",
|
|
)
|
|
|
|
|
|
@asynccontextmanager
|
|
async def clean_db():
|
|
h = Hackers(cl)
|
|
r = Robots(cl)
|
|
c = Congregations(cl)
|
|
h.delete_cascade = True
|
|
r.delete_cascade = True
|
|
c.delete_cascade = True
|
|
await h.delete()
|
|
await r.delete()
|
|
await c.delete()
|
|
yield
|
|
await h.delete()
|
|
await r.delete()
|
|
await c.delete()
|
|
|
|
|
|
async def init_achilles():
|
|
u = User(cl, "achilles")
|
|
u["cn"] = "Achilles"
|
|
u["sn"] = "achilles"
|
|
u["mail"] = "achilles@phthia.gr"
|
|
u["userPassword"] = "Patroclus123"
|
|
|
|
await u.save()
|
|
|
|
return u
|
|
|
|
|
|
async def init_patroclus():
|
|
u = User(cl, "patroclus")
|
|
u["cn"] = "Patroclus"
|
|
u["sn"] = "patroclus"
|
|
u["mail"] = "patroclus@phthia.gr"
|
|
u["userPassword"] = "WannabeAnHero"
|
|
|
|
await u.save()
|
|
|
|
return u
|
|
|
|
|
|
async def init_athena():
|
|
s = Service(cl, "athena")
|
|
s["userPassword"] = "ἁ θεονόα"
|
|
|
|
await s.save()
|
|
|
|
return s
|
|
|
|
|
|
async def init_group(group_name, members):
|
|
g = Group(cl, group_name, member=members)
|
|
|
|
await g.save()
|
|
|
|
return g
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
@pytest.mark.integration_test
|
|
async def test_User_init():
|
|
async with clean_db():
|
|
u = await init_achilles()
|
|
|
|
h = Hackers(cl)
|
|
|
|
res = await h.search("achilles")
|
|
|
|
assert u == res
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
@pytest.mark.integration_test
|
|
async def test_User_exists():
|
|
async with clean_db():
|
|
u1 = await init_achilles()
|
|
u2 = User(cl, "enea")
|
|
|
|
assert await u1.exists()
|
|
assert not await u2.exists()
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
@pytest.mark.integration_test
|
|
async def test_User_double_save_raises():
|
|
async with clean_db():
|
|
u = await init_achilles()
|
|
# Read all the data from the db
|
|
await u.sync()
|
|
|
|
with pytest.raises(e.PhiEntryExists) as ex:
|
|
await u.save()
|
|
|
|
assert u.dn in str(ex.value)
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
@pytest.mark.integration_test
|
|
async def test_User_describe():
|
|
async with clean_db():
|
|
u = await init_achilles()
|
|
|
|
res = await u.describe()
|
|
|
|
assert res == {
|
|
"uid": "achilles",
|
|
"cn": "Achilles",
|
|
"sn": "achilles",
|
|
"dn": f"uid=achilles,ou=Hackers,{BASE_DN}",
|
|
"mail": "achilles@phthia.gr",
|
|
}
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
@pytest.mark.integration_test
|
|
async def test_User_modify():
|
|
async with clean_db():
|
|
u = await init_achilles()
|
|
NEW_EMAIL = "a@myrmidons.mil"
|
|
u["mail"] = NEW_EMAIL
|
|
await u.modify()
|
|
|
|
h = Hackers(cl)
|
|
|
|
res = await h.search("achilles")
|
|
await u.sync()
|
|
|
|
assert u["mail"] == res["mail"] == NEW_EMAIL
|
|
for attr in u.ldap_attributes:
|
|
assert u[attr] == res[attr]
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
@pytest.mark.integration_test
|
|
async def test_User_modify_raises():
|
|
"""Modifying a not-yet-existing user raises."""
|
|
async with clean_db():
|
|
u = User(cl, "enea")
|
|
|
|
with pytest.raises(e.PhiEntryDoesNotExist) as ex:
|
|
await u.modify()
|
|
|
|
assert u.dn in str(ex.value)
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
@pytest.mark.integration_test
|
|
async def test_User_delete():
|
|
async with clean_db():
|
|
u = await init_achilles()
|
|
await u.delete()
|
|
|
|
h = Hackers(cl)
|
|
|
|
with pytest.raises(e.PhiEntryDoesNotExist) as ex:
|
|
await h.search("achilles")
|
|
|
|
assert u.dn in str(ex.value)
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
@pytest.mark.integration_test
|
|
async def test_Service_init():
|
|
async with clean_db():
|
|
s = await init_athena()
|
|
|
|
r = Robots(cl)
|
|
|
|
res = await r.search("athena")
|
|
|
|
assert s == res
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
@pytest.mark.integration_test
|
|
async def test_Service_describe():
|
|
async with clean_db():
|
|
s = await init_athena()
|
|
|
|
res = await s.describe()
|
|
|
|
assert res == {
|
|
"ou": "Robots",
|
|
"uid": "athena",
|
|
"dn": f"uid=athena,ou=Robots,{BASE_DN}",
|
|
}
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
@pytest.mark.integration_test
|
|
async def test_Group_init():
|
|
async with clean_db():
|
|
u = await init_achilles()
|
|
g = await init_group("achaeans", [u])
|
|
|
|
c = Congregations(cl)
|
|
|
|
res = await c.search("achaeans")
|
|
|
|
assert g == res
|
|
assert [u] == [a async for a in g.get_members()]
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
@pytest.mark.integration_test
|
|
async def test_Group_describe():
|
|
async with clean_db():
|
|
u1 = await init_achilles()
|
|
u2 = await init_patroclus()
|
|
g = await init_group("achaeans", [u1, u2])
|
|
|
|
res = await g.describe()
|
|
|
|
assert res == {
|
|
"ou": "Congregations",
|
|
"cn": "achaeans",
|
|
"dn": f"cn=achaeans,ou=Congregations,{BASE_DN}",
|
|
"member": [u1.dn, u2.dn],
|
|
}
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
@pytest.mark.integration_test
|
|
async def test_Group_add_member():
|
|
async with clean_db():
|
|
u = await init_achilles()
|
|
a = await init_athena()
|
|
g1 = await init_group("achaeans", [u])
|
|
g2 = await init_group("gods", [u])
|
|
|
|
await g2.add_member(a)
|
|
|
|
m1 = [m async for m in g1.get_members()]
|
|
m2 = [m async for m in g2.get_members()]
|
|
|
|
assert u in m1
|
|
assert u in m2
|
|
assert a not in m1
|
|
assert a in m2
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
@pytest.mark.integration_test
|
|
async def test_Group_remove_member():
|
|
async with clean_db():
|
|
u = await init_achilles()
|
|
a = await init_athena()
|
|
g = await init_group("achaeans", [u, a])
|
|
|
|
m = [a async for a in g.get_members()]
|
|
|
|
assert u in m
|
|
assert a in m
|
|
|
|
await g.remove_member(a)
|
|
|
|
assert [u] == [el async for el in g.get_members()]
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
@pytest.mark.integration_test
|
|
async def test_User_groups():
|
|
async with clean_db():
|
|
u = await init_achilles()
|
|
a = await init_athena()
|
|
g1 = await init_group("achaeans", [u])
|
|
g2 = await init_group("gods", [u, a])
|
|
|
|
res1 = await u.groups()
|
|
res2 = await a.groups()
|
|
|
|
assert g1 in res1
|
|
assert g2 in res1
|
|
assert g1 not in res2
|
|
assert g2 in res2
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
@pytest.mark.integration_test
|
|
async def test_OU_delete_raises():
|
|
async with clean_db():
|
|
u1 = await init_achilles()
|
|
u2 = await init_patroclus()
|
|
a = await init_athena()
|
|
g1 = await init_group("achaeans", [u1, u2])
|
|
g2 = await init_group("gods", [a, u1])
|
|
h = Hackers(cl)
|
|
_saved_val = h.delete_cascade
|
|
h.delete_cascade = False
|
|
|
|
assert not h.delete_cascade
|
|
|
|
with pytest.raises(e.PhiCannotExecute) as ex:
|
|
await h.delete()
|
|
|
|
assert "delete_cascade is not set" in str(ex.value)
|
|
|
|
h.delete_cascade = _saved_val
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
@pytest.mark.integration_test
|
|
async def test_OU_delete_cascade():
|
|
async with clean_db():
|
|
u1 = await init_achilles()
|
|
u2 = await init_patroclus()
|
|
a = await init_athena()
|
|
g1 = await init_group("achaeans", [u1, u2])
|
|
g2 = await init_group("gods", [a, u1])
|
|
h = Hackers(cl)
|
|
_saved_val = h.delete_cascade
|
|
h.delete_cascade = True
|
|
|
|
assert h.delete_cascade
|
|
|
|
await h.delete()
|
|
g2_members = [e async for e in g2.get_members()]
|
|
h_members = [e async for e in h]
|
|
|
|
assert not await u1.exists()
|
|
assert not await u2.exists()
|
|
assert h_members == []
|
|
assert not await g1.exists()
|
|
assert u1 not in g2_members
|
|
assert a in g2_members
|
|
|
|
h.delete_cascade = _saved_val
|