Add some integration tests
This commit is contained in:
parent
e202d54a7d
commit
6bd1beba9e
131
integration_tests/test_async_ldap_new_model.py
Normal file
131
integration_tests/test_async_ldap_new_model.py
Normal file
|
@ -0,0 +1,131 @@
|
||||||
|
# -*- coding: utf-8 -*-
|
||||||
|
|
||||||
|
import asyncio
|
||||||
|
|
||||||
|
from async_generator import asynccontextmanager
|
||||||
|
import pytest
|
||||||
|
|
||||||
|
from phi.async_ldap.new_model import (
|
||||||
|
Hackers,
|
||||||
|
User,
|
||||||
|
Robots,
|
||||||
|
Service,
|
||||||
|
Group,
|
||||||
|
Congregations,
|
||||||
|
)
|
||||||
|
from phi.async_ldap.mixins import build_heritage
|
||||||
|
from phi.async_ldap.client import AsyncClient
|
||||||
|
import phi.exceptions as e
|
||||||
|
|
||||||
|
BASE_DN = "dc=unit,dc=macaomilano,dc=org"
|
||||||
|
|
||||||
|
cl = AsyncClient(
|
||||||
|
"ldap://localhost",
|
||||||
|
port=389,
|
||||||
|
encryption=True,
|
||||||
|
# validate=True,
|
||||||
|
ca_cert="../openldap/cert.pem",
|
||||||
|
username="root",
|
||||||
|
password="root",
|
||||||
|
base_dn=BASE_DN,
|
||||||
|
attribute_id="cn",
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
|
@asynccontextmanager
|
||||||
|
async def clean_db():
|
||||||
|
h = Hackers(cl)
|
||||||
|
r = Robots(cl)
|
||||||
|
c = Congregations(cl)
|
||||||
|
h.delete_cascade = True
|
||||||
|
r.delete_cascade = True
|
||||||
|
c.delete_cascade = True
|
||||||
|
await h.delete()
|
||||||
|
await r.delete()
|
||||||
|
await c.delete()
|
||||||
|
yield
|
||||||
|
await h.delete()
|
||||||
|
await r.delete()
|
||||||
|
await c.delete()
|
||||||
|
|
||||||
|
|
||||||
|
async def init_achilles():
|
||||||
|
u = User(cl, "achilles")
|
||||||
|
u["cn"] = "Achilles"
|
||||||
|
u["sn"] = "achilles"
|
||||||
|
u["mail"] = "achilles@phthia.gr"
|
||||||
|
u["userPassword"] = "Patroclus123"
|
||||||
|
|
||||||
|
await u.save()
|
||||||
|
|
||||||
|
return u
|
||||||
|
|
||||||
|
|
||||||
|
async def init_group(group_name, members):
|
||||||
|
g = Group(cl, group_name, member=members)
|
||||||
|
|
||||||
|
await g.save()
|
||||||
|
|
||||||
|
return g
|
||||||
|
|
||||||
|
|
||||||
|
@pytest.mark.asyncio
|
||||||
|
@pytest.mark.integration_test
|
||||||
|
async def test_User_init():
|
||||||
|
async with clean_db():
|
||||||
|
u = await init_achilles()
|
||||||
|
|
||||||
|
h = Hackers(cl)
|
||||||
|
|
||||||
|
res = await h.search("achilles")
|
||||||
|
|
||||||
|
assert u == res
|
||||||
|
|
||||||
|
|
||||||
|
@pytest.mark.asyncio
|
||||||
|
@pytest.mark.integration_test
|
||||||
|
async def test_User_modify():
|
||||||
|
async with clean_db():
|
||||||
|
u = await init_achilles()
|
||||||
|
NEW_EMAIL = "a@myrmidons.mil"
|
||||||
|
u["mail"] = NEW_EMAIL
|
||||||
|
await u.modify()
|
||||||
|
|
||||||
|
h = Hackers(cl)
|
||||||
|
|
||||||
|
res = await h.search("achilles")
|
||||||
|
await u.sync()
|
||||||
|
|
||||||
|
assert u["mail"] == res["mail"] == NEW_EMAIL
|
||||||
|
for attr in u.ldap_attributes:
|
||||||
|
assert u[attr] == res[attr]
|
||||||
|
|
||||||
|
|
||||||
|
@pytest.mark.asyncio
|
||||||
|
@pytest.mark.integration_test
|
||||||
|
async def test_User_delete():
|
||||||
|
async with clean_db():
|
||||||
|
u = await init_achilles()
|
||||||
|
await u.delete()
|
||||||
|
|
||||||
|
h = Hackers(cl)
|
||||||
|
|
||||||
|
with pytest.raises(e.PhiEntryDoesNotExist) as ex:
|
||||||
|
await h.search("achilles")
|
||||||
|
|
||||||
|
assert u.dn in str(ex.value)
|
||||||
|
|
||||||
|
|
||||||
|
@pytest.mark.asyncio
|
||||||
|
@pytest.mark.integration_test
|
||||||
|
async def test_Group_init():
|
||||||
|
async with clean_db():
|
||||||
|
u = await init_achilles()
|
||||||
|
g = await init_group("achaeans", [u])
|
||||||
|
|
||||||
|
c = Congregations(cl)
|
||||||
|
|
||||||
|
res = await c.search("achaeans")
|
||||||
|
|
||||||
|
assert g == res
|
||||||
|
assert [u] == [a for a in g.get_members()]
|
Loading…
Reference in New Issue
Block a user