Added async_model and tests.
This commit is contained in:
parent
c8eb5c2dd4
commit
e67b97b214
141
src/phi/ldap/async_model.py
Normal file
141
src/phi/ldap/async_model.py
Normal file
|
@ -0,0 +1,141 @@
|
|||
import asyncio
|
||||
|
||||
from phi.logging import get_logger
|
||||
|
||||
|
||||
log = get_logger(__name__)
|
||||
|
||||
|
||||
def get_class(obj):
|
||||
if type(obj) is type:
|
||||
return obj
|
||||
return obj.__class__
|
||||
|
||||
|
||||
def recall(cls):
|
||||
"""
|
||||
Prints the name of the parent class.
|
||||
"""
|
||||
_cls = get_class(cls)
|
||||
return _cls.__bases__[0]
|
||||
|
||||
|
||||
def call_if_callable(cls, attr):
|
||||
"""
|
||||
Tell between methods and properties. Call if method.
|
||||
"""
|
||||
_attr = getattr(cls, attr)
|
||||
if "__call__" in dir(_attr):
|
||||
return _attr()
|
||||
return _attr
|
||||
|
||||
|
||||
def inheritance(obj, attr, root_cls=object):
|
||||
"""
|
||||
Concatenates the value obtained from invoking the method attr
|
||||
on the class and on any parent class until root_cls.
|
||||
"""
|
||||
res = call_if_callable(obj, attr)
|
||||
base = get_class(obj)
|
||||
while base is not root_cls:
|
||||
if base is not get_class(obj):
|
||||
res += ",{}".format(call_if_callable(base, attr))
|
||||
base = recall(base)
|
||||
return res.strip(",")
|
||||
|
||||
|
||||
class Entry:
|
||||
"""
|
||||
LDAP Entry. Interface to LDAP.
|
||||
"""
|
||||
|
||||
kind = None
|
||||
_name = None
|
||||
|
||||
@classmethod
|
||||
def name(cls):
|
||||
if "_name" in dir(cls) and cls._name is not None:
|
||||
return cls._name
|
||||
return cls.__name__
|
||||
|
||||
@classmethod
|
||||
def qualified_name(cls):
|
||||
return "{}={}".format(cls.kind, cls.name())
|
||||
|
||||
def __init__(self, client):
|
||||
self.client = client
|
||||
self.base_dn = client.base_dn
|
||||
|
||||
@property
|
||||
def dn(self):
|
||||
return "{},{}".format(inheritance(self, "qualified_name", Entry), self.base_dn)
|
||||
|
||||
async def describe(self):
|
||||
async with self.client.connect(is_async=True) as conn:
|
||||
res = await conn.search(self.dn, 2)
|
||||
return res
|
||||
|
||||
|
||||
class Hackers(Entry):
|
||||
"""
|
||||
This class is where Users belong.
|
||||
"""
|
||||
|
||||
kind = "ou"
|
||||
|
||||
|
||||
class Robots(Entry):
|
||||
"""
|
||||
This class is where Services belong.
|
||||
"""
|
||||
|
||||
kind = "ou"
|
||||
_name = "Services"
|
||||
|
||||
|
||||
class User(Hackers):
|
||||
"""
|
||||
This class models a user. User may have attributes
|
||||
and belong to Groups.
|
||||
"""
|
||||
|
||||
kind = "uid"
|
||||
|
||||
def __init__(self, name, *args, **kwargs):
|
||||
super().__init__(*args, **kwargs)
|
||||
self.name = name
|
||||
|
||||
def qualified_name(self):
|
||||
return "{}={}".format(self.kind, self.name)
|
||||
|
||||
@property
|
||||
def name(self):
|
||||
return self._name
|
||||
|
||||
@name.setter
|
||||
def name(self, name):
|
||||
self._name = name
|
||||
|
||||
|
||||
class Service(Robots):
|
||||
"""
|
||||
This class models a user. User may have attributes
|
||||
and belong to Groups.
|
||||
"""
|
||||
|
||||
kind = "uid"
|
||||
|
||||
def __init__(self, name, *args, **kwargs):
|
||||
super().__init__(*args, **kwargs)
|
||||
self.name = name
|
||||
|
||||
def qualified_name(self):
|
||||
return "{}={}".format(self.kind, self.name)
|
||||
|
||||
@property
|
||||
def name(self):
|
||||
return self._name
|
||||
|
||||
@name.setter
|
||||
def name(self, name):
|
||||
self._name = name
|
175
test/test_ldap_async_model.py
Normal file
175
test/test_ldap_async_model.py
Normal file
|
@ -0,0 +1,175 @@
|
|||
from argparse import Namespace
|
||||
import asyncio
|
||||
|
||||
from async_generator import asynccontextmanager
|
||||
import pytest
|
||||
|
||||
from phi.ldap.async_model import (
|
||||
get_class,
|
||||
recall,
|
||||
call_if_callable,
|
||||
inheritance,
|
||||
Entry,
|
||||
Hackers,
|
||||
Robots,
|
||||
Service,
|
||||
User,
|
||||
)
|
||||
|
||||
|
||||
BASE_DN = "dc=test,dc=abbiamoundominio,dc=org"
|
||||
|
||||
|
||||
class MockClient:
|
||||
def __init__(self, base_dn):
|
||||
self._base_dn = base_dn
|
||||
|
||||
@property
|
||||
def base_dn(self):
|
||||
return self._base_dn
|
||||
|
||||
@asynccontextmanager
|
||||
async def connect(self, *args, **kwargs):
|
||||
conn = Namespace()
|
||||
async def _search(*args, **kwargs):
|
||||
await asyncio.sleep(0.01)
|
||||
return "Fake description"
|
||||
conn.search = _search
|
||||
yield conn
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
def client_fixture():
|
||||
return MockClient(BASE_DN)
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
def lineage_fixture():
|
||||
class Grand:
|
||||
@classmethod
|
||||
def name(cls):
|
||||
return cls.__name__
|
||||
|
||||
class Ma(Grand):
|
||||
pass
|
||||
|
||||
class Child(Ma):
|
||||
pass
|
||||
|
||||
grand = Grand()
|
||||
ma = Ma()
|
||||
child = Child()
|
||||
|
||||
return Grand, Ma, Child, grand, ma, child
|
||||
|
||||
|
||||
def test_get_class():
|
||||
c = MockClient(BASE_DN)
|
||||
|
||||
assert get_class(c) is MockClient
|
||||
|
||||
|
||||
def test_recall(lineage_fixture):
|
||||
Grand, Ma, Child, grand, ma, child = lineage_fixture
|
||||
|
||||
assert object is recall(grand)
|
||||
assert recall(grand) is recall(Grand)
|
||||
assert Grand is recall(ma)
|
||||
assert recall(ma) is recall(Ma)
|
||||
assert Ma is recall(child)
|
||||
assert recall(child) is recall(Child)
|
||||
|
||||
|
||||
def test_call_if_callable():
|
||||
class Dummy:
|
||||
classattr = "classattr"
|
||||
|
||||
@property
|
||||
def prop(self):
|
||||
return "prop"
|
||||
|
||||
def func(self):
|
||||
return "func"
|
||||
|
||||
@classmethod
|
||||
def clsmth(cls):
|
||||
return "clsmth"
|
||||
|
||||
d = Dummy()
|
||||
|
||||
assert "classattr" == call_if_callable(d, "classattr")
|
||||
assert "prop" == call_if_callable(d, "prop")
|
||||
assert "func" == call_if_callable(d, "func")
|
||||
assert "clsmth" == call_if_callable(d, "clsmth")
|
||||
|
||||
|
||||
def test_inheritance(lineage_fixture):
|
||||
Grand, Ma, Child, _, _, _ = lineage_fixture
|
||||
|
||||
assert inheritance(Grand, "name") == "Grand"
|
||||
assert inheritance(Ma, "name") == "Ma,Grand"
|
||||
assert inheritance(Child, "name") == "Child,Ma,Grand"
|
||||
assert inheritance(Child, "name", Grand) == "Child,Ma"
|
||||
assert inheritance(Child, "name", Ma) == "Child"
|
||||
|
||||
|
||||
def test_Entry(client_fixture):
|
||||
e = Entry(client_fixture)
|
||||
|
||||
assert e.base_dn == BASE_DN
|
||||
assert e.client is not None
|
||||
|
||||
|
||||
def test_Entry_name(client_fixture):
|
||||
e = Entry(client_fixture)
|
||||
|
||||
assert e.name() == "Entry"
|
||||
|
||||
|
||||
def test_Entry_qualified_name(client_fixture):
|
||||
e = Entry(client_fixture)
|
||||
|
||||
assert e.qualified_name() == "None=Entry"
|
||||
|
||||
|
||||
def test_Entry_dn(client_fixture):
|
||||
e = Entry(client_fixture)
|
||||
|
||||
assert e.dn == "None=Entry,{}".format(BASE_DN)
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_Entry_describe(client_fixture):
|
||||
e = Entry(client_fixture)
|
||||
|
||||
assert "Fake description" == await e.describe()
|
||||
|
||||
|
||||
def test_Hackers(client_fixture):
|
||||
h = Hackers(client_fixture)
|
||||
|
||||
assert h.kind == "ou"
|
||||
assert h.dn == "ou={},{}".format(h.name(), BASE_DN)
|
||||
|
||||
|
||||
def test_Robots(client_fixture):
|
||||
r = Robots(client_fixture)
|
||||
|
||||
assert r.kind == "ou"
|
||||
assert r.dn == "ou={},{}".format(r.name(), BASE_DN)
|
||||
|
||||
|
||||
def test_User(client_fixture):
|
||||
c = User("conte_mascetti", client_fixture)
|
||||
|
||||
assert c.kind == "uid"
|
||||
assert c.name == "conte_mascetti"
|
||||
assert c.dn == "uid=conte_mascetti,ou=Hackers,{}".format(BASE_DN)
|
||||
|
||||
|
||||
def test_Service(client_fixture):
|
||||
c = Service("phi", client_fixture)
|
||||
|
||||
assert c.kind == "uid"
|
||||
assert c.name == "phi"
|
||||
assert c.dn == "uid=phi,ou=Services,{}".format(BASE_DN)
|
Loading…
Reference in New Issue
Block a user